wgctl/core/lib/events.py
Nuno Duque Nunes fb33aa1b6d feat: logs endpoint annotation, alignment, descending sort
- fw/wg events: raw_ip → resolved_name annotation (dim)
- fw events: endpoint column with pre-resolved names (two-pass render)
- fw events: raw IP:port dim suffix after service name
- wg events: endpoint annotation in logs (same as watch)
- fw/wg: descending sort default, --ascending/--descending flags
- wg events: gap/offline indicator, threshold * 2 for offline label
- fw_row: no-endpoint rows show dim — placeholder for alignment
- section headers: dynamic width via tput cols
2026-05-26 03:07:57 +00:00

674 lines
No EOL
24 KiB
Python

"""
events.py — WireGuard and firewall event processing.
"""
import os
import json
import sys
from collections import defaultdict
from datetime import datetime
from lib.util import (
DATETIME_FMT, PROTO_MAP,
build_ip_to_name, load_net_data, load_hosts_data,
reverse_lookup, hosts_lookup, resolve_display,
fmt_ts, fmt_ts_hour, ts_to_unix, parse_since,
make_dest_display,
)
# ──────────────────────────────────────────
# fw_events
# ──────────────────────────────────────────
def fw_events(file, filter_ip, filter_type, clients_dir, net_file,
limit, collapse='1', since='', filter_dest_ip='',
filter_dest_port='', sort_order='desc', endpoint_cache_file=''):
"""
Format firewall drop events with dedup, counts, and service annotation.
collapse='1' (default): hourly aggregation
collapse='0': show all deduplicated events (--detailed mode)
since: relative or absolute time string (e.g. '2h', '23/05', '2026-05-23')
filter_dest_ip: filter by destination IP (optional)
filter_dest_port: filter by destination port (optional)
Output per line: ts|client|dest_ip|dest_port|proto|service_name|count
"""
do_collapse = str(collapse) != '0'
limit = int(limit) if limit else 50
# Preload lookups once
ip_to_name = build_ip_to_name(clients_dir)
net_data = load_net_data(net_file)
hosts_data = load_hosts_data(None) # hosts lookup done in bash for now
endpoint_cache = {}
if endpoint_cache_file and os.path.exists(endpoint_cache_file):
try:
with open(endpoint_cache_file) as f:
endpoint_cache = json.load(f)
except Exception:
pass
since_dt = parse_since(since) if since else None
def _reverse(dest_ip, dest_port, proto):
return reverse_lookup(net_data, dest_ip, dest_port, proto)
# ── Parse and first-pass dedup (time-window per key) ──
events = []
last_seen = {}
try:
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
src = e.get('src_ip', '')
if not src:
continue
if filter_ip and src != filter_ip:
continue
proto_num = int(e.get('ip.protocol', 0))
proto = PROTO_MAP.get(proto_num, str(proto_num))
dst = e.get('dest_ip', '')
port = str(e.get('dest_port', ''))
if filter_dest_ip and dst != filter_dest_ip:
continue
if filter_dest_port and port != filter_dest_port:
continue
ts_str = e.get('timestamp', '')
ts = ts_to_unix(ts_str)
if since_dt:
try:
ev_dt = datetime.fromisoformat(ts_str)
if ev_dt.tzinfo is None:
from datetime import timezone
ev_dt = ev_dt.replace(tzinfo=timezone.utc)
if ev_dt < since_dt:
continue
except Exception:
pass
key = (src, dst, port, proto_num)
windows = {1: 5, 6: 30, 17: 10}
window = windows.get(proto_num, 10)
if key in last_seen and (ts - last_seen[key]) < window:
continue
last_seen[key] = ts
events.append(e)
except Exception:
continue
except Exception:
pass
# ── Collapse or detailed output ──
if do_collapse:
hourly = defaultdict(int)
hourly_ts = {}
for e in events:
src = e.get('src_ip', '')
dst = e.get('dest_ip', '')
port = str(e.get('dest_port', ''))
proto_num = int(e.get('ip.protocol', 0))
proto = PROTO_MAP.get(proto_num, str(proto_num))
ts_str = e.get('timestamp', '')
client = ip_to_name.get(src, src)
svc_name = _reverse(dst, port, proto)
try:
dt = datetime.fromisoformat(ts_str)
hour_key = (client, dst, port, proto, svc_name,
dt.strftime('%Y-%m-%d %H'))
hourly[hour_key] += 1
if hour_key not in hourly_ts:
hourly_ts[hour_key] = dt
except Exception:
continue
sorted_buckets = sorted(hourly_ts.items(), key=lambda x: x[1])
output = sorted_buckets[-limit:]
if sort_order != 'asc':
output = list(reversed(output))
for hour_key, dt in output:
client, dst, port, proto, svc_name, _ = hour_key
count = hourly[hour_key]
ts_fmt = fmt_ts_hour(hourly_ts[hour_key].isoformat())
src_endpoint = endpoint_cache.get(client, '')
print(f"{ts_fmt}|{client}|{dst}|{port}|{proto}|{svc_name}|{count}|{src_endpoint}")
else:
# Detailed — consecutive dedup only
deduped = []
counts = []
for e in events:
src = e.get('src_ip', '')
dst = e.get('dest_ip', '')
port = str(e.get('dest_port', ''))
proto_num = int(e.get('ip.protocol', 0))
key = (src, dst, port, proto_num)
ts = ts_to_unix(e.get('timestamp', ''))
if deduped:
prev = deduped[-1]
prev_ts = ts_to_unix(prev.get('timestamp', ''))
prev_key = (
prev.get('src_ip', ''),
prev.get('dest_ip', ''),
str(prev.get('dest_port', '')),
int(prev.get('ip.protocol', 0))
)
if key == prev_key and (ts - prev_ts) < 300:
counts[-1] += 1
continue
deduped.append(e)
counts.append(1)
pairs = list(zip(deduped, counts))[-limit:]
if sort_order != 'asc':
pairs = list(reversed(pairs))
for e, count in pairs:
src = e.get('src_ip', '')
dst = e.get('dest_ip', '')
port = str(e.get('dest_port', ''))
proto_num = int(e.get('ip.protocol', 0))
proto = PROTO_MAP.get(proto_num, str(proto_num))
client = ip_to_name.get(src, src)
svc_name = reverse_lookup(net_data, dst, port, proto)
src_endpoint = endpoint_cache.get(client, '')
try:
dt = datetime.fromisoformat(e.get('timestamp', ''))
ts_fmt = dt.strftime(DATETIME_FMT)
except Exception:
ts_fmt = e.get('timestamp', '')
print(f"{ts_fmt}|{client}|{dst}|{port}|{proto}|{svc_name}|{count}|{src_endpoint}")
# ──────────────────────────────────────────
# wg_events
# ──────────────────────────────────────────
def wg_events(file, filter_client, filter_type, limit, collapse='1',
since='', filter_event='', endpoint_cache_file='', sort_order='desc'):
"""
Format WireGuard events with dedup, counts, gap and endpoint resolution.
sort_order: 'desc' (default, newest first) | 'asc' (oldest first)
Output per line: ts|client|endpoint|event|count|gap_seconds
"""
from datetime import datetime
from collections import defaultdict
do_collapse = str(collapse) != '0'
limit = int(limit) if limit else 50
since_dt = parse_since(since) if since else None
descending = sort_order != 'asc'
# Load endpoint cache once
endpoint_cache = {}
if endpoint_cache_file and os.path.exists(endpoint_cache_file):
try:
with open(endpoint_cache_file) as f:
endpoint_cache = json.load(f)
except Exception:
pass
events = []
try:
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
client = e.get('client', '')
if not client:
continue
if filter_client and client != filter_client:
continue
if filter_type and not client.startswith(filter_type + '-'):
continue
if filter_event and e.get('event', '') != filter_event:
continue
if since_dt:
ts_str = e.get('timestamp', '')
try:
from datetime import timezone
ev_dt = datetime.fromisoformat(ts_str)
if ev_dt.tzinfo is None:
ev_dt = ev_dt.replace(tzinfo=timezone.utc)
if ev_dt < since_dt:
continue
except Exception:
pass
events.append(e)
except Exception:
pass
except Exception:
pass
def _endpoint(e):
ep = e.get('endpoint', '')
return ep or endpoint_cache.get(e.get('client', ''), '')
if do_collapse:
hourly_attempts = defaultdict(int)
hourly_ts = {}
handshakes = []
handshake_counts = []
for e in events:
ts_str = e.get('timestamp', '')
client = e.get('client', '')
endpoint = _endpoint(e)
event = e.get('event', '')
ts = ts_to_unix(ts_str)
try:
dt = datetime.fromisoformat(ts_str)
except Exception:
dt = None
if event == 'attempt':
if dt:
hour_key = (client, endpoint, event,
dt.strftime('%Y-%m-%d %H'))
hourly_attempts[hour_key] += 1
if hour_key not in hourly_ts:
hourly_ts[hour_key] = dt
else:
key = (client, event, endpoint[:15])
if handshakes:
prev = handshakes[-1]
prev_ts = ts_to_unix(prev.get('timestamp', ''))
prev_key = (prev.get('client', ''), prev.get('event', ''),
_endpoint(prev)[:15])
if key == prev_key and (ts - prev_ts) < 300:
handshake_counts[-1] += 1
continue
handshakes.append(e)
handshake_counts.append(1)
# Build output list — always ascending first for correct gap calculation
output = []
for hour_key, dt in hourly_ts.items():
client, endpoint, event, _ = hour_key
count = hourly_attempts[hour_key]
ts_fmt = fmt_ts_hour(dt.isoformat())
output.append((dt.timestamp(),
f"{ts_fmt}|{client}|{endpoint}|{event}|{count}|"))
# Compute gaps ascending
last_handshake_ts = {}
hs_output = []
for e, count in zip(handshakes, handshake_counts):
ts_str = e.get('timestamp', '')
client = e.get('client', '')
endpoint = _endpoint(e)
event = e.get('event', '')
ts = ts_to_unix(ts_str)
ts_fmt = fmt_ts(ts_str)
gap_seconds = ''
if event == 'handshake':
prev_ts = last_handshake_ts.get(client, 0)
if prev_ts > 0:
gap = int(ts - prev_ts)
if gap > 0:
gap_seconds = str(gap)
last_handshake_ts[client] = ts
hs_output.append((ts, f"{ts_fmt}|{client}|{endpoint}|{event}|{count}|{gap_seconds}"))
output.extend(hs_output)
# Sort ascending first to get correct limit slice, then reverse if needed
output.sort(key=lambda x: x[0])
output = output[-limit:]
if descending:
output.reverse()
for _, line in output:
print(line)
else:
deduped = []
counts = []
for e in events:
client = e.get('client', '')
event = e.get('event', '')
endpoint = _endpoint(e)
key = (client, event, endpoint[:15])
ts = ts_to_unix(e.get('timestamp', ''))
if deduped:
prev = deduped[-1]
prev_ts = ts_to_unix(prev.get('timestamp', ''))
prev_key = (prev.get('client', ''), prev.get('event', ''),
_endpoint(prev)[:15])
if key == prev_key and (ts - prev_ts) < 300:
counts[-1] += 1
continue
deduped.append(e)
counts.append(1)
# Compute gaps ascending, then slice, then reverse if needed
last_handshake_ts = {}
result = []
for e, count in zip(deduped, counts):
ts_str = e.get('timestamp', '')
client = e.get('client', '')
endpoint = _endpoint(e)
event = e.get('event', '')
ts = ts_to_unix(ts_str)
ts_fmt = fmt_ts(ts_str)
gap_seconds = ''
if event == 'handshake':
prev_ts = last_handshake_ts.get(client, 0)
if prev_ts > 0:
gap = int(ts - prev_ts)
if gap > 0:
gap_seconds = str(gap)
last_handshake_ts[client] = ts
result.append((ts, f"{ts_fmt}|{client}|{endpoint}|{event}|{count}|{gap_seconds}"))
result = result[-limit:]
if descending:
result.reverse()
for _, line in result:
print(line)
# ──────────────────────────────────────────
# Single event parsers (used by watch)
# ──────────────────────────────────────────
def parse_event(line):
"""Parse a single JSON wg event line."""
try:
e = json.loads(line)
print(f"{e.get('timestamp','')}|{e.get('client','')}|"
f"{e.get('endpoint','')}|{e.get('event','')}")
except Exception:
pass
def parse_fw_event(line):
"""Parse a single fw_events.log JSON line."""
try:
e = json.loads(line)
proto_num = e.get('ip.protocol', 0)
proto = PROTO_MAP.get(proto_num, str(proto_num))
print(f"{e.get('timestamp','')}|{e.get('src_ip','')}|"
f"{e.get('dest_ip','')}|{e.get('dest_port','')}|{proto}")
except Exception:
pass
def format_fw_event(line, clients_dir):
"""Format a single fw_event line for display."""
ip_to_name = build_ip_to_name(clients_dir)
try:
e = json.loads(line.strip())
src = e.get('src_ip', '')
if not src:
return None
dst = e.get('dest_ip', '')
port = e.get('dest_port', '')
proto_num = e.get('ip.protocol', 0)
proto = PROTO_MAP.get(proto_num, str(proto_num))
dst_str = f"{dst}:{port}" if port else dst
client = ip_to_name.get(src, src)
ts_fmt = fmt_ts(e.get('timestamp', ''))
return f"{ts_fmt}|{client}|{dst_str}|{proto}"
except Exception:
return None
def format_wg_event(line):
"""Format a single wg_event line for display."""
try:
e = json.loads(line.strip())
client = e.get('client', '')
if not client:
return None
ts_fmt = fmt_ts(e.get('timestamp', ''))
endpoint = e.get('endpoint', '')
event = e.get('event', '')
return f"{ts_fmt}|{client}|{endpoint}|{event}|wg"
except Exception:
return None
# ──────────────────────────────────────────
# Event removal
# ──────────────────────────────────────────
def remove_events(file, identifier):
"""Remove all events for a client/ip from a JSONL file."""
try:
lines = []
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
if (e.get('client') == identifier or
e.get('src_ip') == identifier):
continue
lines.append(line)
except Exception:
lines.append(line)
with open(file, 'w') as f:
f.writelines(lines)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def remove_events_filtered(wg_file, fw_file, filter_name, filter_ip,
filter_fw, filter_wg, before_days):
"""Remove events with filters: by name/ip, source, or age."""
import time
cutoff_ts = None
if before_days:
cutoff_ts = time.time() - (float(before_days) * 86400)
def should_remove_wg(e):
if filter_name and e.get('client') != filter_name:
return False
if cutoff_ts:
try:
return ts_to_unix(e.get('timestamp', '')) < cutoff_ts
except Exception:
return False
return True
def should_remove_fw(e):
if filter_ip and e.get('src_ip') != filter_ip:
return False
if cutoff_ts:
try:
return ts_to_unix(e.get('timestamp', '')) < cutoff_ts
except Exception:
return False
return True
removed_wg = removed_fw = 0
if not filter_fw and os.path.exists(wg_file):
lines = []
with open(wg_file) as f:
for line in f:
try:
e = json.loads(line.strip())
if should_remove_wg(e):
removed_wg += 1
continue
except Exception:
pass
lines.append(line)
with open(wg_file, 'w') as f:
f.writelines(lines)
if not filter_wg and os.path.exists(fw_file):
lines = []
with open(fw_file) as f:
for line in f:
try:
e = json.loads(line.strip())
if should_remove_fw(e):
removed_fw += 1
continue
except Exception:
pass
lines.append(line)
with open(fw_file, 'w') as f:
f.writelines(lines)
print(f"{removed_wg}|{removed_fw}")
# ──────────────────────────────────────────
# Log follower (used by old follow_logs)
# ──────────────────────────────────────────
def follow_logs(fw_file, wg_file, filter_ip, filter_type,
clients_dir, filter_peers=''):
"""Follow both log files and output formatted events."""
import time
peer_filter = set(filter_peers.split(',')) if filter_peers else set()
ip_to_name = build_ip_to_name(clients_dir)
files = {}
for label, path in [('fw', fw_file), ('wg', wg_file)]:
if path and os.path.exists(path):
f = open(path)
f.seek(0, 2)
files[label] = f
dedup = {}
try:
while True:
for label, f in files.items():
line = f.readline()
if not line:
continue
try:
e = json.loads(line.strip())
except Exception:
continue
if label == 'fw':
src = e.get('src_ip', '')
if not src:
continue
if filter_ip and src != filter_ip:
continue
if peer_filter:
client_name = ip_to_name.get(src, '')
if client_name not in peer_filter:
continue
dst = e.get('dest_ip', '')
port = e.get('dest_port', '')
proto_num = e.get('ip.protocol', 0)
proto = PROTO_MAP.get(proto_num, str(proto_num))
key = (src, dst, port, proto_num)
windows = {1: 5, 6: 30, 17: 10}
window = windows.get(proto_num, 10)
now = time.time()
if key in dedup and (now - dedup[key]) < window:
continue
dedup[key] = now
client = ip_to_name.get(src, src)
if filter_type and not client.startswith(filter_type + '-'):
continue
dst_str = f"{dst}:{port}" if port else dst
ts = e.get('timestamp', '')[:16].replace('T', ' ')
print(f"fw|{ts}|{client}|{dst_str}|{proto}", flush=True)
elif label == 'wg':
client = e.get('client', '')
if not client:
continue
if filter_ip:
ip = ip_to_name.get(filter_ip, '')
if client != ip and client != filter_ip:
continue
if peer_filter and client not in peer_filter:
continue
if filter_type and not client.startswith(filter_type + '-'):
continue
ts = e.get('timestamp', '')[:16].replace('T', ' ')
endpoint = e.get('endpoint', '')
event = e.get('event', '')
print(f"wg|{ts}|{client}|{endpoint}|{event}", flush=True)
time.sleep(0.1)
except KeyboardInterrupt:
pass
# ──────────────────────────────────────────
# Misc
# ──────────────────────────────────────────
def last_event(file, key, field, client):
"""Get last event field for a client."""
try:
last = None
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
if e.get(key) == client:
last = e
except Exception:
pass
if last:
print(last.get(field, ''))
except Exception:
pass
def events_for(file, ip, limit):
"""Format events for a given IP."""
try:
events = []
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
if e.get('ip') == ip:
events.append(e)
except Exception:
pass
for e in events[-int(limit):]:
ts_fmt = fmt_ts(e.get('timestamp', ''))
endpoint = e.get('endpoint', '')
client = e.get('client', '')
event = e.get('event', '')
print(f' {ts_fmt} {client:<20} {endpoint:<20} {event}')
except Exception:
pass
def iso_to_ts(iso_str):
"""Convert ISO timestamp to unix timestamp."""
try:
from datetime import timezone
dt = datetime.fromisoformat(iso_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
print(int(dt.timestamp()))
except Exception:
print(0)