""" util.py — shared utilities for wgctl json_helper modules. Imported by all other lib modules. """ import os import json import sys from datetime import datetime, timezone, timedelta # ────────────────────────────────────────── # Global config (read from environment) # ────────────────────────────────────────── DATETIME_FMT = os.environ.get('WGCTL_DATETIME_FMT', '%Y-%m-%d %H:%M') DATE_FORMAT = os.environ.get('WGCTL_DATE_FORMAT', 'eu') # eu | iso PROTO_MAP = {1: 'icmp', 6: 'tcp', 17: 'udp'} # ────────────────────────────────────────── # IP → Peer name map # ────────────────────────────────────────── def build_ip_to_name(clients_dir): """ Build a dict mapping peer IP -> peer name from .conf files. Cached per process — call once, reuse. """ import glob ip_to_name = {} for conf in glob.glob(f"{clients_dir}/*.conf"): name = os.path.basename(conf).replace('.conf', '') try: with open(conf) as f: for line in f: if line.startswith('Address'): ip = line.split('=')[1].strip().split('/')[0] ip_to_name[ip] = name break except Exception: pass return ip_to_name def build_pubkey_to_name(clients_dir): """ Build a dict mapping public key -> peer name from *_public.key files. """ import glob pubkey_to_peer = {} for kf in glob.glob(f"{clients_dir}/*_public.key"): name = os.path.basename(kf).replace('_public.key', '') try: with open(kf) as f: key = f.read().strip() if key: pubkey_to_peer[key] = name except Exception: pass return pubkey_to_peer # ────────────────────────────────────────── # Service reverse lookup # ────────────────────────────────────────── def load_net_data(net_file): """Load services.json into a dict. Returns {} on failure.""" if not net_file or not os.path.exists(net_file): return {} try: with open(net_file) as f: return json.load(f) except Exception: return {} def reverse_lookup(net_data, dest_ip, dest_port='', proto=''): """ Resolve dest_ip[:port] to a service name using services.json data. Returns '' if no match found. """ for svc_name, svc in net_data.items(): if not isinstance(svc, dict): continue if svc.get('ip', '') != dest_ip: continue ports = svc.get('ports', {}) if dest_port: for port_name, port_def in ports.items(): if not isinstance(port_def, dict): continue if (str(port_def.get('port', '')) == str(dest_port) and port_def.get('proto', 'tcp') == proto): return f"{svc_name}:{port_name}" # IP matched but no port match — return service name return svc_name return svc_name return '' def load_hosts_data(hosts_file): """Load hosts.json into a dict. Returns empty structure on failure.""" if not hosts_file or not os.path.exists(hosts_file): return {"hosts": {}, "subnets": {}, "ports": {}} try: with open(hosts_file) as f: data = json.load(f) data.setdefault("hosts", {}) data.setdefault("subnets", {}) data.setdefault("ports", {}) return data except Exception: return {"hosts": {}, "subnets": {}, "ports": {}} def hosts_lookup(hosts_data, ip): """ Resolve IP to display name using hosts.json data. Returns '' if no match. """ entry = hosts_data.get("hosts", {}).get(ip) if not entry: return '' if isinstance(entry, dict): return entry.get('name', '') return str(entry) def resolve_display(net_data, hosts_data, dest_ip, dest_port='', proto=''): """ Full resolution chain: 1. hosts.json exact IP match 2. services.json match 3. raw IP fallback (returns dest_ip) """ # 1. hosts.json name = hosts_lookup(hosts_data, dest_ip) if name: return name # 2. services.json name = reverse_lookup(net_data, dest_ip, dest_port, proto) if name: return name # 3. raw fallback return dest_ip # ────────────────────────────────────────── # Timestamp utilities # ────────────────────────────────────────── def fmt_ts(ts_str, fmt=None): """ Format an ISO timestamp string using DATETIME_FMT (or override fmt). Returns ts_str unchanged on failure. """ fmt = fmt or DATETIME_FMT try: dt = datetime.fromisoformat(ts_str) return dt.strftime(fmt) except Exception: return ts_str def fmt_ts_hour(ts_str, fmt=None): """ Format an ISO timestamp to hour precision (minutes replaced with 00). """ fmt = fmt or DATETIME_FMT hour_fmt = fmt.replace('%M', '00') try: dt = datetime.fromisoformat(ts_str) return dt.strftime(hour_fmt) except Exception: return ts_str def ts_to_unix(ts_str): """Convert ISO timestamp to unix float. Returns 0.0 on failure.""" try: dt = datetime.fromisoformat(ts_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.timestamp() except Exception: return 0.0 def parse_since(value, date_format=None): """ Parse a --since value to a datetime (UTC-aware). Accepts: Relative: 2h, 30m, 7d EU date: 23/05, 23/05/2026, 23-05, 23-05-2026 ISO date: 2026-05-23, 2026-05-23 03:00 Returns None on failure. """ import re date_format = date_format or DATE_FORMAT value = value.strip() # Relative: e.g. 2h, 30m, 7d m = re.fullmatch(r'(\d+)([mhd])', value) if m: n, unit = int(m.group(1)), m.group(2) delta = {'m': timedelta(minutes=n), 'h': timedelta(hours=n), 'd': timedelta(days=n)}[unit] return datetime.now(timezone.utc) - delta now_year = datetime.now().year # EU formats: 23/05, 23/05/2026, 23-05, 23-05-2026 for pattern, fmt in [ (r'(\d{1,2})/(\d{1,2})$', f'%d/%m/{now_year}'), (r'(\d{1,2})/(\d{1,2})/(\d{4})$', '%d/%m/%Y'), (r'(\d{1,2})-(\d{1,2})$', f'%d-%m-{now_year}'), (r'(\d{1,2})-(\d{1,2})-(\d{4})$', '%d-%m-%Y'), ]: if re.fullmatch(pattern, value): try: if f'/{now_year}' in fmt or f'-{now_year}' in fmt: dt = datetime.strptime(f"{value}/{now_year}" if '/' in value else f"{value}-{now_year}", fmt) else: dt = datetime.strptime(value, fmt) return dt.replace(tzinfo=timezone.utc) except Exception: pass # ISO formats: 2026-05-23, 2026-05-23 03:00 for fmt in ('%Y-%m-%d', '%Y-%m-%d %H:%M'): try: dt = datetime.strptime(value, fmt) return dt.replace(tzinfo=timezone.utc) except Exception: pass return None # ────────────────────────────────────────── # Dest display formatting # ────────────────────────────────────────── def make_dest_display(dest_ip, dest_port, proto, svc_name): """Build a human-readable destination string.""" if svc_name and svc_name != dest_ip: return svc_name if dest_port: return f"{dest_ip}:{dest_port}/{proto}" if proto and proto not in ('tcp',): return f"{dest_ip} ({proto})" return dest_ip