wgctl/core/lib/activity.py
Nuno Duque Nunes 1308f9e07a refactor: split json_helper.py into lib/ modules
- core/lib/util.py: shared utilities, ip_to_name, reverse_lookup, parse_since
- core/lib/events.py: fw_events, wg_events, follow_logs, event parsers
- core/lib/peers.py: peer_data, peer_transfer, peer_transfer_delta
- core/lib/activity.py: activity_aggregate
- json_helper.py: thin dispatcher importing from lib/
- events.py: --since, --filter-event, --filter-dest-ip/port query flags
- util.py: parse_since supporting relative (2h/7d) and EU/ISO date formats
2026-05-24 22:02:50 +00:00

129 lines
No EOL
4.5 KiB
Python

"""
activity.py — activity aggregation for wgctl activity command.
"""
import os
import json
import glob
import subprocess
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from lib.util import (
PROTO_MAP, build_ip_to_name, build_pubkey_to_name,
load_net_data, load_hosts_data,
reverse_lookup, resolve_display, make_dest_display,
ts_to_unix, parse_since,
)
def activity_aggregate(fw_file, wg_file, wg_interface, net_file,
clients_dir, meta_dir, hours, filter_peer,
filter_service_ip):
"""
Aggregate activity data for wgctl activity.
Output:
peer|name|rx_bytes|tx_bytes|drop_count
service|peer_name|dest_display|drop_count
"""
hours = int(hours) if hours else 24
cutoff = None
if hours > 0:
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
# Preload lookups once
ip_to_peer = build_ip_to_name(clients_dir)
pubkey_to_peer = build_pubkey_to_name(clients_dir)
net_data = load_net_data(net_file)
def _reverse(dest_ip, dest_port, proto):
return reverse_lookup(net_data, dest_ip, dest_port, proto)
# WireGuard transfer totals
peer_rx = defaultdict(int)
peer_tx = defaultdict(int)
try:
result = subprocess.run(
['wg', 'show', wg_interface, 'transfer'],
capture_output=True, text=True
)
for line in result.stdout.strip().splitlines():
parts = line.split()
if len(parts) >= 3:
pubkey, rx, tx = parts[0], int(parts[1]), int(parts[2])
peer = pubkey_to_peer.get(pubkey)
if peer:
peer_rx[peer] += rx
peer_tx[peer] += tx
except Exception:
pass
# Parse fw_events for drops
peer_drops = defaultdict(int)
service_drops = defaultdict(lambda: defaultdict(int))
if os.path.exists(fw_file):
try:
with open(fw_file) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
ev = json.loads(line)
if cutoff:
ts_str = ev.get('timestamp', '')
try:
ts = datetime.fromisoformat(ts_str)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
if ts < cutoff:
continue
except Exception:
pass
src_ip = ev.get('src_ip', '')
if not src_ip:
continue
dest_ip = ev.get('dest_ip', '')
dest_port = str(ev.get('dest_port', ''))
proto_num = ev.get('ip.protocol', 0)
proto = PROTO_MAP.get(int(proto_num), str(proto_num))
peer = ip_to_peer.get(src_ip)
if not peer:
continue
if filter_peer and peer != filter_peer:
continue
if filter_service_ip and dest_ip != filter_service_ip:
continue
svc_name = _reverse(dest_ip, dest_port, proto)
dest_display = make_dest_display(dest_ip, dest_port, proto, svc_name)
peer_drops[peer] += 1
service_drops[peer][dest_display] += 1
except Exception:
continue
except Exception:
pass
# Collect peers with any activity
all_peers = set()
all_peers.update(k for k in peer_rx if peer_rx[k] > 0)
all_peers.update(k for k in peer_tx if peer_tx[k] > 0)
all_peers.update(peer_drops.keys())
if filter_peer:
all_peers = {p for p in all_peers if p == filter_peer}
for peer in sorted(all_peers):
rx = peer_rx.get(peer, 0)
tx = peer_tx.get(peer, 0)
drops = peer_drops.get(peer, 0)
print(f"peer|{peer}|{rx}|{tx}|{drops}")
svc_map = service_drops.get(peer, {})
for dest_display, count in sorted(svc_map.items(), key=lambda x: -x[1]):
print(f"service|{peer}|{dest_display}|{count}")