#!/usr/bin/env python3 """ wgctl JSON helper — called by shell functions to read/write JSON files. Usage: json_helper.py [key] [value] """ import sys import json import os DATETIME_FMT = os.environ.get('WGCTL_DATETIME_FMT', '%Y-%m-%d %H:%M') def get(file, key): try: with open(file) as f: data = json.load(f) val = data.get(key, []) if isinstance(val, bool): print(str(val).lower()) # true/false not True/False elif isinstance(val, list): if val: print('\n'.join(str(v) for v in val)) else: if val: print(val) except: sys.exit(0) def set_key(file, key, value): try: data = {} if os.path.exists(file): with open(file) as f: data = json.load(f) # Try to parse as JSON value first (for arrays/bools) try: data[key] = json.loads(value) except: data[key] = value with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def delete_key(file, key): try: with open(file) as f: data = json.load(f) data.pop(key, None) with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def append(file, key, value): try: data = {} if os.path.exists(file): with open(file) as f: data = json.load(f) if key not in data: data[key] = [] if value not in data[key]: data[key].append(value) with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def remove_value(file, key, value): try: with open(file) as f: data = json.load(f) if key in data and value in data[key]: data[key].remove(value) with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def cat(file): try: with open(file) as f: data = json.load(f) print(json.dumps(data, indent=2)) except Exception as e: sys.exit(1) def has_key(file, key): try: with open(file) as f: data = json.load(f) sys.exit(0 if key in data else 1) except: sys.exit(1) def filter_values(file, key, value): """Remove all entries where value matches""" try: with open(file) as f: data = json.load(f) data = {k: v for k, v in data.items() if v != value} with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def last_event(file, key, field, client): """Get last event field for a client""" try: last = None with open(file) as f: for line in f: try: e = json.loads(line.strip()) if e.get(key) == client: last = e except: pass if last: print(last.get(field, '')) except: pass def events_for(file, ip, limit): """Format events for a given IP""" try: from datetime import datetime events = [] with open(file) as f: for line in f: try: e = json.loads(line.strip()) if e.get('ip') == ip: events.append(e) except: pass for e in events[-int(limit):]: ts = e.get('timestamp', '') try: dt = datetime.fromisoformat(ts) ts = dt.strftime(DATETIME_FMT) except: pass endpoint = e.get('endpoint', '—') client = e.get('client', '—') event = e.get('event', '—') print(f' {ts} {client:<20} {endpoint:<20} {event}') except: pass def fw_events(file, filter_ip, filter_type, clients_dir, limit): """Format firewall drop events with dedup and counts""" import glob proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'} ip_to_name = {} for conf in glob.glob(f"{clients_dir}/*.conf"): name = os.path.basename(conf).replace('.conf', '') try: with open(conf) as f: for line in f: if line.startswith('Address'): ip = line.split('=')[1].strip().split('/')[0] ip_to_name[ip] = name except: pass events = [] last_seen = {} try: with open(file) as f: for line in f: try: e = json.loads(line.strip()) src = e.get('src_ip', '') if not src: continue if filter_ip and src != filter_ip: continue dst = e.get('dest_ip', '') port = e.get('dest_port', '') proto = e.get('ip.protocol', 0) key = (src, dst, port, proto) ts_str = e.get('timestamp', '') try: from datetime import datetime ts = datetime.fromisoformat(ts_str).timestamp() except: ts = 0 windows = {1: 5, 6: 30, 17: 10} window = windows.get(proto, 10) if key in last_seen and (ts - last_seen[key]) < window: continue last_seen[key] = ts events.append(e) except: pass except: pass # Dedup consecutive same src+dst+port within 60s with count deduped = [] counts = [] for e in events: ts_str = e.get('timestamp', '') try: from datetime import datetime ts = datetime.fromisoformat(ts_str).timestamp() except: ts = 0 src = e.get('src_ip', '') dst = e.get('dest_ip', '') port = e.get('dest_port', '') proto = e.get('ip.protocol', 0) key = (src, dst, port, proto) if deduped and counts: prev = deduped[-1] try: prev_ts = datetime.fromisoformat(prev.get('timestamp','')).timestamp() except: prev_ts = 0 prev_key = (prev.get('src_ip',''), prev.get('dest_ip',''), prev.get('dest_port',''), prev.get('ip.protocol',0)) if key == prev_key and (ts - prev_ts) < 60: counts[-1] += 1 continue deduped.append(e) counts.append(1) grouped = [] group_counts = [] for e in deduped: ts_str = e.get('timestamp', '') try: from datetime import datetime dt = datetime.fromisoformat(ts_str) # Truncate to minute for grouping minute_key = dt.strftime('%Y-%m-%d %H:%M') except: minute_key = ts_str[:16] src = e.get('src_ip', '') dst = e.get('dest_ip', '') port = e.get('dest_port', '') proto = e.get('ip.protocol', 0) key = (minute_key, src, dst, proto) # group within same minute if grouped and group_counts: prev = grouped[-1] try: prev_dt = datetime.fromisoformat(prev.get('timestamp','')) prev_minute = prev_dt.strftime('%Y-%m-%d %H:%M') except: prev_minute = '' prev_key = (prev_minute, prev.get('src_ip',''), prev.get('dest_ip',''), prev.get('ip.protocol',0)) if key == prev_key: group_counts[-1] += 1 continue grouped.append(e) group_counts.append(1) for e, count in zip(deduped[-int(limit):], counts[-int(limit):]): ts = e.get('timestamp', '') try: from datetime import datetime dt = datetime.fromisoformat(ts) ts = dt.strftime(DATETIME_FMT) except: pass src = e.get('src_ip', '—') dst = e.get('dest_ip', '—') port = e.get('dest_port', '') proto_num = e.get('ip.protocol', 0) proto = proto_map.get(proto_num, str(proto_num)) dst_str = f"{dst}:{port}" if port else dst client = ip_to_name.get(src, src) if filter_type and not client.startswith(filter_type + '-'): continue count_str = f" (x{count})" if count > 1 else "" print(f"{ts}|{client}|{dst_str}|{proto}{count_str}") def wg_events(file, filter_client, filter_type, limit): """Format WireGuard events from events.log with dedup""" events = [] try: with open(file) as f: for line in f: try: e = json.loads(line.strip()) client = e.get('client', '') if not client: continue if filter_client and client != filter_client: continue if filter_type and not client.startswith(filter_type + '-'): continue events.append(e) except: pass except: pass # Dedup consecutive same client+event+endpoint within 60s deduped = [] counts = [] for e in events: ts_str = e.get('timestamp', '') try: from datetime import datetime ts = datetime.fromisoformat(ts_str).timestamp() except: ts = 0 client = e.get('client', '') event = e.get('event', '') endpoint = e.get('endpoint', '') key = (client, event, endpoint[:15]) if deduped and counts: prev = deduped[-1] prev_ts_str = prev.get('timestamp', '') try: prev_ts = datetime.fromisoformat(prev_ts_str).timestamp() except: prev_ts = 0 prev_key = (prev.get('client',''), prev.get('event',''), prev.get('endpoint','')[:15]) if key == prev_key and (ts - prev_ts) < 60: counts[-1] += 1 continue deduped.append(e) counts.append(1) for e, count in zip(deduped[-int(limit):], counts[-int(limit):]): ts = e.get('timestamp', '') try: from datetime import datetime dt = datetime.fromisoformat(ts) ts = dt.strftime(DATETIME_FMT) except: pass client = e.get('client', '—') endpoint = e.get('endpoint', '—') event = e.get('event', '—') count_str = f" (x{count})" if count > 1 else "" print(f"{ts}|{client}|{endpoint}|{event}{count_str}") def format_fw_event(line, clients_dir): """Format a single fw_event line""" import glob proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'} # Build ip->name map ip_to_name = {} for conf in glob.glob(f"{clients_dir}/*.conf"): name = os.path.basename(conf).replace('.conf', '') try: with open(conf) as f: for l in f: if l.startswith('Address'): ip = l.split('=')[1].strip().split('/')[0] ip_to_name[ip] = name except: pass try: e = json.loads(line.strip()) src = e.get('src_ip', '') if not src: return None ts = e.get('timestamp', '') try: from datetime import datetime dt = datetime.fromisoformat(ts) ts = dt.strftime(DATETIME_FMT) except: pass dst = e.get('dest_ip', '—') port = e.get('dest_port', '') proto_num = e.get('ip.protocol', 0) proto = proto_map.get(proto_num, str(proto_num)) dst_str = f"{dst}:{port}" if port else dst client = ip_to_name.get(src, src) return f"{ts}|{client}|{dst_str}|{proto}" except: return None def format_wg_event(line): """Format a single wg_event line""" try: e = json.loads(line.strip()) client = e.get('client', '') if not client: return None ts = e.get('timestamp', '') try: from datetime import datetime dt = datetime.fromisoformat(ts) ts = dt.strftime(DATETIME_FMT) except: pass endpoint = e.get('endpoint', '—') event = e.get('event', '—') return f"{ts}|{client}|{endpoint}|{event}|wg" except: return None def remove_events(file, identifier): """Remove all events for a client/ip from a JSONL file""" try: lines = [] with open(file) as f: for line in f: try: e = json.loads(line.strip()) if e.get('client') == identifier or e.get('src_ip') == identifier: continue lines.append(line) except: lines.append(line) with open(file, 'w') as f: f.writelines(lines) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def follow_logs(fw_file, wg_file, filter_ip, filter_type, clients_dir, filter_peers=""): """Follow both log files and output formatted events""" import glob, time, select proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'} peer_filter = set(filter_peers.split(',')) if filter_peers else set() # Build ip->name map ip_to_name = {} for conf in glob.glob(f"{clients_dir}/*.conf"): name = os.path.basename(conf).replace('.conf', '') try: with open(conf) as f: for l in f: if l.startswith('Address'): ip = l.split('=')[1].strip().split('/')[0] ip_to_name[ip] = name except: pass # Open files and seek to end files = {} for label, path in [('fw', fw_file), ('wg', wg_file)]: if path and os.path.exists(path): f = open(path) f.seek(0, 2) # seek to end files[label] = f dedup = {} try: while True: for label, f in files.items(): line = f.readline() if not line: continue try: e = json.loads(line.strip()) except: continue if label == 'fw': src = e.get('src_ip', '') if not src: continue if filter_ip and src != filter_ip: continue # Filter by peer names if specified if peer_filter: client_name = ip_to_name.get(src, '') if client_name not in peer_filter: continue dst = e.get('dest_ip', '—') port = e.get('dest_port', '') proto_num = e.get('ip.protocol', 0) proto = proto_map.get(proto_num, str(proto_num)) # Dedup key = (src, dst, port, proto_num) windows = {1: 5, 6: 30, 17: 10} window = windows.get(proto_num, 10) now = time.time() if key in dedup and (now - dedup[key]) < window: continue dedup[key] = now client = ip_to_name.get(src, src) if filter_type and not client.startswith(filter_type + '-'): continue dst_str = f"{dst}:{port}" if port else dst ts = e.get('timestamp', '')[:16].replace('T', ' ') print(f"fw|{ts}|{client}|{dst_str}|{proto}", flush=True) elif label == 'wg': client = e.get('client', '') if not client: continue if filter_ip: ip = ip_to_name.get(filter_ip, '') if client != ip and client != filter_ip: continue if peer_filter and client not in peer_filter: continue if filter_type and not client.startswith(filter_type + '-'): continue ts = e.get('timestamp', '')[:16].replace('T', ' ') endpoint = e.get('endpoint', '—') event = e.get('event', '—') print(f"wg|{ts}|{client}|{endpoint}|{event}", flush=True) time.sleep(0.1) except KeyboardInterrupt: pass def count(file, key): try: with open(file) as f: data = json.load(f) val = data.get(key, []) print(len(val) if isinstance(val, list) else 0) except: print(0) def audit_fw_counts(clients_dir): """Return peer_name:fw_count pairs from iptables and client configs""" import glob, subprocess # Get iptables output once try: result = subprocess.run( ['iptables', '-L', 'FORWARD', '-n'], capture_output=True, text=True ) fw_output = result.stdout except: fw_output = "" # Build ip->name and count rules for conf in glob.glob(f"{clients_dir}/*.conf"): name = os.path.basename(conf).replace('.conf', '') try: with open(conf) as f: for line in f: if line.startswith('Address'): ip = line.split('=')[1].strip().split('/')[0] count = fw_output.count(ip) print(f"{name}:{count}") break except: pass def peer_group_map(groups_dir): """Return peer:group pairs for all groups""" import glob try: for group_file in glob.glob(f"{groups_dir}/*.group"): try: with open(group_file) as f: g = json.load(f) name = g.get('name', '') for peer in g.get('peers', []): if peer: print(f"{peer}:{name}") except: pass except: pass def peer_groups(groups_dir, peer_name): """Find all groups containing a peer""" import glob try: for group_file in glob.glob(f"{groups_dir}/*.group"): try: with open(group_file) as f: g = json.load(f) if peer_name in g.get('peers', []): print(g.get('name', '')) except: pass except: pass def peer_data(clients_dir, meta_dir, events_log): import glob meta = {} for f in glob.glob(f"{meta_dir}/*.meta"): name = os.path.basename(f).replace('.meta', '') try: with open(f) as mf: meta[name] = json.load(mf) except: meta[name] = {} last_events = {} try: with open(events_log) as f: for line in f: try: e = json.loads(line.strip()) client = e.get('client', '') if client: last_events[client] = e except: pass except: pass for conf in sorted(glob.glob(f"{clients_dir}/*.conf")): name = os.path.basename(conf).replace('.conf', '') ip = '' try: with open(conf) as f: for line in f: if line.startswith('Address'): ip = line.split('=')[1].strip().split('/')[0] break except: pass m = meta.get(name, {}) rule = m.get('rule', '') subtype = m.get('subtype', '') last_event = last_events.get(name, {}) last_ts = last_event.get('timestamp', '') # raw ISO, no formatting last_evt = last_event.get('event', '') # fixed: was last_event print(f"{name}|{ip}|{rule}|{subtype}|{last_ts}|{last_evt}") def iso_to_ts(iso_str): """Convert ISO timestamp to unix timestamp""" try: from datetime import datetime, timezone dt = datetime.fromisoformat(iso_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) print(int(dt.timestamp())) except: print(0) def rule_list_data(rules_dir, meta_dir): """Return all rule data including base rules and extends""" import glob rule_peer_counts = {} for f in glob.glob(f"{meta_dir}/*.meta"): try: with open(f) as mf: meta = json.load(mf) rule = meta.get('rule', '') if rule: rule_peer_counts[rule] = rule_peer_counts.get(rule, 0) + 1 except: pass rule_files = ( sorted(glob.glob(f"{rules_dir}/*.rule")) + sorted(glob.glob(f"{rules_dir}/base/*.rule")) ) # Collect all data first rules_data = [] for rule_file in rule_files: is_base = '/base/' in rule_file try: with open(rule_file) as f: r = json.load(f) name = r.get('name', '') desc = r.get('desc', '') group = r.get('group', '') extends = ','.join(r.get('extends', [])) resolved = _rule_resolve_internal(rules_dir, name) n_allows = len(resolved.get('allow_ips', [])) + \ len(resolved.get('allow_ports', [])) n_blocks = len(resolved.get('block_ips', [])) + \ len(resolved.get('block_ports', [])) peer_count = rule_peer_counts.get(name, 0) rules_data.append({ 'name': name, 'desc': desc, 'n_allows': n_allows, 'n_blocks': n_blocks, 'peer_count': peer_count, 'extends': extends, 'is_base': is_base, 'group': group }) except: pass # Sort: non-base first, then by group (empty group last within non-base), # then by name within group rules_data.sort(key=lambda x: ( x['is_base'], x['group'] == '' and not x['is_base'], x['group'], x['name'] )) for r in rules_data: print(f"{r['name']}|{r['desc']}|{r['n_allows']}|{r['n_blocks']}|" f"{r['peer_count']}|{r['extends']}|{r['is_base']}|{r['group']}") def group_list_data(groups_dir, blocks_dir): """Return group summary data in one call""" import glob # Get all block files blocked_peers = set() for f in glob.glob(f"{blocks_dir}/*.block"): name = os.path.basename(f).replace('.block', '') blocked_peers.add(name) for group_file in sorted(glob.glob(f"{groups_dir}/*.group")): try: with open(group_file) as f: g = json.load(f) name = g.get('name', '') desc = g.get('desc', '') peers = [p for p in g.get('peers', []) if p] total = len(peers) blocked = sum(1 for p in peers if p in blocked_peers) print(f"{name}|{desc}|{total}|{blocked}") except: pass def fmt_datetime(iso_str, fmt): """Format ISO timestamp with given strftime format""" try: from datetime import datetime dt = datetime.fromisoformat(iso_str) print(dt.strftime(fmt)) except: print(iso_str) def create_rule(file, name, desc, dns_redirect, allow_ips, block_ips, block_ports, allow_ports='', extends='', group=''): rule = { 'name': name, 'desc': desc, 'group': group, 'dns_redirect': dns_redirect == 'true', 'extends': [x for x in extends.split(',') if x] if extends else [], 'allow_ips': [x for x in allow_ips.split(',') if x] if allow_ips else [], 'allow_ports': [x for x in allow_ports.split(',') if x] if allow_ports else [], 'block_ips': [x for x in block_ips.split(',') if x] if block_ips else [], 'block_ports': [x for x in block_ports.split(',') if x] if block_ports else [], } with open(file, 'w') as f: json.dump(rule, f, indent=2) def cleanup_config(config_file): """Normalize blank lines in WireGuard config""" import re try: with open(config_file) as f: config = f.read() config = re.sub(r'\n{3,}', '\n\n', config) config = config.rstrip('\n') + '\n' with open(config_file, 'w') as f: f.write(config) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def remove_peer_block(config_file, name): """Remove a peer block from WireGuard config by name""" import re try: with open(config_file) as f: config = f.read() pattern = r'\n\[Peer\]\n# ' + re.escape(name) + r'\n[^\n]+\n[^\n]+\n' result = re.sub(pattern, '\n', config) with open(config_file, 'w') as f: f.write(result) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def create_group(file, name, desc): """Create a new group JSON file""" try: group = {'name': name, 'desc': desc, 'peers': []} with open(file, 'w') as f: json.dump(group, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def parse_event(line): """Parse a single JSON event line""" try: e = json.loads(line) print(f"{e.get('timestamp','')}|{e.get('client','')}|{e.get('endpoint','')}|{e.get('event','')}") except: pass def parse_fw_event(line): """Parse a single fw_events.log JSON line""" try: e = json.loads(line) ts = e.get('timestamp', '') src = e.get('src_ip', '') dst = e.get('dest_ip', '') port = e.get('dest_port', '') proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'} proto_num = e.get('ip.protocol', 0) proto = proto_map.get(proto_num, str(proto_num)) print(f"{ts}|{src}|{dst}|{port}|{proto}") except: pass def peer_transfer(wg_interface): """Get total transfer bytes per peer""" import subprocess low = int(os.environ.get('ACTIVITY_TOTAL_LOW_BYTES', '1000000')) med = int(os.environ.get('ACTIVITY_TOTAL_MED_BYTES', '10000000')) high = int(os.environ.get('ACTIVITY_TOTAL_HIGH_BYTES', '100000000')) try: result = subprocess.run( ['wg', 'show', wg_interface, 'transfer'], capture_output=True, text=True ) for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split('\t') if len(parts) == 3: pubkey, rx, tx = parts total = int(rx) + int(tx) if total == 0: level = 'none' elif total < low: level = 'low' elif total < med: level = 'medium' elif total < high: level = 'high' else: level = 'very high' print(f"{pubkey}|{rx}|{tx}|{level}") except: pass def peer_transfer_delta(wg_interface, cache_file): """Calculate current transfer rate using delta from previous sample""" import subprocess, time low = int(os.environ.get('ACTIVITY_CURRENT_LOW_BYTES', '10000')) # 10KB/s med = int(os.environ.get('ACTIVITY_CURRENT_MED_BYTES', '100000')) # 100KB/s high = int(os.environ.get('ACTIVITY_CURRENT_HIGH_BYTES', '1000000')) # 1MB/s current = {} now = time.time() try: result = subprocess.run( ['wg', 'show', wg_interface, 'transfer'], capture_output=True, text=True ) for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split('\t') if len(parts) == 3: pubkey, rx, tx = parts current[pubkey] = {'rx': int(rx), 'tx': int(tx), 'ts': now} except: pass prev = {} if os.path.exists(cache_file): try: with open(cache_file) as f: prev = json.load(f) except: pass try: with open(cache_file, 'w') as f: json.dump(current, f) except: pass for pubkey, data in current.items(): if pubkey in prev: dt = data['ts'] - prev[pubkey].get('ts', data['ts']) if dt > 0: rx_rate = max(0, (data['rx'] - prev[pubkey]['rx']) / dt) tx_rate = max(0, (data['tx'] - prev[pubkey]['tx']) / dt) total = rx_rate + tx_rate if total <= 0: level = 'idle' elif total < low: level = 'low' elif total < med: level = 'medium' elif total < high: level = 'high' else: level = 'very high' print(f"{pubkey}|{int(rx_rate)}|{int(tx_rate)}|{level}") else: print(f"{pubkey}|0|0|idle") else: print(f"{pubkey}|0|0|unknown") def remove_events_filtered(wg_file, fw_file, filter_name, filter_ip, filter_fw, filter_wg, before_days): """Remove events with filters: by name/ip, source, or age""" import time from datetime import datetime, timezone cutoff_ts = None if before_days: cutoff_ts = time.time() - (float(before_days) * 86400) def should_remove_wg(e): if filter_name and e.get('client') != filter_name: return False if cutoff_ts: try: ts = datetime.fromisoformat(e.get('timestamp','')).timestamp() return ts < cutoff_ts except: return False return True def should_remove_fw(e): if filter_ip and e.get('src_ip') != filter_ip: return False if cutoff_ts: try: ts = datetime.fromisoformat(e.get('timestamp','')).timestamp() return ts < cutoff_ts except: return False return True removed_wg = removed_fw = 0 if not filter_fw and os.path.exists(wg_file): lines = [] with open(wg_file) as f: for line in f: try: e = json.loads(line.strip()) if should_remove_wg(e): removed_wg += 1 continue except: pass lines.append(line) with open(wg_file, 'w') as f: f.writelines(lines) if not filter_wg and os.path.exists(fw_file): lines = [] with open(fw_file) as f: for line in f: try: e = json.loads(line.strip()) if should_remove_fw(e): removed_fw += 1 continue except: pass lines.append(line) with open(fw_file, 'w') as f: f.writelines(lines) print(f"{removed_wg}|{removed_fw}") def _rule_resolve_internal(rules_dir, rule_name, visited=None): """Internal recursive resolver — returns dict, does not print""" if visited is None: visited = set() if rule_name in visited: raise ValueError(f"Circular dependency detected: {rule_name}") visited.add(rule_name) rule_file = find_rule_file(rules_dir, rule_name) with open(rule_file) as f: rule = json.load(f) merged = { 'allow_ips': [], 'allow_ports': [], 'block_ips': [], 'block_ports': [], 'dns_redirect': False } for base_name in rule.get('extends', []): base = _rule_resolve_internal(rules_dir, base_name, visited.copy()) merged['allow_ips'] += base.get('allow_ips', []) merged['allow_ports'] += base.get('allow_ports', []) merged['block_ips'] += base.get('block_ips', []) merged['block_ports'] += base.get('block_ports', []) if base.get('dns_redirect'): merged['dns_redirect'] = True # Merge own fields — use .get() with defaults for all fields merged['allow_ips'] = list(dict.fromkeys( merged['allow_ips'] + rule.get('allow_ips', []))) merged['allow_ports'] = list(dict.fromkeys( merged['allow_ports'] + rule.get('allow_ports', []))) merged['block_ips'] = list(dict.fromkeys( merged['block_ips'] + rule.get('block_ips', []))) merged['block_ports'] = list(dict.fromkeys( merged['block_ports'] + rule.get('block_ports', []))) if rule.get('dns_redirect', False): merged['dns_redirect'] = True merged['name'] = rule.get('name', rule_name) merged['desc'] = rule.get('desc', '') merged['group'] = rule.get('group', '') merged['extends'] = rule.get('extends', []) return merged def rule_resolve(rules_dir, rule_name): """Resolve a rule with inheritance — prints JSON""" try: resolved = _rule_resolve_internal(rules_dir, rule_name) print(json.dumps(resolved)) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def rule_resolve_field(rules_dir, rule_name, field): """Get a single field from resolved rule — prints values one per line""" try: resolved = _rule_resolve_internal(rules_dir, rule_name) val = resolved.get(field, []) if isinstance(val, list): for v in val: print(v) else: print(val) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def rule_inspect(rules_dir, rule_name): """Show inheritance tree for a rule""" try: rule_file = find_rule_file(rules_dir, rule_name) with open(rule_file) as f: rule = json.load(f) resolved = _rule_resolve_internal(rules_dir, rule_name) has_extends = bool(rule.get('extends', [])) # Own rules for ip in rule.get('allow_ips', []): print(f"own|allow_ip|{ip}") for p in rule.get('allow_ports', []): print(f"own|allow_port|{p}") for ip in rule.get('block_ips', []): print(f"own|block_ip|{ip}") for p in rule.get('block_ports', []): print(f"own|block_port|{p}") # DNS redirect — separate section if rule.get('dns_redirect'): print(f"dns|dns_redirect|true") if has_extends: # Inherited rules per base for base_name in rule.get('extends', []): base = _rule_resolve_internal(rules_dir, base_name) for ip in base.get('allow_ips', []): print(f"inherited:{base_name}|allow_ip|{ip}") for p in base.get('allow_ports', []): print(f"inherited:{base_name}|allow_port|{p}") for ip in base.get('block_ips', []): print(f"inherited:{base_name}|block_ip|{ip}") for p in base.get('block_ports', []): print(f"inherited:{base_name}|block_port|{p}") if base.get('dns_redirect'): print(f"inherited:{base_name}|dns_redirect|true") # Resolved summary only when inheritance exists has_resolved = ( resolved.get('allow_ips') or resolved.get('allow_ports') or resolved.get('block_ips') or resolved.get('block_ports') or resolved.get('dns_redirect') ) if has_resolved: for ip in resolved.get('allow_ips', []): print(f"resolved|allow_ip|{ip}") for p in resolved.get('allow_ports', []): print(f"resolved|allow_port|{p}") for ip in resolved.get('block_ips', []): print(f"resolved|block_ip|{ip}") for p in resolved.get('block_ports', []): print(f"resolved|block_port|{p}") if resolved.get('dns_redirect'): print(f"resolved|dns_redirect|true") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def find_rule_file(rules_dir, rule_name): """Find rule file in rules/ or rules/base/""" for path in [ os.path.join(rules_dir, f"{rule_name}.rule"), os.path.join(rules_dir, "base", f"{rule_name}.rule"), ]: if os.path.exists(path): return path return "" def get_raw(file, key): try: with open(file) as f: data = json.load(f) val = data.get(key) # returns None if missing if val is None: pass # print nothing elif isinstance(val, bool): print(str(val).lower()) elif isinstance(val, list): for v in val: print(v) else: print(val) except: pass commands = { 'get': lambda args: get(args[0], args[1]), 'set': lambda args: set_key(args[0], args[1], args[2]), 'delete': lambda args: delete_key(args[0], args[1]), 'append': lambda args: append(args[0], args[1], args[2]), 'remove': lambda args: remove_value(args[0], args[1], args[2]), 'cat': lambda args: cat(args[0]), 'has_key': lambda args: has_key(args[0], args[1]), 'filter_values': lambda args: filter_values(args[0], args[1], args[2]), 'last_event': lambda args: last_event(args[0], args[1], args[2], args[3]), 'events_for': lambda args: events_for(args[0], args[1], args[2]), 'fw_events': lambda args: fw_events(args[0], args[1], args[2], args[3], args[4]), 'wg_events': lambda args: wg_events(args[0], args[1], args[2], args[3]), 'format_fw_event': lambda args: format_fw_event(sys.stdin.read(), args[0]), 'format_wg_event': lambda args: format_wg_event(sys.stdin.read()), 'remove_events': lambda args: remove_events(args[0], args[1]), 'follow_logs': lambda args: follow_logs(args[0], args[1], args[2], args[3], args[4], args[5]), 'count': lambda args: count(args[0], args[1]), 'audit_fw_counts': lambda args: audit_fw_counts(args[0]), 'peer_group_map': lambda args: peer_group_map(args[0]), 'peer_groups': lambda args: peer_groups(args[0], args[1]), 'peer_data': lambda args: peer_data(args[0], args[1], args[2]), 'iso_to_ts': lambda args: iso_to_ts(args[0]), 'rule_list_data': lambda args: rule_list_data(args[0], args[1]), 'group_list_data': lambda args: group_list_data(args[0], args[1]), 'fmt_datetime': lambda args: fmt_datetime(args[0], args[1]), 'create_rule': lambda args: create_rule( args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7] if len(args) > 7 else '', args[8] if len(args) > 8 else '', args[9] if len(args) > 9 else '' ), 'cleanup_config': lambda args: cleanup_config(args[0]), 'remove_peer_block': lambda args: remove_peer_block(args[0], args[1]), 'create_group': lambda args: create_group(args[0], args[1], args[2]), 'parse_event': lambda args: parse_event(args[0]), 'parse_fw_event': lambda args: parse_fw_event(args[0]), 'peer_transfer': lambda args: peer_transfer(args[0]), 'remove_events_filtered': lambda args: remove_events_filtered( args[0], args[1], args[2], args[3], args[4]=='true', args[5]=='true', args[6] if len(args)>6 else ''), 'peer_transfer': lambda args: peer_transfer(args[0]), 'peer_transfer_delta': lambda args: peer_transfer_delta(args[0], args[1]), 'rule_resolve': lambda args: rule_resolve(args[0], args[1]), 'rule_resolve_field': lambda args: rule_resolve_field(args[0], args[1], args[2]), 'rule_inspect': lambda args: rule_inspect(args[0], args[1]), 'find_rule_file': lambda args: print(find_rule_file(args[0], args[1])), 'get_raw': lambda args: print(get_raw(args[0], args[1])), } if __name__ == '__main__': if len(sys.argv) < 2: print("Usage: json_helper.py [key] [value]", file=sys.stderr) sys.exit(1) cmd = sys.argv[1] args = sys.argv[2:] if cmd not in commands: print(f"Unknown command: {cmd}", file=sys.stderr) sys.exit(1) commands[cmd](args)