wgctl/core/json_helper.py

1577 lines
No EOL
54 KiB
Python

#!/usr/bin/env python3
"""
wgctl JSON helper — called by shell functions to read/write JSON files.
Usage: json_helper.py <command> <file> [key] [value]
"""
import sys
import json
import os
DATETIME_FMT = os.environ.get('WGCTL_DATETIME_FMT', '%Y-%m-%d %H:%M')
def get(file, key):
try:
with open(file) as f:
data = json.load(f)
val = data.get(key, [])
if isinstance(val, bool):
print(str(val).lower()) # true/false not True/False
elif isinstance(val, list):
if val:
print('\n'.join(str(v) for v in val))
else:
if val:
print(val)
except:
sys.exit(0)
def set_key(file, key, value):
try:
data = {}
if os.path.exists(file):
with open(file) as f:
data = json.load(f)
# Try to parse as JSON value first (for arrays/bools)
try:
data[key] = json.loads(value)
except:
data[key] = value
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def delete_key(file, key):
try:
with open(file) as f:
data = json.load(f)
data.pop(key, None)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def append(file, key, value):
try:
data = {}
if os.path.exists(file):
with open(file) as f:
data = json.load(f)
if key not in data:
data[key] = []
if value not in data[key]:
data[key].append(value)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def remove_value(file, key, value):
try:
with open(file) as f:
data = json.load(f)
if key in data and value in data[key]:
data[key].remove(value)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def cat(file):
try:
with open(file) as f:
data = json.load(f)
print(json.dumps(data, indent=2))
except Exception as e:
sys.exit(1)
def has_key(file, key):
try:
with open(file) as f:
data = json.load(f)
sys.exit(0 if key in data else 1)
except:
sys.exit(1)
def filter_values(file, key, value):
"""Remove all entries where value matches"""
try:
with open(file) as f:
data = json.load(f)
data = {k: v for k, v in data.items() if v != value}
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def last_event(file, key, field, client):
"""Get last event field for a client"""
try:
last = None
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
if e.get(key) == client:
last = e
except:
pass
if last:
print(last.get(field, ''))
except:
pass
def events_for(file, ip, limit):
"""Format events for a given IP"""
try:
from datetime import datetime
events = []
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
if e.get('ip') == ip:
events.append(e)
except:
pass
for e in events[-int(limit):]:
ts = e.get('timestamp', '')
try:
dt = datetime.fromisoformat(ts)
ts = dt.strftime(DATETIME_FMT)
except:
pass
endpoint = e.get('endpoint', '')
client = e.get('client', '')
event = e.get('event', '')
print(f' {ts} {client:<20} {endpoint:<20} {event}')
except:
pass
def fw_events(file, filter_ip, filter_type, clients_dir, limit):
"""Format firewall drop events with dedup and counts"""
import glob
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
ip_to_name = {}
for conf in glob.glob(f"{clients_dir}/*.conf"):
name = os.path.basename(conf).replace('.conf', '')
try:
with open(conf) as f:
for line in f:
if line.startswith('Address'):
ip = line.split('=')[1].strip().split('/')[0]
ip_to_name[ip] = name
except:
pass
events = []
last_seen = {}
try:
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
src = e.get('src_ip', '')
if not src:
continue
if filter_ip and src != filter_ip:
continue
dst = e.get('dest_ip', '')
port = e.get('dest_port', '')
proto = e.get('ip.protocol', 0)
key = (src, dst, port, proto)
ts_str = e.get('timestamp', '')
try:
from datetime import datetime
ts = datetime.fromisoformat(ts_str).timestamp()
except:
ts = 0
windows = {1: 5, 6: 30, 17: 10}
window = windows.get(proto, 10)
if key in last_seen and (ts - last_seen[key]) < window:
continue
last_seen[key] = ts
events.append(e)
except:
pass
except:
pass
# Dedup consecutive same src+dst+port within 60s with count
deduped = []
counts = []
for e in events:
ts_str = e.get('timestamp', '')
try:
from datetime import datetime
ts = datetime.fromisoformat(ts_str).timestamp()
except:
ts = 0
src = e.get('src_ip', '')
dst = e.get('dest_ip', '')
port = e.get('dest_port', '')
proto = e.get('ip.protocol', 0)
key = (src, dst, port, proto)
if deduped and counts:
prev = deduped[-1]
try:
prev_ts = datetime.fromisoformat(prev.get('timestamp','')).timestamp()
except:
prev_ts = 0
prev_key = (prev.get('src_ip',''), prev.get('dest_ip',''),
prev.get('dest_port',''), prev.get('ip.protocol',0))
if key == prev_key and (ts - prev_ts) < 60:
counts[-1] += 1
continue
deduped.append(e)
counts.append(1)
grouped = []
group_counts = []
for e in deduped:
ts_str = e.get('timestamp', '')
try:
from datetime import datetime
dt = datetime.fromisoformat(ts_str)
# Truncate to minute for grouping
minute_key = dt.strftime('%Y-%m-%d %H:%M')
except:
minute_key = ts_str[:16]
src = e.get('src_ip', '')
dst = e.get('dest_ip', '')
port = e.get('dest_port', '')
proto = e.get('ip.protocol', 0)
key = (minute_key, src, dst, proto) # group within same minute
if grouped and group_counts:
prev = grouped[-1]
try:
prev_dt = datetime.fromisoformat(prev.get('timestamp',''))
prev_minute = prev_dt.strftime('%Y-%m-%d %H:%M')
except:
prev_minute = ''
prev_key = (prev_minute, prev.get('src_ip',''),
prev.get('dest_ip',''), prev.get('ip.protocol',0))
if key == prev_key:
group_counts[-1] += 1
continue
grouped.append(e)
group_counts.append(1)
for e, count in zip(deduped[-int(limit):], counts[-int(limit):]):
ts = e.get('timestamp', '')
try:
from datetime import datetime
dt = datetime.fromisoformat(ts)
ts = dt.strftime(DATETIME_FMT)
except:
pass
src = e.get('src_ip', '')
dst = e.get('dest_ip', '')
port = e.get('dest_port', '')
proto_num = e.get('ip.protocol', 0)
proto = proto_map.get(proto_num, str(proto_num))
dst_str = f"{dst}:{port}" if port else dst
client = ip_to_name.get(src, src)
if filter_type and not client.startswith(filter_type + '-'):
continue
count_str = f" (x{count})" if count > 1 else ""
print(f"{ts}|{client}|{dst_str}|{proto}{count_str}")
def wg_events(file, filter_client, filter_type, limit):
"""Format WireGuard events from events.log with dedup"""
events = []
try:
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
client = e.get('client', '')
if not client:
continue
if filter_client and client != filter_client:
continue
if filter_type and not client.startswith(filter_type + '-'):
continue
events.append(e)
except:
pass
except:
pass
# Dedup consecutive same client+event+endpoint within 60s
deduped = []
counts = []
for e in events:
ts_str = e.get('timestamp', '')
try:
from datetime import datetime
ts = datetime.fromisoformat(ts_str).timestamp()
except:
ts = 0
client = e.get('client', '')
event = e.get('event', '')
endpoint = e.get('endpoint', '')
key = (client, event, endpoint[:15])
if deduped and counts:
prev = deduped[-1]
prev_ts_str = prev.get('timestamp', '')
try:
prev_ts = datetime.fromisoformat(prev_ts_str).timestamp()
except:
prev_ts = 0
prev_key = (prev.get('client',''), prev.get('event',''), prev.get('endpoint','')[:15])
if key == prev_key and (ts - prev_ts) < 60:
counts[-1] += 1
continue
deduped.append(e)
counts.append(1)
for e, count in zip(deduped[-int(limit):], counts[-int(limit):]):
ts = e.get('timestamp', '')
try:
from datetime import datetime
dt = datetime.fromisoformat(ts)
ts = dt.strftime(DATETIME_FMT)
except:
pass
client = e.get('client', '')
endpoint = e.get('endpoint', '')
event = e.get('event', '')
count_str = f" (x{count})" if count > 1 else ""
print(f"{ts}|{client}|{endpoint}|{event}{count_str}")
def format_fw_event(line, clients_dir):
"""Format a single fw_event line"""
import glob
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
# Build ip->name map
ip_to_name = {}
for conf in glob.glob(f"{clients_dir}/*.conf"):
name = os.path.basename(conf).replace('.conf', '')
try:
with open(conf) as f:
for l in f:
if l.startswith('Address'):
ip = l.split('=')[1].strip().split('/')[0]
ip_to_name[ip] = name
except:
pass
try:
e = json.loads(line.strip())
src = e.get('src_ip', '')
if not src:
return None
ts = e.get('timestamp', '')
try:
from datetime import datetime
dt = datetime.fromisoformat(ts)
ts = dt.strftime(DATETIME_FMT)
except:
pass
dst = e.get('dest_ip', '')
port = e.get('dest_port', '')
proto_num = e.get('ip.protocol', 0)
proto = proto_map.get(proto_num, str(proto_num))
dst_str = f"{dst}:{port}" if port else dst
client = ip_to_name.get(src, src)
return f"{ts}|{client}|{dst_str}|{proto}"
except:
return None
def format_wg_event(line):
"""Format a single wg_event line"""
try:
e = json.loads(line.strip())
client = e.get('client', '')
if not client:
return None
ts = e.get('timestamp', '')
try:
from datetime import datetime
dt = datetime.fromisoformat(ts)
ts = dt.strftime(DATETIME_FMT)
except:
pass
endpoint = e.get('endpoint', '')
event = e.get('event', '')
return f"{ts}|{client}|{endpoint}|{event}|wg"
except:
return None
def remove_events(file, identifier):
"""Remove all events for a client/ip from a JSONL file"""
try:
lines = []
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
if e.get('client') == identifier or e.get('src_ip') == identifier:
continue
lines.append(line)
except:
lines.append(line)
with open(file, 'w') as f:
f.writelines(lines)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def follow_logs(fw_file, wg_file, filter_ip, filter_type, clients_dir, filter_peers=""):
"""Follow both log files and output formatted events"""
import glob, time, select
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
peer_filter = set(filter_peers.split(',')) if filter_peers else set()
# Build ip->name map
ip_to_name = {}
for conf in glob.glob(f"{clients_dir}/*.conf"):
name = os.path.basename(conf).replace('.conf', '')
try:
with open(conf) as f:
for l in f:
if l.startswith('Address'):
ip = l.split('=')[1].strip().split('/')[0]
ip_to_name[ip] = name
except:
pass
# Open files and seek to end
files = {}
for label, path in [('fw', fw_file), ('wg', wg_file)]:
if path and os.path.exists(path):
f = open(path)
f.seek(0, 2) # seek to end
files[label] = f
dedup = {}
try:
while True:
for label, f in files.items():
line = f.readline()
if not line:
continue
try:
e = json.loads(line.strip())
except:
continue
if label == 'fw':
src = e.get('src_ip', '')
if not src:
continue
if filter_ip and src != filter_ip:
continue
# Filter by peer names if specified
if peer_filter:
client_name = ip_to_name.get(src, '')
if client_name not in peer_filter:
continue
dst = e.get('dest_ip', '')
port = e.get('dest_port', '')
proto_num = e.get('ip.protocol', 0)
proto = proto_map.get(proto_num, str(proto_num))
# Dedup
key = (src, dst, port, proto_num)
windows = {1: 5, 6: 30, 17: 10}
window = windows.get(proto_num, 10)
now = time.time()
if key in dedup and (now - dedup[key]) < window:
continue
dedup[key] = now
client = ip_to_name.get(src, src)
if filter_type and not client.startswith(filter_type + '-'):
continue
dst_str = f"{dst}:{port}" if port else dst
ts = e.get('timestamp', '')[:16].replace('T', ' ')
print(f"fw|{ts}|{client}|{dst_str}|{proto}", flush=True)
elif label == 'wg':
client = e.get('client', '')
if not client:
continue
if filter_ip:
ip = ip_to_name.get(filter_ip, '')
if client != ip and client != filter_ip:
continue
if peer_filter and client not in peer_filter:
continue
if filter_type and not client.startswith(filter_type + '-'):
continue
ts = e.get('timestamp', '')[:16].replace('T', ' ')
endpoint = e.get('endpoint', '')
event = e.get('event', '')
print(f"wg|{ts}|{client}|{endpoint}|{event}", flush=True)
time.sleep(0.1)
except KeyboardInterrupt:
pass
def count(file, key):
try:
with open(file) as f:
data = json.load(f)
val = data.get(key, [])
print(len(val) if isinstance(val, list) else 0)
except:
print(0)
def audit_fw_counts(clients_dir):
"""Return peer_name:fw_count pairs from iptables and client configs"""
import glob, subprocess
# Get iptables output once
try:
result = subprocess.run(
['iptables', '-L', 'FORWARD', '-n'],
capture_output=True, text=True
)
fw_output = result.stdout
except:
fw_output = ""
# Build ip->name and count rules
for conf in glob.glob(f"{clients_dir}/*.conf"):
name = os.path.basename(conf).replace('.conf', '')
try:
with open(conf) as f:
for line in f:
if line.startswith('Address'):
ip = line.split('=')[1].strip().split('/')[0]
count = fw_output.count(ip)
print(f"{name}:{count}")
break
except:
pass
def peer_group_map(groups_dir):
"""Return peer:group pairs for all groups"""
import glob
try:
for group_file in glob.glob(f"{groups_dir}/*.group"):
try:
with open(group_file) as f:
g = json.load(f)
name = g.get('name', '')
for peer in g.get('peers', []):
if peer:
print(f"{peer}:{name}")
except:
pass
except:
pass
def peer_groups(groups_dir, peer_name):
"""Find all groups containing a peer"""
import glob
try:
for group_file in glob.glob(f"{groups_dir}/*.group"):
try:
with open(group_file) as f:
g = json.load(f)
if peer_name in g.get('peers', []):
print(g.get('name', ''))
except:
pass
except:
pass
def peer_data(clients_dir, meta_dir, events_log):
import glob
meta = {}
for f in glob.glob(f"{meta_dir}/*.meta"):
name = os.path.basename(f).replace('.meta', '')
try:
with open(f) as mf:
meta[name] = json.load(mf)
except:
meta[name] = {}
last_events = {}
try:
with open(events_log) as f:
for line in f:
try:
e = json.loads(line.strip())
client = e.get('client', '')
if client:
last_events[client] = e
except:
pass
except:
pass
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
name = os.path.basename(conf).replace('.conf', '')
ip = ''
try:
with open(conf) as f:
for line in f:
if line.startswith('Address'):
ip = line.split('=')[1].strip().split('/')[0]
break
except:
pass
m = meta.get(name, {})
rule = m.get('rule', '')
subtype = m.get('subtype', '')
last_event = last_events.get(name, {})
last_ts = last_event.get('timestamp', '') # raw ISO, no formatting
last_evt = last_event.get('event', '') # fixed: was last_event
print(f"{name}|{ip}|{rule}|{subtype}|{last_ts}|{last_evt}")
def iso_to_ts(iso_str):
"""Convert ISO timestamp to unix timestamp"""
try:
from datetime import datetime, timezone
dt = datetime.fromisoformat(iso_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
print(int(dt.timestamp()))
except:
print(0)
def rule_list_data(rules_dir, meta_dir):
"""Return all rule data including base rules and extends"""
import glob
rule_peer_counts = {}
for f in glob.glob(f"{meta_dir}/*.meta"):
try:
with open(f) as mf:
meta = json.load(mf)
rule = meta.get('rule', '')
if rule:
rule_peer_counts[rule] = rule_peer_counts.get(rule, 0) + 1
except:
pass
rule_files = (
sorted(glob.glob(f"{rules_dir}/*.rule")) +
sorted(glob.glob(f"{rules_dir}/base/*.rule"))
)
# Collect all data first
rules_data = []
for rule_file in rule_files:
is_base = '/base/' in rule_file
try:
with open(rule_file) as f:
r = json.load(f)
name = r.get('name', '')
desc = r.get('desc', '')
group = r.get('group', '')
extends = ','.join(r.get('extends', []))
resolved = _rule_resolve_internal(rules_dir, name)
n_allows = len(resolved.get('allow_ips', [])) + \
len(resolved.get('allow_ports', []))
n_blocks = len(resolved.get('block_ips', [])) + \
len(resolved.get('block_ports', []))
peer_count = rule_peer_counts.get(name, 0)
rules_data.append({
'name': name, 'desc': desc, 'n_allows': n_allows,
'n_blocks': n_blocks, 'peer_count': peer_count,
'extends': extends, 'is_base': is_base, 'group': group
})
except:
pass
# Sort: non-base first, then by group (empty group last within non-base),
# then by name within group
rules_data.sort(key=lambda x: (
x['is_base'],
x['group'] == '' and not x['is_base'],
x['group'],
x['name']
))
for r in rules_data:
print(f"{r['name']}|{r['desc']}|{r['n_allows']}|{r['n_blocks']}|"
f"{r['peer_count']}|{r['extends']}|{r['is_base']}|{r['group']}")
def group_list_data(groups_dir, blocks_dir):
"""Return group summary data in one call"""
import glob
# Get all block files
blocked_peers = set()
for f in glob.glob(f"{blocks_dir}/*.block"):
name = os.path.basename(f).replace('.block', '')
blocked_peers.add(name)
for group_file in sorted(glob.glob(f"{groups_dir}/*.group")):
try:
with open(group_file) as f:
g = json.load(f)
name = g.get('name', '')
desc = g.get('desc', '')
peers = [p for p in g.get('peers', []) if p]
total = len(peers)
blocked = sum(1 for p in peers if p in blocked_peers)
print(f"{name}|{desc}|{total}|{blocked}")
except:
pass
def fmt_datetime(iso_str, fmt):
"""Format ISO timestamp with given strftime format"""
try:
from datetime import datetime
dt = datetime.fromisoformat(iso_str)
print(dt.strftime(fmt))
except:
print(iso_str)
def create_rule(file, name, desc, dns_redirect, allow_ips, block_ips,
block_ports, allow_ports='', extends='', group=''):
rule = {
'name': name,
'desc': desc,
'group': group,
'dns_redirect': dns_redirect == 'true',
'extends': [x for x in extends.split(',') if x] if extends else [],
'allow_ips': [x for x in allow_ips.split(',') if x] if allow_ips else [],
'allow_ports': [x for x in allow_ports.split(',') if x] if allow_ports else [],
'block_ips': [x for x in block_ips.split(',') if x] if block_ips else [],
'block_ports': [x for x in block_ports.split(',') if x] if block_ports else [],
}
with open(file, 'w') as f:
json.dump(rule, f, indent=2)
def cleanup_config(config_file):
"""Normalize blank lines in WireGuard config"""
import re
try:
with open(config_file) as f:
config = f.read()
config = re.sub(r'\n{3,}', '\n\n', config)
config = config.rstrip('\n') + '\n'
with open(config_file, 'w') as f:
f.write(config)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def remove_peer_block(config_file, name):
"""Remove a peer block from WireGuard config by name"""
import re
try:
with open(config_file) as f:
config = f.read()
pattern = r'\n\[Peer\]\n# ' + re.escape(name) + r'\n[^\n]+\n[^\n]+\n'
result = re.sub(pattern, '\n', config)
with open(config_file, 'w') as f:
f.write(result)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def create_group(file, name, desc):
"""Create a new group JSON file"""
try:
group = {'name': name, 'desc': desc, 'peers': []}
with open(file, 'w') as f:
json.dump(group, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def parse_event(line):
"""Parse a single JSON event line"""
try:
e = json.loads(line)
print(f"{e.get('timestamp','')}|{e.get('client','')}|{e.get('endpoint','')}|{e.get('event','')}")
except:
pass
def parse_fw_event(line):
"""Parse a single fw_events.log JSON line"""
try:
e = json.loads(line)
ts = e.get('timestamp', '')
src = e.get('src_ip', '')
dst = e.get('dest_ip', '')
port = e.get('dest_port', '')
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
proto_num = e.get('ip.protocol', 0)
proto = proto_map.get(proto_num, str(proto_num))
print(f"{ts}|{src}|{dst}|{port}|{proto}")
except:
pass
def peer_transfer(wg_interface):
"""Get total transfer bytes per peer"""
import subprocess
low = int(os.environ.get('ACTIVITY_TOTAL_LOW_BYTES', '1000000'))
med = int(os.environ.get('ACTIVITY_TOTAL_MED_BYTES', '10000000'))
high = int(os.environ.get('ACTIVITY_TOTAL_HIGH_BYTES', '100000000'))
try:
result = subprocess.run(
['wg', 'show', wg_interface, 'transfer'],
capture_output=True, text=True
)
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('\t')
if len(parts) == 3:
pubkey, rx, tx = parts
total = int(rx) + int(tx)
if total == 0: level = 'none'
elif total < low: level = 'low'
elif total < med: level = 'medium'
elif total < high: level = 'high'
else: level = 'very high'
print(f"{pubkey}|{rx}|{tx}|{level}")
except:
pass
def peer_transfer_delta(wg_interface, cache_file):
"""Calculate current transfer rate using delta from previous sample"""
import subprocess, time
low = int(os.environ.get('ACTIVITY_CURRENT_LOW_BYTES', '10000')) # 10KB/s
med = int(os.environ.get('ACTIVITY_CURRENT_MED_BYTES', '100000')) # 100KB/s
high = int(os.environ.get('ACTIVITY_CURRENT_HIGH_BYTES', '1000000')) # 1MB/s
current = {}
now = time.time()
try:
result = subprocess.run(
['wg', 'show', wg_interface, 'transfer'],
capture_output=True, text=True
)
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('\t')
if len(parts) == 3:
pubkey, rx, tx = parts
current[pubkey] = {'rx': int(rx), 'tx': int(tx), 'ts': now}
except:
pass
prev = {}
if os.path.exists(cache_file):
try:
with open(cache_file) as f:
prev = json.load(f)
except:
pass
try:
with open(cache_file, 'w') as f:
json.dump(current, f)
except:
pass
for pubkey, data in current.items():
if pubkey in prev:
dt = data['ts'] - prev[pubkey].get('ts', data['ts'])
if dt > 0:
rx_rate = max(0, (data['rx'] - prev[pubkey]['rx']) / dt)
tx_rate = max(0, (data['tx'] - prev[pubkey]['tx']) / dt)
total = rx_rate + tx_rate
if total <= 0: level = 'idle'
elif total < low: level = 'low'
elif total < med: level = 'medium'
elif total < high: level = 'high'
else: level = 'very high'
print(f"{pubkey}|{int(rx_rate)}|{int(tx_rate)}|{level}")
else:
print(f"{pubkey}|0|0|idle")
else:
print(f"{pubkey}|0|0|unknown")
def remove_events_filtered(wg_file, fw_file, filter_name, filter_ip,
filter_fw, filter_wg, before_days):
"""Remove events with filters: by name/ip, source, or age"""
import time
from datetime import datetime, timezone
cutoff_ts = None
if before_days:
cutoff_ts = time.time() - (float(before_days) * 86400)
def should_remove_wg(e):
if filter_name and e.get('client') != filter_name:
return False
if cutoff_ts:
try:
ts = datetime.fromisoformat(e.get('timestamp','')).timestamp()
return ts < cutoff_ts
except:
return False
return True
def should_remove_fw(e):
if filter_ip and e.get('src_ip') != filter_ip:
return False
if cutoff_ts:
try:
ts = datetime.fromisoformat(e.get('timestamp','')).timestamp()
return ts < cutoff_ts
except:
return False
return True
removed_wg = removed_fw = 0
if not filter_fw and os.path.exists(wg_file):
lines = []
with open(wg_file) as f:
for line in f:
try:
e = json.loads(line.strip())
if should_remove_wg(e):
removed_wg += 1
continue
except:
pass
lines.append(line)
with open(wg_file, 'w') as f:
f.writelines(lines)
if not filter_wg and os.path.exists(fw_file):
lines = []
with open(fw_file) as f:
for line in f:
try:
e = json.loads(line.strip())
if should_remove_fw(e):
removed_fw += 1
continue
except:
pass
lines.append(line)
with open(fw_file, 'w') as f:
f.writelines(lines)
print(f"{removed_wg}|{removed_fw}")
def _rule_resolve_internal(rules_dir, rule_name, visited=None):
"""Internal recursive resolver — returns dict, does not print"""
if visited is None:
visited = set()
if rule_name in visited:
raise ValueError(f"Circular dependency detected: {rule_name}")
visited.add(rule_name)
rule_file = find_rule_file(rules_dir, rule_name)
with open(rule_file) as f:
rule = json.load(f)
merged = {
'allow_ips': [],
'allow_ports': [],
'block_ips': [],
'block_ports': [],
'dns_redirect': False
}
for base_name in rule.get('extends', []):
base = _rule_resolve_internal(rules_dir, base_name, visited.copy())
merged['allow_ips'] += base.get('allow_ips', [])
merged['allow_ports'] += base.get('allow_ports', [])
merged['block_ips'] += base.get('block_ips', [])
merged['block_ports'] += base.get('block_ports', [])
if base.get('dns_redirect'):
merged['dns_redirect'] = True
# Merge own fields — use .get() with defaults for all fields
merged['allow_ips'] = list(dict.fromkeys(
merged['allow_ips'] + rule.get('allow_ips', [])))
merged['allow_ports'] = list(dict.fromkeys(
merged['allow_ports'] + rule.get('allow_ports', [])))
merged['block_ips'] = list(dict.fromkeys(
merged['block_ips'] + rule.get('block_ips', [])))
merged['block_ports'] = list(dict.fromkeys(
merged['block_ports'] + rule.get('block_ports', [])))
if rule.get('dns_redirect', False):
merged['dns_redirect'] = True
merged['name'] = rule.get('name', rule_name)
merged['desc'] = rule.get('desc', '')
merged['group'] = rule.get('group', '')
merged['extends'] = rule.get('extends', [])
return merged
def rule_resolve(rules_dir, rule_name):
"""Resolve a rule with inheritance — prints JSON"""
try:
resolved = _rule_resolve_internal(rules_dir, rule_name)
print(json.dumps(resolved))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def rule_resolve_field(rules_dir, rule_name, field):
"""Get a single field from resolved rule — prints values one per line"""
try:
resolved = _rule_resolve_internal(rules_dir, rule_name)
val = resolved.get(field, [])
if isinstance(val, list):
for v in val:
print(v)
else:
print(val)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def rule_inspect(rules_dir, rule_name):
"""Show inheritance tree for a rule"""
try:
rule_file = find_rule_file(rules_dir, rule_name)
with open(rule_file) as f:
rule = json.load(f)
resolved = _rule_resolve_internal(rules_dir, rule_name)
has_extends = bool(rule.get('extends', []))
# Own rules
for ip in rule.get('allow_ips', []):
print(f"own|allow_ip|{ip}")
for p in rule.get('allow_ports', []):
print(f"own|allow_port|{p}")
for ip in rule.get('block_ips', []):
print(f"own|block_ip|{ip}")
for p in rule.get('block_ports', []):
print(f"own|block_port|{p}")
# DNS redirect — separate section
if rule.get('dns_redirect'):
print(f"dns|dns_redirect|true")
if has_extends:
# Inherited rules per base
for base_name in rule.get('extends', []):
base = _rule_resolve_internal(rules_dir, base_name)
for ip in base.get('allow_ips', []):
print(f"inherited:{base_name}|allow_ip|{ip}")
for p in base.get('allow_ports', []):
print(f"inherited:{base_name}|allow_port|{p}")
for ip in base.get('block_ips', []):
print(f"inherited:{base_name}|block_ip|{ip}")
for p in base.get('block_ports', []):
print(f"inherited:{base_name}|block_port|{p}")
if base.get('dns_redirect'):
print(f"inherited:{base_name}|dns_redirect|true")
# Resolved summary only when inheritance exists
has_resolved = (
resolved.get('allow_ips') or resolved.get('allow_ports') or
resolved.get('block_ips') or resolved.get('block_ports') or
resolved.get('dns_redirect')
)
if has_resolved:
for ip in resolved.get('allow_ips', []):
print(f"resolved|allow_ip|{ip}")
for p in resolved.get('allow_ports', []):
print(f"resolved|allow_port|{p}")
for ip in resolved.get('block_ips', []):
print(f"resolved|block_ip|{ip}")
for p in resolved.get('block_ports', []):
print(f"resolved|block_port|{p}")
if resolved.get('dns_redirect'):
print(f"resolved|dns_redirect|true")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def find_rule_file(rules_dir, rule_name):
"""Find rule file in rules/ or rules/base/"""
for path in [
os.path.join(rules_dir, f"{rule_name}.rule"),
os.path.join(rules_dir, "base", f"{rule_name}.rule"),
]:
if os.path.exists(path):
return path
return ""
def get_raw(file, key):
try:
with open(file) as f:
data = json.load(f)
val = data.get(key) # returns None if missing
if val is None:
pass # print nothing
elif isinstance(val, bool):
print(str(val).lower())
elif isinstance(val, list):
for v in val:
print(v)
else:
print(val)
except:
pass
def count_resolved(rules_dir, rule_name, key):
"""Count entries in resolved rule field"""
try:
resolved = _rule_resolve_internal(rules_dir, rule_name)
print(len(resolved.get(key, [])))
except:
print(0)
def _block_init(peer_ip):
"""Return empty block structure"""
return {
"peer_ip": peer_ip,
"blocked_direct": False,
"blocked_by_groups": [],
"rules": []
}
def _block_read(file):
try:
with open(file) as f:
content = f.read().strip()
if not content:
return None # empty file = no block data
try:
return json.loads(content)
except json.JSONDecodeError:
# Old format — migrate
lines = content.split('\n')
peer_ip = lines[0].split()[0] if lines else ''
new_data = {
"peer_ip": peer_ip,
"blocked_direct": True,
"blocked_by_groups": [],
"rules": [{"name": "full block", "type": "full"}]
}
with open(file, 'w') as f:
json.dump(new_data, f, indent=2)
return new_data
except FileNotFoundError:
return None
except Exception:
return None
def _block_write(file, data):
"""Write block file"""
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def block_get(file):
"""Read and print block file as JSON"""
data = _block_read(file)
if data:
print(json.dumps(data))
def block_is_blocked(file):
"""Return true if peer is effectively blocked"""
data = _block_read(file)
if not data:
print("false")
return
blocked = data.get("blocked_direct", False) or \
bool(data.get("blocked_by_groups", []))
print("true" if blocked else "false")
def block_set_direct(file, peer_ip, value):
"""Set blocked_direct"""
try:
data = _block_read(file) or _block_init(peer_ip)
data["blocked_direct"] = value.lower() == "true"
data["peer_ip"] = peer_ip
_block_write(file, data)
remaining = data["blocked_direct"] or bool(data.get("blocked_by_groups", []))
pass
# print("true" if remaining else "false")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_add_group(file, peer_ip, group):
"""Add group to blocked_by_groups"""
try:
data = _block_read(file) or _block_init(peer_ip)
data["peer_ip"] = peer_ip
groups = data.get("blocked_by_groups", [])
if group not in groups:
groups.append(group)
data["blocked_by_groups"] = groups
_block_write(file, data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_remove_group(file, peer_ip, group):
"""Remove group from blocked_by_groups, return whether still blocked"""
try:
data = _block_read(file) or _block_init(peer_ip)
groups = data.get("blocked_by_groups", [])
if group in groups:
groups.remove(group)
data["blocked_by_groups"] = groups
_block_write(file, data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# print("true" if remaining else "false")
pass
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_add_rule(file, peer_ip, rule_type, name="", target="",
port="", proto=""):
"""Add a block rule entry"""
try:
data = _block_read(file) or _block_init(peer_ip)
data["peer_ip"] = peer_ip
rule = {"type": rule_type}
if name: rule["name"] = name
if target: rule["target"] = target
if port: rule["port"] = port
if proto: rule["proto"] = proto
rules = data.get("rules", [])
for existing in rules:
if existing.get("type") == rule_type and \
existing.get("target","") == target and \
existing.get("port","") == port and \
existing.get("proto","") == proto:
return # already exists, skip
rules.append(rule)
data["rules"] = rules
_block_write(file, data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_remove_rule(file, rule_type, target="", port="", proto=""):
"""Remove matching block rule entry"""
try:
data = _block_read(file)
if not data:
return
rules = data.get("rules", [])
filtered = []
for r in rules:
if r.get("type") == rule_type and \
r.get("target", "") == target and \
r.get("port", "") == port and \
r.get("proto", "") == proto:
continue # remove this one
filtered.append(r)
data["rules"] = filtered
_block_write(file, data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_get_rules(file):
"""Print rules as pipe-separated lines: name|type|target|port|proto"""
data = _block_read(file)
if not data:
return
for r in data.get("rules", []):
print(f"{r.get('name','')}|{r.get('type','')}|"
f"{r.get('target','')}|{r.get('port','')}|{r.get('proto','')}")
def block_get_groups(file):
data = _block_read(file)
if not data:
return
print(','.join(data.get('blocked_by_groups', [])))
def block_get_direct(file):
data = _block_read(file)
if not data:
print('false')
return
print('true' if data.get('blocked_direct', False) else 'false')
# ============================================
# Net / Services
# ============================================
def _net_read(file):
"""Read services.json, return dict or empty dict"""
try:
if not os.path.exists(file):
return {}
with open(file) as f:
content = f.read().strip()
if not content:
return {}
return json.loads(content)
except Exception:
return {}
def _net_write(file, data):
"""Write services.json"""
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def net_list(file):
"""List all service names with IP and port count"""
data = _net_read(file)
for name, svc in sorted(data.items()):
ip = svc.get('ip', '')
desc = svc.get('desc', '')
tags = ','.join(svc.get('tags', []))
ports = len(svc.get('ports', {}))
print(f"{name}|{ip}|{desc}|{tags}|{ports}")
def net_show(file, name):
"""Show full service details"""
data = _net_read(file)
if name not in data:
print(f"Error: Service not found: {name}", file=sys.stderr)
sys.exit(1)
svc = data[name]
print(f"name|{name}")
print(f"ip|{svc.get('ip','')}")
print(f"desc|{svc.get('desc','')}")
print(f"tags|{','.join(svc.get('tags',[]))}")
for port_name, port_def in svc.get('ports', {}).items():
port = port_def.get('port', '')
proto = port_def.get('proto', 'tcp')
desc = port_def.get('desc', '')
print(f"port|{port_name}|{port}|{proto}|{desc}")
def net_exists(file, name):
"""Check if service exists"""
data = _net_read(file)
# Handle service:port syntax
if ':' in name:
svc_name, port_name = name.split(':', 1)
if port_name == 'ports':
print('true' if svc_name in data else 'false')
else:
svc = data.get(svc_name, {})
print('true' if port_name in svc.get('ports', {}) else 'false')
else:
print('true' if name in data else 'false')
def net_add_service(file, name, ip, desc='', tags=''):
"""Add or update a service"""
data = _net_read(file)
if name not in data:
data[name] = {'ip': ip, 'ports': {}}
else:
data[name]['ip'] = ip
if desc:
data[name]['desc'] = desc
if tags:
data[name]['tags'] = [t.strip() for t in tags.split(',') if t.strip()]
_net_write(file, data)
def net_add_port(file, service, port_name, port, proto='tcp', desc=''):
"""Add or update a port on a service"""
data = _net_read(file)
if service not in data:
print(f"Error: Service not found: {service}", file=sys.stderr)
sys.exit(1)
if 'ports' not in data[service]:
data[service]['ports'] = {}
entry = {'port': int(port), 'proto': proto}
if desc:
entry['desc'] = desc
data[service]['ports'][port_name] = entry
_net_write(file, data)
def net_remove(file, name):
"""Remove service or port"""
data = _net_read(file)
if ':' in name:
svc_name, port_name = name.split(':', 1)
if svc_name not in data:
print(f"Error: Service not found: {svc_name}", file=sys.stderr)
sys.exit(1)
if port_name == 'ports':
# Remove all ports
data[svc_name]['ports'] = {}
else:
if port_name not in data[svc_name].get('ports', {}):
print(f"Error: Port not found: {port_name}", file=sys.stderr)
sys.exit(1)
del data[svc_name]['ports'][port_name]
else:
if name not in data:
print(f"Error: Service not found: {name}", file=sys.stderr)
sys.exit(1)
del data[name]
_net_write(file, data)
def net_resolve(file, name):
"""Resolve service name to ip or ip:port:proto lines"""
data = _net_read(file)
if ':' in name:
svc_name, port_name = name.split(':', 1)
if svc_name not in data:
print(f"Error: Service not found: {svc_name}", file=sys.stderr)
sys.exit(1)
svc = data[svc_name]
ip = svc.get('ip', '')
if port_name == 'ports':
# All ports
for pname, pdef in svc.get('ports', {}).items():
print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}")
else:
if port_name not in svc.get('ports', {}):
print(f"Error: Port not found: {port_name}", file=sys.stderr)
sys.exit(1)
pdef = svc['ports'][port_name]
print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}")
else:
if name not in data:
print(f"Error: Service not found: {name}", file=sys.stderr)
sys.exit(1)
print(data[name].get('ip', ''))
def net_reverse_lookup(file, ip, port='', proto=''):
"""Reverse lookup IP/port to service name"""
data = _net_read(file)
for svc_name, svc in data.items():
if svc.get('ip') != ip:
continue
if not port:
print(svc_name)
return
for port_name, port_def in svc.get('ports', {}).items():
if (str(port_def.get('port','')) == str(port) and
port_def.get('proto','tcp') == proto):
print(f"{svc_name}:{port_name}")
return
# IP matched but no port match — return service name
print(svc_name)
return
commands = {
'get': lambda args: get(args[0], args[1]),
'set': lambda args: set_key(args[0], args[1], args[2]),
'delete': lambda args: delete_key(args[0], args[1]),
'append': lambda args: append(args[0], args[1], args[2]),
'remove': lambda args: remove_value(args[0], args[1], args[2]),
'cat': lambda args: cat(args[0]),
'has_key': lambda args: has_key(args[0], args[1]),
'filter_values': lambda args: filter_values(args[0], args[1], args[2]),
'last_event': lambda args: last_event(args[0], args[1], args[2], args[3]),
'events_for': lambda args: events_for(args[0], args[1], args[2]),
'fw_events': lambda args: fw_events(args[0], args[1], args[2], args[3], args[4]),
'wg_events': lambda args: wg_events(args[0], args[1], args[2], args[3]),
'format_fw_event': lambda args: format_fw_event(sys.stdin.read(), args[0]),
'format_wg_event': lambda args: format_wg_event(sys.stdin.read()),
'remove_events': lambda args: remove_events(args[0], args[1]),
'follow_logs': lambda args: follow_logs(args[0], args[1], args[2], args[3], args[4], args[5]),
'count': lambda args: count(args[0], args[1]),
'audit_fw_counts': lambda args: audit_fw_counts(args[0]),
'peer_group_map': lambda args: peer_group_map(args[0]),
'peer_groups': lambda args: peer_groups(args[0], args[1]),
'peer_data': lambda args: peer_data(args[0], args[1], args[2]),
'iso_to_ts': lambda args: iso_to_ts(args[0]),
'rule_list_data': lambda args: rule_list_data(args[0], args[1]),
'group_list_data': lambda args: group_list_data(args[0], args[1]),
'fmt_datetime': lambda args: fmt_datetime(args[0], args[1]),
'create_rule': lambda args: create_rule(
args[0], args[1], args[2], args[3], args[4], args[5], args[6],
args[7] if len(args) > 7 else '',
args[8] if len(args) > 8 else '',
args[9] if len(args) > 9 else ''
),
'cleanup_config': lambda args: cleanup_config(args[0]),
'remove_peer_block': lambda args: remove_peer_block(args[0], args[1]),
'create_group': lambda args: create_group(args[0], args[1], args[2]),
'parse_event': lambda args: parse_event(args[0]),
'parse_fw_event': lambda args: parse_fw_event(args[0]),
'peer_transfer': lambda args: peer_transfer(args[0]),
'remove_events_filtered': lambda args: remove_events_filtered(
args[0], args[1], args[2], args[3], args[4]=='true', args[5]=='true', args[6] if len(args)>6 else ''),
'peer_transfer': lambda args: peer_transfer(args[0]),
'peer_transfer_delta': lambda args: peer_transfer_delta(args[0], args[1]),
'rule_resolve': lambda args: rule_resolve(args[0], args[1]),
'rule_resolve_field': lambda args: rule_resolve_field(args[0], args[1], args[2]),
'rule_inspect': lambda args: rule_inspect(args[0], args[1]),
'find_rule_file': lambda args: print(find_rule_file(args[0], args[1])),
'get_raw': lambda args: print(get_raw(args[0], args[1])),
'count_resolved': lambda args: count_resolved(args[0], args[1], args[2]),
'block_get': lambda args: block_get(args[0]),
'block_is_blocked': lambda args: block_is_blocked(args[0]),
'block_set_direct': lambda args: block_set_direct(args[0], args[1], args[2]),
'block_add_group': lambda args: block_add_group(args[0], args[1], args[2]),
'block_remove_group': lambda args: block_remove_group(args[0], args[1], args[2]),
'block_add_rule': lambda args: block_add_rule(
args[0], args[1], args[2],
args[3] if len(args) > 3 else '',
args[4] if len(args) > 4 else '',
args[5] if len(args) > 5 else '',
args[6] if len(args) > 6 else ''
),
'block_remove_rule': lambda args: block_remove_rule(
args[0], args[1],
args[2] if len(args) > 2 else '',
args[3] if len(args) > 3 else '',
args[4] if len(args) > 4 else ''
),
'block_get_rules': lambda args: block_get_rules(args[0]),
'block_get_groups': lambda args: block_get_groups(args[0]),
'block_get_direct': lambda args: block_get_direct(args[0]),
'net_list': lambda args: net_list(args[0]),
'net_show': lambda args: net_show(args[0], args[1]),
'net_exists': lambda args: net_exists(args[0], args[1]),
'net_add_service': lambda args: net_add_service(
args[0], args[1], args[2],
args[3] if len(args) > 3 else '',
args[4] if len(args) > 4 else ''
),
'net_add_port': lambda args: net_add_port(
args[0], args[1], args[2], args[3],
args[4] if len(args) > 4 else 'tcp',
args[5] if len(args) > 5 else ''
),
'net_remove': lambda args: net_remove(args[0], args[1]),
'net_resolve': lambda args: net_resolve(args[0], args[1]),
'net_reverse_lookup': lambda args: net_reverse_lookup(
args[0], args[1],
args[2] if len(args) > 2 else '',
args[3] if len(args) > 3 else ''
),
'block_is_empty': lambda args: block_is_empty(args[0]),
}
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: json_helper.py <command> <file> [key] [value]", file=sys.stderr)
sys.exit(1)
cmd = sys.argv[1]
args = sys.argv[2:]
if cmd not in commands:
print(f"Unknown command: {cmd}", file=sys.stderr)
sys.exit(1)
commands[cmd](args)