#!/usr/bin/env python3 """ wgctl JSON helper — called by shell functions to read/write JSON files. Usage: json_helper.py [key] [value] """ import sys import json import os DATETIME_FMT = os.environ.get('WGCTL_DATETIME_FMT', '%Y-%m-%d %H:%M') def get(file, key): try: with open(file) as f: data = json.load(f) val = data.get(key, []) if isinstance(val, bool): print(str(val).lower()) # true/false not True/False elif isinstance(val, list): if val: print('\n'.join(str(v) for v in val)) else: if val: print(val) except: sys.exit(0) def set_key(file, key, value): try: data = {} if os.path.exists(file): with open(file) as f: data = json.load(f) # Try to parse as JSON value first (for arrays/bools) try: data[key] = json.loads(value) except: data[key] = value with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def delete_key(file, key): try: with open(file) as f: data = json.load(f) data.pop(key, None) with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def append(file, key, value): try: data = {} if os.path.exists(file): with open(file) as f: data = json.load(f) if key not in data: data[key] = [] if value not in data[key]: data[key].append(value) with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def remove_value(file, key, value): try: with open(file) as f: data = json.load(f) if key in data and value in data[key]: data[key].remove(value) with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def cat(file): try: with open(file) as f: data = json.load(f) print(json.dumps(data, indent=2)) except Exception as e: sys.exit(1) def has_key(file, key): try: with open(file) as f: data = json.load(f) sys.exit(0 if key in data else 1) except: sys.exit(1) def filter_values(file, key, value): """Remove all entries where value matches""" try: with open(file) as f: data = json.load(f) data = {k: v for k, v in data.items() if v != value} with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def last_event(file, key, field, client): """Get last event field for a client""" try: last = None with open(file) as f: for line in f: try: e = json.loads(line.strip()) if e.get(key) == client: last = e except: pass if last: print(last.get(field, '')) except: pass def events_for(file, ip, limit): """Format events for a given IP""" try: from datetime import datetime events = [] with open(file) as f: for line in f: try: e = json.loads(line.strip()) if e.get('ip') == ip: events.append(e) except: pass for e in events[-int(limit):]: ts = e.get('timestamp', '') try: dt = datetime.fromisoformat(ts) ts = dt.strftime(DATETIME_FMT) except: pass endpoint = e.get('endpoint', '—') client = e.get('client', '—') event = e.get('event', '—') print(f' {ts} {client:<20} {endpoint:<20} {event}') except: pass def fw_events(file, filter_ip, filter_type, clients_dir, net_file, limit): """ Format firewall drop events with dedup, counts, and service annotation. Output per line: ts|client|dest_ip|dest_port|proto|service_name|count """ import glob from datetime import datetime proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'} # Build ip->name map ip_to_name = {} for conf in glob.glob(f"{clients_dir}/*.conf"): name = os.path.basename(conf).replace('.conf', '') try: with open(conf) as f: for line in f: if line.startswith('Address'): ip = line.split('=')[1].strip().split('/')[0] ip_to_name[ip] = name except Exception: pass # Load net services for reverse lookup — independent of rest of function net_data = {} if net_file and os.path.exists(net_file): try: with open(net_file) as f: net_data = json.load(f) except Exception: pass def reverse_lookup(dest_ip, dest_port, proto): for svc_name, svc in net_data.items(): if not isinstance(svc, dict): continue if svc.get('ip', '') != dest_ip: continue ports = svc.get('ports', {}) if dest_port: for port_name, port_def in ports.items(): if not isinstance(port_def, dict): continue if (str(port_def.get('port', '')) == str(dest_port) and port_def.get('proto', 'tcp') == proto): return f"{svc_name}:{port_name}" return svc_name return svc_name return '' # Parse and first-pass dedup (within time window per key) events = [] last_seen = {} try: with open(file) as f: for line in f: try: e = json.loads(line.strip()) src = e.get('src_ip', '') if not src: continue if filter_ip and src != filter_ip: continue proto_num = int(e.get('ip.protocol', 0)) proto = proto_map.get(proto_num, str(proto_num)) dst = e.get('dest_ip', '') port = str(e.get('dest_port', '')) key = (src, dst, port, proto_num) ts_str = e.get('timestamp', '') try: ts = datetime.fromisoformat(ts_str).timestamp() except Exception: ts = 0 windows = {1: 5, 6: 30, 17: 10} window = windows.get(proto_num, 10) if key in last_seen and (ts - last_seen[key]) < window: continue last_seen[key] = ts events.append(e) except Exception: pass except Exception: pass # Second-pass dedup consecutive same events with count deduped = [] counts = [] for e in events: ts_str = e.get('timestamp', '') try: ts = datetime.fromisoformat(ts_str).timestamp() except Exception: ts = 0 src = e.get('src_ip', '') dst = e.get('dest_ip', '') port = str(e.get('dest_port', '')) proto_num = int(e.get('ip.protocol', 0)) key = (src, dst, port, proto_num) if deduped: prev = deduped[-1] try: prev_ts = datetime.fromisoformat(prev.get('timestamp', '')).timestamp() except Exception: prev_ts = 0 prev_key = ( prev.get('src_ip', ''), prev.get('dest_ip', ''), str(prev.get('dest_port', '')), int(prev.get('ip.protocol', 0)) ) if key == prev_key and (ts - prev_ts) < 300: counts[-1] += 1 continue deduped.append(e) counts.append(1) limit = int(limit) if limit else 50 for e, count in list(zip(deduped, counts))[-limit:]: ts_str = e.get('timestamp', '') src = e.get('src_ip', '') dst = e.get('dest_ip', '') port = str(e.get('dest_port', '')) proto_num = int(e.get('ip.protocol', 0)) proto = proto_map.get(proto_num, str(proto_num)) client = ip_to_name.get(src, src) svc_name = reverse_lookup(dst, port, proto) try: dt = datetime.fromisoformat(ts_str) ts_fmt = dt.strftime(DATETIME_FMT) except Exception: ts_fmt = ts_str print(f"{ts_fmt}|{client}|{dst}|{port}|{proto}|{svc_name}|{count}") def wg_events(file, filter_client, filter_type, limit): """ Format WireGuard events with dedup and counts. Output per line: ts|client|endpoint|event|count """ from datetime import datetime events = [] try: with open(file) as f: for line in f: try: e = json.loads(line.strip()) client = e.get('client', '') if not client: continue if filter_client and client != filter_client: continue if filter_type and not client.startswith(filter_type + '-'): continue events.append(e) except Exception: pass except Exception: pass # Dedup consecutive same client+event+endpoint within 60s deduped = [] counts = [] for e in events: ts_str = e.get('timestamp', '') try: ts = datetime.fromisoformat(ts_str).timestamp() except Exception: ts = 0 client = e.get('client', '') event = e.get('event', '') endpoint = e.get('endpoint', '') key = (client, event, endpoint[:15]) if deduped: prev = deduped[-1] prev_ts_str = prev.get('timestamp', '') try: prev_ts = datetime.fromisoformat(prev_ts_str).timestamp() except Exception: prev_ts = 0 prev_key = ( prev.get('client', ''), prev.get('event', ''), prev.get('endpoint', '')[:15] ) if key == prev_key and (ts - prev_ts) < 300: counts[-1] += 1 continue deduped.append(e) counts.append(1) limit = int(limit) if limit else 50 for e, count in list(zip(deduped, counts))[-limit:]: ts_str = e.get('timestamp', '') client = e.get('client', '') endpoint = e.get('endpoint', '') event = e.get('event', '') try: dt = datetime.fromisoformat(ts_str) ts_fmt = dt.strftime('%d/%m %H:%M') except Exception: ts_fmt = ts_str print(f"{ts_fmt}|{client}|{endpoint}|{event}|{count}") def format_fw_event(line, clients_dir): """Format a single fw_event line""" import glob proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'} # Build ip->name map ip_to_name = {} for conf in glob.glob(f"{clients_dir}/*.conf"): name = os.path.basename(conf).replace('.conf', '') try: with open(conf) as f: for l in f: if l.startswith('Address'): ip = l.split('=')[1].strip().split('/')[0] ip_to_name[ip] = name except: pass try: e = json.loads(line.strip()) src = e.get('src_ip', '') if not src: return None ts = e.get('timestamp', '') try: from datetime import datetime dt = datetime.fromisoformat(ts) ts = dt.strftime(DATETIME_FMT) except: pass dst = e.get('dest_ip', '—') port = e.get('dest_port', '') proto_num = e.get('ip.protocol', 0) proto = proto_map.get(proto_num, str(proto_num)) dst_str = f"{dst}:{port}" if port else dst client = ip_to_name.get(src, src) return f"{ts}|{client}|{dst_str}|{proto}" except: return None def format_wg_event(line): """Format a single wg_event line""" try: e = json.loads(line.strip()) client = e.get('client', '') if not client: return None ts = e.get('timestamp', '') try: from datetime import datetime dt = datetime.fromisoformat(ts) ts = dt.strftime(DATETIME_FMT) except: pass endpoint = e.get('endpoint', '—') event = e.get('event', '—') return f"{ts}|{client}|{endpoint}|{event}|wg" except: return None def remove_events(file, identifier): """Remove all events for a client/ip from a JSONL file""" try: lines = [] with open(file) as f: for line in f: try: e = json.loads(line.strip()) if e.get('client') == identifier or e.get('src_ip') == identifier: continue lines.append(line) except: lines.append(line) with open(file, 'w') as f: f.writelines(lines) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def follow_logs(fw_file, wg_file, filter_ip, filter_type, clients_dir, filter_peers=""): """Follow both log files and output formatted events""" import glob, time, select proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'} peer_filter = set(filter_peers.split(',')) if filter_peers else set() # Build ip->name map ip_to_name = {} for conf in glob.glob(f"{clients_dir}/*.conf"): name = os.path.basename(conf).replace('.conf', '') try: with open(conf) as f: for l in f: if l.startswith('Address'): ip = l.split('=')[1].strip().split('/')[0] ip_to_name[ip] = name except: pass # Open files and seek to end files = {} for label, path in [('fw', fw_file), ('wg', wg_file)]: if path and os.path.exists(path): f = open(path) f.seek(0, 2) # seek to end files[label] = f dedup = {} try: while True: for label, f in files.items(): line = f.readline() if not line: continue try: e = json.loads(line.strip()) except: continue if label == 'fw': src = e.get('src_ip', '') if not src: continue if filter_ip and src != filter_ip: continue # Filter by peer names if specified if peer_filter: client_name = ip_to_name.get(src, '') if client_name not in peer_filter: continue dst = e.get('dest_ip', '—') port = e.get('dest_port', '') proto_num = e.get('ip.protocol', 0) proto = proto_map.get(proto_num, str(proto_num)) # Dedup key = (src, dst, port, proto_num) windows = {1: 5, 6: 30, 17: 10} window = windows.get(proto_num, 10) now = time.time() if key in dedup and (now - dedup[key]) < window: continue dedup[key] = now client = ip_to_name.get(src, src) if filter_type and not client.startswith(filter_type + '-'): continue dst_str = f"{dst}:{port}" if port else dst ts = e.get('timestamp', '')[:16].replace('T', ' ') print(f"fw|{ts}|{client}|{dst_str}|{proto}", flush=True) elif label == 'wg': client = e.get('client', '') if not client: continue if filter_ip: ip = ip_to_name.get(filter_ip, '') if client != ip and client != filter_ip: continue if peer_filter and client not in peer_filter: continue if filter_type and not client.startswith(filter_type + '-'): continue ts = e.get('timestamp', '')[:16].replace('T', ' ') endpoint = e.get('endpoint', '—') event = e.get('event', '—') print(f"wg|{ts}|{client}|{endpoint}|{event}", flush=True) time.sleep(0.1) except KeyboardInterrupt: pass def count(file, key): try: with open(file) as f: data = json.load(f) val = data.get(key, []) print(len(val) if isinstance(val, list) else 0) except: print(0) def audit_fw_counts(clients_dir): """Return peer_name:fw_count pairs from iptables""" import glob, subprocess, re try: result = subprocess.run( ['iptables', '-L', 'FORWARD', '-n', '-v'], capture_output=True, text=True ) fw_lines = result.stdout.splitlines() except Exception: fw_lines = [] # Filter to only data lines (skip headers and blanks) # In -v output, source IP is in column 8 (0-indexed) # Format: pkts bytes target prot opt in out source destination [options] rule_lines = [l for l in fw_lines if l.strip() and not l.startswith('Chain') and not l.startswith(' pkts')] for conf in sorted(glob.glob(f"{clients_dir}/*.conf")): name = os.path.basename(conf).replace('.conf', '') try: with open(conf) as f: ip = '' for line in f: if line.startswith('Address'): ip = line.split('=')[1].strip().split('/')[0] break if not ip: continue # Count lines where source column exactly matches the peer IP count = sum(1 for l in rule_lines if re.search(r'\s' + re.escape(ip) + r'\s', l)) print(f"{name}:{count}") except Exception: pass def peer_group_map(groups_dir): """Return peer:group pairs for all groups""" import glob try: for group_file in glob.glob(f"{groups_dir}/*.group"): try: with open(group_file) as f: g = json.load(f) name = g.get('name', '') for peer in g.get('peers', []): if peer: print(f"{peer}:{name}") except: pass except: pass def peer_groups(groups_dir, peer_name): """Find all groups containing a peer""" import glob try: for group_file in glob.glob(f"{groups_dir}/*.group"): try: with open(group_file) as f: g = json.load(f) if peer_name in g.get('peers', []): print(g.get('name', '')) except: pass except: pass def iso_to_ts(iso_str): """Convert ISO timestamp to unix timestamp""" try: from datetime import datetime, timezone dt = datetime.fromisoformat(iso_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) print(int(dt.timestamp())) except: print(0) def rule_list_data(rules_dir, meta_dir): """Return all rule data including base rules and extends""" import glob rule_peer_counts = {} for f in glob.glob(f"{meta_dir}/*.meta"): try: with open(f) as mf: meta = json.load(mf) rule = meta.get('rule', '') if rule: rule_peer_counts[rule] = rule_peer_counts.get(rule, 0) + 1 except: pass rule_files = ( sorted(glob.glob(f"{rules_dir}/*.rule")) + sorted(glob.glob(f"{rules_dir}/base/*.rule")) ) # Collect all data first rules_data = [] for rule_file in rule_files: is_base = '/base/' in rule_file try: with open(rule_file) as f: r = json.load(f) name = r.get('name', '') desc = r.get('desc', '') group = r.get('group', '') extends = ','.join(r.get('extends', [])) resolved = _rule_resolve_internal(rules_dir, name) n_allows = len(resolved.get('allow_ips', [])) + \ len(resolved.get('allow_ports', [])) n_blocks = len(resolved.get('block_ips', [])) + \ len(resolved.get('block_ports', [])) peer_count = rule_peer_counts.get(name, 0) rules_data.append({ 'name': name, 'desc': desc, 'n_allows': n_allows, 'n_blocks': n_blocks, 'peer_count': peer_count, 'extends': extends, 'is_base': is_base, 'group': group }) except: pass # Sort: non-base first, then by group (empty group last within non-base), # then by name within group rules_data.sort(key=lambda x: ( x['is_base'], x['group'] == '' and not x['is_base'], x['group'], x['name'] )) for r in rules_data: print(f"{r['name']}|{r['desc']}|{r['n_allows']}|{r['n_blocks']}|" f"{r['peer_count']}|{r['extends']}|{r['is_base']}|{r['group']}") def group_list_data(groups_dir, blocks_dir): """Return group summary data in one call""" import glob # Get all block files blocked_peers = set() for f in glob.glob(f"{blocks_dir}/*.block"): name = os.path.basename(f).replace('.block', '') blocked_peers.add(name) for group_file in sorted(glob.glob(f"{groups_dir}/*.group")): try: with open(group_file) as f: g = json.load(f) name = g.get('name', '') desc = g.get('desc', '') peers = [p for p in g.get('peers', []) if p] total = len(peers) blocked = sum(1 for p in peers if p in blocked_peers) print(f"{name}|{desc}|{total}|{blocked}") except: pass def fmt_datetime(iso_str, fmt): """Format ISO timestamp with given strftime format""" try: from datetime import datetime dt = datetime.fromisoformat(iso_str) print(dt.strftime(fmt)) except: print(iso_str) def create_rule(file, name, desc, dns_redirect, allow_ips, block_ips, block_ports, allow_ports='', extends='', group=''): rule = { 'name': name, 'desc': desc, 'group': group, 'dns_redirect': dns_redirect == 'true', 'extends': [x for x in extends.split(',') if x] if extends else [], 'allow_ips': [x for x in allow_ips.split(',') if x] if allow_ips else [], 'allow_ports': [x for x in allow_ports.split(',') if x] if allow_ports else [], 'block_ips': [x for x in block_ips.split(',') if x] if block_ips else [], 'block_ports': [x for x in block_ports.split(',') if x] if block_ports else [], } with open(file, 'w') as f: json.dump(rule, f, indent=2) def cleanup_config(config_file): """Normalize blank lines in WireGuard config""" import re try: with open(config_file) as f: config = f.read() config = re.sub(r'\n{3,}', '\n\n', config) config = config.rstrip('\n') + '\n' with open(config_file, 'w') as f: f.write(config) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def remove_peer_block(config_file, name): """Remove a peer block from WireGuard config by name""" import re try: with open(config_file) as f: config = f.read() pattern = r'\n\[Peer\]\n# ' + re.escape(name) + r'\n[^\n]+\n[^\n]+\n' result = re.sub(pattern, '\n', config) with open(config_file, 'w') as f: f.write(result) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def create_group(file, name, desc): """Create a new group JSON file""" try: group = {'name': name, 'desc': desc, 'peers': []} with open(file, 'w') as f: json.dump(group, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def parse_event(line): """Parse a single JSON event line""" try: e = json.loads(line) print(f"{e.get('timestamp','')}|{e.get('client','')}|{e.get('endpoint','')}|{e.get('event','')}") except: pass def parse_fw_event(line): """Parse a single fw_events.log JSON line""" try: e = json.loads(line) ts = e.get('timestamp', '') src = e.get('src_ip', '') dst = e.get('dest_ip', '') port = e.get('dest_port', '') proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'} proto_num = e.get('ip.protocol', 0) proto = proto_map.get(proto_num, str(proto_num)) print(f"{ts}|{src}|{dst}|{port}|{proto}") except: pass def peer_transfer(wg_interface): """Get total transfer bytes per peer""" import subprocess low = int(os.environ.get('ACTIVITY_TOTAL_LOW_BYTES', '1000000')) med = int(os.environ.get('ACTIVITY_TOTAL_MED_BYTES', '10000000')) high = int(os.environ.get('ACTIVITY_TOTAL_HIGH_BYTES', '100000000')) try: result = subprocess.run( ['wg', 'show', wg_interface, 'transfer'], capture_output=True, text=True ) for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split('\t') if len(parts) == 3: pubkey, rx, tx = parts total = int(rx) + int(tx) if total == 0: level = 'none' elif total < low: level = 'low' elif total < med: level = 'medium' elif total < high: level = 'high' else: level = 'very high' print(f"{pubkey}|{rx}|{tx}|{level}") except: pass def peer_transfer_delta(wg_interface, cache_file): """Calculate current transfer rate using delta from previous sample""" import subprocess, time low = int(os.environ.get('ACTIVITY_CURRENT_LOW_BYTES', '10000')) # 10KB/s med = int(os.environ.get('ACTIVITY_CURRENT_MED_BYTES', '100000')) # 100KB/s high = int(os.environ.get('ACTIVITY_CURRENT_HIGH_BYTES', '1000000')) # 1MB/s current = {} now = time.time() try: result = subprocess.run( ['wg', 'show', wg_interface, 'transfer'], capture_output=True, text=True ) for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split('\t') if len(parts) == 3: pubkey, rx, tx = parts current[pubkey] = {'rx': int(rx), 'tx': int(tx), 'ts': now} except: pass prev = {} if os.path.exists(cache_file): try: with open(cache_file) as f: prev = json.load(f) except: pass try: with open(cache_file, 'w') as f: json.dump(current, f) except: pass for pubkey, data in current.items(): if pubkey in prev: dt = data['ts'] - prev[pubkey].get('ts', data['ts']) if dt > 0: rx_rate = max(0, (data['rx'] - prev[pubkey]['rx']) / dt) tx_rate = max(0, (data['tx'] - prev[pubkey]['tx']) / dt) total = rx_rate + tx_rate if total <= 0: level = 'idle' elif total < low: level = 'low' elif total < med: level = 'medium' elif total < high: level = 'high' else: level = 'very high' print(f"{pubkey}|{int(rx_rate)}|{int(tx_rate)}|{level}") else: print(f"{pubkey}|0|0|idle") else: print(f"{pubkey}|0|0|unknown") def remove_events_filtered(wg_file, fw_file, filter_name, filter_ip, filter_fw, filter_wg, before_days): """Remove events with filters: by name/ip, source, or age""" import time from datetime import datetime, timezone cutoff_ts = None if before_days: cutoff_ts = time.time() - (float(before_days) * 86400) def should_remove_wg(e): if filter_name and e.get('client') != filter_name: return False if cutoff_ts: try: ts = datetime.fromisoformat(e.get('timestamp','')).timestamp() return ts < cutoff_ts except: return False return True def should_remove_fw(e): if filter_ip and e.get('src_ip') != filter_ip: return False if cutoff_ts: try: ts = datetime.fromisoformat(e.get('timestamp','')).timestamp() return ts < cutoff_ts except: return False return True removed_wg = removed_fw = 0 if not filter_fw and os.path.exists(wg_file): lines = [] with open(wg_file) as f: for line in f: try: e = json.loads(line.strip()) if should_remove_wg(e): removed_wg += 1 continue except: pass lines.append(line) with open(wg_file, 'w') as f: f.writelines(lines) if not filter_wg and os.path.exists(fw_file): lines = [] with open(fw_file) as f: for line in f: try: e = json.loads(line.strip()) if should_remove_fw(e): removed_fw += 1 continue except: pass lines.append(line) with open(fw_file, 'w') as f: f.writelines(lines) print(f"{removed_wg}|{removed_fw}") def _rule_resolve_internal(rules_dir, rule_name, visited=None): """Internal recursive resolver — returns dict, does not print""" if visited is None: visited = set() if rule_name in visited: raise ValueError(f"Circular dependency detected: {rule_name}") visited.add(rule_name) rule_file = find_rule_file(rules_dir, rule_name) with open(rule_file) as f: rule = json.load(f) merged = { 'allow_ips': [], 'allow_ports': [], 'block_ips': [], 'block_ports': [], 'dns_redirect': False } for base_name in rule.get('extends', []): base = _rule_resolve_internal(rules_dir, base_name, visited.copy()) merged['allow_ips'] += base.get('allow_ips', []) merged['allow_ports'] += base.get('allow_ports', []) merged['block_ips'] += base.get('block_ips', []) merged['block_ports'] += base.get('block_ports', []) if base.get('dns_redirect'): merged['dns_redirect'] = True # Merge own fields — use .get() with defaults for all fields merged['allow_ips'] = list(dict.fromkeys( merged['allow_ips'] + rule.get('allow_ips', []))) merged['allow_ports'] = list(dict.fromkeys( merged['allow_ports'] + rule.get('allow_ports', []))) merged['block_ips'] = list(dict.fromkeys( merged['block_ips'] + rule.get('block_ips', []))) merged['block_ports'] = list(dict.fromkeys( merged['block_ports'] + rule.get('block_ports', []))) if rule.get('dns_redirect', False): merged['dns_redirect'] = True merged['name'] = rule.get('name', rule_name) merged['desc'] = rule.get('desc', '') merged['group'] = rule.get('group', '') merged['extends'] = rule.get('extends', []) return merged def rule_resolve(rules_dir, rule_name): """Resolve a rule with inheritance — prints JSON""" try: resolved = _rule_resolve_internal(rules_dir, rule_name) print(json.dumps(resolved)) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def rule_resolve_field(rules_dir, rule_name, field): """Get a single field from resolved rule — prints values one per line""" try: resolved = _rule_resolve_internal(rules_dir, rule_name) val = resolved.get(field, []) if isinstance(val, list): for v in val: print(v) else: print(val) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def rule_inspect(rules_dir, rule_name): """Show inheritance tree for a rule""" try: rule_file = find_rule_file(rules_dir, rule_name) with open(rule_file) as f: rule = json.load(f) resolved = _rule_resolve_internal(rules_dir, rule_name) has_extends = bool(rule.get('extends', [])) # Own rules for ip in rule.get('allow_ips', []): print(f"own|allow_ip|{ip}") for p in rule.get('allow_ports', []): print(f"own|allow_port|{p}") for ip in rule.get('block_ips', []): print(f"own|block_ip|{ip}") for p in rule.get('block_ports', []): print(f"own|block_port|{p}") # DNS redirect — separate section if rule.get('dns_redirect'): print(f"dns|dns_redirect|true") if has_extends: # Inherited rules per base for base_name in rule.get('extends', []): base = _rule_resolve_internal(rules_dir, base_name) for ip in base.get('allow_ips', []): print(f"inherited:{base_name}|allow_ip|{ip}") for p in base.get('allow_ports', []): print(f"inherited:{base_name}|allow_port|{p}") for ip in base.get('block_ips', []): print(f"inherited:{base_name}|block_ip|{ip}") for p in base.get('block_ports', []): print(f"inherited:{base_name}|block_port|{p}") if base.get('dns_redirect'): print(f"inherited:{base_name}|dns_redirect|true") # Resolved summary only when inheritance exists has_resolved = ( resolved.get('allow_ips') or resolved.get('allow_ports') or resolved.get('block_ips') or resolved.get('block_ports') or resolved.get('dns_redirect') ) if has_resolved: for ip in resolved.get('allow_ips', []): print(f"resolved|allow_ip|{ip}") for p in resolved.get('allow_ports', []): print(f"resolved|allow_port|{p}") for ip in resolved.get('block_ips', []): print(f"resolved|block_ip|{ip}") for p in resolved.get('block_ports', []): print(f"resolved|block_port|{p}") if resolved.get('dns_redirect'): print(f"resolved|dns_redirect|true") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def find_rule_file(rules_dir, rule_name): """Find rule file in rules/ or rules/base/""" for path in [ os.path.join(rules_dir, f"{rule_name}.rule"), os.path.join(rules_dir, "base", f"{rule_name}.rule"), ]: if os.path.exists(path): return path return "" def get_raw(file, key): try: with open(file) as f: data = json.load(f) val = data.get(key) # returns None if missing if val is None: pass # print nothing elif isinstance(val, bool): print(str(val).lower()) elif isinstance(val, list): for v in val: print(v) else: print(val) except: pass def count_resolved(rules_dir, rule_name, key): """Count entries in resolved rule field""" try: resolved = _rule_resolve_internal(rules_dir, rule_name) print(len(resolved.get(key, []))) except: print(0) def _block_init(peer_ip): """Return empty block structure""" return { "peer_ip": peer_ip, "blocked_direct": False, "blocked_by_groups": [], "rules": [] } def _block_read(file): try: with open(file) as f: content = f.read().strip() if not content: return None # empty file = no block data try: return json.loads(content) except json.JSONDecodeError: # Old format — migrate lines = content.split('\n') peer_ip = lines[0].split()[0] if lines else '' new_data = { "peer_ip": peer_ip, "blocked_direct": True, "blocked_by_groups": [], "rules": [{"name": "full block", "type": "full"}] } with open(file, 'w') as f: json.dump(new_data, f, indent=2) return new_data except FileNotFoundError: return None except Exception: return None def _block_write(file, data): """Write block file""" with open(file, 'w') as f: json.dump(data, f, indent=2) def block_get(file): """Read and print block file as JSON""" data = _block_read(file) if data: print(json.dumps(data)) def block_is_blocked(file): """Return true if peer is effectively blocked""" data = _block_read(file) if not data: print("false") return blocked = data.get("blocked_direct", False) or \ bool(data.get("blocked_by_groups", [])) print("true" if blocked else "false") def block_set_direct(file, peer_ip, value): """Set blocked_direct""" try: data = _block_read(file) or _block_init(peer_ip) data["blocked_direct"] = value.lower() == "true" data["peer_ip"] = peer_ip _block_write(file, data) remaining = data["blocked_direct"] or bool(data.get("blocked_by_groups", [])) pass # print("true" if remaining else "false") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def block_add_group(file, peer_ip, group): """Add group to blocked_by_groups""" try: data = _block_read(file) or _block_init(peer_ip) data["peer_ip"] = peer_ip groups = data.get("blocked_by_groups", []) if group not in groups: groups.append(group) data["blocked_by_groups"] = groups _block_write(file, data) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def block_remove_group(file, peer_ip, group): """Remove group from blocked_by_groups, return whether still blocked""" try: data = _block_read(file) or _block_init(peer_ip) groups = data.get("blocked_by_groups", []) if group in groups: groups.remove(group) data["blocked_by_groups"] = groups _block_write(file, data) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) # print("true" if remaining else "false") pass except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def block_add_rule(file, peer_ip, rule_type, name="", target="", port="", proto=""): """Add a block rule entry""" try: data = _block_read(file) or _block_init(peer_ip) data["peer_ip"] = peer_ip rule = {"type": rule_type} if name: rule["name"] = name if target: rule["target"] = target if port: rule["port"] = port if proto: rule["proto"] = proto rules = data.get("rules", []) for existing in rules: if existing.get("type") == rule_type and \ existing.get("target","") == target and \ existing.get("port","") == port and \ existing.get("proto","") == proto: return # already exists, skip rules.append(rule) data["rules"] = rules _block_write(file, data) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def block_remove_rule(file, rule_type, target="", port="", proto=""): data = _block_read(file) if not data: return rules = data.get("rules", []) filtered = [r for r in rules if not ( r.get("type") == rule_type and r.get("target", "") == target and r.get("port", "") == port and r.get("proto", "") == proto )] data["rules"] = filtered _block_write(file, data) def block_get_rules(file): """Print rules as pipe-separated lines: name|type|target|port|proto""" data = _block_read(file) if not data: return for r in data.get("rules", []): print(f"{r.get('name','')}|{r.get('type','')}|" f"{r.get('target','')}|{r.get('port','')}|{r.get('proto','')}") def block_get_groups(file): data = _block_read(file) if not data: return print(','.join(data.get('blocked_by_groups', []))) def block_get_direct(file): data = _block_read(file) if not data: print('false') return print('true' if data.get('blocked_direct', False) else 'false') # ============================================ # Net / Services # ============================================ def _net_read(file): """Read services.json, return dict or empty dict""" try: if not os.path.exists(file): return {} with open(file) as f: content = f.read().strip() if not content: return {} return json.loads(content) except Exception: return {} def _net_write(file, data): """Write services.json""" os.makedirs(os.path.dirname(file), exist_ok=True) with open(file, 'w') as f: json.dump(data, f, indent=2) def net_list(file): """List all service names with IP and port count""" data = _net_read(file) for name, svc in sorted(data.items()): ip = svc.get('ip', '') desc = svc.get('desc', '') tags = ','.join(svc.get('tags', [])) ports = len(svc.get('ports', {})) print(f"{name}|{ip}|{desc}|{tags}|{ports}") def net_show(file, name): """Show full service details""" data = _net_read(file) if name not in data: print(f"Error: Service not found: {name}", file=sys.stderr) sys.exit(1) svc = data[name] print(f"name|{name}") print(f"ip|{svc.get('ip','')}") print(f"desc|{svc.get('desc','')}") print(f"tags|{','.join(svc.get('tags',[]))}") for port_name, port_def in svc.get('ports', {}).items(): port = port_def.get('port', '') proto = port_def.get('proto', 'tcp') desc = port_def.get('desc', '') print(f"port|{port_name}|{port}|{proto}|{desc}") def net_exists(file, name): """Check if service exists""" data = _net_read(file) # Handle service:port syntax if ':' in name: svc_name, port_name = name.split(':', 1) if port_name == 'ports': print('true' if svc_name in data else 'false') else: svc = data.get(svc_name, {}) print('true' if port_name in svc.get('ports', {}) else 'false') else: print('true' if name in data else 'false') def net_add_service(file, name, ip, desc='', tags=''): """Add or update a service""" data = _net_read(file) if name not in data: data[name] = {'ip': ip, 'ports': {}} else: data[name]['ip'] = ip if desc: data[name]['desc'] = desc if tags: data[name]['tags'] = [t.strip() for t in tags.split(',') if t.strip()] _net_write(file, data) def net_add_port(file, service, port_name, port, proto='tcp', desc=''): """Add or update a port on a service""" data = _net_read(file) if service not in data: print(f"Error: Service not found: {service}", file=sys.stderr) sys.exit(1) if 'ports' not in data[service]: data[service]['ports'] = {} entry = {'port': int(port), 'proto': proto} if desc: entry['desc'] = desc data[service]['ports'][port_name] = entry _net_write(file, data) def net_remove(file, name): """Remove service or port""" data = _net_read(file) if ':' in name: svc_name, port_name = name.split(':', 1) if svc_name not in data: print(f"Error: Service not found: {svc_name}", file=sys.stderr) sys.exit(1) if port_name == 'ports': # Remove all ports data[svc_name]['ports'] = {} else: if port_name not in data[svc_name].get('ports', {}): print(f"Error: Port not found: {port_name}", file=sys.stderr) sys.exit(1) del data[svc_name]['ports'][port_name] else: if name not in data: print(f"Error: Service not found: {name}", file=sys.stderr) sys.exit(1) del data[name] _net_write(file, data) def net_resolve(file, name): """Resolve service name to ip or ip:port:proto lines""" data = _net_read(file) if ':' in name: svc_name, port_name = name.split(':', 1) if svc_name not in data: print(f"Error: Service not found: {svc_name}", file=sys.stderr) sys.exit(1) svc = data[svc_name] ip = svc.get('ip', '') if port_name == 'ports': # All ports for pname, pdef in svc.get('ports', {}).items(): print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}") else: if port_name not in svc.get('ports', {}): print(f"Error: Port not found: {port_name}", file=sys.stderr) sys.exit(1) pdef = svc['ports'][port_name] print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}") else: if name not in data: print(f"Error: Service not found: {name}", file=sys.stderr) sys.exit(1) print(data[name].get('ip', '')) def net_reverse_lookup(file, ip, port='', proto=''): """Reverse lookup IP/port to service name""" data = _net_read(file) for svc_name, svc in data.items(): if svc.get('ip') != ip: continue if not port: print(svc_name) return for port_name, port_def in svc.get('ports', {}).items(): if (str(port_def.get('port','')) == str(port) and port_def.get('proto','tcp') == proto): print(f"{svc_name}:{port_name}") return # IP matched but no port match — return service name print(svc_name) return def block_is_empty(file): data = _block_read(file) if not data: print("true") return empty = ( not data.get("blocked_direct", False) and not data.get("blocked_by_groups", []) and not data.get("rules", []) and not data.get("services", []) ) print("true" if empty else "false") def group_has_peer(file, peer_name): try: with open(file) as f: data = json.load(f) peers = data.get('peers', []) print('true' if peer_name in peers else 'false') except Exception: print('false') # ============================================ # Subnet Map # ============================================ def _subnet_read(file): """Read subnets.json, return dict or empty dict""" try: if not os.path.exists(file): return {} with open(file) as f: content = f.read().strip() if not content: return {} return json.loads(content) except Exception: return {} def _subnet_write(file, data): """Write subnets.json""" os.makedirs(os.path.dirname(file), exist_ok=True) with open(file, 'w') as f: json.dump(data, f, indent=2) def _subnet_is_group(entry): """Return True if a subnet entry is a nested group (like 'guests')""" return isinstance(entry, dict) and 'subnet' not in entry def subnet_lookup(file, name, type_key=''): """ Resolve a subnet name (and optional type) to a CIDR string. For scalar entries: subnet_lookup(file, "desktop") -> "10.1.1.0/24" For group entries: subnet_lookup(file, "guests", "phone") -> "10.1.103.0/24" For group with no type: subnet_lookup(file, "guests") -> "10.1.100.0/24" (none slot) Prints the CIDR on success, nothing and exits 1 on failure. """ data = _subnet_read(file) if name not in data: sys.exit(1) entry = data[name] if _subnet_is_group(entry): key = type_key if type_key else 'none' if key not in entry: sys.exit(1) print(entry[key]['subnet']) else: print(entry['subnet']) def subnet_type(file, name, type_key=''): """ Return the type string for a subnet entry. For scalar: subnet_type(file, "desktop") -> "desktop" For group: subnet_type(file, "guests", "phone") -> "phone" subnet_type(file, "guests") -> "none" """ data = _subnet_read(file) if name not in data: sys.exit(1) entry = data[name] if _subnet_is_group(entry): key = type_key if type_key else 'none' if key not in entry: sys.exit(1) # For group entries the type IS the child key print(key if key != 'none' else 'none') else: print(entry.get('type', name)) def subnet_tunnel_mode(file, name, type_key=''): """Return tunnel_mode for a subnet entry""" data = _subnet_read(file) if name not in data: print('split') # safe default return entry = data[name] if _subnet_is_group(entry): key = type_key if type_key else 'none' child = entry.get(key, {}) print(child.get('tunnel_mode', 'split')) else: print(entry.get('tunnel_mode', 'split')) def subnet_for_ip(file, ip): """ Reverse lookup: given a peer IP, find which subnet name (and type) it belongs to. Output: name|type (e.g. "guests|phone" or "desktop|desktop") Returns nothing (exit 0) if not found — caller falls back to hardcoded map. """ import ipaddress try: peer_addr = ipaddress.ip_address(ip) except ValueError: return data = _subnet_read(file) for name, entry in data.items(): if _subnet_is_group(entry): for type_key, child in entry.items(): try: network = ipaddress.ip_network(child['subnet'], strict=False) if peer_addr in network: print(f"{name}|{type_key}") return except Exception: continue else: try: network = ipaddress.ip_network(entry['subnet'], strict=False) if peer_addr in network: peer_type = entry.get('type', name) print(f"{name}|{peer_type}") return except Exception: continue def subnet_list(file): """ List all subnets for display. Output per line: name|subnet|type|tunnel_mode|desc|is_group For group entries, outputs one line per child: name.type|subnet|type|tunnel_mode|desc|true """ data = _subnet_read(file) for name, entry in data.items(): if _subnet_is_group(entry): for type_key, child in entry.items(): display_name = f"{name}.{type_key}" if type_key != 'none' else name print(f"{display_name}|{child.get('subnet','')}|" f"{type_key}|{child.get('tunnel_mode','split')}|" f"{child.get('desc','')}|true|{name}") else: print(f"{name}|{entry.get('subnet','')}|" f"{entry.get('type',name)}|{entry.get('tunnel_mode','split')}|" f"{entry.get('desc','')}|false|{name}") def subnet_show(file, name): """Show a single subnet entry (scalar or group) in detail""" data = _subnet_read(file) if name not in data: print(f"Error: Subnet '{name}' not found", file=sys.stderr) sys.exit(1) entry = data[name] if _subnet_is_group(entry): print(f"name|{name}") print(f"is_group|true") for type_key, child in entry.items(): print(f"child|{type_key}|{child.get('subnet','')}|" f"{child.get('tunnel_mode','split')}|{child.get('desc','')}") else: print(f"name|{name}") print(f"is_group|false") print(f"subnet|{entry.get('subnet','')}") print(f"type|{entry.get('type',name)}") print(f"tunnel_mode|{entry.get('tunnel_mode','split')}") print(f"desc|{entry.get('desc','')}") def subnet_add(file, name, subnet, type_key, tunnel_mode, desc, group_parent=''): """ Add a new subnet entry. If group_parent is set, adds as a child under that group key. Otherwise adds as a scalar entry. """ data = _subnet_read(file) entry = { 'subnet': subnet, 'tunnel_mode': tunnel_mode or 'split', 'desc': desc or '' } if group_parent: # Adding a child to an existing group, or creating a new group if group_parent not in data: data[group_parent] = {} elif not _subnet_is_group(data[group_parent]): print(f"Error: '{group_parent}' exists but is not a group", file=sys.stderr) sys.exit(1) data[group_parent][type_key or 'none'] = entry else: # Scalar entry — type stored explicitly entry['type'] = type_key or name data[name] = entry _subnet_write(file, data) def subnet_remove(file, name, peers_using): """ Remove a subnet entry. peers_using is a comma-separated list of peer names currently using this subnet (passed in from bash after meta scan). If non-empty, refuses with error. """ if peers_using: peers = [p for p in peers_using.split(',') if p] if peers: print(f"Error: Subnet '{name}' is in use by: {', '.join(peers)}", file=sys.stderr) sys.exit(1) data = _subnet_read(file) if '.' in name: # Removing a group child: e.g. "guests.phone" parent, child_key = name.split('.', 1) if parent not in data or not _subnet_is_group(data[parent]): print(f"Error: Group '{parent}' not found", file=sys.stderr) sys.exit(1) if child_key not in data[parent]: print(f"Error: '{child_key}' not found in group '{parent}'", file=sys.stderr) sys.exit(1) del data[parent][child_key] if not data[parent]: del data[parent] # remove empty group else: if name not in data: print(f"Error: Subnet '{name}' not found", file=sys.stderr) sys.exit(1) del data[name] _subnet_write(file, data) def subnet_rename(file, old_name, new_name, peers_using): """ Rename a subnet entry. Hard refusal if any peers reference it. peers_using: comma-separated peer names from bash meta scan. """ if peers_using: peers = [p for p in peers_using.split(',') if p] if peers: print(f"Error: Cannot rename subnet '{old_name}' — in use by: {', '.join(peers)}", file=sys.stderr) sys.exit(1) data = _subnet_read(file) if old_name not in data: print(f"Error: Subnet '{old_name}' not found", file=sys.stderr) sys.exit(1) if new_name in data: print(f"Error: Subnet '{new_name}' already exists", file=sys.stderr) sys.exit(1) data[new_name] = data.pop(old_name) _subnet_write(file, data) def subnet_peers(meta_dir, clients_dir, subnet_name, subnets_file): """ Find all peers using a subnet. Two-pass check: 1. Meta field: peer has "subnet": subnet_name in their .meta file 2. IP fallback: peer's IP falls within the subnet's CIDR(s) (catches peers added before meta stored subnet explicitly) Output: one peer name per line. """ import glob import ipaddress # Resolve all CIDRs covered by this subnet name data = _subnet_read(subnets_file) cidrs = [] if subnet_name in data: entry = data[subnet_name] if _subnet_is_group(entry): for child in entry.values(): try: cidrs.append(ipaddress.ip_network(child['subnet'], strict=False)) except Exception: pass else: try: cidrs.append(ipaddress.ip_network(entry['subnet'], strict=False)) except Exception: pass printed = set() for conf in sorted(glob.glob(f"{clients_dir}/*.conf")): peer_name = os.path.basename(conf).replace('.conf', '') # Pass 1: check meta field meta_file = os.path.join(meta_dir, f"{peer_name}.meta") try: with open(meta_file) as f: meta = json.load(f) if meta.get('subnet', '') == subnet_name: if peer_name not in printed: print(peer_name) printed.add(peer_name) continue except Exception: pass # Pass 2: IP reverse lookup against subnet CIDRs if not cidrs: continue peer_ip = '' try: with open(conf) as f: for line in f: if line.startswith('Address'): peer_ip = line.split('=')[1].strip().split('/')[0] break except Exception: continue if not peer_ip: continue try: addr = ipaddress.ip_address(peer_ip) if any(addr in cidr for cidr in cidrs): if peer_name not in printed: print(peer_name) printed.add(peer_name) except Exception: continue def subnet_exists(file, name): """Check if a subnet name exists (scalar or group). Exits 0/1.""" data = _subnet_read(file) if '.' in name: parent, child_key = name.split('.', 1) exists = parent in data and _subnet_is_group(data[parent]) and child_key in data[parent] else: exists = name in data sys.exit(0 if exists else 1) # ============================================ # Identity System # ============================================ def identity_rules(file): """ Return all rules assigned to an identity, one per line. Reads from 'rules' array (1:N). Falls back to 'rule' scalar for migration. """ data = _identity_read(file) if not data: return # Support legacy scalar 'rule' field rules = data.get('rules', []) if not rules and data.get('rule'): rules = [data['rule']] for r in rules: if r: print(r) def identity_add_rule(file, identity_name, rule_name): """ Add a rule to an identity's rules array. Warns if already present (prints warning to stderr, exits 2). Creates identity file if it doesn't exist. """ data = _identity_read(file) or _identity_init(identity_name) rules = data.get('rules', []) # Migrate legacy scalar field if 'rule' in data and data['rule']: if data['rule'] not in rules: rules.append(data['rule']) del data['rule'] if rule_name in rules: print(f"Warning: Rule '{rule_name}' is already assigned to identity '{identity_name}'", file=sys.stderr) sys.exit(2) rules.append(rule_name) data['rules'] = rules _identity_write(file, data) def identity_remove_rule(file, rule_name): """ Remove a specific rule from an identity's rules array. Exits 1 if rule not found. """ data = _identity_read(file) if not data: print(f"Error: Identity not found", file=sys.stderr) sys.exit(1) rules = data.get('rules', []) if rule_name not in rules: print(f"Error: Rule '{rule_name}' not assigned to this identity", file=sys.stderr) sys.exit(1) rules.remove(rule_name) data['rules'] = rules _identity_write(file, data) def identity_clear_rules(file): """Remove all rules from an identity.""" data = _identity_read(file) if not data: return data['rules'] = [] data.pop('rule', None) # remove legacy scalar too _identity_write(file, data) def identity_has_rule(file, rule_name): """Exit 0 if identity has this rule, 1 otherwise.""" data = _identity_read(file) if not data: sys.exit(1) rules = data.get('rules', []) if not rules and data.get('rule'): rules = [data['rule']] sys.exit(0 if rule_name in rules else 1) def _identity_read(file): """Read an identity file, return dict or None""" try: if not os.path.exists(file): return None with open(file) as f: content = f.read().strip() if not content: return None return json.loads(content) except Exception: return None def _identity_write(file, data): """Write an identity file""" os.makedirs(os.path.dirname(file), exist_ok=True) with open(file, 'w') as f: json.dump(data, f, indent=2) def _identity_init(name): """Return empty identity structure""" return { 'name': name, 'peers': [], 'devices': {} } def _parse_peer_name(peer_name): """ Parse a peer name into (type, identity, index). phone-nuno -> ('phone', 'nuno', 1) phone-nuno-2 -> ('phone', 'nuno', 2) desktop-zephyr -> ('desktop', 'zephyr', 1) laptop-nuno -> ('laptop', 'nuno', 1) Returns None if name doesn't match convention. Convention: {type}-{identity}[-{index}] Known types: desktop, laptop, phone, tablet, server, iot, none """ known_types = {'desktop', 'laptop', 'phone', 'tablet', 'server', 'iot', 'none'} parts = peer_name.split('-') if len(parts) < 2: return None peer_type = parts[0] if peer_type not in known_types: return None # Check if last part is a numeric index if len(parts) >= 3 and parts[-1].isdigit(): index = int(parts[-1]) identity = '-'.join(parts[1:-1]) else: index = 1 identity = '-'.join(parts[1:]) if not identity: return None return (peer_type, identity, index) def identity_list(identities_dir): """ List all identities with peer count, rules and policy. Output per line: name|peer_count|types|rules|policy """ import glob for id_file in sorted(glob.glob(f"{identities_dir}/*.identity")): try: with open(id_file) as f: data = json.load(f) name = data.get('name', '') peers = data.get('peers', []) devices = data.get('devices', {}) rules = data.get('rules', []) # Migrate legacy scalar rule field if not rules and data.get('rule'): rules = [data['rule']] policy = data.get('policy', 'default') types = sorted(set( d.get('type', '') for d in devices.values() if d.get('type') )) print(f"{name}|{len(peers)}|{','.join(types)}|{','.join(rules)}|{policy}") except Exception: continue def identity_show(file): """Show identity details""" data = _identity_read(file) if not data: print("Error: Identity not found", file=sys.stderr) sys.exit(1) print(f"name|{data.get('name','')}") print(f"peer_count|{len(data.get('peers',[]))}") for peer_name, dev in data.get('devices', {}).items(): print(f"device|{peer_name}|{dev.get('type','')}|{dev.get('index',1)}") def identity_add_peer(file, identity_name, peer_name, peer_type, index): """Add a peer to an identity file, creating it if needed""" data = _identity_read(file) or _identity_init(identity_name) if peer_name not in data['peers']: data['peers'].append(peer_name) data['devices'][peer_name] = { 'type': peer_type, 'index': int(index) } _identity_write(file, data) def identity_remove_peer(file, peer_name): """Remove a peer from an identity file""" data = _identity_read(file) if not data: return data['peers'] = [p for p in data['peers'] if p != peer_name] data['devices'].pop(peer_name, None) _identity_write(file, data) def identity_remove(file): """Delete an identity file — existence check done in bash""" try: os.remove(file) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def identity_next_index(file, peer_type): """ Return the next available index for a given type within an identity. phone-nuno exists (index 1) -> returns 2 No phones exist -> returns 1 """ data = _identity_read(file) if not data: print(1) return existing = [ d.get('index', 1) for d in data.get('devices', {}).values() if d.get('type') == peer_type ] if not existing: print(1) return # Find lowest unused index starting from 1 used = set(existing) i = 1 while i in used: i += 1 print(i) def identity_peers(file, filter_type=''): """ List peers belonging to an identity, optionally filtered by type. Output: one peer name per line. """ data = _identity_read(file) if not data: return for peer_name in data.get('peers', []): if filter_type: dev = data.get('devices', {}).get(peer_name, {}) if dev.get('type') != filter_type: continue print(peer_name) def identity_migrate(identities_dir, clients_dir, meta_dir, dry_run): """ Scan all peer configs and auto-create identity files from name convention. dry_run: 'true' -> print what would be done, no writes. Output per action: action|identity|peer|type|index """ import glob is_dry = dry_run == 'true' grouped = {} # identity_name -> [(peer_name, type, index)] for conf in sorted(glob.glob(f"{clients_dir}/*.conf")): peer_name = os.path.basename(conf).replace('.conf', '') parsed = _parse_peer_name(peer_name) if not parsed: print(f"skip|{peer_name}") continue peer_type, identity_name, index = parsed if identity_name not in grouped: grouped[identity_name] = [] grouped[identity_name].append((peer_name, peer_type, index)) for identity_name, peers in sorted(grouped.items()): id_file = os.path.join(identities_dir, f"{identity_name}.identity") for peer_name, peer_type, index in peers: print(f"create|{identity_name}|{peer_name}|{peer_type}|{index}") if not is_dry: identity_add_peer(id_file, identity_name, peer_name, peer_type, index) def identity_infer(peer_name): """ Parse a peer name and print identity|type|index, or nothing if no match. Used by add.command.sh to auto-attach on vanilla wgctl add. """ parsed = _parse_peer_name(peer_name) if parsed: peer_type, identity_name, index = parsed print(f"{identity_name}|{peer_type}|{index}") def identity_exists(file): """Exit 0 if identity file exists and is valid, else exit 1""" data = _identity_read(file) sys.exit(0 if data is not None else 1) # ============================================ # peer_data update — adds type field from meta # ============================================ # NOTE: This replaces the existing peer_data function. # The new version reads 'type' from meta directly. # Output format: name|ip|rule|type|last_ts|last_evt|main_group def peer_data(clients_dir, meta_dir, events_log): """ Updated peer_data that reads 'type' from meta. Output: name|ip|rule|type|last_ts|last_evt|main_group """ import glob meta = {} for f in glob.glob(f"{meta_dir}/*.meta"): name = os.path.basename(f).replace('.meta', '') try: with open(f) as mf: meta[name] = json.load(mf) except Exception: meta[name] = {} last_events = {} try: with open(events_log) as f: for line in f: try: e = json.loads(line.strip()) client = e.get('client', '') if client: last_events[client] = e except Exception: pass except Exception: pass for conf in sorted(glob.glob(f"{clients_dir}/*.conf")): name = os.path.basename(conf).replace('.conf', '') ip = '' try: with open(conf) as f: for line in f: if line.startswith('Address'): ip = line.split('=')[1].strip().split('/')[0] break except Exception: pass m = meta.get(name, {}) rule = m.get('rule', '') peer_type = m.get('type', '') main_group = m.get('main_group', '') last_event = last_events.get(name, {}) last_ts = last_event.get('timestamp', '') last_evt = last_event.get('event', '') print(f"{name}|{ip}|{rule}|{peer_type}|{last_ts}|{last_evt}|{main_group}") def subnet_default_rule(file, name, type_key=''): """ Return the default_rule for a subnet entry, or empty string if none set. For scalar: subnet_default_rule(file, "desktop") -> "" For group: subnet_default_rule(file, "guests", "phone") -> "guest" """ data = _subnet_read(file) if name not in data: print('') return entry = data[name] if _subnet_is_group(entry): key = type_key if type_key else 'none' child = entry.get(key, {}) print(child.get('default_rule', '')) else: print(entry.get('default_rule', '')) def subnet_list_names(file): """ List all top-level subnet names, one per line. Used for dynamic flag registration in commands. Output: one name per line (e.g. desktop, laptop, guests, servers, iot) """ data = _subnet_read(file) for name in data.keys(): print(name) # ============================================ # Policy System # ============================================ _POLICY_DEFAULTS = { "default": { "tunnel_mode": "split", "default_rule": None, "strict_rule": False, "auto_apply": True, "desc": "Default policy" }, "guest": { "tunnel_mode": "split", "default_rule": "guest", "strict_rule": True, "auto_apply": True, "desc": "Guest access policy" }, "trusted": { "tunnel_mode": "split", "default_rule": None, "strict_rule": False, "auto_apply": True, "desc": "Trusted device policy" }, "server": { "tunnel_mode": "split", "default_rule": None, "strict_rule": False, "auto_apply": True, "desc": "Server policy" }, "iot": { "tunnel_mode": "split", "default_rule": None, "strict_rule": False, "auto_apply": True, "desc": "IoT device policy" } } def _policy_read(file): """Read policies.json, fall back to hardcoded defaults if missing.""" try: if not os.path.exists(file): return dict(_POLICY_DEFAULTS) with open(file) as f: content = f.read().strip() if not content: return dict(_POLICY_DEFAULTS) data = json.loads(content) # Merge with defaults so hardcoded policies always exist merged = dict(_POLICY_DEFAULTS) merged.update(data) return merged except Exception: return dict(_POLICY_DEFAULTS) def _policy_write(file, data): """Write policies.json.""" os.makedirs(os.path.dirname(file), exist_ok=True) with open(file, 'w') as f: json.dump(data, f, indent=2) def _policy_get_entry(data, name): """Get a policy entry, falling back to 'default' policy values for missing fields.""" entry = data.get(name, {}) default = data.get('default', _POLICY_DEFAULTS.get('default', {})) # Merge: entry fields override default resolved = dict(default) resolved.update(entry) return resolved def policy_get(file, name, field=''): """ Get a policy entry or a specific field from it. policy_get(file, "guest") -> prints all fields as key|value lines policy_get(file, "guest", "tunnel_mode") -> prints "split" policy_get(file, "guest", "strict_rule") -> prints "true" or "false" """ data = _policy_read(file) entry = _policy_get_entry(data, name) if field: val = entry.get(field) if val is None: print('') elif isinstance(val, bool): print('true' if val else 'false') else: print(val) else: for k, v in entry.items(): if isinstance(v, bool): print(f"{k}|{'true' if v else 'false'}") elif v is None: print(f"{k}|") else: print(f"{k}|{v}") def policy_list(file): """ List all policies. Output per line: name|tunnel_mode|default_rule|strict_rule|auto_apply|desc """ data = _policy_read(file) for name, raw_entry in data.items(): entry = _policy_get_entry(data, name) tunnel_mode = entry.get('tunnel_mode', 'split') default_rule = entry.get('default_rule') or '' strict_rule = 'true' if entry.get('strict_rule', False) else 'false' auto_apply = 'true' if entry.get('auto_apply', True) else 'false' desc = entry.get('desc', '') print(f"{name}|{tunnel_mode}|{default_rule}|{strict_rule}|{auto_apply}|{desc}") def policy_exists(file, name): """Exit 0 if policy exists, 1 otherwise.""" data = _policy_read(file) sys.exit(0 if name in data else 1) def policy_add(file, name, tunnel_mode, default_rule, strict_rule, auto_apply, desc): """Add or update a policy entry.""" data = _policy_read(file) data[name] = { 'tunnel_mode': tunnel_mode or 'split', 'default_rule': default_rule if default_rule else None, 'strict_rule': strict_rule == 'true', 'auto_apply': auto_apply != 'false', 'desc': desc or '' } _policy_write(file, data) def policy_remove(file, name): """Remove a policy. Refuses to remove hardcoded defaults.""" if name in _POLICY_DEFAULTS: print(f"Error: Cannot remove built-in policy '{name}'", file=sys.stderr) sys.exit(1) data = _policy_read(file) if name not in data: print(f"Error: Policy '{name}' not found", file=sys.stderr) sys.exit(1) del data[name] _policy_write(file, data) def policy_set_field(file, name, field, value): """Set a single field on an existing policy.""" data = _policy_read(file) if name not in data: print(f"Error: Policy '{name}' not found", file=sys.stderr) sys.exit(1) entry = data[name] if field in ('strict_rule', 'auto_apply'): entry[field] = value == 'true' elif field == 'default_rule': entry[field] = value if value else None else: entry[field] = value data[name] = entry _policy_write(file, data) def subnet_policy(subnets_file, subnet_name, type_key=''): """ Get the policy name for a subnet entry. Falls back to 'default' if no policy set. """ data = _subnet_read(subnets_file) if subnet_name not in data: print('default') return entry = data[subnet_name] if _subnet_is_group(entry): key = type_key if type_key else 'none' child = entry.get(key, {}) print(child.get('policy', 'default')) else: print(entry.get('policy', 'default')) def json_get_nested(file, *keys): """ Get a nested field from a JSON file. json_get_nested(file, "rule_flags", "strict_rule") Output: the value as a string, or empty string if not found. """ try: with open(file) as f: data = json.load(f) val = data for key in keys: if not isinstance(val, dict): print('') return val = val.get(key) if val is None: print('') return if isinstance(val, bool): print('true' if val else 'false') elif val is None: print('') else: print(val) except Exception: print('') def json_set_nested(file, *args): """ Set a nested field in a JSON file. Args: file, key1, key2, ..., value json_set_nested(file, "rule_flags", "strict_rule", "true") Creates intermediate dicts as needed. """ if len(args) < 2: return keys = args[:-1] value = args[-1] try: if os.path.exists(file): with open(file) as f: data = json.load(f) else: data = {} # Navigate/create nested structure target = data for key in keys[:-1]: if key not in target or not isinstance(target[key], dict): target[key] = {} target = target[key] # Coerce value types final_key = keys[-1] if value == 'true': target[final_key] = True elif value == 'false': target[final_key] = False else: target[final_key] = value with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def activity_aggregate(fw_file, wg_file, wg_interface, net_file, clients_dir, meta_dir, hours, filter_peer, filter_service_ip): """ Aggregate activity data for wgctl activity. Output: peer|name|rx_bytes|tx_bytes|drop_count service|peer_name|dest_display|drop_count """ import glob import subprocess from datetime import datetime, timezone, timedelta from collections import defaultdict hours = int(hours) if hours else 24 cutoff = None if hours > 0: cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) # Build ip -> peer name map ip_to_peer = {} for conf in sorted(glob.glob(f"{clients_dir}/*.conf")): name = os.path.basename(conf).replace('.conf', '') try: with open(conf) as f: for line in f: if line.startswith('Address'): ip = line.split('=')[1].strip().split('/')[0] ip_to_peer[ip] = name break except Exception: continue # Build pubkey -> peer name map pubkey_to_peer = {} for kf in glob.glob(f"{clients_dir}/*_public.key"): name = os.path.basename(kf).replace('_public.key', '') try: with open(kf) as f: key = f.read().strip() if key: pubkey_to_peer[key] = name except Exception: continue # Get WireGuard transfer totals peer_rx = defaultdict(int) peer_tx = defaultdict(int) try: result = subprocess.run( ['wg', 'show', wg_interface, 'transfer'], capture_output=True, text=True ) for line in result.stdout.strip().splitlines(): parts = line.split() if len(parts) >= 3: pubkey, rx, tx = parts[0], int(parts[1]), int(parts[2]) peer = pubkey_to_peer.get(pubkey) if peer: peer_rx[peer] += rx peer_tx[peer] += tx except Exception: pass # Load net services for reverse lookup net_data = {} if os.path.exists(net_file): try: with open(net_file) as f: net_data = json.load(f) except Exception: pass def reverse_lookup(dest_ip, dest_port, proto): for svc_name, svc in net_data.items(): if not isinstance(svc, dict): continue if svc.get('ip', '') != dest_ip: continue ports = svc.get('ports', {}) if dest_port: for port_name, port_def in ports.items(): if not isinstance(port_def, dict): continue if (str(port_def.get('port', '')) == str(dest_port) and port_def.get('proto', 'tcp') == proto): return f"{svc_name}:{port_name}" return svc_name return svc_name return '' # Parse and first-pass dedup (within time window per key) events = [] last_seen = {} try: with open(file) as f: for line in f: try: e = json.loads(line.strip()) src = e.get('src_ip', '') if not src: continue if filter_ip and src != filter_ip: continue proto_num = int(e.get('ip.protocol', 0)) proto = proto_map.get(proto_num, str(proto_num)) dst = e.get('dest_ip', '') port = str(e.get('dest_port', '')) key = (src, dst, port, proto_num) ts_str = e.get('timestamp', '') try: ts = datetime.fromisoformat(ts_str).timestamp() except Exception: ts = 0 windows = {1: 5, 6: 30, 17: 10} window = windows.get(proto_num, 10) if key in last_seen and (ts - last_seen[key]) < window: continue last_seen[key] = ts events.append(e) except Exception: pass except Exception: pass # Second-pass dedup consecutive same events with count deduped = [] counts = [] for e in events: ts_str = e.get('timestamp', '') try: ts = datetime.fromisoformat(ts_str).timestamp() except Exception: ts = 0 src = e.get('src_ip', '') dst = e.get('dest_ip', '') port = str(e.get('dest_port', '')) proto_num = int(e.get('ip.protocol', 0)) key = (src, dst, port, proto_num) if deduped: prev = deduped[-1] try: prev_ts = datetime.fromisoformat(prev.get('timestamp', '')).timestamp() except Exception: prev_ts = 0 prev_key = ( prev.get('src_ip', ''), prev.get('dest_ip', ''), str(prev.get('dest_port', '')), int(prev.get('ip.protocol', 0)) ) if key == prev_key and (ts - prev_ts) < 300: counts[-1] += 1 continue deduped.append(e) counts.append(1) limit = int(limit) if limit else 50 for e, count in list(zip(deduped, counts))[-limit:]: ts_str = e.get('timestamp', '') src = e.get('src_ip', '') dst = e.get('dest_ip', '') port = str(e.get('dest_port', '')) proto_num = int(e.get('ip.protocol', 0)) proto = proto_map.get(proto_num, str(proto_num)) client = ip_to_name.get(src, src) svc_name = reverse_lookup(dst, port, proto) try: dt = datetime.fromisoformat(ts_str) ts_fmt = dt.strftime(DATETIME_FMT) except Exception: ts_fmt = ts_str print(f"{ts_fmt}|{client}|{dst}|{port}|{proto}|{svc_name}|{count}") def make_dest_display(dest_ip, dest_port, proto, svc_name): if svc_name: return svc_name if dest_port: display = f"{dest_ip}:{dest_port}" else: display = dest_ip if proto and proto not in ('tcp', '6'): display += f" ({proto})" return display proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'} # Parse fw_events for drops peer_drops = defaultdict(int) service_drops = defaultdict(lambda: defaultdict(int)) if os.path.exists(fw_file): try: with open(fw_file) as f: for line in f: line = line.strip() if not line: continue try: ev = json.loads(line) if cutoff: ts_str = ev.get('timestamp', '') try: ts = datetime.fromisoformat(ts_str) if ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) if ts < cutoff: continue except Exception: pass src_ip = ev.get('src_ip', '') if not src_ip: continue dest_ip = ev.get('dest_ip', '') dest_port = str(ev.get('dest_port', '')) proto_num = ev.get('ip.protocol', 0) proto = proto_map.get(int(proto_num), str(proto_num)) peer = ip_to_peer.get(src_ip) if not peer: continue if filter_peer and peer != filter_peer: continue if filter_service_ip and dest_ip != filter_service_ip: continue svc_name = reverse_lookup(dest_ip, dest_port, proto) dest_display = make_dest_display(dest_ip, dest_port, proto, svc_name) peer_drops[peer] += 1 service_drops[peer][dest_display] += 1 except Exception: continue except Exception: pass # Collect peers with any activity all_peers = set() all_peers.update(k for k in peer_rx if peer_rx[k] > 0) all_peers.update(k for k in peer_tx if peer_tx[k] > 0) all_peers.update(peer_drops.keys()) if filter_peer: all_peers = {p for p in all_peers if p == filter_peer} for peer in sorted(all_peers): rx = peer_rx.get(peer, 0) tx = peer_tx.get(peer, 0) drops = peer_drops.get(peer, 0) print(f"peer|{peer}|{rx}|{tx}|{drops}") svc_map = service_drops.get(peer, {}) for dest_display, count in sorted(svc_map.items(), key=lambda x: -x[1]): print(f"service|{peer}|{dest_display}|{count}") commands = { 'get': lambda args: get(args[0], args[1]), 'set': lambda args: set_key(args[0], args[1], args[2]), 'delete': lambda args: delete_key(args[0], args[1]), 'append': lambda args: append(args[0], args[1], args[2]), 'remove': lambda args: remove_value(args[0], args[1], args[2]), 'cat': lambda args: cat(args[0]), 'has_key': lambda args: has_key(args[0], args[1]), 'filter_values': lambda args: filter_values(args[0], args[1], args[2]), 'last_event': lambda args: last_event(args[0], args[1], args[2], args[3]), 'events_for': lambda args: events_for(args[0], args[1], args[2]), 'fw_events': lambda args: fw_events(args[0], args[1], args[2], args[3], args[4], args[5] if len(args) > 5 else '50'), 'wg_events': lambda args: wg_events(args[0], args[1], args[2], args[3] if len(args) > 3 else '50'), 'format_fw_event': lambda args: format_fw_event(sys.stdin.read(), args[0]), 'format_wg_event': lambda args: format_wg_event(sys.stdin.read()), 'remove_events': lambda args: remove_events(args[0], args[1]), 'follow_logs': lambda args: follow_logs(args[0], args[1], args[2], args[3], args[4], args[5]), 'count': lambda args: count(args[0], args[1]), 'audit_fw_counts': lambda args: audit_fw_counts(args[0]), 'peer_group_map': lambda args: peer_group_map(args[0]), 'peer_groups': lambda args: peer_groups(args[0], args[1]), 'peer_data': lambda args: peer_data(args[0], args[1], args[2]), 'iso_to_ts': lambda args: iso_to_ts(args[0]), 'rule_list_data': lambda args: rule_list_data(args[0], args[1]), 'group_list_data': lambda args: group_list_data(args[0], args[1]), 'fmt_datetime': lambda args: fmt_datetime(args[0], args[1]), 'create_rule': lambda args: create_rule( args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7] if len(args) > 7 else '', args[8] if len(args) > 8 else '', args[9] if len(args) > 9 else '' ), 'cleanup_config': lambda args: cleanup_config(args[0]), 'remove_peer_block': lambda args: remove_peer_block(args[0], args[1]), 'create_group': lambda args: create_group(args[0], args[1], args[2]), 'parse_event': lambda args: parse_event(args[0]), 'parse_fw_event': lambda args: parse_fw_event(args[0]), 'remove_events_filtered': lambda args: remove_events_filtered( args[0], args[1], args[2], args[3], args[4]=='true', args[5]=='true', args[6] if len(args)>6 else ''), 'peer_transfer': lambda args: peer_transfer(args[0]), 'peer_transfer_delta': lambda args: peer_transfer_delta(args[0], args[1]), 'rule_resolve': lambda args: rule_resolve(args[0], args[1]), 'rule_resolve_field': lambda args: rule_resolve_field(args[0], args[1], args[2]), 'rule_inspect': lambda args: rule_inspect(args[0], args[1]), 'find_rule_file': lambda args: print(find_rule_file(args[0], args[1])), 'get_raw': lambda args: print(get_raw(args[0], args[1])), 'count_resolved': lambda args: count_resolved(args[0], args[1], args[2]), 'block_get': lambda args: block_get(args[0]), 'block_is_blocked': lambda args: block_is_blocked(args[0]), 'block_set_direct': lambda args: block_set_direct(args[0], args[1], args[2]), 'block_add_group': lambda args: block_add_group(args[0], args[1], args[2]), 'block_remove_group': lambda args: block_remove_group(args[0], args[1], args[2]), 'block_add_rule': lambda args: block_add_rule( args[0], args[1], args[2], args[3] if len(args) > 3 else '', args[4] if len(args) > 4 else '', args[5] if len(args) > 5 else '', args[6] if len(args) > 6 else '' ), 'block_remove_rule': lambda args: block_remove_rule( args[0], args[1], args[2] if len(args) > 2 else '', args[3] if len(args) > 3 else '', args[4] if len(args) > 4 else '' ), 'block_get_rules': lambda args: block_get_rules(args[0]), 'block_get_groups': lambda args: block_get_groups(args[0]), 'block_get_direct': lambda args: block_get_direct(args[0]), 'net_list': lambda args: net_list(args[0]), 'net_show': lambda args: net_show(args[0], args[1]), 'net_exists': lambda args: net_exists(args[0], args[1]), 'net_add_service': lambda args: net_add_service( args[0], args[1], args[2], args[3] if len(args) > 3 else '', args[4] if len(args) > 4 else '' ), 'net_add_port': lambda args: net_add_port( args[0], args[1], args[2], args[3], args[4] if len(args) > 4 else 'tcp', args[5] if len(args) > 5 else '' ), 'net_remove': lambda args: net_remove(args[0], args[1]), 'net_resolve': lambda args: net_resolve(args[0], args[1]), 'net_reverse_lookup': lambda args: net_reverse_lookup( args[0], args[1], args[2] if len(args) > 2 else '', args[3] if len(args) > 3 else '' ), 'block_is_empty': lambda args: block_is_empty(args[0]), 'group_has_peer': lambda args: group_has_peer(args[0], args[1]), # Subnet commands: 'subnet_lookup': lambda args: subnet_lookup(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_type': lambda args: subnet_type(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_tunnel_mode': lambda args: subnet_tunnel_mode(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_for_ip': lambda args: subnet_for_ip(args[0], args[1]), 'subnet_list': lambda args: subnet_list(args[0]), 'subnet_show': lambda args: subnet_show(args[0], args[1]), 'subnet_add': lambda args: subnet_add( args[0], args[1], args[2], args[3], args[4] if len(args) > 4 else 'split', args[5] if len(args) > 5 else '', args[6] if len(args) > 6 else '' ), 'subnet_remove': lambda args: subnet_remove(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_rename': lambda args: subnet_rename(args[0], args[1], args[2], args[3] if len(args) > 3 else ''), 'subnet_peers': lambda args: subnet_peers(args[0], args[1], args[2], args[3]), 'subnet_exists': lambda args: subnet_exists(args[0], args[1]), # Identity commands: 'identity_list': lambda args: identity_list(args[0]), 'identity_show': lambda args: identity_show(args[0]), 'identity_add_peer': lambda args: identity_add_peer(args[0], args[1], args[2], args[3], args[4]), 'identity_remove_peer':lambda args: identity_remove_peer(args[0], args[1]), 'identity_remove': lambda args: identity_remove(args[0]), 'identity_next_index': lambda args: identity_next_index(args[0], args[1]), 'identity_peers': lambda args: identity_peers(args[0], args[1] if len(args) > 1 else ''), 'identity_migrate': lambda args: identity_migrate(args[0], args[1], args[2], args[3]), 'identity_infer': lambda args: identity_infer(args[0]), 'identity_exists': lambda args: identity_exists(args[0]), 'subnet_default_rule': lambda args: subnet_default_rule(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_list_names': lambda args: subnet_list_names(args[0]), # Policy commands: 'policy_get': lambda args: policy_get(args[0], args[1], args[2] if len(args) > 2 else ''), 'policy_list': lambda args: policy_list(args[0]), 'policy_exists': lambda args: policy_exists(args[0], args[1]), 'policy_add': lambda args: policy_add(args[0], args[1], args[2], args[3], args[4], args[5], args[6] if len(args) > 6 else ''), 'policy_remove': lambda args: policy_remove(args[0], args[1]), 'policy_set_field': lambda args: policy_set_field(args[0], args[1], args[2], args[3]), 'subnet_policy': lambda args: subnet_policy(args[0], args[1], args[2] if len(args) > 2 else ''), 'get_nested': lambda args: json_get_nested(args[0], *args[1:]), 'set_nested': lambda args: json_set_nested(args[0], *args[1:]), 'identity_rules': lambda args: identity_rules(args[0]), 'identity_add_rule': lambda args: identity_add_rule(args[0], args[1], args[2]), 'identity_remove_rule': lambda args: identity_remove_rule(args[0], args[1]), 'identity_clear_rules': lambda args: identity_clear_rules(args[0]), 'identity_has_rule': lambda args: identity_has_rule(args[0], args[1]), 'activity_aggregate': lambda args: activity_aggregate( args[0], args[1], args[2], args[3], args[4], args[5], args[6] if len(args) > 6 else '24', args[7] if len(args) > 7 else '', args[8] if len(args) > 8 else '' ), } if __name__ == '__main__': if len(sys.argv) < 2: print("Usage: json_helper.py [key] [value]", file=sys.stderr) sys.exit(1) cmd = sys.argv[1] args = sys.argv[2:] if cmd not in commands: print(f"Unknown command: {cmd}", file=sys.stderr) sys.exit(1) commands[cmd](args)