wgctl/core/lib/accept_events.py
Nuno Duque Nunes b892298259 feat: accept logging, conntrack daemon, activity integration
- daemon/wgctl-conntrack: Go daemon for conntrack DESTROY events
- wgctl-conntrack.service: systemd service
- core/lib/accept_events.py: accept_events(), accept_aggregate()
- ctx::accept_events_log: .wgctl/daemon/accept_events.log
- activity: ACCEPT row with bytes in/out and conn count
- activity: accept dest rows with ↓/↑ bytes at end
- activity: --accept, --drop, --external flags
- activity: unified w_count for drop/accept alignment
- activity: drop service rows in red
- activity: accept dest rows in green
- sysctl: nf_conntrack_acct=1 for byte counting
- note: --exclude-service/--include-service deferred
2026-05-28 23:31:10 +00:00

228 lines
No EOL
8.6 KiB
Python

"""
accept_events.py — conntrack accept event processing.
Reads accept_events.log written by wgctl-conntrack daemon.
Each line is a JSON object with fields:
ts, peer, src_ip, dst_ip, dst_port, proto,
bytes_orig, bytes_reply, packets_orig, packets_reply,
duration_sec, service, event, external
"""
import os
import json
from collections import defaultdict
from datetime import datetime
from lib.util import (
DATETIME_FMT,
load_net_data, load_hosts_data,
reverse_lookup, hosts_lookup,
fmt_ts, fmt_ts_hour, ts_to_unix, parse_since,
)
def accept_events(file, filter_peer, filter_type, net_file,
limit, collapse='1', since='', filter_external='0',
sort_order='desc'):
"""
Format accept events with optional aggregation.
Output per line (collapse=1):
ts|peer|dst_ip|dst_port|proto|bytes_total|packets_total|count|duration_avg
Output per line (collapse=0):
ts|peer|dst_ip|dst_port|proto|bytes_orig|bytes_reply|packets_orig|packets_reply|duration_sec
"""
do_collapse = str(collapse) != '0'
external_only = str(filter_external) == '1'
limit = int(limit) if limit else 100
since_dt = parse_since(since) if since else None
descending = sort_order != 'asc'
events = []
try:
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
if not e.get('peer'):
continue
if filter_peer and e.get('peer') != filter_peer:
continue
if filter_type and not e.get('peer', '').startswith(filter_type + '-'):
continue
if external_only and not e.get('external', False):
continue
if not external_only and e.get('external', False):
continue
if since_dt:
ts_str = e.get('ts', '')
try:
from datetime import timezone
ev_dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
if ev_dt < since_dt:
continue
except Exception:
pass
events.append(e)
except Exception:
pass
except Exception:
pass
if do_collapse:
# Aggregate by peer + dst_ip + dst_port + proto + hour
buckets = defaultdict(lambda: {'count': 0, 'bytes': 0, 'packets': 0, 'duration': 0.0})
bucket_ts = {}
for e in events:
ts_str = e.get('ts', '')
peer = e.get('peer', '')
dst_ip = e.get('dst_ip', '')
dst_port = str(e.get('dst_port', ''))
proto = e.get('proto', '')
try:
dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
hour_key = (peer, dst_ip, dst_port, proto, dt.strftime('%Y-%m-%d %H'))
except Exception:
continue
b = buckets[hour_key]
b['count'] += 1
b['bytes'] += e.get('bytes_orig', 0) + e.get('bytes_reply', 0)
b['packets'] += e.get('packets_orig', 0) + e.get('packets_reply', 0)
b['duration'] += e.get('duration_sec', 0.0)
if hour_key not in bucket_ts:
bucket_ts[hour_key] = dt
# Sort and limit
sorted_buckets = sorted(bucket_ts.items(), key=lambda x: x[1])
output = sorted_buckets[-limit:]
if descending:
output = list(reversed(output))
for hour_key, dt in output:
peer, dst_ip, dst_port, proto, _ = hour_key
b = buckets[hour_key]
ts_fmt = fmt_ts_hour(dt.isoformat())
dur_avg = b['duration'] / b['count'] if b['count'] > 0 else 0.0
print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b['bytes']}|{b['packets']}|{b['count']}|{dur_avg:.1f}")
else:
# Detailed — one row per event
result = [(ts_to_unix(e.get('ts', '')), e) for e in events]
result = result[-limit:]
if descending:
result.reverse()
for _, e in result:
ts_fmt = fmt_ts(e.get('ts', ''))
peer = e.get('peer', '')
dst_ip = e.get('dst_ip', '')
dst_port = str(e.get('dst_port', ''))
proto = e.get('proto', '')
b_orig = e.get('bytes_orig', 0)
b_reply = e.get('bytes_reply', 0)
p_orig = e.get('packets_orig', 0)
p_reply = e.get('packets_reply', 0)
dur = e.get('duration_sec', 0.0)
print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b_orig}|{b_reply}|{p_orig}|{p_reply}|{dur:.1f}")
def accept_aggregate(file, net_file, clients_dir, since='',
filter_peer='', external_only='0'):
"""
Aggregate accept events per peer — total bytes, packets, top destinations.
Used by wgctl activity to show accepted traffic alongside drops.
external_only='1': only show traffic to external IPs (non-private)
external_only='0': only show traffic to internal IPs (default)
Output:
peer|peer_name|bytes_in|bytes_out|packets_in|packets_out|conn_count
dest|peer_name|dst_ip|dst_port|proto|bytes_total|conn_count
"""
from collections import defaultdict
from itertools import groupby
since_dt = parse_since(since) if since else None
show_external = str(external_only) == '1'
peer_stats = defaultdict(lambda: {
'bytes_in': 0, 'bytes_out': 0,
'packets_in': 0, 'packets_out': 0,
'conn_count': 0
})
# dest_stats = defaultdict(lambda: {'bytes': 0, 'count': 0})
dest_stats = defaultdict(lambda: {'bytes_orig': 0, 'bytes_reply': 0, 'count': 0})
try:
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
peer = e.get('peer', '')
if not peer:
continue
if filter_peer and peer != filter_peer:
continue
# Filter by external/internal
is_external = e.get('external', False)
if show_external and not is_external:
continue
if not show_external and is_external:
continue
if since_dt:
ts_str = e.get('ts', '')
try:
from datetime import timezone
ev_dt = datetime.fromisoformat(
ts_str.replace('Z', '+00:00'))
if ev_dt < since_dt:
continue
except Exception:
pass
dst_ip = e.get('dst_ip', '')
dst_port = str(e.get('dst_port', ''))
proto = e.get('proto', '')
b_orig = e.get('bytes_orig', 0)
b_reply = e.get('bytes_reply', 0)
p_orig = e.get('packets_orig', 0)
p_reply = e.get('packets_reply', 0)
ps = peer_stats[peer]
ps['bytes_out'] += b_orig
ps['bytes_in'] += b_reply
ps['packets_out'] += p_orig
ps['packets_in'] += p_reply
ps['conn_count'] += 1
dest_key = (peer, dst_ip, dst_port, proto)
dest_stats[dest_key]['bytes_orig'] += b_orig
dest_stats[dest_key]['bytes_reply'] += b_reply
dest_stats[dest_key]['count'] += 1
except Exception:
pass
except Exception:
pass
# Output peer summaries
for peer, ps in sorted(peer_stats.items()):
print(f"peer|{peer}|{ps['bytes_in']}|{ps['bytes_out']}|"
f"{ps['packets_in']}|{ps['packets_out']}|{ps['conn_count']}")
# Output top 5 destinations per peer sorted by byte count
dest_items = sorted(
dest_stats.items(),
key=lambda x: (x[0][0], -(x[1]['bytes_orig'] + x[1]['bytes_reply']))
)
for peer, group in groupby(dest_items, key=lambda x: x[0][0]):
top = list(group)[:20]
for (p, dst_ip, dst_port, proto), stats in top:
print(f"dest|{p}|{dst_ip}|{dst_port}|{proto}|"
f"{stats['bytes_orig']}|{stats['bytes_reply']}|{stats['count']}")