- daemon/wgctl-conntrack: Go daemon for conntrack DESTROY events - wgctl-conntrack.service: systemd service - core/lib/accept_events.py: accept_events(), accept_aggregate() - ctx::accept_events_log: .wgctl/daemon/accept_events.log - activity: ACCEPT row with bytes in/out and conn count - activity: accept dest rows with ↓/↑ bytes at end - activity: --accept, --drop, --external flags - activity: unified w_count for drop/accept alignment - activity: drop service rows in red - activity: accept dest rows in green - sysctl: nf_conntrack_acct=1 for byte counting - note: --exclude-service/--include-service deferred
228 lines
No EOL
8.6 KiB
Python
228 lines
No EOL
8.6 KiB
Python
"""
|
|
accept_events.py — conntrack accept event processing.
|
|
|
|
Reads accept_events.log written by wgctl-conntrack daemon.
|
|
Each line is a JSON object with fields:
|
|
ts, peer, src_ip, dst_ip, dst_port, proto,
|
|
bytes_orig, bytes_reply, packets_orig, packets_reply,
|
|
duration_sec, service, event, external
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
from lib.util import (
|
|
DATETIME_FMT,
|
|
load_net_data, load_hosts_data,
|
|
reverse_lookup, hosts_lookup,
|
|
fmt_ts, fmt_ts_hour, ts_to_unix, parse_since,
|
|
)
|
|
|
|
|
|
def accept_events(file, filter_peer, filter_type, net_file,
|
|
limit, collapse='1', since='', filter_external='0',
|
|
sort_order='desc'):
|
|
"""
|
|
Format accept events with optional aggregation.
|
|
|
|
Output per line (collapse=1):
|
|
ts|peer|dst_ip|dst_port|proto|bytes_total|packets_total|count|duration_avg
|
|
|
|
Output per line (collapse=0):
|
|
ts|peer|dst_ip|dst_port|proto|bytes_orig|bytes_reply|packets_orig|packets_reply|duration_sec
|
|
"""
|
|
do_collapse = str(collapse) != '0'
|
|
external_only = str(filter_external) == '1'
|
|
limit = int(limit) if limit else 100
|
|
since_dt = parse_since(since) if since else None
|
|
descending = sort_order != 'asc'
|
|
|
|
events = []
|
|
try:
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if not e.get('peer'):
|
|
continue
|
|
if filter_peer and e.get('peer') != filter_peer:
|
|
continue
|
|
if filter_type and not e.get('peer', '').startswith(filter_type + '-'):
|
|
continue
|
|
if external_only and not e.get('external', False):
|
|
continue
|
|
if not external_only and e.get('external', False):
|
|
continue
|
|
if since_dt:
|
|
ts_str = e.get('ts', '')
|
|
try:
|
|
from datetime import timezone
|
|
ev_dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
|
|
if ev_dt < since_dt:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
events.append(e)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
if do_collapse:
|
|
# Aggregate by peer + dst_ip + dst_port + proto + hour
|
|
buckets = defaultdict(lambda: {'count': 0, 'bytes': 0, 'packets': 0, 'duration': 0.0})
|
|
bucket_ts = {}
|
|
|
|
for e in events:
|
|
ts_str = e.get('ts', '')
|
|
peer = e.get('peer', '')
|
|
dst_ip = e.get('dst_ip', '')
|
|
dst_port = str(e.get('dst_port', ''))
|
|
proto = e.get('proto', '')
|
|
try:
|
|
dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
|
|
hour_key = (peer, dst_ip, dst_port, proto, dt.strftime('%Y-%m-%d %H'))
|
|
except Exception:
|
|
continue
|
|
|
|
b = buckets[hour_key]
|
|
b['count'] += 1
|
|
b['bytes'] += e.get('bytes_orig', 0) + e.get('bytes_reply', 0)
|
|
b['packets'] += e.get('packets_orig', 0) + e.get('packets_reply', 0)
|
|
b['duration'] += e.get('duration_sec', 0.0)
|
|
|
|
if hour_key not in bucket_ts:
|
|
bucket_ts[hour_key] = dt
|
|
|
|
# Sort and limit
|
|
sorted_buckets = sorted(bucket_ts.items(), key=lambda x: x[1])
|
|
output = sorted_buckets[-limit:]
|
|
if descending:
|
|
output = list(reversed(output))
|
|
|
|
for hour_key, dt in output:
|
|
peer, dst_ip, dst_port, proto, _ = hour_key
|
|
b = buckets[hour_key]
|
|
ts_fmt = fmt_ts_hour(dt.isoformat())
|
|
dur_avg = b['duration'] / b['count'] if b['count'] > 0 else 0.0
|
|
print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b['bytes']}|{b['packets']}|{b['count']}|{dur_avg:.1f}")
|
|
|
|
else:
|
|
# Detailed — one row per event
|
|
result = [(ts_to_unix(e.get('ts', '')), e) for e in events]
|
|
result = result[-limit:]
|
|
if descending:
|
|
result.reverse()
|
|
|
|
for _, e in result:
|
|
ts_fmt = fmt_ts(e.get('ts', ''))
|
|
peer = e.get('peer', '')
|
|
dst_ip = e.get('dst_ip', '')
|
|
dst_port = str(e.get('dst_port', ''))
|
|
proto = e.get('proto', '')
|
|
b_orig = e.get('bytes_orig', 0)
|
|
b_reply = e.get('bytes_reply', 0)
|
|
p_orig = e.get('packets_orig', 0)
|
|
p_reply = e.get('packets_reply', 0)
|
|
dur = e.get('duration_sec', 0.0)
|
|
print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b_orig}|{b_reply}|{p_orig}|{p_reply}|{dur:.1f}")
|
|
|
|
|
|
def accept_aggregate(file, net_file, clients_dir, since='',
|
|
filter_peer='', external_only='0'):
|
|
"""
|
|
Aggregate accept events per peer — total bytes, packets, top destinations.
|
|
Used by wgctl activity to show accepted traffic alongside drops.
|
|
|
|
external_only='1': only show traffic to external IPs (non-private)
|
|
external_only='0': only show traffic to internal IPs (default)
|
|
|
|
Output:
|
|
peer|peer_name|bytes_in|bytes_out|packets_in|packets_out|conn_count
|
|
dest|peer_name|dst_ip|dst_port|proto|bytes_total|conn_count
|
|
"""
|
|
from collections import defaultdict
|
|
from itertools import groupby
|
|
|
|
since_dt = parse_since(since) if since else None
|
|
show_external = str(external_only) == '1'
|
|
|
|
peer_stats = defaultdict(lambda: {
|
|
'bytes_in': 0, 'bytes_out': 0,
|
|
'packets_in': 0, 'packets_out': 0,
|
|
'conn_count': 0
|
|
})
|
|
# dest_stats = defaultdict(lambda: {'bytes': 0, 'count': 0})
|
|
dest_stats = defaultdict(lambda: {'bytes_orig': 0, 'bytes_reply': 0, 'count': 0})
|
|
|
|
try:
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
peer = e.get('peer', '')
|
|
if not peer:
|
|
continue
|
|
if filter_peer and peer != filter_peer:
|
|
continue
|
|
|
|
# Filter by external/internal
|
|
is_external = e.get('external', False)
|
|
if show_external and not is_external:
|
|
continue
|
|
if not show_external and is_external:
|
|
continue
|
|
|
|
if since_dt:
|
|
ts_str = e.get('ts', '')
|
|
try:
|
|
from datetime import timezone
|
|
ev_dt = datetime.fromisoformat(
|
|
ts_str.replace('Z', '+00:00'))
|
|
if ev_dt < since_dt:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
|
|
dst_ip = e.get('dst_ip', '')
|
|
dst_port = str(e.get('dst_port', ''))
|
|
proto = e.get('proto', '')
|
|
b_orig = e.get('bytes_orig', 0)
|
|
b_reply = e.get('bytes_reply', 0)
|
|
p_orig = e.get('packets_orig', 0)
|
|
p_reply = e.get('packets_reply', 0)
|
|
|
|
ps = peer_stats[peer]
|
|
ps['bytes_out'] += b_orig
|
|
ps['bytes_in'] += b_reply
|
|
ps['packets_out'] += p_orig
|
|
ps['packets_in'] += p_reply
|
|
ps['conn_count'] += 1
|
|
|
|
dest_key = (peer, dst_ip, dst_port, proto)
|
|
dest_stats[dest_key]['bytes_orig'] += b_orig
|
|
dest_stats[dest_key]['bytes_reply'] += b_reply
|
|
dest_stats[dest_key]['count'] += 1
|
|
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
# Output peer summaries
|
|
for peer, ps in sorted(peer_stats.items()):
|
|
print(f"peer|{peer}|{ps['bytes_in']}|{ps['bytes_out']}|"
|
|
f"{ps['packets_in']}|{ps['packets_out']}|{ps['conn_count']}")
|
|
|
|
# Output top 5 destinations per peer sorted by byte count
|
|
dest_items = sorted(
|
|
dest_stats.items(),
|
|
key=lambda x: (x[0][0], -(x[1]['bytes_orig'] + x[1]['bytes_reply']))
|
|
)
|
|
for peer, group in groupby(dest_items, key=lambda x: x[0][0]):
|
|
top = list(group)[:20]
|
|
for (p, dst_ip, dst_port, proto), stats in top:
|
|
print(f"dest|{p}|{dst_ip}|{dst_port}|{proto}|"
|
|
f"{stats['bytes_orig']}|{stats['bytes_reply']}|{stats['count']}") |