- core/lib/util.py: shared utilities, ip_to_name, reverse_lookup, parse_since - core/lib/events.py: fw_events, wg_events, follow_logs, event parsers - core/lib/peers.py: peer_data, peer_transfer, peer_transfer_delta - core/lib/activity.py: activity_aggregate - json_helper.py: thin dispatcher importing from lib/ - events.py: --since, --filter-event, --filter-dest-ip/port query flags - util.py: parse_since supporting relative (2h/7d) and EU/ISO date formats
609 lines
No EOL
22 KiB
Python
609 lines
No EOL
22 KiB
Python
"""
|
|
events.py — WireGuard and firewall event processing.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
from lib.util import (
|
|
DATETIME_FMT, PROTO_MAP,
|
|
build_ip_to_name, load_net_data, load_hosts_data,
|
|
reverse_lookup, hosts_lookup, resolve_display,
|
|
fmt_ts, fmt_ts_hour, ts_to_unix, parse_since,
|
|
make_dest_display,
|
|
)
|
|
|
|
|
|
# ──────────────────────────────────────────
|
|
# fw_events
|
|
# ──────────────────────────────────────────
|
|
|
|
def fw_events(file, filter_ip, filter_type, clients_dir, net_file,
|
|
limit, collapse='1', since='', filter_dest_ip='', filter_dest_port=''):
|
|
"""
|
|
Format firewall drop events with dedup, counts, and service annotation.
|
|
|
|
collapse='1' (default): hourly aggregation
|
|
collapse='0': show all deduplicated events (--detailed mode)
|
|
since: relative or absolute time string (e.g. '2h', '23/05', '2026-05-23')
|
|
filter_dest_ip: filter by destination IP (optional)
|
|
filter_dest_port: filter by destination port (optional)
|
|
|
|
Output per line: ts|client|dest_ip|dest_port|proto|service_name|count
|
|
"""
|
|
do_collapse = str(collapse) != '0'
|
|
limit = int(limit) if limit else 50
|
|
|
|
# Preload lookups once
|
|
ip_to_name = build_ip_to_name(clients_dir)
|
|
net_data = load_net_data(net_file)
|
|
hosts_data = load_hosts_data(None) # hosts lookup done in bash for now
|
|
|
|
since_dt = parse_since(since) if since else None
|
|
|
|
def _reverse(dest_ip, dest_port, proto):
|
|
return reverse_lookup(net_data, dest_ip, dest_port, proto)
|
|
|
|
# ── Parse and first-pass dedup (time-window per key) ──
|
|
events = []
|
|
last_seen = {}
|
|
|
|
try:
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
src = e.get('src_ip', '')
|
|
if not src:
|
|
continue
|
|
if filter_ip and src != filter_ip:
|
|
continue
|
|
|
|
proto_num = int(e.get('ip.protocol', 0))
|
|
proto = PROTO_MAP.get(proto_num, str(proto_num))
|
|
dst = e.get('dest_ip', '')
|
|
port = str(e.get('dest_port', ''))
|
|
|
|
if filter_dest_ip and dst != filter_dest_ip:
|
|
continue
|
|
if filter_dest_port and port != filter_dest_port:
|
|
continue
|
|
|
|
ts_str = e.get('timestamp', '')
|
|
ts = ts_to_unix(ts_str)
|
|
|
|
if since_dt:
|
|
try:
|
|
ev_dt = datetime.fromisoformat(ts_str)
|
|
if ev_dt.tzinfo is None:
|
|
from datetime import timezone
|
|
ev_dt = ev_dt.replace(tzinfo=timezone.utc)
|
|
if ev_dt < since_dt:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
|
|
key = (src, dst, port, proto_num)
|
|
windows = {1: 5, 6: 30, 17: 10}
|
|
window = windows.get(proto_num, 10)
|
|
if key in last_seen and (ts - last_seen[key]) < window:
|
|
continue
|
|
last_seen[key] = ts
|
|
events.append(e)
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
|
|
# ── Collapse or detailed output ──
|
|
if do_collapse:
|
|
hourly = defaultdict(int)
|
|
hourly_ts = {}
|
|
|
|
for e in events:
|
|
src = e.get('src_ip', '')
|
|
dst = e.get('dest_ip', '')
|
|
port = str(e.get('dest_port', ''))
|
|
proto_num = int(e.get('ip.protocol', 0))
|
|
proto = PROTO_MAP.get(proto_num, str(proto_num))
|
|
ts_str = e.get('timestamp', '')
|
|
client = ip_to_name.get(src, src)
|
|
svc_name = _reverse(dst, port, proto)
|
|
|
|
try:
|
|
dt = datetime.fromisoformat(ts_str)
|
|
hour_key = (client, dst, port, proto, svc_name,
|
|
dt.strftime('%Y-%m-%d %H'))
|
|
hourly[hour_key] += 1
|
|
if hour_key not in hourly_ts:
|
|
hourly_ts[hour_key] = dt
|
|
except Exception:
|
|
continue
|
|
|
|
sorted_buckets = sorted(hourly_ts.items(), key=lambda x: x[1])
|
|
for hour_key, dt in sorted_buckets[-limit:]:
|
|
client, dst, port, proto, svc_name, _ = hour_key
|
|
count = hourly[hour_key]
|
|
ts_fmt = fmt_ts_hour(dt.isoformat())
|
|
print(f"{ts_fmt}|{client}|{dst}|{port}|{proto}|{svc_name}|{count}")
|
|
|
|
else:
|
|
# Detailed — consecutive dedup only
|
|
deduped = []
|
|
counts = []
|
|
for e in events:
|
|
src = e.get('src_ip', '')
|
|
dst = e.get('dest_ip', '')
|
|
port = str(e.get('dest_port', ''))
|
|
proto_num = int(e.get('ip.protocol', 0))
|
|
key = (src, dst, port, proto_num)
|
|
|
|
ts = ts_to_unix(e.get('timestamp', ''))
|
|
|
|
if deduped:
|
|
prev = deduped[-1]
|
|
prev_ts = ts_to_unix(prev.get('timestamp', ''))
|
|
prev_key = (
|
|
prev.get('src_ip', ''),
|
|
prev.get('dest_ip', ''),
|
|
str(prev.get('dest_port', '')),
|
|
int(prev.get('ip.protocol', 0))
|
|
)
|
|
if key == prev_key and (ts - prev_ts) < 300:
|
|
counts[-1] += 1
|
|
continue
|
|
|
|
deduped.append(e)
|
|
counts.append(1)
|
|
|
|
for e, count in list(zip(deduped, counts))[-limit:]:
|
|
src = e.get('src_ip', '')
|
|
dst = e.get('dest_ip', '')
|
|
port = str(e.get('dest_port', ''))
|
|
proto_num = int(e.get('ip.protocol', 0))
|
|
proto = PROTO_MAP.get(proto_num, str(proto_num))
|
|
client = ip_to_name.get(src, src)
|
|
svc_name = _reverse(dst, port, proto)
|
|
ts_fmt = fmt_ts(e.get('timestamp', ''))
|
|
print(f"{ts_fmt}|{client}|{dst}|{port}|{proto}|{svc_name}|{count}")
|
|
|
|
|
|
# ──────────────────────────────────────────
|
|
# wg_events
|
|
# ──────────────────────────────────────────
|
|
|
|
def wg_events(file, filter_client, filter_type, limit, collapse='1',
|
|
since='', filter_event=''):
|
|
"""
|
|
Format WireGuard events with dedup and counts.
|
|
|
|
collapse='1' (default): hourly aggregation for attempt events
|
|
collapse='0': show all deduplicated events (--detailed mode)
|
|
since: relative or absolute time string
|
|
filter_event: 'attempt' | 'handshake' | '' (all)
|
|
|
|
Output per line: ts|client|endpoint|event|count
|
|
"""
|
|
do_collapse = str(collapse) != '0'
|
|
limit = int(limit) if limit else 50
|
|
since_dt = parse_since(since) if since else None
|
|
|
|
events = []
|
|
try:
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
client = e.get('client', '')
|
|
if not client:
|
|
continue
|
|
if filter_client and client != filter_client:
|
|
continue
|
|
if filter_type and not client.startswith(filter_type + '-'):
|
|
continue
|
|
if filter_event and e.get('event', '') != filter_event:
|
|
continue
|
|
|
|
if since_dt:
|
|
ts_str = e.get('timestamp', '')
|
|
try:
|
|
ev_dt = datetime.fromisoformat(ts_str)
|
|
if ev_dt.tzinfo is None:
|
|
from datetime import timezone
|
|
ev_dt = ev_dt.replace(tzinfo=timezone.utc)
|
|
if ev_dt < since_dt:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
|
|
events.append(e)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
if do_collapse:
|
|
hourly_attempts = defaultdict(int)
|
|
hourly_ts = {}
|
|
handshakes = []
|
|
handshake_counts = []
|
|
|
|
for e in events:
|
|
ts_str = e.get('timestamp', '')
|
|
client = e.get('client', '')
|
|
endpoint = e.get('endpoint', '')
|
|
event = e.get('event', '')
|
|
ts = ts_to_unix(ts_str)
|
|
|
|
try:
|
|
dt = datetime.fromisoformat(ts_str)
|
|
except Exception:
|
|
dt = None
|
|
|
|
if event == 'attempt':
|
|
if dt:
|
|
hour_key = (client, endpoint, event,
|
|
dt.strftime('%Y-%m-%d %H'))
|
|
hourly_attempts[hour_key] += 1
|
|
if hour_key not in hourly_ts:
|
|
hourly_ts[hour_key] = dt
|
|
else:
|
|
key = (client, event, endpoint[:15])
|
|
if handshakes:
|
|
prev = handshakes[-1]
|
|
prev_ts = ts_to_unix(prev.get('timestamp', ''))
|
|
prev_key = (
|
|
prev.get('client', ''),
|
|
prev.get('event', ''),
|
|
prev.get('endpoint', '')[:15]
|
|
)
|
|
if key == prev_key and (ts - prev_ts) < 300:
|
|
handshake_counts[-1] += 1
|
|
continue
|
|
handshakes.append(e)
|
|
handshake_counts.append(1)
|
|
|
|
output = []
|
|
for hour_key, dt in hourly_ts.items():
|
|
client, endpoint, event, _ = hour_key
|
|
count = hourly_attempts[hour_key]
|
|
ts_fmt = fmt_ts_hour(dt.isoformat())
|
|
output.append((dt.timestamp(), f"{ts_fmt}|{client}|{endpoint}|{event}|{count}"))
|
|
|
|
for e, count in zip(handshakes, handshake_counts):
|
|
ts_str = e.get('timestamp', '')
|
|
client = e.get('client', '')
|
|
endpoint = e.get('endpoint', '')
|
|
event = e.get('event', '')
|
|
ts = ts_to_unix(ts_str)
|
|
ts_fmt = fmt_ts(ts_str)
|
|
output.append((ts, f"{ts_fmt}|{client}|{endpoint}|{event}|{count}"))
|
|
|
|
output.sort(key=lambda x: x[0])
|
|
for _, line in output[-limit:]:
|
|
print(line)
|
|
|
|
else:
|
|
deduped = []
|
|
counts = []
|
|
for e in events:
|
|
client = e.get('client', '')
|
|
event = e.get('event', '')
|
|
endpoint = e.get('endpoint', '')
|
|
key = (client, event, endpoint[:15])
|
|
ts = ts_to_unix(e.get('timestamp', ''))
|
|
|
|
if deduped:
|
|
prev = deduped[-1]
|
|
prev_ts = ts_to_unix(prev.get('timestamp', ''))
|
|
prev_key = (
|
|
prev.get('client', ''),
|
|
prev.get('event', ''),
|
|
prev.get('endpoint', '')[:15]
|
|
)
|
|
if key == prev_key and (ts - prev_ts) < 300:
|
|
counts[-1] += 1
|
|
continue
|
|
|
|
deduped.append(e)
|
|
counts.append(1)
|
|
|
|
for e, count in list(zip(deduped, counts))[-limit:]:
|
|
ts_fmt = fmt_ts(e.get('timestamp', ''))
|
|
client = e.get('client', '')
|
|
endpoint = e.get('endpoint', '')
|
|
event = e.get('event', '')
|
|
print(f"{ts_fmt}|{client}|{endpoint}|{event}|{count}")
|
|
|
|
|
|
# ──────────────────────────────────────────
|
|
# Single event parsers (used by watch)
|
|
# ──────────────────────────────────────────
|
|
|
|
def parse_event(line):
|
|
"""Parse a single JSON wg event line."""
|
|
try:
|
|
e = json.loads(line)
|
|
print(f"{e.get('timestamp','')}|{e.get('client','')}|"
|
|
f"{e.get('endpoint','')}|{e.get('event','')}")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def parse_fw_event(line):
|
|
"""Parse a single fw_events.log JSON line."""
|
|
try:
|
|
e = json.loads(line)
|
|
proto_num = e.get('ip.protocol', 0)
|
|
proto = PROTO_MAP.get(proto_num, str(proto_num))
|
|
print(f"{e.get('timestamp','')}|{e.get('src_ip','')}|"
|
|
f"{e.get('dest_ip','')}|{e.get('dest_port','')}|{proto}")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def format_fw_event(line, clients_dir):
|
|
"""Format a single fw_event line for display."""
|
|
ip_to_name = build_ip_to_name(clients_dir)
|
|
try:
|
|
e = json.loads(line.strip())
|
|
src = e.get('src_ip', '')
|
|
if not src:
|
|
return None
|
|
dst = e.get('dest_ip', '—')
|
|
port = e.get('dest_port', '')
|
|
proto_num = e.get('ip.protocol', 0)
|
|
proto = PROTO_MAP.get(proto_num, str(proto_num))
|
|
dst_str = f"{dst}:{port}" if port else dst
|
|
client = ip_to_name.get(src, src)
|
|
ts_fmt = fmt_ts(e.get('timestamp', ''))
|
|
return f"{ts_fmt}|{client}|{dst_str}|{proto}"
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def format_wg_event(line):
|
|
"""Format a single wg_event line for display."""
|
|
try:
|
|
e = json.loads(line.strip())
|
|
client = e.get('client', '')
|
|
if not client:
|
|
return None
|
|
ts_fmt = fmt_ts(e.get('timestamp', ''))
|
|
endpoint = e.get('endpoint', '—')
|
|
event = e.get('event', '—')
|
|
return f"{ts_fmt}|{client}|{endpoint}|{event}|wg"
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# ──────────────────────────────────────────
|
|
# Event removal
|
|
# ──────────────────────────────────────────
|
|
|
|
def remove_events(file, identifier):
|
|
"""Remove all events for a client/ip from a JSONL file."""
|
|
try:
|
|
lines = []
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if (e.get('client') == identifier or
|
|
e.get('src_ip') == identifier):
|
|
continue
|
|
lines.append(line)
|
|
except Exception:
|
|
lines.append(line)
|
|
with open(file, 'w') as f:
|
|
f.writelines(lines)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def remove_events_filtered(wg_file, fw_file, filter_name, filter_ip,
|
|
filter_fw, filter_wg, before_days):
|
|
"""Remove events with filters: by name/ip, source, or age."""
|
|
import time
|
|
|
|
cutoff_ts = None
|
|
if before_days:
|
|
cutoff_ts = time.time() - (float(before_days) * 86400)
|
|
|
|
def should_remove_wg(e):
|
|
if filter_name and e.get('client') != filter_name:
|
|
return False
|
|
if cutoff_ts:
|
|
try:
|
|
return ts_to_unix(e.get('timestamp', '')) < cutoff_ts
|
|
except Exception:
|
|
return False
|
|
return True
|
|
|
|
def should_remove_fw(e):
|
|
if filter_ip and e.get('src_ip') != filter_ip:
|
|
return False
|
|
if cutoff_ts:
|
|
try:
|
|
return ts_to_unix(e.get('timestamp', '')) < cutoff_ts
|
|
except Exception:
|
|
return False
|
|
return True
|
|
|
|
removed_wg = removed_fw = 0
|
|
|
|
if not filter_fw and os.path.exists(wg_file):
|
|
lines = []
|
|
with open(wg_file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if should_remove_wg(e):
|
|
removed_wg += 1
|
|
continue
|
|
except Exception:
|
|
pass
|
|
lines.append(line)
|
|
with open(wg_file, 'w') as f:
|
|
f.writelines(lines)
|
|
|
|
if not filter_wg and os.path.exists(fw_file):
|
|
lines = []
|
|
with open(fw_file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if should_remove_fw(e):
|
|
removed_fw += 1
|
|
continue
|
|
except Exception:
|
|
pass
|
|
lines.append(line)
|
|
with open(fw_file, 'w') as f:
|
|
f.writelines(lines)
|
|
|
|
print(f"{removed_wg}|{removed_fw}")
|
|
|
|
|
|
# ──────────────────────────────────────────
|
|
# Log follower (used by old follow_logs)
|
|
# ──────────────────────────────────────────
|
|
|
|
def follow_logs(fw_file, wg_file, filter_ip, filter_type,
|
|
clients_dir, filter_peers=''):
|
|
"""Follow both log files and output formatted events."""
|
|
import time
|
|
peer_filter = set(filter_peers.split(',')) if filter_peers else set()
|
|
ip_to_name = build_ip_to_name(clients_dir)
|
|
|
|
files = {}
|
|
for label, path in [('fw', fw_file), ('wg', wg_file)]:
|
|
if path and os.path.exists(path):
|
|
f = open(path)
|
|
f.seek(0, 2)
|
|
files[label] = f
|
|
|
|
dedup = {}
|
|
|
|
try:
|
|
while True:
|
|
for label, f in files.items():
|
|
line = f.readline()
|
|
if not line:
|
|
continue
|
|
try:
|
|
e = json.loads(line.strip())
|
|
except Exception:
|
|
continue
|
|
|
|
if label == 'fw':
|
|
src = e.get('src_ip', '')
|
|
if not src:
|
|
continue
|
|
if filter_ip and src != filter_ip:
|
|
continue
|
|
if peer_filter:
|
|
client_name = ip_to_name.get(src, '')
|
|
if client_name not in peer_filter:
|
|
continue
|
|
|
|
dst = e.get('dest_ip', '—')
|
|
port = e.get('dest_port', '')
|
|
proto_num = e.get('ip.protocol', 0)
|
|
proto = PROTO_MAP.get(proto_num, str(proto_num))
|
|
|
|
key = (src, dst, port, proto_num)
|
|
windows = {1: 5, 6: 30, 17: 10}
|
|
window = windows.get(proto_num, 10)
|
|
now = time.time()
|
|
if key in dedup and (now - dedup[key]) < window:
|
|
continue
|
|
dedup[key] = now
|
|
|
|
client = ip_to_name.get(src, src)
|
|
if filter_type and not client.startswith(filter_type + '-'):
|
|
continue
|
|
dst_str = f"{dst}:{port}" if port else dst
|
|
ts = e.get('timestamp', '')[:16].replace('T', ' ')
|
|
print(f"fw|{ts}|{client}|{dst_str}|{proto}", flush=True)
|
|
|
|
elif label == 'wg':
|
|
client = e.get('client', '')
|
|
if not client:
|
|
continue
|
|
if filter_ip:
|
|
ip = ip_to_name.get(filter_ip, '')
|
|
if client != ip and client != filter_ip:
|
|
continue
|
|
if peer_filter and client not in peer_filter:
|
|
continue
|
|
if filter_type and not client.startswith(filter_type + '-'):
|
|
continue
|
|
ts = e.get('timestamp', '')[:16].replace('T', ' ')
|
|
endpoint = e.get('endpoint', '—')
|
|
event = e.get('event', '—')
|
|
print(f"wg|{ts}|{client}|{endpoint}|{event}", flush=True)
|
|
|
|
time.sleep(0.1)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
|
|
# ──────────────────────────────────────────
|
|
# Misc
|
|
# ──────────────────────────────────────────
|
|
|
|
def last_event(file, key, field, client):
|
|
"""Get last event field for a client."""
|
|
try:
|
|
last = None
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if e.get(key) == client:
|
|
last = e
|
|
except Exception:
|
|
pass
|
|
if last:
|
|
print(last.get(field, ''))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def events_for(file, ip, limit):
|
|
"""Format events for a given IP."""
|
|
try:
|
|
events = []
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if e.get('ip') == ip:
|
|
events.append(e)
|
|
except Exception:
|
|
pass
|
|
for e in events[-int(limit):]:
|
|
ts_fmt = fmt_ts(e.get('timestamp', ''))
|
|
endpoint = e.get('endpoint', '—')
|
|
client = e.get('client', '—')
|
|
event = e.get('event', '—')
|
|
print(f' {ts_fmt} {client:<20} {endpoint:<20} {event}')
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def iso_to_ts(iso_str):
|
|
"""Convert ISO timestamp to unix timestamp."""
|
|
try:
|
|
from datetime import timezone
|
|
dt = datetime.fromisoformat(iso_str)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
print(int(dt.timestamp()))
|
|
except Exception:
|
|
print(0) |