wgctl/core/lib/util.py
Nuno Duque Nunes 1308f9e07a refactor: split json_helper.py into lib/ modules
- core/lib/util.py: shared utilities, ip_to_name, reverse_lookup, parse_since
- core/lib/events.py: fw_events, wg_events, follow_logs, event parsers
- core/lib/peers.py: peer_data, peer_transfer, peer_transfer_delta
- core/lib/activity.py: activity_aggregate
- json_helper.py: thin dispatcher importing from lib/
- events.py: --since, --filter-event, --filter-dest-ip/port query flags
- util.py: parse_since supporting relative (2h/7d) and EU/ISO date formats
2026-05-24 22:02:50 +00:00

255 lines
No EOL
8.3 KiB
Python

"""
util.py — shared utilities for wgctl json_helper modules.
Imported by all other lib modules.
"""
import os
import json
import sys
from datetime import datetime, timezone, timedelta
# ──────────────────────────────────────────
# Global config (read from environment)
# ──────────────────────────────────────────
DATETIME_FMT = os.environ.get('WGCTL_DATETIME_FMT', '%Y-%m-%d %H:%M')
DATE_FORMAT = os.environ.get('WGCTL_DATE_FORMAT', 'eu') # eu | iso
PROTO_MAP = {1: 'icmp', 6: 'tcp', 17: 'udp'}
# ──────────────────────────────────────────
# IP → Peer name map
# ──────────────────────────────────────────
def build_ip_to_name(clients_dir):
"""
Build a dict mapping peer IP -> peer name from .conf files.
Cached per process — call once, reuse.
"""
import glob
ip_to_name = {}
for conf in glob.glob(f"{clients_dir}/*.conf"):
name = os.path.basename(conf).replace('.conf', '')
try:
with open(conf) as f:
for line in f:
if line.startswith('Address'):
ip = line.split('=')[1].strip().split('/')[0]
ip_to_name[ip] = name
break
except Exception:
pass
return ip_to_name
def build_pubkey_to_name(clients_dir):
"""
Build a dict mapping public key -> peer name from *_public.key files.
"""
import glob
pubkey_to_peer = {}
for kf in glob.glob(f"{clients_dir}/*_public.key"):
name = os.path.basename(kf).replace('_public.key', '')
try:
with open(kf) as f:
key = f.read().strip()
if key:
pubkey_to_peer[key] = name
except Exception:
pass
return pubkey_to_peer
# ──────────────────────────────────────────
# Service reverse lookup
# ──────────────────────────────────────────
def load_net_data(net_file):
"""Load services.json into a dict. Returns {} on failure."""
if not net_file or not os.path.exists(net_file):
return {}
try:
with open(net_file) as f:
return json.load(f)
except Exception:
return {}
def reverse_lookup(net_data, dest_ip, dest_port='', proto=''):
"""
Resolve dest_ip[:port] to a service name using services.json data.
Returns '' if no match found.
"""
for svc_name, svc in net_data.items():
if not isinstance(svc, dict):
continue
if svc.get('ip', '') != dest_ip:
continue
ports = svc.get('ports', {})
if dest_port:
for port_name, port_def in ports.items():
if not isinstance(port_def, dict):
continue
if (str(port_def.get('port', '')) == str(dest_port) and
port_def.get('proto', 'tcp') == proto):
return f"{svc_name}:{port_name}"
# IP matched but no port match — return service name
return svc_name
return svc_name
return ''
def load_hosts_data(hosts_file):
"""Load hosts.json into a dict. Returns empty structure on failure."""
if not hosts_file or not os.path.exists(hosts_file):
return {"hosts": {}, "subnets": {}, "ports": {}}
try:
with open(hosts_file) as f:
data = json.load(f)
data.setdefault("hosts", {})
data.setdefault("subnets", {})
data.setdefault("ports", {})
return data
except Exception:
return {"hosts": {}, "subnets": {}, "ports": {}}
def hosts_lookup(hosts_data, ip):
"""
Resolve IP to display name using hosts.json data.
Returns '' if no match.
"""
entry = hosts_data.get("hosts", {}).get(ip)
if not entry:
return ''
if isinstance(entry, dict):
return entry.get('name', '')
return str(entry)
def resolve_display(net_data, hosts_data, dest_ip, dest_port='', proto=''):
"""
Full resolution chain:
1. hosts.json exact IP match
2. services.json match
3. raw IP fallback (returns dest_ip)
"""
# 1. hosts.json
name = hosts_lookup(hosts_data, dest_ip)
if name:
return name
# 2. services.json
name = reverse_lookup(net_data, dest_ip, dest_port, proto)
if name:
return name
# 3. raw fallback
return dest_ip
# ──────────────────────────────────────────
# Timestamp utilities
# ──────────────────────────────────────────
def fmt_ts(ts_str, fmt=None):
"""
Format an ISO timestamp string using DATETIME_FMT (or override fmt).
Returns ts_str unchanged on failure.
"""
fmt = fmt or DATETIME_FMT
try:
dt = datetime.fromisoformat(ts_str)
return dt.strftime(fmt)
except Exception:
return ts_str
def fmt_ts_hour(ts_str, fmt=None):
"""
Format an ISO timestamp to hour precision (minutes replaced with 00).
"""
fmt = fmt or DATETIME_FMT
hour_fmt = fmt.replace('%M', '00')
try:
dt = datetime.fromisoformat(ts_str)
return dt.strftime(hour_fmt)
except Exception:
return ts_str
def ts_to_unix(ts_str):
"""Convert ISO timestamp to unix float. Returns 0.0 on failure."""
try:
dt = datetime.fromisoformat(ts_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.timestamp()
except Exception:
return 0.0
def parse_since(value, date_format=None):
"""
Parse a --since value to a datetime (UTC-aware).
Accepts:
Relative: 2h, 30m, 7d
EU date: 23/05, 23/05/2026, 23-05, 23-05-2026
ISO date: 2026-05-23, 2026-05-23 03:00
Returns None on failure.
"""
import re
date_format = date_format or DATE_FORMAT
value = value.strip()
# Relative: e.g. 2h, 30m, 7d
m = re.fullmatch(r'(\d+)([mhd])', value)
if m:
n, unit = int(m.group(1)), m.group(2)
delta = {'m': timedelta(minutes=n),
'h': timedelta(hours=n),
'd': timedelta(days=n)}[unit]
return datetime.now(timezone.utc) - delta
now_year = datetime.now().year
# EU formats: 23/05, 23/05/2026, 23-05, 23-05-2026
for pattern, fmt in [
(r'(\d{1,2})/(\d{1,2})$', f'%d/%m/{now_year}'),
(r'(\d{1,2})/(\d{1,2})/(\d{4})$', '%d/%m/%Y'),
(r'(\d{1,2})-(\d{1,2})$', f'%d-%m-{now_year}'),
(r'(\d{1,2})-(\d{1,2})-(\d{4})$', '%d-%m-%Y'),
]:
if re.fullmatch(pattern, value):
try:
if f'/{now_year}' in fmt or f'-{now_year}' in fmt:
dt = datetime.strptime(f"{value}/{now_year}" if '/' in value
else f"{value}-{now_year}", fmt)
else:
dt = datetime.strptime(value, fmt)
return dt.replace(tzinfo=timezone.utc)
except Exception:
pass
# ISO formats: 2026-05-23, 2026-05-23 03:00
for fmt in ('%Y-%m-%d', '%Y-%m-%d %H:%M'):
try:
dt = datetime.strptime(value, fmt)
return dt.replace(tzinfo=timezone.utc)
except Exception:
pass
return None
# ──────────────────────────────────────────
# Dest display formatting
# ──────────────────────────────────────────
def make_dest_display(dest_ip, dest_port, proto, svc_name):
"""Build a human-readable destination string."""
if svc_name and svc_name != dest_ip:
return svc_name
if dest_port:
return f"{dest_ip}:{dest_port}/{proto}"
if proto and proto not in ('tcp',):
return f"{dest_ip} ({proto})"
return dest_ip