""" accept_events.py — conntrack accept event processing. Reads accept_events.log written by wgctl-conntrack daemon. Each line is a JSON object with fields: ts, peer, src_ip, dst_ip, dst_port, proto, bytes_orig, bytes_reply, packets_orig, packets_reply, duration_sec, service, event, external """ import os import json from collections import defaultdict from datetime import datetime from lib.util import ( DATETIME_FMT, load_net_data, load_hosts_data, reverse_lookup, hosts_lookup, fmt_ts, fmt_ts_hour, ts_to_unix, parse_since, ) def accept_events(file, filter_peer, filter_type, net_file, limit, collapse='1', since='', filter_external='0', sort_order='desc'): """ Format accept events with optional aggregation. Output per line (collapse=1): ts|peer|dst_ip|dst_port|proto|bytes_total|packets_total|count|duration_avg Output per line (collapse=0): ts|peer|dst_ip|dst_port|proto|bytes_orig|bytes_reply|packets_orig|packets_reply|duration_sec """ do_collapse = str(collapse) != '0' external_only = str(filter_external) == '1' limit = int(limit) if limit else 100 since_dt = parse_since(since) if since else None descending = sort_order != 'asc' events = [] try: with open(file) as f: for line in f: try: e = json.loads(line.strip()) if not e.get('peer'): continue if filter_peer and e.get('peer') != filter_peer: continue if filter_type and not e.get('peer', '').startswith(filter_type + '-'): continue if external_only and not e.get('external', False): continue if not external_only and e.get('external', False): continue if since_dt: ts_str = e.get('ts', '') try: from datetime import timezone ev_dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) if ev_dt < since_dt: continue except Exception: pass events.append(e) except Exception: pass except Exception: pass if do_collapse: # Aggregate by peer + dst_ip + dst_port + proto + hour buckets = defaultdict(lambda: {'count': 0, 'bytes': 0, 'packets': 0, 'duration': 0.0}) bucket_ts = {} for e in events: ts_str = e.get('ts', '') peer = e.get('peer', '') dst_ip = e.get('dst_ip', '') dst_port = str(e.get('dst_port', '')) proto = e.get('proto', '') try: dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) hour_key = (peer, dst_ip, dst_port, proto, dt.strftime('%Y-%m-%d %H')) except Exception: continue b = buckets[hour_key] b['count'] += 1 b['bytes'] += e.get('bytes_orig', 0) + e.get('bytes_reply', 0) b['packets'] += e.get('packets_orig', 0) + e.get('packets_reply', 0) b['duration'] += e.get('duration_sec', 0.0) if hour_key not in bucket_ts: bucket_ts[hour_key] = dt # Sort and limit sorted_buckets = sorted(bucket_ts.items(), key=lambda x: x[1]) output = sorted_buckets[-limit:] if descending: output = list(reversed(output)) for hour_key, dt in output: peer, dst_ip, dst_port, proto, _ = hour_key b = buckets[hour_key] ts_fmt = fmt_ts_hour(dt.isoformat()) dur_avg = b['duration'] / b['count'] if b['count'] > 0 else 0.0 print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b['bytes']}|{b['packets']}|{b['count']}|{dur_avg:.1f}") else: # Detailed — one row per event result = [(ts_to_unix(e.get('ts', '')), e) for e in events] result = result[-limit:] if descending: result.reverse() for _, e in result: ts_fmt = fmt_ts(e.get('ts', '')) peer = e.get('peer', '') dst_ip = e.get('dst_ip', '') dst_port = str(e.get('dst_port', '')) proto = e.get('proto', '') b_orig = e.get('bytes_orig', 0) b_reply = e.get('bytes_reply', 0) p_orig = e.get('packets_orig', 0) p_reply = e.get('packets_reply', 0) dur = e.get('duration_sec', 0.0) print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b_orig}|{b_reply}|{p_orig}|{p_reply}|{dur:.1f}") def accept_aggregate(file, net_file, clients_dir, since='', filter_peer='', external_only='0', exclude_services=''): """ Aggregate accept events per peer — total bytes, packets, top destinations. Used by wgctl activity to show accepted traffic alongside drops. external_only='1': only show traffic to external IPs (non-private) external_only='0': only show traffic to internal IPs (default) Output: peer|peer_name|bytes_in|bytes_out|packets_in|packets_out|conn_count dest|peer_name|dst_ip|dst_port|proto|bytes_total|conn_count """ from collections import defaultdict from itertools import groupby from lib.util import load_net_data, hosts_lookup, reverse_lookup since_dt = parse_since(since) if since else None show_external = str(external_only) == '1' peer_stats = defaultdict(lambda: { 'bytes_in': 0, 'bytes_out': 0, 'packets_in': 0, 'packets_out': 0, 'conn_count': 0 }) # dest_stats = defaultdict(lambda: {'bytes': 0, 'count': 0}) dest_stats = defaultdict(lambda: {'bytes_orig': 0, 'bytes_reply': 0, 'count': 0}) # Build exclusion set — supports service names and ip:port:proto exclude_set = set() if exclude_services: for svc in exclude_services.split(): exclude_set.add(svc.strip()) net_data = load_net_data(net_file) if (net_file and exclude_set) else {} try: with open(file) as f: for line in f: try: e = json.loads(line.strip()) peer = e.get('peer', '') if not peer: continue if filter_peer and peer != filter_peer: continue # Filter by external/internal is_external = e.get('external', False) if show_external and not is_external: continue if not show_external and is_external: continue if since_dt: ts_str = e.get('ts', '') try: from datetime import timezone ev_dt = datetime.fromisoformat( ts_str.replace('Z', '+00:00')) if ev_dt < since_dt: continue except Exception: pass dst_ip = e.get('dst_ip', '') dst_port = str(e.get('dst_port', '')) proto = e.get('proto', '') b_orig = e.get('bytes_orig', 0) b_reply = e.get('bytes_reply', 0) p_orig = e.get('packets_orig', 0) p_reply = e.get('packets_reply', 0) ps = peer_stats[peer] ps['bytes_out'] += b_orig ps['bytes_in'] += b_reply ps['packets_out'] += p_orig ps['packets_in'] += p_reply ps['conn_count'] += 1 if _is_excluded(dst_ip, dst_port, proto, exclude_set, net_data): continue dest_key = (peer, dst_ip, dst_port, proto) dest_stats[dest_key]['bytes_orig'] += b_orig dest_stats[dest_key]['bytes_reply'] += b_reply dest_stats[dest_key]['count'] += 1 except Exception: pass except Exception: pass # Output peer summaries for peer, ps in sorted(peer_stats.items()): print(f"peer|{peer}|{ps['bytes_in']}|{ps['bytes_out']}|" f"{ps['packets_in']}|{ps['packets_out']}|{ps['conn_count']}") # Output top 5 destinations per peer sorted by byte count dest_items = sorted( dest_stats.items(), key=lambda x: (x[0][0], -(x[1]['bytes_orig'] + x[1]['bytes_reply'])) ) for peer, group in groupby(dest_items, key=lambda x: x[0][0]): top = list(group)[:20] for (p, dst_ip, dst_port, proto), stats in top: print(f"dest|{p}|{dst_ip}|{dst_port}|{proto}|" f"{stats['bytes_orig']}|{stats['bytes_reply']}|{stats['count']}") def _is_excluded(ip, port, proto, exclude_set, net_data): if not exclude_set: return False # Check raw ip:port:proto if f"{ip}:{port}:{proto}" in exclude_set: return True # Check service name svc = reverse_lookup(net_data, ip, str(port), proto) if net_data else '' if svc and svc in exclude_set: return True # Check service:proto format (e.g. "pihole:dns-udp" -> "pihole" + "dns-udp") if svc: for excl in exclude_set: if ':' in excl: excl_svc, excl_port = excl.rsplit(':', 1) if excl_svc == svc and excl_port in (f"{proto}-{port}", f"dns-{proto}"): return True return False