""" activity.py — activity aggregation for wgctl activity command. """ import os import json import glob import subprocess from collections import defaultdict from datetime import datetime, timezone, timedelta from lib.util import ( PROTO_MAP, build_ip_to_name, build_pubkey_to_name, load_net_data, load_hosts_data, reverse_lookup, resolve_display, make_dest_display, ts_to_unix, parse_since, ) def activity_aggregate(fw_file, wg_file, wg_interface, net_file, clients_dir, meta_dir, hours, filter_peer, filter_service_ip): """ Aggregate activity data for wgctl activity. Output: peer|name|rx_bytes|tx_bytes|drop_count service|peer_name|dest_display|drop_count """ hours = int(hours) if hours else 24 cutoff = None if hours > 0: cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) # Preload lookups once ip_to_peer = build_ip_to_name(clients_dir) pubkey_to_peer = build_pubkey_to_name(clients_dir) net_data = load_net_data(net_file) def _reverse(dest_ip, dest_port, proto): return reverse_lookup(net_data, dest_ip, dest_port, proto) # WireGuard transfer totals peer_rx = defaultdict(int) peer_tx = defaultdict(int) try: result = subprocess.run( ['wg', 'show', wg_interface, 'transfer'], capture_output=True, text=True ) for line in result.stdout.strip().splitlines(): parts = line.split() if len(parts) >= 3: pubkey, rx, tx = parts[0], int(parts[1]), int(parts[2]) peer = pubkey_to_peer.get(pubkey) if peer: peer_rx[peer] += rx peer_tx[peer] += tx except Exception: pass # Parse fw_events for drops peer_drops = defaultdict(int) service_drops = defaultdict(lambda: defaultdict(int)) if os.path.exists(fw_file): try: with open(fw_file) as f: for line in f: line = line.strip() if not line: continue try: ev = json.loads(line) if cutoff: ts_str = ev.get('timestamp', '') try: ts = datetime.fromisoformat(ts_str) if ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) if ts < cutoff: continue except Exception: pass src_ip = ev.get('src_ip', '') if not src_ip: continue dest_ip = ev.get('dest_ip', '') dest_port = str(ev.get('dest_port', '')) proto_num = ev.get('ip.protocol', 0) proto = PROTO_MAP.get(int(proto_num), str(proto_num)) peer = ip_to_peer.get(src_ip) if not peer: continue if filter_peer and peer != filter_peer: continue if filter_service_ip and dest_ip != filter_service_ip: continue svc_name = _reverse(dest_ip, dest_port, proto) dest_display = make_dest_display(dest_ip, dest_port, proto, svc_name) peer_drops[peer] += 1 service_drops[peer][dest_display] += 1 except Exception: continue except Exception: pass # Collect peers with any activity all_peers = set() all_peers.update(k for k in peer_rx if peer_rx[k] > 0) all_peers.update(k for k in peer_tx if peer_tx[k] > 0) all_peers.update(peer_drops.keys()) if filter_peer: all_peers = {p for p in all_peers if p == filter_peer} for peer in sorted(all_peers): rx = peer_rx.get(peer, 0) tx = peer_tx.get(peer, 0) drops = peer_drops.get(peer, 0) print(f"peer|{peer}|{rx}|{tx}|{drops}") svc_map = service_drops.get(peer, {}) for dest_display, count in sorted(svc_map.items(), key=lambda x: -x[1]): print(f"service|{peer}|{dest_display}|{count}")