wgctl/core/json_helper.py.bak
Nuno Duque Nunes 1308f9e07a refactor: split json_helper.py into lib/ modules
- core/lib/util.py: shared utilities, ip_to_name, reverse_lookup, parse_since
- core/lib/events.py: fw_events, wg_events, follow_logs, event parsers
- core/lib/peers.py: peer_data, peer_transfer, peer_transfer_delta
- core/lib/activity.py: activity_aggregate
- json_helper.py: thin dispatcher importing from lib/
- events.py: --since, --filter-event, --filter-dest-ip/port query flags
- util.py: parse_since supporting relative (2h/7d) and EU/ISO date formats
2026-05-24 22:02:50 +00:00

3073 lines
No EOL
106 KiB
Python

#!/usr/bin/env python3
"""
wgctl JSON helper — called by shell functions to read/write JSON files.
Usage: json_helper.py <command> <file> [key] [value]
"""
import sys
import json
import os
DATETIME_FMT = os.environ.get('WGCTL_DATETIME_FMT', '%Y-%m-%d %H:%M')
def get(file, key):
try:
with open(file) as f:
data = json.load(f)
val = data.get(key, [])
if isinstance(val, bool):
print(str(val).lower()) # true/false not True/False
elif isinstance(val, list):
if val:
print('\n'.join(str(v) for v in val))
else:
if val:
print(val)
except:
sys.exit(0)
def set_key(file, key, value):
try:
data = {}
if os.path.exists(file):
with open(file) as f:
data = json.load(f)
# Try to parse as JSON value first (for arrays/bools)
try:
data[key] = json.loads(value)
except:
data[key] = value
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def delete_key(file, key):
try:
with open(file) as f:
data = json.load(f)
data.pop(key, None)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def append(file, key, value):
try:
data = {}
if os.path.exists(file):
with open(file) as f:
data = json.load(f)
if key not in data:
data[key] = []
if value not in data[key]:
data[key].append(value)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def remove_value(file, key, value):
try:
with open(file) as f:
data = json.load(f)
if key in data and value in data[key]:
data[key].remove(value)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def cat(file):
try:
with open(file) as f:
data = json.load(f)
print(json.dumps(data, indent=2))
except Exception as e:
sys.exit(1)
def has_key(file, key):
try:
with open(file) as f:
data = json.load(f)
sys.exit(0 if key in data else 1)
except:
sys.exit(1)
def filter_values(file, key, value):
"""Remove all entries where value matches"""
try:
with open(file) as f:
data = json.load(f)
data = {k: v for k, v in data.items() if v != value}
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def last_event(file, key, field, client):
"""Get last event field for a client"""
try:
last = None
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
if e.get(key) == client:
last = e
except:
pass
if last:
print(last.get(field, ''))
except:
pass
def events_for(file, ip, limit):
"""Format events for a given IP"""
try:
from datetime import datetime
events = []
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
if e.get('ip') == ip:
events.append(e)
except:
pass
for e in events[-int(limit):]:
ts = e.get('timestamp', '')
try:
dt = datetime.fromisoformat(ts)
ts = dt.strftime(DATETIME_FMT)
except:
pass
endpoint = e.get('endpoint', '')
client = e.get('client', '')
event = e.get('event', '')
print(f' {ts} {client:<20} {endpoint:<20} {event}')
except:
pass
def fw_events(file, filter_ip, filter_type, clients_dir, net_file, limit, collapse='1'):
"""
Format firewall drop events with dedup, counts, and service annotation.
collapse='1' (default): hourly aggregation
collapse='0': show all deduplicated events (--detailed mode)
Output per line: ts|client|dest_ip|dest_port|proto|service_name|count
"""
import glob
from datetime import datetime
from collections import defaultdict
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
do_collapse = str(collapse) != '0'
# Build ip->name map
ip_to_name = {}
for conf in glob.glob(f"{clients_dir}/*.conf"):
name = os.path.basename(conf).replace('.conf', '')
try:
with open(conf) as f:
for line in f:
if line.startswith('Address'):
ip = line.split('=')[1].strip().split('/')[0]
ip_to_name[ip] = name
except Exception:
pass
# Load net services for reverse lookup
net_data = {}
if net_file and os.path.exists(net_file):
try:
with open(net_file) as f:
net_data = json.load(f)
except Exception:
pass
def reverse_lookup(dest_ip, dest_port, proto):
for svc_name, svc in net_data.items():
if not isinstance(svc, dict):
continue
if svc.get('ip', '') != dest_ip:
continue
ports = svc.get('ports', {})
if dest_port:
for port_name, port_def in ports.items():
if not isinstance(port_def, dict):
continue
if (str(port_def.get('port', '')) == str(dest_port) and
port_def.get('proto', 'tcp') == proto):
return f"{svc_name}:{port_name}"
return svc_name
return svc_name
return ''
# Parse and first-pass dedup (within time window per key)
events = []
last_seen = {}
try:
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
src = e.get('src_ip', '')
if not src:
continue
if filter_ip and src != filter_ip:
continue
proto_num = int(e.get('ip.protocol', 0))
proto = proto_map.get(proto_num, str(proto_num))
dst = e.get('dest_ip', '')
port = str(e.get('dest_port', ''))
key = (src, dst, port, proto_num)
ts_str = e.get('timestamp', '')
try:
ts = datetime.fromisoformat(ts_str).timestamp()
except Exception:
ts = 0
windows = {1: 5, 6: 30, 17: 10}
window = windows.get(proto_num, 10)
if key in last_seen and (ts - last_seen[key]) < window:
continue
last_seen[key] = ts
events.append(e)
except Exception:
continue
except Exception:
pass
limit = int(limit) if limit else 50
if do_collapse:
# Hourly aggregation: group by (client, dst, port, proto, date, hour)
hourly = defaultdict(int)
hourly_ts = {} # store first ts per hour bucket for output ordering
for e in events:
src = e.get('src_ip', '')
dst = e.get('dest_ip', '')
port = str(e.get('dest_port', ''))
proto_num = int(e.get('ip.protocol', 0))
proto = proto_map.get(proto_num, str(proto_num))
ts_str = e.get('timestamp', '')
client = ip_to_name.get(src, src)
svc_name = reverse_lookup(dst, port, proto)
try:
dt = datetime.fromisoformat(ts_str)
hour_key = (client, dst, port, proto, svc_name,
dt.strftime('%Y-%m-%d %H'))
hourly[hour_key] += 1
if hour_key not in hourly_ts:
hourly_ts[hour_key] = dt
except Exception:
continue
# Sort by timestamp and emit last N
sorted_buckets = sorted(hourly_ts.items(), key=lambda x: x[1])
for hour_key, dt in sorted_buckets[-limit:]:
client, dst, port, proto, svc_name, _ = hour_key
count = hourly[hour_key]
ts_fmt = dt.strftime(DATETIME_FMT.replace('%M', '00'))
print(f"{ts_fmt}|{client}|{dst}|{port}|{proto}|{svc_name}|{count}")
else:
# Detailed mode — consecutive dedup only
deduped = []
counts = []
for e in events:
ts_str = e.get('timestamp', '')
src = e.get('src_ip', '')
dst = e.get('dest_ip', '')
port = str(e.get('dest_port', ''))
proto_num = int(e.get('ip.protocol', 0))
key = (src, dst, port, proto_num)
if deduped:
prev = deduped[-1]
try:
prev_ts = datetime.fromisoformat(
prev.get('timestamp', '')).timestamp()
cur_ts = datetime.fromisoformat(ts_str).timestamp()
except Exception:
prev_ts = cur_ts = 0
prev_key = (
prev.get('src_ip', ''),
prev.get('dest_ip', ''),
str(prev.get('dest_port', '')),
int(prev.get('ip.protocol', 0))
)
if key == prev_key and (cur_ts - prev_ts) < 300:
counts[-1] += 1
continue
deduped.append(e)
counts.append(1)
for e, count in list(zip(deduped, counts))[-limit:]:
ts_str = e.get('timestamp', '')
src = e.get('src_ip', '')
dst = e.get('dest_ip', '')
port = str(e.get('dest_port', ''))
proto_num = int(e.get('ip.protocol', 0))
proto = proto_map.get(proto_num, str(proto_num))
client = ip_to_name.get(src, src)
svc_name = reverse_lookup(dst, port, proto)
try:
dt = datetime.fromisoformat(ts_str)
ts_fmt = dt.strftime(DATETIME_FMT)
except Exception:
ts_fmt = ts_str
print(f"{ts_fmt}|{client}|{dst}|{port}|{proto}|{svc_name}|{count}")
def wg_events(file, filter_client, filter_type, limit, collapse='1'):
"""
Format WireGuard events with dedup and counts.
collapse='1' (default): hourly aggregation for attempt events
collapse='0': show all deduplicated events (--detailed mode)
Output per line: ts|client|endpoint|event|count
"""
from datetime import datetime
from collections import defaultdict
do_collapse = str(collapse) != '0'
events = []
try:
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
client = e.get('client', '')
if not client:
continue
if filter_client and client != filter_client:
continue
if filter_type and not client.startswith(filter_type + '-'):
continue
events.append(e)
except Exception:
pass
except Exception:
pass
limit = int(limit) if limit else 50
if do_collapse:
# Hourly aggregation for attempts; individual for handshakes
hourly_attempts = defaultdict(int)
hourly_ts = {}
handshakes = []
handshake_counts = []
for e in events:
ts_str = e.get('timestamp', '')
client = e.get('client', '')
endpoint = e.get('endpoint', '')
event = e.get('event', '')
try:
dt = datetime.fromisoformat(ts_str)
ts = dt.timestamp()
except Exception:
dt = None
ts = 0
if event == 'attempt':
if dt:
hour_key = (client, endpoint, event,
dt.strftime('%Y-%m-%d %H'))
hourly_attempts[hour_key] += 1
if hour_key not in hourly_ts:
hourly_ts[hour_key] = dt
else:
# Handshakes — consecutive dedup only
key = (client, event, endpoint[:15])
if handshakes:
prev = handshakes[-1]
try:
prev_ts = datetime.fromisoformat(
prev.get('timestamp', '')).timestamp()
except Exception:
prev_ts = 0
prev_key = (
prev.get('client', ''),
prev.get('event', ''),
prev.get('endpoint', '')[:15]
)
if key == prev_key and (ts - prev_ts) < 300:
handshake_counts[-1] += 1
continue
handshakes.append(e)
handshake_counts.append(1)
# Build output list: attempts (hourly) + handshakes, sorted by ts
output = []
for hour_key, dt in hourly_ts.items():
client, endpoint, event, _ = hour_key
count = hourly_attempts[hour_key]
ts_fmt = dt.strftime(DATETIME_FMT.replace('%M', '00'))
output.append((dt.timestamp(), f"{ts_fmt}|{client}|{endpoint}|{event}|{count}"))
for e, count in zip(handshakes, handshake_counts):
ts_str = e.get('timestamp', '')
client = e.get('client', '')
endpoint = e.get('endpoint', '')
event = e.get('event', '')
try:
dt = datetime.fromisoformat(ts_str)
ts_fmt = dt.strftime(DATETIME_FMT)
ts = dt.timestamp()
except Exception:
ts_fmt = ts_str
ts = 0
output.append((ts, f"{ts_fmt}|{client}|{endpoint}|{event}|{count}"))
output.sort(key=lambda x: x[0])
for _, line in output[-limit:]:
print(line)
else:
# Detailed mode — consecutive dedup only
deduped = []
counts = []
for e in events:
ts_str = e.get('timestamp', '')
client = e.get('client', '')
event = e.get('event', '')
endpoint = e.get('endpoint', '')
key = (client, event, endpoint[:15])
try:
ts = datetime.fromisoformat(ts_str).timestamp()
except Exception:
ts = 0
if deduped:
prev = deduped[-1]
try:
prev_ts = datetime.fromisoformat(
prev.get('timestamp', '')).timestamp()
except Exception:
prev_ts = 0
prev_key = (
prev.get('client', ''),
prev.get('event', ''),
prev.get('endpoint', '')[:15]
)
if key == prev_key and (ts - prev_ts) < 300:
counts[-1] += 1
continue
deduped.append(e)
counts.append(1)
for e, count in list(zip(deduped, counts))[-limit:]:
ts_str = e.get('timestamp', '')
client = e.get('client', '')
endpoint = e.get('endpoint', '')
event = e.get('event', '')
try:
dt = datetime.fromisoformat(ts_str)
ts_fmt = dt.strftime(DATETIME_FMT)
except Exception:
ts_fmt = ts_str
print(f"{ts_fmt}|{client}|{endpoint}|{event}|{count}")
def format_fw_event(line, clients_dir):
"""Format a single fw_event line"""
import glob
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
# Build ip->name map
ip_to_name = {}
for conf in glob.glob(f"{clients_dir}/*.conf"):
name = os.path.basename(conf).replace('.conf', '')
try:
with open(conf) as f:
for l in f:
if l.startswith('Address'):
ip = l.split('=')[1].strip().split('/')[0]
ip_to_name[ip] = name
except:
pass
try:
e = json.loads(line.strip())
src = e.get('src_ip', '')
if not src:
return None
ts = e.get('timestamp', '')
try:
from datetime import datetime
dt = datetime.fromisoformat(ts)
ts = dt.strftime(DATETIME_FMT)
except:
pass
dst = e.get('dest_ip', '')
port = e.get('dest_port', '')
proto_num = e.get('ip.protocol', 0)
proto = proto_map.get(proto_num, str(proto_num))
dst_str = f"{dst}:{port}" if port else dst
client = ip_to_name.get(src, src)
return f"{ts}|{client}|{dst_str}|{proto}"
except:
return None
def format_wg_event(line):
"""Format a single wg_event line"""
try:
e = json.loads(line.strip())
client = e.get('client', '')
if not client:
return None
ts = e.get('timestamp', '')
try:
from datetime import datetime
dt = datetime.fromisoformat(ts)
ts = dt.strftime(DATETIME_FMT)
except:
pass
endpoint = e.get('endpoint', '')
event = e.get('event', '')
return f"{ts}|{client}|{endpoint}|{event}|wg"
except:
return None
def remove_events(file, identifier):
"""Remove all events for a client/ip from a JSONL file"""
try:
lines = []
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
if e.get('client') == identifier or e.get('src_ip') == identifier:
continue
lines.append(line)
except:
lines.append(line)
with open(file, 'w') as f:
f.writelines(lines)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def follow_logs(fw_file, wg_file, filter_ip, filter_type, clients_dir, filter_peers=""):
"""Follow both log files and output formatted events"""
import glob, time, select
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
peer_filter = set(filter_peers.split(',')) if filter_peers else set()
# Build ip->name map
ip_to_name = {}
for conf in glob.glob(f"{clients_dir}/*.conf"):
name = os.path.basename(conf).replace('.conf', '')
try:
with open(conf) as f:
for l in f:
if l.startswith('Address'):
ip = l.split('=')[1].strip().split('/')[0]
ip_to_name[ip] = name
except:
pass
# Open files and seek to end
files = {}
for label, path in [('fw', fw_file), ('wg', wg_file)]:
if path and os.path.exists(path):
f = open(path)
f.seek(0, 2) # seek to end
files[label] = f
dedup = {}
try:
while True:
for label, f in files.items():
line = f.readline()
if not line:
continue
try:
e = json.loads(line.strip())
except:
continue
if label == 'fw':
src = e.get('src_ip', '')
if not src:
continue
if filter_ip and src != filter_ip:
continue
# Filter by peer names if specified
if peer_filter:
client_name = ip_to_name.get(src, '')
if client_name not in peer_filter:
continue
dst = e.get('dest_ip', '')
port = e.get('dest_port', '')
proto_num = e.get('ip.protocol', 0)
proto = proto_map.get(proto_num, str(proto_num))
# Dedup
key = (src, dst, port, proto_num)
windows = {1: 5, 6: 30, 17: 10}
window = windows.get(proto_num, 10)
now = time.time()
if key in dedup and (now - dedup[key]) < window:
continue
dedup[key] = now
client = ip_to_name.get(src, src)
if filter_type and not client.startswith(filter_type + '-'):
continue
dst_str = f"{dst}:{port}" if port else dst
ts = e.get('timestamp', '')[:16].replace('T', ' ')
print(f"fw|{ts}|{client}|{dst_str}|{proto}", flush=True)
elif label == 'wg':
client = e.get('client', '')
if not client:
continue
if filter_ip:
ip = ip_to_name.get(filter_ip, '')
if client != ip and client != filter_ip:
continue
if peer_filter and client not in peer_filter:
continue
if filter_type and not client.startswith(filter_type + '-'):
continue
ts = e.get('timestamp', '')[:16].replace('T', ' ')
endpoint = e.get('endpoint', '')
event = e.get('event', '')
print(f"wg|{ts}|{client}|{endpoint}|{event}", flush=True)
time.sleep(0.1)
except KeyboardInterrupt:
pass
def count(file, key):
try:
with open(file) as f:
data = json.load(f)
val = data.get(key, [])
print(len(val) if isinstance(val, list) else 0)
except:
print(0)
def audit_fw_counts(clients_dir):
"""Return peer_name:fw_count pairs from iptables"""
import glob, subprocess, re
try:
result = subprocess.run(
['iptables', '-L', 'FORWARD', '-n', '-v'],
capture_output=True, text=True
)
fw_lines = result.stdout.splitlines()
except Exception:
fw_lines = []
# Filter to only data lines (skip headers and blanks)
# In -v output, source IP is in column 8 (0-indexed)
# Format: pkts bytes target prot opt in out source destination [options]
rule_lines = [l for l in fw_lines if l.strip() and not l.startswith('Chain') and not l.startswith(' pkts')]
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
name = os.path.basename(conf).replace('.conf', '')
try:
with open(conf) as f:
ip = ''
for line in f:
if line.startswith('Address'):
ip = line.split('=')[1].strip().split('/')[0]
break
if not ip:
continue
# Count lines where source column exactly matches the peer IP
count = sum(1 for l in rule_lines if re.search(r'\s' + re.escape(ip) + r'\s', l))
print(f"{name}:{count}")
except Exception:
pass
def peer_group_map(groups_dir):
"""Return peer:group pairs for all groups"""
import glob
try:
for group_file in glob.glob(f"{groups_dir}/*.group"):
try:
with open(group_file) as f:
g = json.load(f)
name = g.get('name', '')
for peer in g.get('peers', []):
if peer:
print(f"{peer}:{name}")
except:
pass
except:
pass
def peer_groups(groups_dir, peer_name):
"""Find all groups containing a peer"""
import glob
try:
for group_file in glob.glob(f"{groups_dir}/*.group"):
try:
with open(group_file) as f:
g = json.load(f)
if peer_name in g.get('peers', []):
print(g.get('name', ''))
except:
pass
except:
pass
def iso_to_ts(iso_str):
"""Convert ISO timestamp to unix timestamp"""
try:
from datetime import datetime, timezone
dt = datetime.fromisoformat(iso_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
print(int(dt.timestamp()))
except:
print(0)
def rule_list_data(rules_dir, meta_dir):
"""Return all rule data including base rules and extends"""
import glob
rule_peer_counts = {}
for f in glob.glob(f"{meta_dir}/*.meta"):
try:
with open(f) as mf:
meta = json.load(mf)
rule = meta.get('rule', '')
if rule:
rule_peer_counts[rule] = rule_peer_counts.get(rule, 0) + 1
except:
pass
rule_files = (
sorted(glob.glob(f"{rules_dir}/*.rule")) +
sorted(glob.glob(f"{rules_dir}/base/*.rule"))
)
# Collect all data first
rules_data = []
for rule_file in rule_files:
is_base = '/base/' in rule_file
try:
with open(rule_file) as f:
r = json.load(f)
name = r.get('name', '')
desc = r.get('desc', '')
group = r.get('group', '')
extends = ','.join(r.get('extends', []))
resolved = _rule_resolve_internal(rules_dir, name)
n_allows = len(resolved.get('allow_ips', [])) + \
len(resolved.get('allow_ports', []))
n_blocks = len(resolved.get('block_ips', [])) + \
len(resolved.get('block_ports', []))
peer_count = rule_peer_counts.get(name, 0)
rules_data.append({
'name': name, 'desc': desc, 'n_allows': n_allows,
'n_blocks': n_blocks, 'peer_count': peer_count,
'extends': extends, 'is_base': is_base, 'group': group
})
except:
pass
# Sort: non-base first, then by group (empty group last within non-base),
# then by name within group
rules_data.sort(key=lambda x: (
x['is_base'],
x['group'] == '' and not x['is_base'],
x['group'],
x['name']
))
for r in rules_data:
print(f"{r['name']}|{r['desc']}|{r['n_allows']}|{r['n_blocks']}|"
f"{r['peer_count']}|{r['extends']}|{r['is_base']}|{r['group']}")
def group_list_data(groups_dir, blocks_dir, clients_dir):
"""Return group summary data in one call"""
import glob
# Get all block files
blocked_peers = set()
for f in glob.glob(f"{blocks_dir}/*.block"):
name = os.path.basename(f).replace('.block', '')
blocked_peers.add(name)
for group_file in sorted(glob.glob(f"{groups_dir}/*.group")):
try:
with open(group_file) as f:
g = json.load(f)
name = g.get('name', '')
desc = g.get('desc', '')
peers = [p for p in g.get('peers', []) if p]
valid_peers = [p for p in peers
if os.path.exists(os.path.join(clients_dir, f"{p}.conf"))]
total = len(valid_peers)
blocked = sum(1 for p in peers if p in blocked_peers)
print(f"{name}|{desc}|{total}|{blocked}")
except:
pass
def fmt_datetime(iso_str, fmt):
"""Format ISO timestamp with given strftime format"""
try:
from datetime import datetime
dt = datetime.fromisoformat(iso_str)
print(dt.strftime(fmt))
except:
print(iso_str)
def create_rule(file, name, desc, dns_redirect, allow_ips, block_ips,
block_ports, allow_ports='', extends='', group=''):
rule = {
'name': name,
'desc': desc,
'group': group,
'dns_redirect': dns_redirect == 'true',
'extends': [x for x in extends.split(',') if x] if extends else [],
'allow_ips': [x for x in allow_ips.split(',') if x] if allow_ips else [],
'allow_ports': [x for x in allow_ports.split(',') if x] if allow_ports else [],
'block_ips': [x for x in block_ips.split(',') if x] if block_ips else [],
'block_ports': [x for x in block_ports.split(',') if x] if block_ports else [],
}
with open(file, 'w') as f:
json.dump(rule, f, indent=2)
def cleanup_config(config_file):
"""Normalize blank lines in WireGuard config"""
import re
try:
with open(config_file) as f:
config = f.read()
config = re.sub(r'\n{3,}', '\n\n', config)
config = config.rstrip('\n') + '\n'
with open(config_file, 'w') as f:
f.write(config)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def remove_peer_block(config_file, name):
"""Remove a peer block from WireGuard config by name"""
import re
try:
with open(config_file) as f:
config = f.read()
pattern = r'\n\[Peer\]\n# ' + re.escape(name) + r'\n[^\n]+\n[^\n]+\n'
result = re.sub(pattern, '\n', config)
with open(config_file, 'w') as f:
f.write(result)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def create_group(file, name, desc):
"""Create a new group JSON file"""
try:
group = {'name': name, 'desc': desc, 'peers': []}
with open(file, 'w') as f:
json.dump(group, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def parse_event(line):
"""Parse a single JSON event line"""
try:
e = json.loads(line)
print(f"{e.get('timestamp','')}|{e.get('client','')}|{e.get('endpoint','')}|{e.get('event','')}")
except:
pass
def parse_fw_event(line):
"""Parse a single fw_events.log JSON line"""
try:
e = json.loads(line)
ts = e.get('timestamp', '')
src = e.get('src_ip', '')
dst = e.get('dest_ip', '')
port = e.get('dest_port', '')
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
proto_num = e.get('ip.protocol', 0)
proto = proto_map.get(proto_num, str(proto_num))
print(f"{ts}|{src}|{dst}|{port}|{proto}")
except:
pass
def peer_transfer(wg_interface):
"""Get total transfer bytes per peer"""
import subprocess
low = int(os.environ.get('ACTIVITY_TOTAL_LOW_BYTES', '1000000'))
med = int(os.environ.get('ACTIVITY_TOTAL_MED_BYTES', '10000000'))
high = int(os.environ.get('ACTIVITY_TOTAL_HIGH_BYTES', '100000000'))
try:
result = subprocess.run(
['wg', 'show', wg_interface, 'transfer'],
capture_output=True, text=True
)
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('\t')
if len(parts) == 3:
pubkey, rx, tx = parts
total = int(rx) + int(tx)
if total == 0: level = 'none'
elif total < low: level = 'low'
elif total < med: level = 'medium'
elif total < high: level = 'high'
else: level = 'very high'
print(f"{pubkey}|{rx}|{tx}|{level}")
except:
pass
def peer_transfer_delta(wg_interface, cache_file):
"""Calculate current transfer rate using delta from previous sample"""
import subprocess, time
low = int(os.environ.get('ACTIVITY_CURRENT_LOW_BYTES', '10000')) # 10KB/s
med = int(os.environ.get('ACTIVITY_CURRENT_MED_BYTES', '100000')) # 100KB/s
high = int(os.environ.get('ACTIVITY_CURRENT_HIGH_BYTES', '1000000')) # 1MB/s
current = {}
now = time.time()
try:
result = subprocess.run(
['wg', 'show', wg_interface, 'transfer'],
capture_output=True, text=True
)
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('\t')
if len(parts) == 3:
pubkey, rx, tx = parts
current[pubkey] = {'rx': int(rx), 'tx': int(tx), 'ts': now}
except:
pass
prev = {}
if os.path.exists(cache_file):
try:
with open(cache_file) as f:
prev = json.load(f)
except:
pass
try:
with open(cache_file, 'w') as f:
json.dump(current, f)
except:
pass
for pubkey, data in current.items():
if pubkey in prev:
dt = data['ts'] - prev[pubkey].get('ts', data['ts'])
if dt > 0:
rx_rate = max(0, (data['rx'] - prev[pubkey]['rx']) / dt)
tx_rate = max(0, (data['tx'] - prev[pubkey]['tx']) / dt)
total = rx_rate + tx_rate
if total <= 0: level = 'idle'
elif total < low: level = 'low'
elif total < med: level = 'medium'
elif total < high: level = 'high'
else: level = 'very high'
print(f"{pubkey}|{int(rx_rate)}|{int(tx_rate)}|{level}")
else:
print(f"{pubkey}|0|0|idle")
else:
print(f"{pubkey}|0|0|unknown")
def remove_events_filtered(wg_file, fw_file, filter_name, filter_ip,
filter_fw, filter_wg, before_days):
"""Remove events with filters: by name/ip, source, or age"""
import time
from datetime import datetime, timezone
cutoff_ts = None
if before_days:
cutoff_ts = time.time() - (float(before_days) * 86400)
def should_remove_wg(e):
if filter_name and e.get('client') != filter_name:
return False
if cutoff_ts:
try:
ts = datetime.fromisoformat(e.get('timestamp','')).timestamp()
return ts < cutoff_ts
except:
return False
return True
def should_remove_fw(e):
if filter_ip and e.get('src_ip') != filter_ip:
return False
if cutoff_ts:
try:
ts = datetime.fromisoformat(e.get('timestamp','')).timestamp()
return ts < cutoff_ts
except:
return False
return True
removed_wg = removed_fw = 0
if not filter_fw and os.path.exists(wg_file):
lines = []
with open(wg_file) as f:
for line in f:
try:
e = json.loads(line.strip())
if should_remove_wg(e):
removed_wg += 1
continue
except:
pass
lines.append(line)
with open(wg_file, 'w') as f:
f.writelines(lines)
if not filter_wg and os.path.exists(fw_file):
lines = []
with open(fw_file) as f:
for line in f:
try:
e = json.loads(line.strip())
if should_remove_fw(e):
removed_fw += 1
continue
except:
pass
lines.append(line)
with open(fw_file, 'w') as f:
f.writelines(lines)
print(f"{removed_wg}|{removed_fw}")
def _rule_resolve_internal(rules_dir, rule_name, visited=None):
"""Internal recursive resolver — returns dict, does not print"""
if visited is None:
visited = set()
if rule_name in visited:
raise ValueError(f"Circular dependency detected: {rule_name}")
visited.add(rule_name)
rule_file = find_rule_file(rules_dir, rule_name)
with open(rule_file) as f:
rule = json.load(f)
merged = {
'allow_ips': [],
'allow_ports': [],
'block_ips': [],
'block_ports': [],
'dns_redirect': False
}
for base_name in rule.get('extends', []):
base = _rule_resolve_internal(rules_dir, base_name, visited.copy())
merged['allow_ips'] += base.get('allow_ips', [])
merged['allow_ports'] += base.get('allow_ports', [])
merged['block_ips'] += base.get('block_ips', [])
merged['block_ports'] += base.get('block_ports', [])
if base.get('dns_redirect'):
merged['dns_redirect'] = True
# Merge own fields — use .get() with defaults for all fields
merged['allow_ips'] = list(dict.fromkeys(
merged['allow_ips'] + rule.get('allow_ips', [])))
merged['allow_ports'] = list(dict.fromkeys(
merged['allow_ports'] + rule.get('allow_ports', [])))
merged['block_ips'] = list(dict.fromkeys(
merged['block_ips'] + rule.get('block_ips', [])))
merged['block_ports'] = list(dict.fromkeys(
merged['block_ports'] + rule.get('block_ports', [])))
if rule.get('dns_redirect', False):
merged['dns_redirect'] = True
merged['name'] = rule.get('name', rule_name)
merged['desc'] = rule.get('desc', '')
merged['group'] = rule.get('group', '')
merged['extends'] = rule.get('extends', [])
return merged
def rule_resolve(rules_dir, rule_name):
"""Resolve a rule with inheritance — prints JSON"""
try:
resolved = _rule_resolve_internal(rules_dir, rule_name)
print(json.dumps(resolved))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def rule_resolve_field(rules_dir, rule_name, field):
"""Get a single field from resolved rule — prints values one per line"""
try:
resolved = _rule_resolve_internal(rules_dir, rule_name)
val = resolved.get(field, [])
if isinstance(val, list):
for v in val:
print(v)
else:
print(val)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def rule_inspect(rules_dir, rule_name):
"""Show inheritance tree for a rule"""
try:
rule_file = find_rule_file(rules_dir, rule_name)
with open(rule_file) as f:
rule = json.load(f)
resolved = _rule_resolve_internal(rules_dir, rule_name)
has_extends = bool(rule.get('extends', []))
# Own rules
for ip in rule.get('allow_ips', []):
print(f"own|allow_ip|{ip}")
for p in rule.get('allow_ports', []):
print(f"own|allow_port|{p}")
for ip in rule.get('block_ips', []):
print(f"own|block_ip|{ip}")
for p in rule.get('block_ports', []):
print(f"own|block_port|{p}")
# DNS redirect — separate section
if rule.get('dns_redirect'):
print(f"dns|dns_redirect|true")
if has_extends:
# Inherited rules per base
for base_name in rule.get('extends', []):
base = _rule_resolve_internal(rules_dir, base_name)
for ip in base.get('allow_ips', []):
print(f"inherited:{base_name}|allow_ip|{ip}")
for p in base.get('allow_ports', []):
print(f"inherited:{base_name}|allow_port|{p}")
for ip in base.get('block_ips', []):
print(f"inherited:{base_name}|block_ip|{ip}")
for p in base.get('block_ports', []):
print(f"inherited:{base_name}|block_port|{p}")
if base.get('dns_redirect'):
print(f"inherited:{base_name}|dns_redirect|true")
# Resolved summary only when inheritance exists
has_resolved = (
resolved.get('allow_ips') or resolved.get('allow_ports') or
resolved.get('block_ips') or resolved.get('block_ports') or
resolved.get('dns_redirect')
)
if has_resolved:
for ip in resolved.get('allow_ips', []):
print(f"resolved|allow_ip|{ip}")
for p in resolved.get('allow_ports', []):
print(f"resolved|allow_port|{p}")
for ip in resolved.get('block_ips', []):
print(f"resolved|block_ip|{ip}")
for p in resolved.get('block_ports', []):
print(f"resolved|block_port|{p}")
if resolved.get('dns_redirect'):
print(f"resolved|dns_redirect|true")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def find_rule_file(rules_dir, rule_name):
"""Find rule file in rules/ or rules/base/"""
for path in [
os.path.join(rules_dir, f"{rule_name}.rule"),
os.path.join(rules_dir, "base", f"{rule_name}.rule"),
]:
if os.path.exists(path):
return path
return ""
def get_raw(file, key):
try:
with open(file) as f:
data = json.load(f)
val = data.get(key) # returns None if missing
if val is None:
pass # print nothing
elif isinstance(val, bool):
print(str(val).lower())
elif isinstance(val, list):
for v in val:
print(v)
else:
print(val)
except:
pass
def count_resolved(rules_dir, rule_name, key):
"""Count entries in resolved rule field"""
try:
resolved = _rule_resolve_internal(rules_dir, rule_name)
print(len(resolved.get(key, [])))
except:
print(0)
def _block_init(peer_ip):
"""Return empty block structure"""
return {
"peer_ip": peer_ip,
"blocked_direct": False,
"blocked_by_groups": [],
"rules": []
}
def _block_read(file):
try:
with open(file) as f:
content = f.read().strip()
if not content:
return None # empty file = no block data
try:
return json.loads(content)
except json.JSONDecodeError:
# Old format — migrate
lines = content.split('\n')
peer_ip = lines[0].split()[0] if lines else ''
new_data = {
"peer_ip": peer_ip,
"blocked_direct": True,
"blocked_by_groups": [],
"rules": [{"name": "full block", "type": "full"}]
}
with open(file, 'w') as f:
json.dump(new_data, f, indent=2)
return new_data
except FileNotFoundError:
return None
except Exception:
return None
def _block_write(file, data):
"""Write block file"""
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def block_get(file):
"""Read and print block file as JSON"""
data = _block_read(file)
if data:
print(json.dumps(data))
def block_is_blocked(file):
"""Return true if peer is effectively blocked"""
data = _block_read(file)
if not data:
print("false")
return
blocked = data.get("blocked_direct", False) or \
bool(data.get("blocked_by_groups", []))
print("true" if blocked else "false")
def block_set_direct(file, peer_ip, value):
"""Set blocked_direct"""
try:
data = _block_read(file) or _block_init(peer_ip)
data["blocked_direct"] = value.lower() == "true"
data["peer_ip"] = peer_ip
_block_write(file, data)
remaining = data["blocked_direct"] or bool(data.get("blocked_by_groups", []))
pass
# print("true" if remaining else "false")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_add_group(file, peer_ip, group):
"""Add group to blocked_by_groups"""
try:
data = _block_read(file) or _block_init(peer_ip)
data["peer_ip"] = peer_ip
groups = data.get("blocked_by_groups", [])
if group not in groups:
groups.append(group)
data["blocked_by_groups"] = groups
_block_write(file, data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_remove_group(file, peer_ip, group):
"""Remove group from blocked_by_groups, return whether still blocked"""
try:
data = _block_read(file) or _block_init(peer_ip)
groups = data.get("blocked_by_groups", [])
if group in groups:
groups.remove(group)
data["blocked_by_groups"] = groups
_block_write(file, data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# print("true" if remaining else "false")
pass
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_add_rule(file, peer_ip, rule_type, name="", target="",
port="", proto=""):
"""Add a block rule entry"""
try:
data = _block_read(file) or _block_init(peer_ip)
data["peer_ip"] = peer_ip
rule = {"type": rule_type}
if name: rule["name"] = name
if target: rule["target"] = target
if port: rule["port"] = port
if proto: rule["proto"] = proto
rules = data.get("rules", [])
for existing in rules:
if existing.get("type") == rule_type and \
existing.get("target","") == target and \
existing.get("port","") == port and \
existing.get("proto","") == proto:
return # already exists, skip
rules.append(rule)
data["rules"] = rules
_block_write(file, data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_remove_rule(file, rule_type, target="", port="", proto=""):
data = _block_read(file)
if not data:
return
rules = data.get("rules", [])
filtered = [r for r in rules if not (
r.get("type") == rule_type and
r.get("target", "") == target and
r.get("port", "") == port and
r.get("proto", "") == proto
)]
data["rules"] = filtered
_block_write(file, data)
def block_get_rules(file):
"""Print rules as pipe-separated lines: name|type|target|port|proto"""
data = _block_read(file)
if not data:
return
for r in data.get("rules", []):
print(f"{r.get('name','')}|{r.get('type','')}|"
f"{r.get('target','')}|{r.get('port','')}|{r.get('proto','')}")
def block_get_groups(file):
data = _block_read(file)
if not data:
return
print(','.join(data.get('blocked_by_groups', [])))
def block_get_direct(file):
data = _block_read(file)
if not data:
print('false')
return
print('true' if data.get('blocked_direct', False) else 'false')
# ============================================
# Net / Services
# ============================================
def _net_read(file):
"""Read services.json, return dict or empty dict"""
try:
if not os.path.exists(file):
return {}
with open(file) as f:
content = f.read().strip()
if not content:
return {}
return json.loads(content)
except Exception:
return {}
def _net_write(file, data):
"""Write services.json"""
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def net_list(file):
"""List all service names with IP and port count"""
data = _net_read(file)
for name, svc in sorted(data.items()):
ip = svc.get('ip', '')
desc = svc.get('desc', '')
tags = ','.join(svc.get('tags', []))
ports = len(svc.get('ports', {}))
print(f"{name}|{ip}|{desc}|{tags}|{ports}")
def net_show(file, name):
"""Show full service details"""
data = _net_read(file)
if name not in data:
print(f"Error: Service not found: {name}", file=sys.stderr)
sys.exit(1)
svc = data[name]
print(f"name|{name}")
print(f"ip|{svc.get('ip','')}")
print(f"desc|{svc.get('desc','')}")
print(f"tags|{','.join(svc.get('tags',[]))}")
for port_name, port_def in svc.get('ports', {}).items():
port = port_def.get('port', '')
proto = port_def.get('proto', 'tcp')
desc = port_def.get('desc', '')
print(f"port|{port_name}|{port}|{proto}|{desc}")
def net_exists(file, name):
"""Check if service exists"""
data = _net_read(file)
# Handle service:port syntax
if ':' in name:
svc_name, port_name = name.split(':', 1)
if port_name == 'ports':
print('true' if svc_name in data else 'false')
else:
svc = data.get(svc_name, {})
print('true' if port_name in svc.get('ports', {}) else 'false')
else:
print('true' if name in data else 'false')
def net_add_service(file, name, ip, desc='', tags=''):
"""Add or update a service"""
data = _net_read(file)
if name not in data:
data[name] = {'ip': ip, 'ports': {}}
else:
data[name]['ip'] = ip
if desc:
data[name]['desc'] = desc
if tags:
data[name]['tags'] = [t.strip() for t in tags.split(',') if t.strip()]
_net_write(file, data)
def net_add_port(file, service, port_name, port, proto='tcp', desc=''):
"""Add or update a port on a service"""
data = _net_read(file)
if service not in data:
print(f"Error: Service not found: {service}", file=sys.stderr)
sys.exit(1)
if 'ports' not in data[service]:
data[service]['ports'] = {}
entry = {'port': int(port), 'proto': proto}
if desc:
entry['desc'] = desc
data[service]['ports'][port_name] = entry
_net_write(file, data)
def net_remove(file, name):
"""Remove service or port"""
data = _net_read(file)
if ':' in name:
svc_name, port_name = name.split(':', 1)
if svc_name not in data:
print(f"Error: Service not found: {svc_name}", file=sys.stderr)
sys.exit(1)
if port_name == 'ports':
# Remove all ports
data[svc_name]['ports'] = {}
else:
if port_name not in data[svc_name].get('ports', {}):
print(f"Error: Port not found: {port_name}", file=sys.stderr)
sys.exit(1)
del data[svc_name]['ports'][port_name]
else:
if name not in data:
print(f"Error: Service not found: {name}", file=sys.stderr)
sys.exit(1)
del data[name]
_net_write(file, data)
def net_resolve(file, name):
"""Resolve service name to ip or ip:port:proto lines"""
data = _net_read(file)
if ':' in name:
svc_name, port_name = name.split(':', 1)
if svc_name not in data:
print(f"Error: Service not found: {svc_name}", file=sys.stderr)
sys.exit(1)
svc = data[svc_name]
ip = svc.get('ip', '')
if port_name == 'ports':
# All ports
for pname, pdef in svc.get('ports', {}).items():
print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}")
else:
if port_name not in svc.get('ports', {}):
print(f"Error: Port not found: {port_name}", file=sys.stderr)
sys.exit(1)
pdef = svc['ports'][port_name]
print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}")
else:
if name not in data:
print(f"Error: Service not found: {name}", file=sys.stderr)
sys.exit(1)
print(data[name].get('ip', ''))
def net_reverse_lookup(file, ip, port='', proto=''):
"""Reverse lookup IP/port to service name"""
data = _net_read(file)
for svc_name, svc in data.items():
if svc.get('ip') != ip:
continue
if not port:
print(svc_name)
return
for port_name, port_def in svc.get('ports', {}).items():
if (str(port_def.get('port','')) == str(port) and
port_def.get('proto','tcp') == proto):
print(f"{svc_name}:{port_name}")
return
# IP matched but no port match — return service name
print(svc_name)
return
def block_is_empty(file):
data = _block_read(file)
if not data:
print("true")
return
empty = (
not data.get("blocked_direct", False) and
not data.get("blocked_by_groups", []) and
not data.get("rules", []) and
not data.get("services", [])
)
print("true" if empty else "false")
def group_has_peer(file, peer_name):
try:
with open(file) as f:
data = json.load(f)
peers = data.get('peers', [])
print('true' if peer_name in peers else 'false')
except Exception:
print('false')
# ============================================
# Subnet Map
# ============================================
def _subnet_read(file):
"""Read subnets.json, return dict or empty dict"""
try:
if not os.path.exists(file):
return {}
with open(file) as f:
content = f.read().strip()
if not content:
return {}
return json.loads(content)
except Exception:
return {}
def _subnet_write(file, data):
"""Write subnets.json"""
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def _subnet_is_group(entry):
"""Return True if a subnet entry is a nested group (like 'guests')"""
return isinstance(entry, dict) and 'subnet' not in entry
def subnet_lookup(file, name, type_key=''):
"""
Resolve a subnet name (and optional type) to a CIDR string.
For scalar entries: subnet_lookup(file, "desktop") -> "10.1.1.0/24"
For group entries: subnet_lookup(file, "guests", "phone") -> "10.1.103.0/24"
For group with no type: subnet_lookup(file, "guests") -> "10.1.100.0/24" (none slot)
Prints the CIDR on success, nothing and exits 1 on failure.
"""
data = _subnet_read(file)
if name not in data:
sys.exit(1)
entry = data[name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
if key not in entry:
sys.exit(1)
print(entry[key]['subnet'])
else:
print(entry['subnet'])
def subnet_type(file, name, type_key=''):
"""
Return the type string for a subnet entry.
For scalar: subnet_type(file, "desktop") -> "desktop"
For group: subnet_type(file, "guests", "phone") -> "phone"
subnet_type(file, "guests") -> "none"
"""
data = _subnet_read(file)
if name not in data:
sys.exit(1)
entry = data[name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
if key not in entry:
sys.exit(1)
# For group entries the type IS the child key
print(key if key != 'none' else 'none')
else:
print(entry.get('type', name))
def subnet_tunnel_mode(file, name, type_key=''):
"""Return tunnel_mode for a subnet entry"""
data = _subnet_read(file)
if name not in data:
print('split') # safe default
return
entry = data[name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
child = entry.get(key, {})
print(child.get('tunnel_mode', 'split'))
else:
print(entry.get('tunnel_mode', 'split'))
def subnet_for_ip(file, ip):
"""
Reverse lookup: given a peer IP, find which subnet name (and type) it belongs to.
Output: name|type (e.g. "guests|phone" or "desktop|desktop")
Returns nothing (exit 0) if not found — caller falls back to hardcoded map.
"""
import ipaddress
try:
peer_addr = ipaddress.ip_address(ip)
except ValueError:
return
data = _subnet_read(file)
for name, entry in data.items():
if _subnet_is_group(entry):
for type_key, child in entry.items():
try:
network = ipaddress.ip_network(child['subnet'], strict=False)
if peer_addr in network:
print(f"{name}|{type_key}")
return
except Exception:
continue
else:
try:
network = ipaddress.ip_network(entry['subnet'], strict=False)
if peer_addr in network:
peer_type = entry.get('type', name)
print(f"{name}|{peer_type}")
return
except Exception:
continue
def subnet_list(file):
"""
List all subnets for display.
Output per line: name|subnet|type|tunnel_mode|desc|is_group
For group entries, outputs one line per child: name.type|subnet|type|tunnel_mode|desc|true
"""
data = _subnet_read(file)
for name, entry in data.items():
if _subnet_is_group(entry):
for type_key, child in entry.items():
display_name = f"{name}.{type_key}" if type_key != 'none' else name
print(f"{display_name}|{child.get('subnet','')}|"
f"{type_key}|{child.get('tunnel_mode','split')}|"
f"{child.get('desc','')}|true|{name}")
else:
print(f"{name}|{entry.get('subnet','')}|"
f"{entry.get('type',name)}|{entry.get('tunnel_mode','split')}|"
f"{entry.get('desc','')}|false|{name}")
def subnet_show(file, name):
"""Show a single subnet entry (scalar or group) in detail"""
data = _subnet_read(file)
if name not in data:
print(f"Error: Subnet '{name}' not found", file=sys.stderr)
sys.exit(1)
entry = data[name]
if _subnet_is_group(entry):
print(f"name|{name}")
print(f"is_group|true")
for type_key, child in entry.items():
print(f"child|{type_key}|{child.get('subnet','')}|"
f"{child.get('tunnel_mode','split')}|{child.get('desc','')}")
else:
print(f"name|{name}")
print(f"is_group|false")
print(f"subnet|{entry.get('subnet','')}")
print(f"type|{entry.get('type',name)}")
print(f"tunnel_mode|{entry.get('tunnel_mode','split')}")
print(f"desc|{entry.get('desc','')}")
def subnet_add(file, name, subnet, type_key, tunnel_mode, desc, group_parent=''):
"""
Add a new subnet entry.
If group_parent is set, adds as a child under that group key.
Otherwise adds as a scalar entry.
"""
data = _subnet_read(file)
entry = {
'subnet': subnet,
'tunnel_mode': tunnel_mode or 'split',
'desc': desc or ''
}
if group_parent:
# Adding a child to an existing group, or creating a new group
if group_parent not in data:
data[group_parent] = {}
elif not _subnet_is_group(data[group_parent]):
print(f"Error: '{group_parent}' exists but is not a group", file=sys.stderr)
sys.exit(1)
data[group_parent][type_key or 'none'] = entry
else:
# Scalar entry — type stored explicitly
entry['type'] = type_key or name
data[name] = entry
_subnet_write(file, data)
def subnet_remove(file, name, peers_using):
"""
Remove a subnet entry. peers_using is a comma-separated list of peer names
currently using this subnet (passed in from bash after meta scan).
If non-empty, refuses with error.
"""
if peers_using:
peers = [p for p in peers_using.split(',') if p]
if peers:
print(f"Error: Subnet '{name}' is in use by: {', '.join(peers)}", file=sys.stderr)
sys.exit(1)
data = _subnet_read(file)
if '.' in name:
# Removing a group child: e.g. "guests.phone"
parent, child_key = name.split('.', 1)
if parent not in data or not _subnet_is_group(data[parent]):
print(f"Error: Group '{parent}' not found", file=sys.stderr)
sys.exit(1)
if child_key not in data[parent]:
print(f"Error: '{child_key}' not found in group '{parent}'", file=sys.stderr)
sys.exit(1)
del data[parent][child_key]
if not data[parent]:
del data[parent] # remove empty group
else:
if name not in data:
print(f"Error: Subnet '{name}' not found", file=sys.stderr)
sys.exit(1)
del data[name]
_subnet_write(file, data)
def subnet_rename(file, old_name, new_name, peers_using):
"""
Rename a subnet entry. Hard refusal if any peers reference it.
peers_using: comma-separated peer names from bash meta scan.
"""
if peers_using:
peers = [p for p in peers_using.split(',') if p]
if peers:
print(f"Error: Cannot rename subnet '{old_name}' — in use by: {', '.join(peers)}", file=sys.stderr)
sys.exit(1)
data = _subnet_read(file)
if old_name not in data:
print(f"Error: Subnet '{old_name}' not found", file=sys.stderr)
sys.exit(1)
if new_name in data:
print(f"Error: Subnet '{new_name}' already exists", file=sys.stderr)
sys.exit(1)
data[new_name] = data.pop(old_name)
_subnet_write(file, data)
def subnet_peers(meta_dir, clients_dir, subnet_name, subnets_file):
"""
Find all peers using a subnet.
Two-pass check:
1. Meta field: peer has "subnet": subnet_name in their .meta file
2. IP fallback: peer's IP falls within the subnet's CIDR(s)
(catches peers added before meta stored subnet explicitly)
Output: one peer name per line.
"""
import glob
import ipaddress
# Resolve all CIDRs covered by this subnet name
data = _subnet_read(subnets_file)
cidrs = []
if subnet_name in data:
entry = data[subnet_name]
if _subnet_is_group(entry):
for child in entry.values():
try:
cidrs.append(ipaddress.ip_network(child['subnet'], strict=False))
except Exception:
pass
else:
try:
cidrs.append(ipaddress.ip_network(entry['subnet'], strict=False))
except Exception:
pass
printed = set()
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
peer_name = os.path.basename(conf).replace('.conf', '')
# Pass 1: check meta field
meta_file = os.path.join(meta_dir, f"{peer_name}.meta")
try:
with open(meta_file) as f:
meta = json.load(f)
if meta.get('subnet', '') == subnet_name:
if peer_name not in printed:
print(peer_name)
printed.add(peer_name)
continue
except Exception:
pass
# Pass 2: IP reverse lookup against subnet CIDRs
if not cidrs:
continue
peer_ip = ''
try:
with open(conf) as f:
for line in f:
if line.startswith('Address'):
peer_ip = line.split('=')[1].strip().split('/')[0]
break
except Exception:
continue
if not peer_ip:
continue
try:
addr = ipaddress.ip_address(peer_ip)
if any(addr in cidr for cidr in cidrs):
if peer_name not in printed:
print(peer_name)
printed.add(peer_name)
except Exception:
continue
def subnet_exists(file, name):
"""Check if a subnet name exists (scalar or group). Exits 0/1."""
data = _subnet_read(file)
if '.' in name:
parent, child_key = name.split('.', 1)
exists = parent in data and _subnet_is_group(data[parent]) and child_key in data[parent]
else:
exists = name in data
sys.exit(0 if exists else 1)
# ============================================
# Identity System
# ============================================
def identity_rules(file):
"""
Return all rules assigned to an identity, one per line.
Reads from 'rules' array (1:N). Falls back to 'rule' scalar for migration.
"""
data = _identity_read(file)
if not data:
return
# Support legacy scalar 'rule' field
rules = data.get('rules', [])
if not rules and data.get('rule'):
rules = [data['rule']]
for r in rules:
if r:
print(r)
def identity_add_rule(file, identity_name, rule_name):
"""
Add a rule to an identity's rules array.
Warns if already present (prints warning to stderr, exits 2).
Creates identity file if it doesn't exist.
"""
data = _identity_read(file) or _identity_init(identity_name)
rules = data.get('rules', [])
# Migrate legacy scalar field
if 'rule' in data and data['rule']:
if data['rule'] not in rules:
rules.append(data['rule'])
del data['rule']
if rule_name in rules:
print(f"Warning: Rule '{rule_name}' is already assigned to identity '{identity_name}'",
file=sys.stderr)
sys.exit(2)
rules.append(rule_name)
data['rules'] = rules
_identity_write(file, data)
def identity_remove_rule(file, rule_name):
"""
Remove a specific rule from an identity's rules array.
Exits 1 if rule not found.
"""
data = _identity_read(file)
if not data:
print(f"Error: Identity not found", file=sys.stderr)
sys.exit(1)
rules = data.get('rules', [])
if rule_name not in rules:
print(f"Error: Rule '{rule_name}' not assigned to this identity", file=sys.stderr)
sys.exit(1)
rules.remove(rule_name)
data['rules'] = rules
_identity_write(file, data)
def identity_clear_rules(file):
"""Remove all rules from an identity."""
data = _identity_read(file)
if not data:
return
data['rules'] = []
data.pop('rule', None) # remove legacy scalar too
_identity_write(file, data)
def identity_has_rule(file, rule_name):
"""Exit 0 if identity has this rule, 1 otherwise."""
data = _identity_read(file)
if not data:
sys.exit(1)
rules = data.get('rules', [])
if not rules and data.get('rule'):
rules = [data['rule']]
sys.exit(0 if rule_name in rules else 1)
def _identity_read(file):
"""Read an identity file, return dict or None"""
try:
if not os.path.exists(file):
return None
with open(file) as f:
content = f.read().strip()
if not content:
return None
return json.loads(content)
except Exception:
return None
def _identity_write(file, data):
"""Write an identity file"""
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def _identity_init(name):
"""Return empty identity structure"""
return {
'name': name,
'peers': [],
'devices': {}
}
def _parse_peer_name(peer_name):
"""
Parse a peer name into (type, identity, index).
phone-nuno -> ('phone', 'nuno', 1)
phone-nuno-2 -> ('phone', 'nuno', 2)
desktop-zephyr -> ('desktop', 'zephyr', 1)
laptop-nuno -> ('laptop', 'nuno', 1)
Returns None if name doesn't match convention.
Convention: {type}-{identity}[-{index}]
Known types: desktop, laptop, phone, tablet, server, iot, none
"""
known_types = {'desktop', 'laptop', 'phone', 'tablet', 'server', 'iot', 'none'}
parts = peer_name.split('-')
if len(parts) < 2:
return None
peer_type = parts[0]
if peer_type not in known_types:
return None
# Check if last part is a numeric index
if len(parts) >= 3 and parts[-1].isdigit():
index = int(parts[-1])
identity = '-'.join(parts[1:-1])
else:
index = 1
identity = '-'.join(parts[1:])
if not identity:
return None
return (peer_type, identity, index)
def identity_list(identities_dir):
"""
List all identities with peer count, rules and policy.
Output per line: name|peer_count|types|rules|policy
"""
import glob
for id_file in sorted(glob.glob(f"{identities_dir}/*.identity")):
try:
with open(id_file) as f:
data = json.load(f)
name = data.get('name', '')
peers = data.get('peers', [])
devices = data.get('devices', {})
rules = data.get('rules', [])
# Migrate legacy scalar rule field
if not rules and data.get('rule'):
rules = [data['rule']]
policy = data.get('policy', 'default')
types = sorted(set(
d.get('type', '') for d in devices.values() if d.get('type')
))
print(f"{name}|{len(peers)}|{','.join(types)}|{','.join(rules)}|{policy}")
except Exception:
continue
def identity_show(file):
"""Show identity details"""
data = _identity_read(file)
if not data:
print("Error: Identity not found", file=sys.stderr)
sys.exit(1)
print(f"name|{data.get('name','')}")
print(f"peer_count|{len(data.get('peers',[]))}")
for peer_name, dev in data.get('devices', {}).items():
print(f"device|{peer_name}|{dev.get('type','')}|{dev.get('index',1)}")
def identity_add_peer(file, identity_name, peer_name, peer_type, index):
"""Add a peer to an identity file, creating it if needed"""
data = _identity_read(file) or _identity_init(identity_name)
if peer_name not in data['peers']:
data['peers'].append(peer_name)
data['devices'][peer_name] = {
'type': peer_type,
'index': int(index)
}
_identity_write(file, data)
def identity_remove_peer(file, peer_name):
"""Remove a peer from an identity file"""
data = _identity_read(file)
if not data:
return
data['peers'] = [p for p in data['peers'] if p != peer_name]
data['devices'].pop(peer_name, None)
_identity_write(file, data)
def identity_remove(file):
"""Delete an identity file — existence check done in bash"""
try:
os.remove(file)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def identity_next_index(file, peer_type):
"""
Return the next available index for a given type within an identity.
phone-nuno exists (index 1) -> returns 2
No phones exist -> returns 1
"""
data = _identity_read(file)
if not data:
print(1)
return
existing = [
d.get('index', 1)
for d in data.get('devices', {}).values()
if d.get('type') == peer_type
]
if not existing:
print(1)
return
# Find lowest unused index starting from 1
used = set(existing)
i = 1
while i in used:
i += 1
print(i)
def identity_peers(file, filter_type=''):
"""
List peers belonging to an identity, optionally filtered by type.
Output: one peer name per line.
"""
data = _identity_read(file)
if not data:
return
for peer_name in data.get('peers', []):
if filter_type:
dev = data.get('devices', {}).get(peer_name, {})
if dev.get('type') != filter_type:
continue
print(peer_name)
def identity_migrate(identities_dir, clients_dir, meta_dir, dry_run):
"""
Scan all peer configs and auto-create identity files from name convention.
dry_run: 'true' -> print what would be done, no writes.
Output per action: action|identity|peer|type|index
"""
import glob
is_dry = dry_run == 'true'
grouped = {} # identity_name -> [(peer_name, type, index)]
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
peer_name = os.path.basename(conf).replace('.conf', '')
parsed = _parse_peer_name(peer_name)
if not parsed:
print(f"skip|{peer_name}")
continue
peer_type, identity_name, index = parsed
if identity_name not in grouped:
grouped[identity_name] = []
grouped[identity_name].append((peer_name, peer_type, index))
for identity_name, peers in sorted(grouped.items()):
id_file = os.path.join(identities_dir, f"{identity_name}.identity")
for peer_name, peer_type, index in peers:
print(f"create|{identity_name}|{peer_name}|{peer_type}|{index}")
if not is_dry:
identity_add_peer(id_file, identity_name, peer_name, peer_type, index)
def identity_infer(peer_name):
"""
Parse a peer name and print identity|type|index, or nothing if no match.
Used by add.command.sh to auto-attach on vanilla wgctl add.
"""
parsed = _parse_peer_name(peer_name)
if parsed:
peer_type, identity_name, index = parsed
print(f"{identity_name}|{peer_type}|{index}")
def identity_exists(file):
"""Exit 0 if identity file exists and is valid, else exit 1"""
data = _identity_read(file)
sys.exit(0 if data is not None else 1)
# ============================================
# peer_data update — adds type field from meta
# ============================================
# NOTE: This replaces the existing peer_data function.
# The new version reads 'type' from meta directly.
# Output format: name|ip|rule|type|last_ts|last_evt|main_group
def peer_data(clients_dir, meta_dir, events_log):
"""
Updated peer_data that reads 'type' from meta.
Output: name|ip|rule|type|last_ts|last_evt|main_group
"""
import glob
meta = {}
for f in glob.glob(f"{meta_dir}/*.meta"):
name = os.path.basename(f).replace('.meta', '')
try:
with open(f) as mf:
meta[name] = json.load(mf)
except Exception:
meta[name] = {}
last_events = {}
try:
with open(events_log) as f:
for line in f:
try:
e = json.loads(line.strip())
client = e.get('client', '')
if client:
last_events[client] = e
except Exception:
pass
except Exception:
pass
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
name = os.path.basename(conf).replace('.conf', '')
ip = ''
try:
with open(conf) as f:
for line in f:
if line.startswith('Address'):
ip = line.split('=')[1].strip().split('/')[0]
break
except Exception:
pass
m = meta.get(name, {})
rule = m.get('rule', '')
peer_type = m.get('type', '')
main_group = m.get('main_group', '')
last_event = last_events.get(name, {})
last_ts = last_event.get('timestamp', '')
last_evt = last_event.get('event', '')
print(f"{name}|{ip}|{rule}|{peer_type}|{last_ts}|{last_evt}|{main_group}")
def subnet_default_rule(file, name, type_key=''):
"""
Return the default_rule for a subnet entry, or empty string if none set.
For scalar: subnet_default_rule(file, "desktop") -> ""
For group: subnet_default_rule(file, "guests", "phone") -> "guest"
"""
data = _subnet_read(file)
if name not in data:
print('')
return
entry = data[name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
child = entry.get(key, {})
print(child.get('default_rule', ''))
else:
print(entry.get('default_rule', ''))
def subnet_list_names(file):
"""
List all top-level subnet names, one per line.
Used for dynamic flag registration in commands.
Output: one name per line (e.g. desktop, laptop, guests, servers, iot)
"""
data = _subnet_read(file)
for name in data.keys():
print(name)
# ============================================
# Policy System
# ============================================
_POLICY_DEFAULTS = {
"default": {
"tunnel_mode": "split",
"default_rule": None,
"strict_rule": False,
"auto_apply": True,
"desc": "Default policy"
},
"guest": {
"tunnel_mode": "split",
"default_rule": "guest",
"strict_rule": True,
"auto_apply": True,
"desc": "Guest access policy"
},
"trusted": {
"tunnel_mode": "split",
"default_rule": None,
"strict_rule": False,
"auto_apply": True,
"desc": "Trusted device policy"
},
"server": {
"tunnel_mode": "split",
"default_rule": None,
"strict_rule": False,
"auto_apply": True,
"desc": "Server policy"
},
"iot": {
"tunnel_mode": "split",
"default_rule": None,
"strict_rule": False,
"auto_apply": True,
"desc": "IoT device policy"
}
}
def _policy_read(file):
"""Read policies.json, fall back to hardcoded defaults if missing."""
try:
if not os.path.exists(file):
return dict(_POLICY_DEFAULTS)
with open(file) as f:
content = f.read().strip()
if not content:
return dict(_POLICY_DEFAULTS)
data = json.loads(content)
# Merge with defaults so hardcoded policies always exist
merged = dict(_POLICY_DEFAULTS)
merged.update(data)
return merged
except Exception:
return dict(_POLICY_DEFAULTS)
def _policy_write(file, data):
"""Write policies.json."""
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def _policy_get_entry(data, name):
"""Get a policy entry, falling back to 'default' policy values for missing fields."""
entry = data.get(name, {})
default = data.get('default', _POLICY_DEFAULTS.get('default', {}))
# Merge: entry fields override default
resolved = dict(default)
resolved.update(entry)
return resolved
def policy_get(file, name, field=''):
"""
Get a policy entry or a specific field from it.
policy_get(file, "guest") -> prints all fields as key|value lines
policy_get(file, "guest", "tunnel_mode") -> prints "split"
policy_get(file, "guest", "strict_rule") -> prints "true" or "false"
"""
data = _policy_read(file)
entry = _policy_get_entry(data, name)
if field:
val = entry.get(field)
if val is None:
print('')
elif isinstance(val, bool):
print('true' if val else 'false')
else:
print(val)
else:
for k, v in entry.items():
if isinstance(v, bool):
print(f"{k}|{'true' if v else 'false'}")
elif v is None:
print(f"{k}|")
else:
print(f"{k}|{v}")
def policy_list(file):
"""
List all policies.
Output per line: name|tunnel_mode|default_rule|strict_rule|auto_apply|desc
"""
data = _policy_read(file)
for name, raw_entry in data.items():
entry = _policy_get_entry(data, name)
tunnel_mode = entry.get('tunnel_mode', 'split')
default_rule = entry.get('default_rule') or ''
strict_rule = 'true' if entry.get('strict_rule', False) else 'false'
auto_apply = 'true' if entry.get('auto_apply', True) else 'false'
desc = entry.get('desc', '')
print(f"{name}|{tunnel_mode}|{default_rule}|{strict_rule}|{auto_apply}|{desc}")
def policy_exists(file, name):
"""Exit 0 if policy exists, 1 otherwise."""
data = _policy_read(file)
sys.exit(0 if name in data else 1)
def policy_add(file, name, tunnel_mode, default_rule, strict_rule, auto_apply, desc):
"""Add or update a policy entry."""
data = _policy_read(file)
data[name] = {
'tunnel_mode': tunnel_mode or 'split',
'default_rule': default_rule if default_rule else None,
'strict_rule': strict_rule == 'true',
'auto_apply': auto_apply != 'false',
'desc': desc or ''
}
_policy_write(file, data)
def policy_remove(file, name):
"""Remove a policy. Refuses to remove hardcoded defaults."""
if name in _POLICY_DEFAULTS:
print(f"Error: Cannot remove built-in policy '{name}'", file=sys.stderr)
sys.exit(1)
data = _policy_read(file)
if name not in data:
print(f"Error: Policy '{name}' not found", file=sys.stderr)
sys.exit(1)
del data[name]
_policy_write(file, data)
def policy_set_field(file, name, field, value):
"""Set a single field on an existing policy."""
data = _policy_read(file)
if name not in data:
print(f"Error: Policy '{name}' not found", file=sys.stderr)
sys.exit(1)
entry = data[name]
if field in ('strict_rule', 'auto_apply'):
entry[field] = value == 'true'
elif field == 'default_rule':
entry[field] = value if value else None
else:
entry[field] = value
data[name] = entry
_policy_write(file, data)
def subnet_policy(subnets_file, subnet_name, type_key=''):
"""
Get the policy name for a subnet entry.
Falls back to 'default' if no policy set.
"""
data = _subnet_read(subnets_file)
if subnet_name not in data:
print('default')
return
entry = data[subnet_name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
child = entry.get(key, {})
print(child.get('policy', 'default'))
else:
print(entry.get('policy', 'default'))
def json_get_nested(file, *keys):
"""
Get a nested field from a JSON file.
json_get_nested(file, "rule_flags", "strict_rule")
Output: the value as a string, or empty string if not found.
"""
try:
with open(file) as f:
data = json.load(f)
val = data
for key in keys:
if not isinstance(val, dict):
print('')
return
val = val.get(key)
if val is None:
print('')
return
if isinstance(val, bool):
print('true' if val else 'false')
elif val is None:
print('')
else:
print(val)
except Exception:
print('')
def json_set_nested(file, *args):
"""
Set a nested field in a JSON file.
Args: file, key1, key2, ..., value
json_set_nested(file, "rule_flags", "strict_rule", "true")
Creates intermediate dicts as needed.
"""
if len(args) < 2:
return
keys = args[:-1]
value = args[-1]
try:
if os.path.exists(file):
with open(file) as f:
data = json.load(f)
else:
data = {}
# Navigate/create nested structure
target = data
for key in keys[:-1]:
if key not in target or not isinstance(target[key], dict):
target[key] = {}
target = target[key]
# Coerce value types
final_key = keys[-1]
if value == 'true':
target[final_key] = True
elif value == 'false':
target[final_key] = False
else:
target[final_key] = value
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def activity_aggregate(fw_file, wg_file, wg_interface, net_file,
clients_dir, meta_dir, hours, filter_peer, filter_service_ip):
"""
Aggregate activity data for wgctl activity.
Output:
peer|name|rx_bytes|tx_bytes|drop_count
service|peer_name|dest_display|drop_count
"""
import glob
import subprocess
from datetime import datetime, timezone, timedelta
from collections import defaultdict
hours = int(hours) if hours else 24
cutoff = None
if hours > 0:
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
# Build ip -> peer name map
ip_to_peer = {}
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
name = os.path.basename(conf).replace('.conf', '')
try:
with open(conf) as f:
for line in f:
if line.startswith('Address'):
ip = line.split('=')[1].strip().split('/')[0]
ip_to_peer[ip] = name
break
except Exception:
continue
# Build pubkey -> peer name map
pubkey_to_peer = {}
for kf in glob.glob(f"{clients_dir}/*_public.key"):
name = os.path.basename(kf).replace('_public.key', '')
try:
with open(kf) as f:
key = f.read().strip()
if key:
pubkey_to_peer[key] = name
except Exception:
continue
# Get WireGuard transfer totals
peer_rx = defaultdict(int)
peer_tx = defaultdict(int)
try:
result = subprocess.run(
['wg', 'show', wg_interface, 'transfer'],
capture_output=True, text=True
)
for line in result.stdout.strip().splitlines():
parts = line.split()
if len(parts) >= 3:
pubkey, rx, tx = parts[0], int(parts[1]), int(parts[2])
peer = pubkey_to_peer.get(pubkey)
if peer:
peer_rx[peer] += rx
peer_tx[peer] += tx
except Exception:
pass
# Load net services for reverse lookup
net_data = {}
if os.path.exists(net_file):
try:
with open(net_file) as f:
net_data = json.load(f)
except Exception:
pass
def reverse_lookup(dest_ip, dest_port, proto):
for svc_name, svc in net_data.items():
if not isinstance(svc, dict):
continue
if svc.get('ip', '') != dest_ip:
continue
ports = svc.get('ports', {})
if dest_port:
for port_name, port_def in ports.items():
if not isinstance(port_def, dict):
continue
if (str(port_def.get('port', '')) == str(dest_port) and
port_def.get('proto', 'tcp') == proto):
return f"{svc_name}:{port_name}"
return svc_name
return svc_name
return ''
# Parse and first-pass dedup (within time window per key)
events = []
last_seen = {}
try:
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
src = e.get('src_ip', '')
if not src:
continue
if filter_ip and src != filter_ip:
continue
proto_num = int(e.get('ip.protocol', 0))
proto = proto_map.get(proto_num, str(proto_num))
dst = e.get('dest_ip', '')
port = str(e.get('dest_port', ''))
key = (src, dst, port, proto_num)
ts_str = e.get('timestamp', '')
try:
ts = datetime.fromisoformat(ts_str).timestamp()
except Exception:
ts = 0
windows = {1: 5, 6: 30, 17: 10}
window = windows.get(proto_num, 10)
if key in last_seen and (ts - last_seen[key]) < window:
continue
last_seen[key] = ts
events.append(e)
except Exception:
pass
except Exception:
pass
# Second-pass dedup consecutive same events with count
deduped = []
counts = []
for e in events:
ts_str = e.get('timestamp', '')
try:
ts = datetime.fromisoformat(ts_str).timestamp()
except Exception:
ts = 0
src = e.get('src_ip', '')
dst = e.get('dest_ip', '')
port = str(e.get('dest_port', ''))
proto_num = int(e.get('ip.protocol', 0))
key = (src, dst, port, proto_num)
if deduped:
prev = deduped[-1]
try:
prev_ts = datetime.fromisoformat(prev.get('timestamp', '')).timestamp()
except Exception:
prev_ts = 0
prev_key = (
prev.get('src_ip', ''),
prev.get('dest_ip', ''),
str(prev.get('dest_port', '')),
int(prev.get('ip.protocol', 0))
)
if key == prev_key and (ts - prev_ts) < 300:
counts[-1] += 1
continue
deduped.append(e)
counts.append(1)
limit = int(limit) if limit else 50
for e, count in list(zip(deduped, counts))[-limit:]:
ts_str = e.get('timestamp', '')
src = e.get('src_ip', '')
dst = e.get('dest_ip', '')
port = str(e.get('dest_port', ''))
proto_num = int(e.get('ip.protocol', 0))
proto = proto_map.get(proto_num, str(proto_num))
client = ip_to_name.get(src, src)
svc_name = reverse_lookup(dst, port, proto)
try:
dt = datetime.fromisoformat(ts_str)
ts_fmt = dt.strftime(DATETIME_FMT)
except Exception:
ts_fmt = ts_str
print(f"{ts_fmt}|{client}|{dst}|{port}|{proto}|{svc_name}|{count}")
def make_dest_display(dest_ip, dest_port, proto, svc_name):
if svc_name:
return svc_name
if dest_port:
display = f"{dest_ip}:{dest_port}"
else:
display = dest_ip
if proto and proto not in ('tcp', '6'):
display += f" ({proto})"
return display
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
# Parse fw_events for drops
peer_drops = defaultdict(int)
service_drops = defaultdict(lambda: defaultdict(int))
if os.path.exists(fw_file):
try:
with open(fw_file) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
ev = json.loads(line)
if cutoff:
ts_str = ev.get('timestamp', '')
try:
ts = datetime.fromisoformat(ts_str)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
if ts < cutoff:
continue
except Exception:
pass
src_ip = ev.get('src_ip', '')
if not src_ip:
continue
dest_ip = ev.get('dest_ip', '')
dest_port = str(ev.get('dest_port', ''))
proto_num = ev.get('ip.protocol', 0)
proto = proto_map.get(int(proto_num), str(proto_num))
peer = ip_to_peer.get(src_ip)
if not peer:
continue
if filter_peer and peer != filter_peer:
continue
if filter_service_ip and dest_ip != filter_service_ip:
continue
svc_name = reverse_lookup(dest_ip, dest_port, proto)
dest_display = make_dest_display(dest_ip, dest_port, proto, svc_name)
peer_drops[peer] += 1
service_drops[peer][dest_display] += 1
except Exception:
continue
except Exception:
pass
# Collect peers with any activity
all_peers = set()
all_peers.update(k for k in peer_rx if peer_rx[k] > 0)
all_peers.update(k for k in peer_tx if peer_tx[k] > 0)
all_peers.update(peer_drops.keys())
if filter_peer:
all_peers = {p for p in all_peers if p == filter_peer}
for peer in sorted(all_peers):
rx = peer_rx.get(peer, 0)
tx = peer_tx.get(peer, 0)
drops = peer_drops.get(peer, 0)
print(f"peer|{peer}|{rx}|{tx}|{drops}")
svc_map = service_drops.get(peer, {})
for dest_display, count in sorted(svc_map.items(), key=lambda x: -x[1]):
print(f"service|{peer}|{dest_display}|{count}")
def _hosts_read(file):
if not os.path.exists(file):
return {"hosts": {}, "subnets": {}, "ports": {}}
try:
with open(file) as f:
data = json.load(f)
# Ensure all sections exist
data.setdefault("hosts", {})
data.setdefault("subnets", {})
data.setdefault("ports", {})
return data
except Exception:
return {"hosts": {}, "subnets": {}, "ports": {}}
def _hosts_write(file, data):
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def hosts_list(file):
"""
List all host entries.
Output per line: type|key|name|desc|tags
"""
data = _hosts_read(file)
for ip, entry in sorted(data["hosts"].items()):
if isinstance(entry, dict):
name = entry.get("name", "")
desc = entry.get("desc", "")
tags = ",".join(entry.get("tags", []))
else:
name = str(entry)
desc = ""
tags = ""
print(f"host|{ip}|{name}|{desc}|{tags}")
for subnet, entry in sorted(data["subnets"].items()):
if isinstance(entry, dict):
name = entry.get("name", "")
desc = entry.get("desc", "")
tags = ",".join(entry.get("tags", []))
else:
name = str(entry)
desc = ""
tags = ""
print(f"subnet|{subnet}|{name}|{desc}|{tags}")
for port, entry in sorted(data["ports"].items()):
if isinstance(entry, dict):
name = entry.get("name", "")
desc = entry.get("desc", "")
tags = ",".join(entry.get("tags", []))
else:
name = str(entry)
desc = ""
tags = ""
print(f"port|{port}|{name}|{desc}|{tags}")
def hosts_show(file, key, entry_type):
"""Show a single host entry. type: host|subnet|port"""
data = _hosts_read(file)
section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"}
section = section_map.get(entry_type, "hosts")
entry = data[section].get(key)
if not entry:
print(f"Error: not found: {key}", file=sys.stderr)
sys.exit(1)
if isinstance(entry, dict):
print(f"name|{entry.get('name', '')}")
print(f"desc|{entry.get('desc', '')}")
print(f"tags|{','.join(entry.get('tags', []))}")
else:
print(f"name|{entry}")
print(f"desc|")
print(f"tags|")
def hosts_add(file, entry_type, key, name, desc, tags):
"""
Add a host entry.
entry_type: host|subnet|port
key: IP, subnet CIDR, or port number
"""
data = _hosts_read(file)
section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"}
section = section_map.get(entry_type, "hosts")
tag_list = [t.strip() for t in tags.split(",") if t.strip()] if tags else []
data[section][key] = {
"name": name,
"desc": desc,
"tags": tag_list
}
_hosts_write(file, data)
def hosts_remove(file, entry_type, key):
"""Remove a host entry."""
data = _hosts_read(file)
section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"}
section = section_map.get(entry_type, "hosts")
if key not in data[section]:
print(f"Error: not found: {key}", file=sys.stderr)
sys.exit(1)
del data[section][key]
_hosts_write(file, data)
def hosts_exists(file, entry_type, key):
"""Check if a host entry exists."""
data = _hosts_read(file)
section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"}
section = section_map.get(entry_type, "hosts")
print("true" if key in data[section] else "false")
def hosts_lookup(file, ip):
"""
Resolve an IP to a display name.
Checks hosts section only (exact match).
Returns name or empty string.
"""
data = _hosts_read(file)
entry = data["hosts"].get(ip)
if not entry:
print("")
return
if isinstance(entry, dict):
print(entry.get("name", ip))
else:
print(str(entry))
commands = {
'get': lambda args: get(args[0], args[1]),
'set': lambda args: set_key(args[0], args[1], args[2]),
'delete': lambda args: delete_key(args[0], args[1]),
'append': lambda args: append(args[0], args[1], args[2]),
'remove': lambda args: remove_value(args[0], args[1], args[2]),
'cat': lambda args: cat(args[0]),
'has_key': lambda args: has_key(args[0], args[1]),
'filter_values': lambda args: filter_values(args[0], args[1], args[2]),
'last_event': lambda args: last_event(args[0], args[1], args[2], args[3]),
'events_for': lambda args: events_for(args[0], args[1], args[2]),
'fw_events': lambda args: fw_events(
args[0], args[1], args[2], args[3], args[4],
args[5] if len(args) > 5 else '50',
args[6] if len(args) > 6 else '1'),
'wg_events': lambda args: wg_events(
args[0], args[1], args[2],
args[3] if len(args) > 3 else '50',
args[4] if len(args) > 4 else '1'),
'format_fw_event': lambda args: format_fw_event(sys.stdin.read(), args[0]),
'format_wg_event': lambda args: format_wg_event(sys.stdin.read()),
'remove_events': lambda args: remove_events(args[0], args[1]),
'follow_logs': lambda args: follow_logs(args[0], args[1], args[2], args[3], args[4], args[5]),
'count': lambda args: count(args[0], args[1]),
'audit_fw_counts': lambda args: audit_fw_counts(args[0]),
'peer_group_map': lambda args: peer_group_map(args[0]),
'peer_groups': lambda args: peer_groups(args[0], args[1]),
'peer_data': lambda args: peer_data(args[0], args[1], args[2]),
'iso_to_ts': lambda args: iso_to_ts(args[0]),
'rule_list_data': lambda args: rule_list_data(args[0], args[1]),
'group_list_data': lambda args: group_list_data(args[0], args[1], args[2]),
'fmt_datetime': lambda args: fmt_datetime(args[0], args[1]),
'create_rule': lambda args: create_rule(
args[0], args[1], args[2], args[3], args[4], args[5], args[6],
args[7] if len(args) > 7 else '',
args[8] if len(args) > 8 else '',
args[9] if len(args) > 9 else ''
),
'cleanup_config': lambda args: cleanup_config(args[0]),
'remove_peer_block': lambda args: remove_peer_block(args[0], args[1]),
'create_group': lambda args: create_group(args[0], args[1], args[2]),
'parse_event': lambda args: parse_event(args[0]),
'parse_fw_event': lambda args: parse_fw_event(args[0]),
'remove_events_filtered': lambda args: remove_events_filtered(
args[0], args[1], args[2], args[3], args[4]=='true', args[5]=='true', args[6] if len(args)>6 else ''),
'peer_transfer': lambda args: peer_transfer(args[0]),
'peer_transfer_delta': lambda args: peer_transfer_delta(args[0], args[1]),
'rule_resolve': lambda args: rule_resolve(args[0], args[1]),
'rule_resolve_field': lambda args: rule_resolve_field(args[0], args[1], args[2]),
'rule_inspect': lambda args: rule_inspect(args[0], args[1]),
'find_rule_file': lambda args: print(find_rule_file(args[0], args[1])),
'get_raw': lambda args: print(get_raw(args[0], args[1])),
'count_resolved': lambda args: count_resolved(args[0], args[1], args[2]),
'block_get': lambda args: block_get(args[0]),
'block_is_blocked': lambda args: block_is_blocked(args[0]),
'block_set_direct': lambda args: block_set_direct(args[0], args[1], args[2]),
'block_add_group': lambda args: block_add_group(args[0], args[1], args[2]),
'block_remove_group': lambda args: block_remove_group(args[0], args[1], args[2]),
'block_add_rule': lambda args: block_add_rule(
args[0], args[1], args[2],
args[3] if len(args) > 3 else '',
args[4] if len(args) > 4 else '',
args[5] if len(args) > 5 else '',
args[6] if len(args) > 6 else ''
),
'block_remove_rule': lambda args: block_remove_rule(
args[0], args[1],
args[2] if len(args) > 2 else '',
args[3] if len(args) > 3 else '',
args[4] if len(args) > 4 else ''
),
'block_get_rules': lambda args: block_get_rules(args[0]),
'block_get_groups': lambda args: block_get_groups(args[0]),
'block_get_direct': lambda args: block_get_direct(args[0]),
'net_list': lambda args: net_list(args[0]),
'net_show': lambda args: net_show(args[0], args[1]),
'net_exists': lambda args: net_exists(args[0], args[1]),
'net_add_service': lambda args: net_add_service(
args[0], args[1], args[2],
args[3] if len(args) > 3 else '',
args[4] if len(args) > 4 else ''
),
'net_add_port': lambda args: net_add_port(
args[0], args[1], args[2], args[3],
args[4] if len(args) > 4 else 'tcp',
args[5] if len(args) > 5 else ''
),
'net_remove': lambda args: net_remove(args[0], args[1]),
'net_resolve': lambda args: net_resolve(args[0], args[1]),
'net_reverse_lookup': lambda args: net_reverse_lookup(
args[0], args[1],
args[2] if len(args) > 2 else '',
args[3] if len(args) > 3 else ''
),
'block_is_empty': lambda args: block_is_empty(args[0]),
'group_has_peer': lambda args: group_has_peer(args[0], args[1]),
# Subnet commands:
'subnet_lookup': lambda args: subnet_lookup(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_type': lambda args: subnet_type(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_tunnel_mode': lambda args: subnet_tunnel_mode(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_for_ip': lambda args: subnet_for_ip(args[0], args[1]),
'subnet_list': lambda args: subnet_list(args[0]),
'subnet_show': lambda args: subnet_show(args[0], args[1]),
'subnet_add': lambda args: subnet_add(
args[0], args[1], args[2], args[3],
args[4] if len(args) > 4 else 'split',
args[5] if len(args) > 5 else '',
args[6] if len(args) > 6 else ''
),
'subnet_remove': lambda args: subnet_remove(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_rename': lambda args: subnet_rename(args[0], args[1], args[2], args[3] if len(args) > 3 else ''),
'subnet_peers': lambda args: subnet_peers(args[0], args[1], args[2], args[3]),
'subnet_exists': lambda args: subnet_exists(args[0], args[1]),
# Identity commands:
'identity_list': lambda args: identity_list(args[0]),
'identity_show': lambda args: identity_show(args[0]),
'identity_add_peer': lambda args: identity_add_peer(args[0], args[1], args[2], args[3], args[4]),
'identity_remove_peer':lambda args: identity_remove_peer(args[0], args[1]),
'identity_remove': lambda args: identity_remove(args[0]),
'identity_next_index': lambda args: identity_next_index(args[0], args[1]),
'identity_peers': lambda args: identity_peers(args[0], args[1] if len(args) > 1 else ''),
'identity_migrate': lambda args: identity_migrate(args[0], args[1], args[2], args[3]),
'identity_infer': lambda args: identity_infer(args[0]),
'identity_exists': lambda args: identity_exists(args[0]),
'subnet_default_rule': lambda args: subnet_default_rule(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_list_names': lambda args: subnet_list_names(args[0]),
# Policy commands:
'policy_get': lambda args: policy_get(args[0], args[1], args[2] if len(args) > 2 else ''),
'policy_list': lambda args: policy_list(args[0]),
'policy_exists': lambda args: policy_exists(args[0], args[1]),
'policy_add': lambda args: policy_add(args[0], args[1], args[2], args[3], args[4], args[5], args[6] if len(args) > 6 else ''),
'policy_remove': lambda args: policy_remove(args[0], args[1]),
'policy_set_field': lambda args: policy_set_field(args[0], args[1], args[2], args[3]),
'subnet_policy': lambda args: subnet_policy(args[0], args[1], args[2] if len(args) > 2 else ''),
'get_nested': lambda args: json_get_nested(args[0], *args[1:]),
'set_nested': lambda args: json_set_nested(args[0], *args[1:]),
'identity_rules': lambda args: identity_rules(args[0]),
'identity_add_rule': lambda args: identity_add_rule(args[0], args[1], args[2]),
'identity_remove_rule': lambda args: identity_remove_rule(args[0], args[1]),
'identity_clear_rules': lambda args: identity_clear_rules(args[0]),
'identity_has_rule': lambda args: identity_has_rule(args[0], args[1]),
'activity_aggregate': lambda args: activity_aggregate(
args[0], args[1], args[2], args[3], args[4], args[5],
args[6] if len(args) > 6 else '24',
args[7] if len(args) > 7 else '',
args[8] if len(args) > 8 else ''
),
'hosts_list': lambda args: hosts_list(args[0]),
'hosts_show': lambda args: hosts_show(args[0], args[1], args[2]),
'hosts_add': lambda args: hosts_add(args[0], args[1], args[2], args[3],
args[4] if len(args) > 4 else '',
args[5] if len(args) > 5 else ''),
'hosts_remove': lambda args: hosts_remove(args[0], args[1], args[2]),
'hosts_exists': lambda args: hosts_exists(args[0], args[1], args[2]),
'hosts_lookup': lambda args: hosts_lookup(args[0], args[1]),
}
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: json_helper.py <command> <file> [key] [value]", file=sys.stderr)
sys.exit(1)
cmd = sys.argv[1]
args = sys.argv[2:]
if cmd not in commands:
print(f"Unknown command: {cmd}", file=sys.stderr)
sys.exit(1)
commands[cmd](args)