""" events.py — WireGuard and firewall event processing. """ import os import json import sys from collections import defaultdict from datetime import datetime from lib.util import ( DATETIME_FMT, PROTO_MAP, build_ip_to_name, load_net_data, load_hosts_data, reverse_lookup, hosts_lookup, resolve_display, fmt_ts, fmt_ts_hour, ts_to_unix, parse_since, make_dest_display, ) # ────────────────────────────────────────── # fw_events # ────────────────────────────────────────── def fw_events(file, filter_ip, filter_type, clients_dir, net_file, limit, collapse='1', since='', filter_dest_ip='', filter_dest_port='', sort_order='desc', endpoint_cache_file=''): """ Format firewall drop events with dedup, counts, and service annotation. collapse='1' (default): hourly aggregation collapse='0': show all deduplicated events (--detailed mode) since: relative or absolute time string (e.g. '2h', '23/05', '2026-05-23') filter_dest_ip: filter by destination IP (optional) filter_dest_port: filter by destination port (optional) Output per line: ts|client|dest_ip|dest_port|proto|service_name|count """ do_collapse = str(collapse) != '0' limit = int(limit) if limit else 50 # Preload lookups once ip_to_name = build_ip_to_name(clients_dir) net_data = load_net_data(net_file) hosts_data = load_hosts_data(None) # hosts lookup done in bash for now endpoint_cache = {} if endpoint_cache_file and os.path.exists(endpoint_cache_file): try: with open(endpoint_cache_file) as f: endpoint_cache = json.load(f) except Exception: pass since_dt = parse_since(since) if since else None def _reverse(dest_ip, dest_port, proto): return reverse_lookup(net_data, dest_ip, dest_port, proto) # ── Parse and first-pass dedup (time-window per key) ── events = [] last_seen = {} try: with open(file) as f: for line in f: try: e = json.loads(line.strip()) src = e.get('src_ip', '') if not src: continue if filter_ip and src != filter_ip: continue proto_num = int(e.get('ip.protocol', 0)) proto = PROTO_MAP.get(proto_num, str(proto_num)) dst = e.get('dest_ip', '') port = str(e.get('dest_port', '')) if filter_dest_ip and dst != filter_dest_ip: continue if filter_dest_port and port != filter_dest_port: continue ts_str = e.get('timestamp', '') ts = ts_to_unix(ts_str) if since_dt: try: ev_dt = datetime.fromisoformat(ts_str) if ev_dt.tzinfo is None: from datetime import timezone ev_dt = ev_dt.replace(tzinfo=timezone.utc) if ev_dt < since_dt: continue except Exception: pass key = (src, dst, port, proto_num) windows = {1: 5, 6: 30, 17: 10} window = windows.get(proto_num, 10) if key in last_seen and (ts - last_seen[key]) < window: continue last_seen[key] = ts events.append(e) except Exception: continue except Exception: pass # ── Collapse or detailed output ── if do_collapse: hourly = defaultdict(int) hourly_ts = {} for e in events: src = e.get('src_ip', '') dst = e.get('dest_ip', '') port = str(e.get('dest_port', '')) proto_num = int(e.get('ip.protocol', 0)) proto = PROTO_MAP.get(proto_num, str(proto_num)) ts_str = e.get('timestamp', '') client = ip_to_name.get(src, src) svc_name = _reverse(dst, port, proto) try: dt = datetime.fromisoformat(ts_str) hour_key = (client, dst, port, proto, svc_name, dt.strftime('%Y-%m-%d %H')) hourly[hour_key] += 1 if hour_key not in hourly_ts: hourly_ts[hour_key] = dt except Exception: continue sorted_buckets = sorted(hourly_ts.items(), key=lambda x: x[1]) output = sorted_buckets[-limit:] if sort_order != 'asc': output = list(reversed(output)) for hour_key, dt in output: client, dst, port, proto, svc_name, _ = hour_key count = hourly[hour_key] ts_fmt = fmt_ts_hour(hourly_ts[hour_key].isoformat()) src_endpoint = endpoint_cache.get(client, '') print(f"{ts_fmt}|{client}|{dst}|{port}|{proto}|{svc_name}|{count}|{src_endpoint}") else: # Detailed — consecutive dedup only deduped = [] counts = [] for e in events: src = e.get('src_ip', '') dst = e.get('dest_ip', '') port = str(e.get('dest_port', '')) proto_num = int(e.get('ip.protocol', 0)) key = (src, dst, port, proto_num) ts = ts_to_unix(e.get('timestamp', '')) if deduped: prev = deduped[-1] prev_ts = ts_to_unix(prev.get('timestamp', '')) prev_key = ( prev.get('src_ip', ''), prev.get('dest_ip', ''), str(prev.get('dest_port', '')), int(prev.get('ip.protocol', 0)) ) if key == prev_key and (ts - prev_ts) < 300: counts[-1] += 1 continue deduped.append(e) counts.append(1) pairs = list(zip(deduped, counts))[-limit:] if sort_order != 'asc': pairs = list(reversed(pairs)) for e, count in pairs: src = e.get('src_ip', '') dst = e.get('dest_ip', '') port = str(e.get('dest_port', '')) proto_num = int(e.get('ip.protocol', 0)) proto = PROTO_MAP.get(proto_num, str(proto_num)) client = ip_to_name.get(src, src) svc_name = reverse_lookup(net_data, dst, port, proto) src_endpoint = endpoint_cache.get(client, '') try: dt = datetime.fromisoformat(e.get('timestamp', '')) ts_fmt = dt.strftime(DATETIME_FMT) except Exception: ts_fmt = e.get('timestamp', '') print(f"{ts_fmt}|{client}|{dst}|{port}|{proto}|{svc_name}|{count}|{src_endpoint}") # ────────────────────────────────────────── # wg_events # ────────────────────────────────────────── def wg_events(file, filter_client, filter_type, limit, collapse='1', since='', filter_event='', endpoint_cache_file='', sort_order='desc'): """ Format WireGuard events with dedup, counts, gap and endpoint resolution. sort_order: 'desc' (default, newest first) | 'asc' (oldest first) Output per line: ts|client|endpoint|event|count|gap_seconds """ from datetime import datetime from collections import defaultdict do_collapse = str(collapse) != '0' limit = int(limit) if limit else 50 since_dt = parse_since(since) if since else None descending = sort_order != 'asc' # Load endpoint cache once endpoint_cache = {} if endpoint_cache_file and os.path.exists(endpoint_cache_file): try: with open(endpoint_cache_file) as f: endpoint_cache = json.load(f) except Exception: pass events = [] try: with open(file) as f: for line in f: try: e = json.loads(line.strip()) client = e.get('client', '') if not client: continue if filter_client and client != filter_client: continue if filter_type and not client.startswith(filter_type + '-'): continue if filter_event and e.get('event', '') != filter_event: continue if since_dt: ts_str = e.get('timestamp', '') try: from datetime import timezone ev_dt = datetime.fromisoformat(ts_str) if ev_dt.tzinfo is None: ev_dt = ev_dt.replace(tzinfo=timezone.utc) if ev_dt < since_dt: continue except Exception: pass events.append(e) except Exception: pass except Exception: pass def _endpoint(e): ep = e.get('endpoint', '') return ep or endpoint_cache.get(e.get('client', ''), '') if do_collapse: hourly_attempts = defaultdict(int) hourly_ts = {} handshakes = [] handshake_counts = [] for e in events: ts_str = e.get('timestamp', '') client = e.get('client', '') endpoint = _endpoint(e) event = e.get('event', '') ts = ts_to_unix(ts_str) try: dt = datetime.fromisoformat(ts_str) except Exception: dt = None if event == 'attempt': if dt: hour_key = (client, endpoint, event, dt.strftime('%Y-%m-%d %H')) hourly_attempts[hour_key] += 1 if hour_key not in hourly_ts: hourly_ts[hour_key] = dt else: key = (client, event, endpoint[:15]) if handshakes: prev = handshakes[-1] prev_ts = ts_to_unix(prev.get('timestamp', '')) prev_key = (prev.get('client', ''), prev.get('event', ''), _endpoint(prev)[:15]) if key == prev_key and (ts - prev_ts) < 300: handshake_counts[-1] += 1 continue handshakes.append(e) handshake_counts.append(1) # Build output list — always ascending first for correct gap calculation output = [] for hour_key, dt in hourly_ts.items(): client, endpoint, event, _ = hour_key count = hourly_attempts[hour_key] ts_fmt = fmt_ts_hour(dt.isoformat()) output.append((dt.timestamp(), f"{ts_fmt}|{client}|{endpoint}|{event}|{count}|")) # Compute gaps ascending last_handshake_ts = {} hs_output = [] for e, count in zip(handshakes, handshake_counts): ts_str = e.get('timestamp', '') client = e.get('client', '') endpoint = _endpoint(e) event = e.get('event', '') ts = ts_to_unix(ts_str) ts_fmt = fmt_ts(ts_str) gap_seconds = '' if event == 'handshake': prev_ts = last_handshake_ts.get(client, 0) if prev_ts > 0: gap = int(ts - prev_ts) if gap > 0: gap_seconds = str(gap) last_handshake_ts[client] = ts hs_output.append((ts, f"{ts_fmt}|{client}|{endpoint}|{event}|{count}|{gap_seconds}")) output.extend(hs_output) # Sort ascending first to get correct limit slice, then reverse if needed output.sort(key=lambda x: x[0]) output = output[-limit:] if descending: output.reverse() for _, line in output: print(line) else: deduped = [] counts = [] for e in events: client = e.get('client', '') event = e.get('event', '') endpoint = _endpoint(e) key = (client, event, endpoint[:15]) ts = ts_to_unix(e.get('timestamp', '')) if deduped: prev = deduped[-1] prev_ts = ts_to_unix(prev.get('timestamp', '')) prev_key = (prev.get('client', ''), prev.get('event', ''), _endpoint(prev)[:15]) if key == prev_key and (ts - prev_ts) < 300: counts[-1] += 1 continue deduped.append(e) counts.append(1) # Compute gaps ascending, then slice, then reverse if needed last_handshake_ts = {} result = [] for e, count in zip(deduped, counts): ts_str = e.get('timestamp', '') client = e.get('client', '') endpoint = _endpoint(e) event = e.get('event', '') ts = ts_to_unix(ts_str) ts_fmt = fmt_ts(ts_str) gap_seconds = '' if event == 'handshake': prev_ts = last_handshake_ts.get(client, 0) if prev_ts > 0: gap = int(ts - prev_ts) if gap > 0: gap_seconds = str(gap) last_handshake_ts[client] = ts result.append((ts, f"{ts_fmt}|{client}|{endpoint}|{event}|{count}|{gap_seconds}")) result = result[-limit:] if descending: result.reverse() for _, line in result: print(line) # ────────────────────────────────────────── # Single event parsers (used by watch) # ────────────────────────────────────────── def parse_event(line): """Parse a single JSON wg event line.""" try: e = json.loads(line) print(f"{e.get('timestamp','')}|{e.get('client','')}|" f"{e.get('endpoint','')}|{e.get('event','')}") except Exception: pass def parse_fw_event(line): """Parse a single fw_events.log JSON line.""" try: e = json.loads(line) proto_num = e.get('ip.protocol', 0) proto = PROTO_MAP.get(proto_num, str(proto_num)) print(f"{e.get('timestamp','')}|{e.get('src_ip','')}|" f"{e.get('dest_ip','')}|{e.get('dest_port','')}|{proto}") except Exception: pass def format_fw_event(line, clients_dir): """Format a single fw_event line for display.""" ip_to_name = build_ip_to_name(clients_dir) try: e = json.loads(line.strip()) src = e.get('src_ip', '') if not src: return None dst = e.get('dest_ip', '—') port = e.get('dest_port', '') proto_num = e.get('ip.protocol', 0) proto = PROTO_MAP.get(proto_num, str(proto_num)) dst_str = f"{dst}:{port}" if port else dst client = ip_to_name.get(src, src) ts_fmt = fmt_ts(e.get('timestamp', '')) return f"{ts_fmt}|{client}|{dst_str}|{proto}" except Exception: return None def format_wg_event(line): """Format a single wg_event line for display.""" try: e = json.loads(line.strip()) client = e.get('client', '') if not client: return None ts_fmt = fmt_ts(e.get('timestamp', '')) endpoint = e.get('endpoint', '—') event = e.get('event', '—') return f"{ts_fmt}|{client}|{endpoint}|{event}|wg" except Exception: return None # ────────────────────────────────────────── # Event removal # ────────────────────────────────────────── def remove_events(file, identifier): """Remove all events for a client/ip from a JSONL file.""" try: lines = [] with open(file) as f: for line in f: try: e = json.loads(line.strip()) if (e.get('client') == identifier or e.get('src_ip') == identifier): continue lines.append(line) except Exception: lines.append(line) with open(file, 'w') as f: f.writelines(lines) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def remove_events_filtered(wg_file, fw_file, filter_name, filter_ip, filter_fw, filter_wg, before_days): """Remove events with filters: by name/ip, source, or age.""" import time cutoff_ts = None if before_days: cutoff_ts = time.time() - (float(before_days) * 86400) def should_remove_wg(e): if filter_name and e.get('client') != filter_name: return False if cutoff_ts: try: return ts_to_unix(e.get('timestamp', '')) < cutoff_ts except Exception: return False return True def should_remove_fw(e): if filter_ip and e.get('src_ip') != filter_ip: return False if cutoff_ts: try: return ts_to_unix(e.get('timestamp', '')) < cutoff_ts except Exception: return False return True removed_wg = removed_fw = 0 if not filter_fw and os.path.exists(wg_file): lines = [] with open(wg_file) as f: for line in f: try: e = json.loads(line.strip()) if should_remove_wg(e): removed_wg += 1 continue except Exception: pass lines.append(line) with open(wg_file, 'w') as f: f.writelines(lines) if not filter_wg and os.path.exists(fw_file): lines = [] with open(fw_file) as f: for line in f: try: e = json.loads(line.strip()) if should_remove_fw(e): removed_fw += 1 continue except Exception: pass lines.append(line) with open(fw_file, 'w') as f: f.writelines(lines) print(f"{removed_wg}|{removed_fw}") # ────────────────────────────────────────── # Log follower (used by old follow_logs) # ────────────────────────────────────────── def follow_logs(fw_file, wg_file, filter_ip, filter_type, clients_dir, filter_peers=''): """Follow both log files and output formatted events.""" import time peer_filter = set(filter_peers.split(',')) if filter_peers else set() ip_to_name = build_ip_to_name(clients_dir) files = {} for label, path in [('fw', fw_file), ('wg', wg_file)]: if path and os.path.exists(path): f = open(path) f.seek(0, 2) files[label] = f dedup = {} try: while True: for label, f in files.items(): line = f.readline() if not line: continue try: e = json.loads(line.strip()) except Exception: continue if label == 'fw': src = e.get('src_ip', '') if not src: continue if filter_ip and src != filter_ip: continue if peer_filter: client_name = ip_to_name.get(src, '') if client_name not in peer_filter: continue dst = e.get('dest_ip', '—') port = e.get('dest_port', '') proto_num = e.get('ip.protocol', 0) proto = PROTO_MAP.get(proto_num, str(proto_num)) key = (src, dst, port, proto_num) windows = {1: 5, 6: 30, 17: 10} window = windows.get(proto_num, 10) now = time.time() if key in dedup and (now - dedup[key]) < window: continue dedup[key] = now client = ip_to_name.get(src, src) if filter_type and not client.startswith(filter_type + '-'): continue dst_str = f"{dst}:{port}" if port else dst ts = e.get('timestamp', '')[:16].replace('T', ' ') print(f"fw|{ts}|{client}|{dst_str}|{proto}", flush=True) elif label == 'wg': client = e.get('client', '') if not client: continue if filter_ip: ip = ip_to_name.get(filter_ip, '') if client != ip and client != filter_ip: continue if peer_filter and client not in peer_filter: continue if filter_type and not client.startswith(filter_type + '-'): continue ts = e.get('timestamp', '')[:16].replace('T', ' ') endpoint = e.get('endpoint', '—') event = e.get('event', '—') print(f"wg|{ts}|{client}|{endpoint}|{event}", flush=True) time.sleep(0.1) except KeyboardInterrupt: pass # ────────────────────────────────────────── # Misc # ────────────────────────────────────────── def last_event(file, key, field, client): """Get last event field for a client.""" try: last = None with open(file) as f: for line in f: try: e = json.loads(line.strip()) if e.get(key) == client: last = e except Exception: pass if last: print(last.get(field, '')) except Exception: pass def events_for(file, ip, limit): """Format events for a given IP.""" try: events = [] with open(file) as f: for line in f: try: e = json.loads(line.strip()) if e.get('ip') == ip: events.append(e) except Exception: pass for e in events[-int(limit):]: ts_fmt = fmt_ts(e.get('timestamp', '')) endpoint = e.get('endpoint', '—') client = e.get('client', '—') event = e.get('event', '—') print(f' {ts_fmt} {client:<20} {endpoint:<20} {event}') except Exception: pass def iso_to_ts(iso_str): """Convert ISO timestamp to unix timestamp.""" try: from datetime import timezone dt = datetime.fromisoformat(iso_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) print(int(dt.timestamp())) except Exception: print(0)