- core/lib/util.py: shared utilities, ip_to_name, reverse_lookup, parse_since - core/lib/events.py: fw_events, wg_events, follow_logs, event parsers - core/lib/peers.py: peer_data, peer_transfer, peer_transfer_delta - core/lib/activity.py: activity_aggregate - json_helper.py: thin dispatcher importing from lib/ - events.py: --since, --filter-event, --filter-dest-ip/port query flags - util.py: parse_since supporting relative (2h/7d) and EU/ISO date formats
255 lines
No EOL
8.3 KiB
Python
255 lines
No EOL
8.3 KiB
Python
"""
|
|
util.py — shared utilities for wgctl json_helper modules.
|
|
Imported by all other lib modules.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import sys
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
# ──────────────────────────────────────────
|
|
# Global config (read from environment)
|
|
# ──────────────────────────────────────────
|
|
|
|
DATETIME_FMT = os.environ.get('WGCTL_DATETIME_FMT', '%Y-%m-%d %H:%M')
|
|
DATE_FORMAT = os.environ.get('WGCTL_DATE_FORMAT', 'eu') # eu | iso
|
|
|
|
PROTO_MAP = {1: 'icmp', 6: 'tcp', 17: 'udp'}
|
|
|
|
# ──────────────────────────────────────────
|
|
# IP → Peer name map
|
|
# ──────────────────────────────────────────
|
|
|
|
def build_ip_to_name(clients_dir):
|
|
"""
|
|
Build a dict mapping peer IP -> peer name from .conf files.
|
|
Cached per process — call once, reuse.
|
|
"""
|
|
import glob
|
|
ip_to_name = {}
|
|
for conf in glob.glob(f"{clients_dir}/*.conf"):
|
|
name = os.path.basename(conf).replace('.conf', '')
|
|
try:
|
|
with open(conf) as f:
|
|
for line in f:
|
|
if line.startswith('Address'):
|
|
ip = line.split('=')[1].strip().split('/')[0]
|
|
ip_to_name[ip] = name
|
|
break
|
|
except Exception:
|
|
pass
|
|
return ip_to_name
|
|
|
|
|
|
def build_pubkey_to_name(clients_dir):
|
|
"""
|
|
Build a dict mapping public key -> peer name from *_public.key files.
|
|
"""
|
|
import glob
|
|
pubkey_to_peer = {}
|
|
for kf in glob.glob(f"{clients_dir}/*_public.key"):
|
|
name = os.path.basename(kf).replace('_public.key', '')
|
|
try:
|
|
with open(kf) as f:
|
|
key = f.read().strip()
|
|
if key:
|
|
pubkey_to_peer[key] = name
|
|
except Exception:
|
|
pass
|
|
return pubkey_to_peer
|
|
|
|
|
|
# ──────────────────────────────────────────
|
|
# Service reverse lookup
|
|
# ──────────────────────────────────────────
|
|
|
|
def load_net_data(net_file):
|
|
"""Load services.json into a dict. Returns {} on failure."""
|
|
if not net_file or not os.path.exists(net_file):
|
|
return {}
|
|
try:
|
|
with open(net_file) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def reverse_lookup(net_data, dest_ip, dest_port='', proto=''):
|
|
"""
|
|
Resolve dest_ip[:port] to a service name using services.json data.
|
|
Returns '' if no match found.
|
|
"""
|
|
for svc_name, svc in net_data.items():
|
|
if not isinstance(svc, dict):
|
|
continue
|
|
if svc.get('ip', '') != dest_ip:
|
|
continue
|
|
ports = svc.get('ports', {})
|
|
if dest_port:
|
|
for port_name, port_def in ports.items():
|
|
if not isinstance(port_def, dict):
|
|
continue
|
|
if (str(port_def.get('port', '')) == str(dest_port) and
|
|
port_def.get('proto', 'tcp') == proto):
|
|
return f"{svc_name}:{port_name}"
|
|
# IP matched but no port match — return service name
|
|
return svc_name
|
|
return svc_name
|
|
return ''
|
|
|
|
|
|
def load_hosts_data(hosts_file):
|
|
"""Load hosts.json into a dict. Returns empty structure on failure."""
|
|
if not hosts_file or not os.path.exists(hosts_file):
|
|
return {"hosts": {}, "subnets": {}, "ports": {}}
|
|
try:
|
|
with open(hosts_file) as f:
|
|
data = json.load(f)
|
|
data.setdefault("hosts", {})
|
|
data.setdefault("subnets", {})
|
|
data.setdefault("ports", {})
|
|
return data
|
|
except Exception:
|
|
return {"hosts": {}, "subnets": {}, "ports": {}}
|
|
|
|
|
|
def hosts_lookup(hosts_data, ip):
|
|
"""
|
|
Resolve IP to display name using hosts.json data.
|
|
Returns '' if no match.
|
|
"""
|
|
entry = hosts_data.get("hosts", {}).get(ip)
|
|
if not entry:
|
|
return ''
|
|
if isinstance(entry, dict):
|
|
return entry.get('name', '')
|
|
return str(entry)
|
|
|
|
|
|
def resolve_display(net_data, hosts_data, dest_ip, dest_port='', proto=''):
|
|
"""
|
|
Full resolution chain:
|
|
1. hosts.json exact IP match
|
|
2. services.json match
|
|
3. raw IP fallback (returns dest_ip)
|
|
"""
|
|
# 1. hosts.json
|
|
name = hosts_lookup(hosts_data, dest_ip)
|
|
if name:
|
|
return name
|
|
# 2. services.json
|
|
name = reverse_lookup(net_data, dest_ip, dest_port, proto)
|
|
if name:
|
|
return name
|
|
# 3. raw fallback
|
|
return dest_ip
|
|
|
|
|
|
# ──────────────────────────────────────────
|
|
# Timestamp utilities
|
|
# ──────────────────────────────────────────
|
|
|
|
def fmt_ts(ts_str, fmt=None):
|
|
"""
|
|
Format an ISO timestamp string using DATETIME_FMT (or override fmt).
|
|
Returns ts_str unchanged on failure.
|
|
"""
|
|
fmt = fmt or DATETIME_FMT
|
|
try:
|
|
dt = datetime.fromisoformat(ts_str)
|
|
return dt.strftime(fmt)
|
|
except Exception:
|
|
return ts_str
|
|
|
|
|
|
def fmt_ts_hour(ts_str, fmt=None):
|
|
"""
|
|
Format an ISO timestamp to hour precision (minutes replaced with 00).
|
|
"""
|
|
fmt = fmt or DATETIME_FMT
|
|
hour_fmt = fmt.replace('%M', '00')
|
|
try:
|
|
dt = datetime.fromisoformat(ts_str)
|
|
return dt.strftime(hour_fmt)
|
|
except Exception:
|
|
return ts_str
|
|
|
|
|
|
def ts_to_unix(ts_str):
|
|
"""Convert ISO timestamp to unix float. Returns 0.0 on failure."""
|
|
try:
|
|
dt = datetime.fromisoformat(ts_str)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt.timestamp()
|
|
except Exception:
|
|
return 0.0
|
|
|
|
|
|
def parse_since(value, date_format=None):
|
|
"""
|
|
Parse a --since value to a datetime (UTC-aware).
|
|
Accepts:
|
|
Relative: 2h, 30m, 7d
|
|
EU date: 23/05, 23/05/2026, 23-05, 23-05-2026
|
|
ISO date: 2026-05-23, 2026-05-23 03:00
|
|
Returns None on failure.
|
|
"""
|
|
import re
|
|
date_format = date_format or DATE_FORMAT
|
|
value = value.strip()
|
|
|
|
# Relative: e.g. 2h, 30m, 7d
|
|
m = re.fullmatch(r'(\d+)([mhd])', value)
|
|
if m:
|
|
n, unit = int(m.group(1)), m.group(2)
|
|
delta = {'m': timedelta(minutes=n),
|
|
'h': timedelta(hours=n),
|
|
'd': timedelta(days=n)}[unit]
|
|
return datetime.now(timezone.utc) - delta
|
|
|
|
now_year = datetime.now().year
|
|
|
|
# EU formats: 23/05, 23/05/2026, 23-05, 23-05-2026
|
|
for pattern, fmt in [
|
|
(r'(\d{1,2})/(\d{1,2})$', f'%d/%m/{now_year}'),
|
|
(r'(\d{1,2})/(\d{1,2})/(\d{4})$', '%d/%m/%Y'),
|
|
(r'(\d{1,2})-(\d{1,2})$', f'%d-%m-{now_year}'),
|
|
(r'(\d{1,2})-(\d{1,2})-(\d{4})$', '%d-%m-%Y'),
|
|
]:
|
|
if re.fullmatch(pattern, value):
|
|
try:
|
|
if f'/{now_year}' in fmt or f'-{now_year}' in fmt:
|
|
dt = datetime.strptime(f"{value}/{now_year}" if '/' in value
|
|
else f"{value}-{now_year}", fmt)
|
|
else:
|
|
dt = datetime.strptime(value, fmt)
|
|
return dt.replace(tzinfo=timezone.utc)
|
|
except Exception:
|
|
pass
|
|
|
|
# ISO formats: 2026-05-23, 2026-05-23 03:00
|
|
for fmt in ('%Y-%m-%d', '%Y-%m-%d %H:%M'):
|
|
try:
|
|
dt = datetime.strptime(value, fmt)
|
|
return dt.replace(tzinfo=timezone.utc)
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
# ──────────────────────────────────────────
|
|
# Dest display formatting
|
|
# ──────────────────────────────────────────
|
|
|
|
def make_dest_display(dest_ip, dest_port, proto, svc_name):
|
|
"""Build a human-readable destination string."""
|
|
if svc_name and svc_name != dest_ip:
|
|
return svc_name
|
|
if dest_port:
|
|
return f"{dest_ip}:{dest_port}/{proto}"
|
|
if proto and proto not in ('tcp',):
|
|
return f"{dest_ip} ({proto})"
|
|
return dest_ip |