#!/usr/bin/env python3 """ wgctl JSON helper — thin dispatcher. Imports from core/lib/ modules and routes commands. """ import sys import os import json # Add lib/ directory to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'lib')) DATETIME_FMT = os.environ.get('WGCTL_DATETIME_FMT', '%Y-%m-%d %H:%M') _POLICY_DEFAULTS = { 'tunnel_mode': 'split', 'default_rule': None, 'strict_rule': False, 'auto_apply': True, } # ── Lazy imports (only load what's needed per command) ────────────────────── def _events(): from lib.events import ( fw_events, wg_events, parse_event, parse_fw_event, format_fw_event, format_wg_event, remove_events, remove_events_filtered, follow_logs, last_event, events_for, iso_to_ts, ) return locals() def _peers(): from lib.peers import ( peer_data, peer_transfer, peer_transfer_delta, peer_group_map, peer_groups, ) return locals() def _activity(): from lib.activity import activity_aggregate return locals() # ── Keep all non-lib functions inline (JSON CRUD, rules, net, etc.) ───────── # These are stable, small, and don't benefit from splitting yet. # Candidates for future lib/rules.py, lib/net.py etc. in a follow-up pass. def get(file, key): try: with open(file) as f: data = json.load(f) val = data.get(key, []) if isinstance(val, bool): print(str(val).lower()) elif isinstance(val, list): if val: print('\n'.join(str(v) for v in val)) else: if val: print(val) except Exception: sys.exit(0) def set_key(file, key, value): try: data = {} if os.path.exists(file): with open(file) as f: data = json.load(f) try: data[key] = json.loads(value) except Exception: data[key] = value with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def delete_key(file, key): try: with open(file) as f: data = json.load(f) data.pop(key, None) with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def append(file, key, value): try: data = {} if os.path.exists(file): with open(file) as f: data = json.load(f) if key not in data: data[key] = [] if value not in data[key]: data[key].append(value) with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def remove_value(file, key, value): try: with open(file) as f: data = json.load(f) if key in data and value in data[key]: data[key].remove(value) with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def cat(file): try: with open(file) as f: data = json.load(f) print(json.dumps(data, indent=2)) except Exception: sys.exit(1) def has_key(file, key): try: with open(file) as f: data = json.load(f) sys.exit(0 if key in data else 1) except Exception: sys.exit(1) def filter_values(file, key, value): try: with open(file) as f: data = json.load(f) data = {k: v for k, v in data.items() if v != value} with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def count(file, key): try: with open(file) as f: data = json.load(f) val = data.get(key, []) print(len(val) if isinstance(val, list) else 0) except Exception: print(0) def get_raw(file, key): try: with open(file) as f: data = json.load(f) val = data.get(key) if val is None: pass elif isinstance(val, bool): print(str(val).lower()) elif isinstance(val, list): for v in val: print(v) else: print(val) except Exception: pass def fmt_datetime(iso_str, fmt): try: from datetime import datetime dt = datetime.fromisoformat(iso_str) print(dt.strftime(fmt)) except Exception: print(iso_str) def cleanup_config(config_file): import re try: with open(config_file) as f: config = f.read() config = re.sub(r'\n{3,}', '\n\n', config) config = config.rstrip('\n') + '\n' with open(config_file, 'w') as f: f.write(config) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def remove_peer_block(config_file, name): import re try: with open(config_file) as f: config = f.read() pattern = r'\n\[Peer\]\n# ' + re.escape(name) + r'\n[^\n]+\n[^\n]+\n' result = re.sub(pattern, '\n', config) with open(config_file, 'w') as f: f.write(result) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def audit_fw_counts(clients_dir): import glob, subprocess, re try: result = subprocess.run( ['iptables', '-L', 'FORWARD', '-n', '-v'], capture_output=True, text=True ) fw_lines = result.stdout.splitlines() except Exception: fw_lines = [] rule_lines = [l for l in fw_lines if l.strip() and not l.startswith('Chain') and not l.startswith(' pkts')] for conf in sorted(glob.glob(f"{clients_dir}/*.conf")): name = os.path.basename(conf).replace('.conf', '') try: with open(conf) as f: ip = '' for line in f: if line.startswith('Address'): ip = line.split('=')[1].strip().split('/')[0] break if not ip: continue count_val = sum(1 for l in rule_lines if re.search(r'\s' + re.escape(ip) + r'\s', l)) print(f"{name}:{count_val}") except Exception: pass # ── Rules (kept inline — candidates for lib/rules.py) ──────────────────── def find_rule_file(rules_dir, rule_name): for path in [ os.path.join(rules_dir, f"{rule_name}.rule"), os.path.join(rules_dir, "base", f"{rule_name}.rule"), ]: if os.path.exists(path): return path return "" def _rule_resolve_internal(rules_dir, rule_name, visited=None): if visited is None: visited = set() if rule_name in visited: raise ValueError(f"Circular dependency: {rule_name}") visited.add(rule_name) rule_file = find_rule_file(rules_dir, rule_name) with open(rule_file) as f: rule = json.load(f) merged = {'allow_ips': [], 'allow_ports': [], 'block_ips': [], 'block_ports': [], 'dns_redirect': False} for base_name in rule.get('extends', []): base = _rule_resolve_internal(rules_dir, base_name, visited.copy()) merged['allow_ips'] += base.get('allow_ips', []) merged['allow_ports'] += base.get('allow_ports', []) merged['block_ips'] += base.get('block_ips', []) merged['block_ports'] += base.get('block_ports', []) if base.get('dns_redirect'): merged['dns_redirect'] = True merged['allow_ips'] = list(dict.fromkeys(merged['allow_ips'] + rule.get('allow_ips', []))) merged['allow_ports'] = list(dict.fromkeys(merged['allow_ports'] + rule.get('allow_ports', []))) merged['block_ips'] = list(dict.fromkeys(merged['block_ips'] + rule.get('block_ips', []))) merged['block_ports'] = list(dict.fromkeys(merged['block_ports'] + rule.get('block_ports', []))) if rule.get('dns_redirect', False): merged['dns_redirect'] = True merged['name'] = rule.get('name', rule_name) merged['desc'] = rule.get('desc', '') merged['group'] = rule.get('group', '') merged['extends'] = rule.get('extends', []) return merged def rule_resolve(rules_dir, rule_name): try: print(json.dumps(_rule_resolve_internal(rules_dir, rule_name))) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def rule_resolve_field(rules_dir, rule_name, field): try: resolved = _rule_resolve_internal(rules_dir, rule_name) val = resolved.get(field, []) if isinstance(val, list): for v in val: print(v) else: print(val) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def count_resolved(rules_dir, rule_name, key): try: print(len(_rule_resolve_internal(rules_dir, rule_name).get(key, []))) except Exception: print(0) def rule_list_data(rules_dir, meta_dir): import glob rule_peer_counts = {} for f in glob.glob(f"{meta_dir}/*.meta"): try: with open(f) as mf: meta = json.load(mf) rule = meta.get('rule', '') if rule: rule_peer_counts[rule] = rule_peer_counts.get(rule, 0) + 1 except Exception: pass rule_files = (sorted(glob.glob(f"{rules_dir}/*.rule")) + sorted(glob.glob(f"{rules_dir}/base/*.rule"))) rules_data = [] for rule_file in rule_files: is_base = '/base/' in rule_file try: with open(rule_file) as f: r = json.load(f) name = r.get('name', '') resolved = _rule_resolve_internal(rules_dir, name) n_allows = (len(resolved.get('allow_ips', [])) + len(resolved.get('allow_ports', []))) n_blocks = (len(resolved.get('block_ips', [])) + len(resolved.get('block_ports', []))) rules_data.append({ 'name': name, 'desc': r.get('desc', ''), 'n_allows': n_allows, 'n_blocks': n_blocks, 'peer_count': rule_peer_counts.get(name, 0), 'extends': ','.join(r.get('extends', [])), 'is_base': is_base, 'group': r.get('group', '') }) except Exception: pass rules_data.sort(key=lambda x: ( x['is_base'], x['group'] == '' and not x['is_base'], x['group'], x['name'] )) for r in rules_data: print(f"{r['name']}|{r['desc']}|{r['n_allows']}|{r['n_blocks']}|" f"{r['peer_count']}|{r['extends']}|{r['is_base']}|{r['group']}") def rule_inspect(rules_dir, rule_name): try: rule_file = find_rule_file(rules_dir, rule_name) with open(rule_file) as f: rule = json.load(f) resolved = _rule_resolve_internal(rules_dir, rule_name) has_extends = bool(rule.get('extends', [])) for ip in rule.get('allow_ips', []): print(f"own|allow_ip|{ip}") for p in rule.get('allow_ports', []): print(f"own|allow_port|{p}") for ip in rule.get('block_ips', []): print(f"own|block_ip|{ip}") for p in rule.get('block_ports', []): print(f"own|block_port|{p}") if rule.get('dns_redirect'): print(f"dns|dns_redirect|true") if has_extends: for base_name in rule.get('extends', []): base = _rule_resolve_internal(rules_dir, base_name) for ip in base.get('allow_ips', []): print(f"inherited:{base_name}|allow_ip|{ip}") for p in base.get('allow_ports', []): print(f"inherited:{base_name}|allow_port|{p}") for ip in base.get('block_ips', []): print(f"inherited:{base_name}|block_ip|{ip}") for p in base.get('block_ports', []): print(f"inherited:{base_name}|block_port|{p}") if base.get('dns_redirect'): print(f"inherited:{base_name}|dns_redirect|true") has_resolved = any([resolved.get('allow_ips'), resolved.get('allow_ports'), resolved.get('block_ips'), resolved.get('block_ports'), resolved.get('dns_redirect')]) if has_resolved: for ip in resolved.get('allow_ips', []): print(f"resolved|allow_ip|{ip}") for p in resolved.get('allow_ports', []): print(f"resolved|allow_port|{p}") for ip in resolved.get('block_ips', []): print(f"resolved|block_ip|{ip}") for p in resolved.get('block_ports', []): print(f"resolved|block_port|{p}") if resolved.get('dns_redirect'): print(f"resolved|dns_redirect|true") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def create_rule(file, name, desc, dns_redirect, allow_ips, block_ips, block_ports, allow_ports='', extends='', group=''): rule = { 'name': name, 'desc': desc, 'group': group, 'dns_redirect': dns_redirect == 'true', 'extends': [x for x in extends.split(',') if x] if extends else [], 'allow_ips': [x for x in allow_ips.split(',') if x] if allow_ips else [], 'allow_ports': [x for x in allow_ports.split(',') if x] if allow_ports else [], 'block_ips': [x for x in block_ips.split(',') if x] if block_ips else [], 'block_ports': [x for x in block_ports.split(',') if x] if block_ports else [], } with open(file, 'w') as f: json.dump(rule, f, indent=2) # ── All remaining functions (net, hosts, blocks, subnet, identity, policy) ── # Kept verbatim from original json_helper.py — identical behaviour. # Future pass will move each section to lib/net.py, lib/hosts.py, etc. def _parse_peer_name(peer_name): """ Parse a peer name into (type, identity, index). phone-nuno -> ('phone', 'nuno', 1) phone-nuno-2 -> ('phone', 'nuno', 2) desktop-zephyr -> ('desktop', 'zephyr', 1) laptop-nuno -> ('laptop', 'nuno', 1) Returns None if name doesn't match convention. Convention: {type}-{identity}[-{index}] Known types: desktop, laptop, phone, tablet, server, iot, none """ known_types = {'desktop', 'laptop', 'phone', 'tablet', 'server', 'iot', 'none'} parts = peer_name.split('-') if len(parts) < 2: return None peer_type = parts[0] if peer_type not in known_types: return None # Check if last part is a numeric index if len(parts) >= 3 and parts[-1].isdigit(): index = int(parts[-1]) identity = '-'.join(parts[1:-1]) else: index = 1 identity = '-'.join(parts[1:]) if not identity: return None return (peer_type, identity, index) # ====================================================== # Blocks # ====================================================== def _block_init(peer_ip): """Return empty block structure""" return { "peer_ip": peer_ip, "blocked_direct": False, "blocked_by_groups": [], "rules": [] } def _block_read(file): try: with open(file) as f: content = f.read().strip() if not content: return None # empty file = no block data try: return json.loads(content) except json.JSONDecodeError: # Old format — migrate lines = content.split('\n') peer_ip = lines[0].split()[0] if lines else '' new_data = { "peer_ip": peer_ip, "blocked_direct": True, "blocked_by_groups": [], "rules": [{"name": "full block", "type": "full"}] } with open(file, 'w') as f: json.dump(new_data, f, indent=2) return new_data except FileNotFoundError: return None except Exception: return None def _block_write(file, data): """Write block file""" with open(file, 'w') as f: json.dump(data, f, indent=2) def block_get(file): """Read and print block file as JSON""" data = _block_read(file) if data: print(json.dumps(data)) def block_is_blocked(file): """Return true if peer is effectively blocked""" data = _block_read(file) if not data: print("false") return blocked = data.get("blocked_direct", False) or \ bool(data.get("blocked_by_groups", [])) print("true" if blocked else "false") def block_set_direct(file, peer_ip, value): """Set blocked_direct""" try: data = _block_read(file) or _block_init(peer_ip) data["blocked_direct"] = value.lower() == "true" data["peer_ip"] = peer_ip _block_write(file, data) remaining = data["blocked_direct"] or bool(data.get("blocked_by_groups", [])) pass # print("true" if remaining else "false") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def block_add_group(file, peer_ip, group): """Add group to blocked_by_groups""" try: data = _block_read(file) or _block_init(peer_ip) data["peer_ip"] = peer_ip groups = data.get("blocked_by_groups", []) if group not in groups: groups.append(group) data["blocked_by_groups"] = groups _block_write(file, data) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def block_remove_group(file, peer_ip, group): """Remove group from blocked_by_groups, return whether still blocked""" try: data = _block_read(file) or _block_init(peer_ip) groups = data.get("blocked_by_groups", []) if group in groups: groups.remove(group) data["blocked_by_groups"] = groups _block_write(file, data) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) # print("true" if remaining else "false") pass except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def block_add_rule(file, peer_ip, rule_type, name="", target="", port="", proto=""): """Add a block rule entry""" try: data = _block_read(file) or _block_init(peer_ip) data["peer_ip"] = peer_ip rule = {"type": rule_type} if name: rule["name"] = name if target: rule["target"] = target if port: rule["port"] = port if proto: rule["proto"] = proto rules = data.get("rules", []) for existing in rules: if existing.get("type") == rule_type and \ existing.get("target","") == target and \ existing.get("port","") == port and \ existing.get("proto","") == proto: return # already exists, skip rules.append(rule) data["rules"] = rules _block_write(file, data) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def block_remove_rule(file, rule_type, target="", port="", proto=""): data = _block_read(file) if not data: return rules = data.get("rules", []) filtered = [r for r in rules if not ( r.get("type") == rule_type and r.get("target", "") == target and r.get("port", "") == port and r.get("proto", "") == proto )] data["rules"] = filtered _block_write(file, data) def block_get_rules(file): """Print rules as pipe-separated lines: name|type|target|port|proto""" data = _block_read(file) if not data: return for r in data.get("rules", []): print(f"{r.get('name','')}|{r.get('type','')}|" f"{r.get('target','')}|{r.get('port','')}|{r.get('proto','')}") def block_get_groups(file): data = _block_read(file) if not data: return print(','.join(data.get('blocked_by_groups', []))) def block_get_direct(file): data = _block_read(file) if not data: print('false') return print('true' if data.get('blocked_direct', False) else 'false') def block_is_empty(file): data = _block_read(file) if not data: print("true") return empty = ( not data.get("blocked_direct", False) and not data.get("blocked_by_groups", []) and not data.get("rules", []) and not data.get("services", []) ) print("true" if empty else "false") # ====================================================== # Subnet # ====================================================== def _subnet_read(file): """Read subnets.json, return dict or empty dict""" try: if not os.path.exists(file): return {} with open(file) as f: content = f.read().strip() if not content: return {} return json.loads(content) except Exception: return {} def _subnet_write(file, data): """Write subnets.json""" os.makedirs(os.path.dirname(file), exist_ok=True) with open(file, 'w') as f: json.dump(data, f, indent=2) def _subnet_is_group(entry): """Return True if a subnet entry is a nested group (like 'guests')""" return isinstance(entry, dict) and 'subnet' not in entry def subnet_lookup(file, name, type_key=''): """ Resolve a subnet name (and optional type) to a CIDR string. For scalar entries: subnet_lookup(file, "desktop") -> "10.1.1.0/24" For group entries: subnet_lookup(file, "guests", "phone") -> "10.1.103.0/24" For group with no type: subnet_lookup(file, "guests") -> "10.1.100.0/24" (none slot) Prints the CIDR on success, nothing and exits 1 on failure. """ data = _subnet_read(file) if name not in data: sys.exit(1) entry = data[name] if _subnet_is_group(entry): key = type_key if type_key else 'none' if key not in entry: sys.exit(1) print(entry[key]['subnet']) else: print(entry['subnet']) def subnet_type(file, name, type_key=''): """ Return the type string for a subnet entry. For scalar: subnet_type(file, "desktop") -> "desktop" For group: subnet_type(file, "guests", "phone") -> "phone" subnet_type(file, "guests") -> "none" """ data = _subnet_read(file) if name not in data: sys.exit(1) entry = data[name] if _subnet_is_group(entry): key = type_key if type_key else 'none' if key not in entry: sys.exit(1) # For group entries the type IS the child key print(key if key != 'none' else 'none') else: print(entry.get('type', name)) def subnet_tunnel_mode(file, name, type_key=''): """Return tunnel_mode for a subnet entry""" data = _subnet_read(file) if name not in data: print('split') # safe default return entry = data[name] if _subnet_is_group(entry): key = type_key if type_key else 'none' child = entry.get(key, {}) print(child.get('tunnel_mode', 'split')) else: print(entry.get('tunnel_mode', 'split')) def subnet_for_ip(file, ip): """ Reverse lookup: given a peer IP, find which subnet name (and type) it belongs to. Output: name|type (e.g. "guests|phone" or "desktop|desktop") Returns nothing (exit 0) if not found — caller falls back to hardcoded map. """ import ipaddress try: peer_addr = ipaddress.ip_address(ip) except ValueError: return data = _subnet_read(file) for name, entry in data.items(): if _subnet_is_group(entry): for type_key, child in entry.items(): try: network = ipaddress.ip_network(child['subnet'], strict=False) if peer_addr in network: print(f"{name}|{type_key}") return except Exception: continue else: try: network = ipaddress.ip_network(entry['subnet'], strict=False) if peer_addr in network: peer_type = entry.get('type', name) print(f"{name}|{peer_type}") return except Exception: continue def subnet_list(file): """ List all subnets for display. Output per line: name|subnet|type|tunnel_mode|desc|is_group For group entries, outputs one line per child: name.type|subnet|type|tunnel_mode|desc|true """ data = _subnet_read(file) for name, entry in data.items(): if _subnet_is_group(entry): for type_key, child in entry.items(): display_name = f"{name}.{type_key}" if type_key != 'none' else name print(f"{display_name}|{child.get('subnet','')}|" f"{type_key}|{child.get('tunnel_mode','split')}|" f"{child.get('desc','')}|true|{name}") else: print(f"{name}|{entry.get('subnet','')}|" f"{entry.get('type',name)}|{entry.get('tunnel_mode','split')}|" f"{entry.get('desc','')}|false|{name}") def subnet_show(file, name): """Show a single subnet entry (scalar or group) in detail""" data = _subnet_read(file) if name not in data: print(f"Error: Subnet '{name}' not found", file=sys.stderr) sys.exit(1) entry = data[name] if _subnet_is_group(entry): print(f"name|{name}") print(f"is_group|true") for type_key, child in entry.items(): print(f"child|{type_key}|{child.get('subnet','')}|" f"{child.get('tunnel_mode','split')}|{child.get('desc','')}") else: print(f"name|{name}") print(f"is_group|false") print(f"subnet|{entry.get('subnet','')}") print(f"type|{entry.get('type',name)}") print(f"tunnel_mode|{entry.get('tunnel_mode','split')}") print(f"desc|{entry.get('desc','')}") def subnet_add(file, name, subnet, type_key, tunnel_mode, desc, group_parent=''): """ Add a new subnet entry. If group_parent is set, adds as a child under that group key. Otherwise adds as a scalar entry. """ data = _subnet_read(file) entry = { 'subnet': subnet, 'tunnel_mode': tunnel_mode or 'split', 'desc': desc or '' } if group_parent: # Adding a child to an existing group, or creating a new group if group_parent not in data: data[group_parent] = {} elif not _subnet_is_group(data[group_parent]): print(f"Error: '{group_parent}' exists but is not a group", file=sys.stderr) sys.exit(1) data[group_parent][type_key or 'none'] = entry else: # Scalar entry — type stored explicitly entry['type'] = type_key or name data[name] = entry _subnet_write(file, data) def subnet_remove(file, name, peers_using): """ Remove a subnet entry. peers_using is a comma-separated list of peer names currently using this subnet (passed in from bash after meta scan). If non-empty, refuses with error. """ if peers_using: peers = [p for p in peers_using.split(',') if p] if peers: print(f"Error: Subnet '{name}' is in use by: {', '.join(peers)}", file=sys.stderr) sys.exit(1) data = _subnet_read(file) if '.' in name: # Removing a group child: e.g. "guests.phone" parent, child_key = name.split('.', 1) if parent not in data or not _subnet_is_group(data[parent]): print(f"Error: Group '{parent}' not found", file=sys.stderr) sys.exit(1) if child_key not in data[parent]: print(f"Error: '{child_key}' not found in group '{parent}'", file=sys.stderr) sys.exit(1) del data[parent][child_key] if not data[parent]: del data[parent] # remove empty group else: if name not in data: print(f"Error: Subnet '{name}' not found", file=sys.stderr) sys.exit(1) del data[name] _subnet_write(file, data) def subnet_rename(file, old_name, new_name, peers_using): """ Rename a subnet entry. Hard refusal if any peers reference it. peers_using: comma-separated peer names from bash meta scan. """ if peers_using: peers = [p for p in peers_using.split(',') if p] if peers: print(f"Error: Cannot rename subnet '{old_name}' — in use by: {', '.join(peers)}", file=sys.stderr) sys.exit(1) data = _subnet_read(file) if old_name not in data: print(f"Error: Subnet '{old_name}' not found", file=sys.stderr) sys.exit(1) if new_name in data: print(f"Error: Subnet '{new_name}' already exists", file=sys.stderr) sys.exit(1) data[new_name] = data.pop(old_name) _subnet_write(file, data) def subnet_peers(meta_dir, clients_dir, subnet_name, subnets_file): """ Find all peers using a subnet. Two-pass check: 1. Meta field: peer has "subnet": subnet_name in their .meta file 2. IP fallback: peer's IP falls within the subnet's CIDR(s) (catches peers added before meta stored subnet explicitly) Output: one peer name per line. """ import glob import ipaddress # Resolve all CIDRs covered by this subnet name data = _subnet_read(subnets_file) cidrs = [] if subnet_name in data: entry = data[subnet_name] if _subnet_is_group(entry): for child in entry.values(): try: cidrs.append(ipaddress.ip_network(child['subnet'], strict=False)) except Exception: pass else: try: cidrs.append(ipaddress.ip_network(entry['subnet'], strict=False)) except Exception: pass printed = set() for conf in sorted(glob.glob(f"{clients_dir}/*.conf")): peer_name = os.path.basename(conf).replace('.conf', '') # Pass 1: check meta field meta_file = os.path.join(meta_dir, f"{peer_name}.meta") try: with open(meta_file) as f: meta = json.load(f) if meta.get('subnet', '') == subnet_name: if peer_name not in printed: print(peer_name) printed.add(peer_name) continue except Exception: pass # Pass 2: IP reverse lookup against subnet CIDRs if not cidrs: continue peer_ip = '' try: with open(conf) as f: for line in f: if line.startswith('Address'): peer_ip = line.split('=')[1].strip().split('/')[0] break except Exception: continue if not peer_ip: continue try: addr = ipaddress.ip_address(peer_ip) if any(addr in cidr for cidr in cidrs): if peer_name not in printed: print(peer_name) printed.add(peer_name) except Exception: continue def subnet_exists(file, name): """Check if a subnet name exists (scalar or group). Exits 0/1.""" data = _subnet_read(file) if '.' in name: parent, child_key = name.split('.', 1) exists = parent in data and _subnet_is_group(data[parent]) and child_key in data[parent] else: exists = name in data sys.exit(0 if exists else 1) def subnet_policy(subnets_file, subnet_name, type_key=''): """ Get the policy name for a subnet entry. Falls back to 'default' if no policy set. """ data = _subnet_read(subnets_file) if subnet_name not in data: print('default') return entry = data[subnet_name] if _subnet_is_group(entry): key = type_key if type_key else 'none' child = entry.get(key, {}) print(child.get('policy', 'default')) else: print(entry.get('policy', 'default')) def subnet_default_rule(file, name, type_key=''): """ Return the default_rule for a subnet entry, or empty string if none set. For scalar: subnet_default_rule(file, "desktop") -> "" For group: subnet_default_rule(file, "guests", "phone") -> "guest" """ data = _subnet_read(file) if name not in data: print('') return entry = data[name] if _subnet_is_group(entry): key = type_key if type_key else 'none' child = entry.get(key, {}) print(child.get('default_rule', '')) else: print(entry.get('default_rule', '')) def subnet_list_names(file): """ List all top-level subnet names, one per line. Used for dynamic flag registration in commands. Output: one name per line (e.g. desktop, laptop, guests, servers, iot) """ data = _subnet_read(file) for name in data.keys(): print(name) # ====================================================== # Identity # ====================================================== def identity_rules(file): """ Return all rules assigned to an identity, one per line. Reads from 'rules' array (1:N). Falls back to 'rule' scalar for migration. """ data = _identity_read(file) if not data: return # Support legacy scalar 'rule' field rules = data.get('rules', []) if not rules and data.get('rule'): rules = [data['rule']] for r in rules: if r: print(r) def identity_add_rule(file, identity_name, rule_name): """ Add a rule to an identity's rules array. Warns if already present (prints warning to stderr, exits 2). Creates identity file if it doesn't exist. """ data = _identity_read(file) or _identity_init(identity_name) rules = data.get('rules', []) # Migrate legacy scalar field if 'rule' in data and data['rule']: if data['rule'] not in rules: rules.append(data['rule']) del data['rule'] if rule_name in rules: print(f"Warning: Rule '{rule_name}' is already assigned to identity '{identity_name}'", file=sys.stderr) sys.exit(2) rules.append(rule_name) data['rules'] = rules _identity_write(file, data) def identity_remove_rule(file, rule_name): """ Remove a specific rule from an identity's rules array. Exits 1 if rule not found. """ data = _identity_read(file) if not data: print(f"Error: Identity not found", file=sys.stderr) sys.exit(1) rules = data.get('rules', []) if rule_name not in rules: print(f"Error: Rule '{rule_name}' not assigned to this identity", file=sys.stderr) sys.exit(1) rules.remove(rule_name) data['rules'] = rules _identity_write(file, data) def identity_clear_rules(file): """Remove all rules from an identity.""" data = _identity_read(file) if not data: return data['rules'] = [] data.pop('rule', None) # remove legacy scalar too _identity_write(file, data) def identity_has_rule(file, rule_name): """Exit 0 if identity has this rule, 1 otherwise.""" data = _identity_read(file) if not data: sys.exit(1) rules = data.get('rules', []) if not rules and data.get('rule'): rules = [data['rule']] sys.exit(0 if rule_name in rules else 1) def _identity_read(file): """Read an identity file, return dict or None""" try: if not os.path.exists(file): return None with open(file) as f: content = f.read().strip() if not content: return None return json.loads(content) except Exception: return None def _identity_write(file, data): """Write an identity file""" os.makedirs(os.path.dirname(file), exist_ok=True) with open(file, 'w') as f: json.dump(data, f, indent=2) def _identity_init(name): """Return empty identity structure""" return { 'name': name, 'peers': [], 'devices': {} } def identity_list(identities_dir): """ List all identities with peer count, rules and policy. Output per line: name|peer_count|types|rules|policy """ import glob for id_file in sorted(glob.glob(f"{identities_dir}/*.identity")): try: with open(id_file) as f: data = json.load(f) name = data.get('name', '') peers = data.get('peers', []) devices = data.get('devices', {}) rules = data.get('rules', []) # Migrate legacy scalar rule field if not rules and data.get('rule'): rules = [data['rule']] policy = data.get('policy', 'default') types = sorted(set( d.get('type', '') for d in devices.values() if d.get('type') )) print(f"{name}|{len(peers)}|{','.join(types)}|{','.join(rules)}|{policy}") except Exception: continue def identity_show(file): """Show identity details""" data = _identity_read(file) if not data: print("Error: Identity not found", file=sys.stderr) sys.exit(1) print(f"name|{data.get('name','')}") print(f"peer_count|{len(data.get('peers',[]))}") for peer_name, dev in data.get('devices', {}).items(): print(f"device|{peer_name}|{dev.get('type','')}|{dev.get('index',1)}") def identity_add_peer(file, identity_name, peer_name, peer_type, index): """Add a peer to an identity file, creating it if needed""" data = _identity_read(file) or _identity_init(identity_name) if peer_name not in data['peers']: data['peers'].append(peer_name) data['devices'][peer_name] = { 'type': peer_type, 'index': int(index) } _identity_write(file, data) def identity_remove_peer(file, peer_name): """Remove a peer from an identity file""" data = _identity_read(file) if not data: return data['peers'] = [p for p in data['peers'] if p != peer_name] data['devices'].pop(peer_name, None) _identity_write(file, data) def identity_remove(file): """Delete an identity file — existence check done in bash""" try: os.remove(file) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def identity_next_index(file, peer_type): """ Return the next available index for a given type within an identity. phone-nuno exists (index 1) -> returns 2 No phones exist -> returns 1 """ data = _identity_read(file) if not data: print(1) return existing = [ d.get('index', 1) for d in data.get('devices', {}).values() if d.get('type') == peer_type ] if not existing: print(1) return # Find lowest unused index starting from 1 used = set(existing) i = 1 while i in used: i += 1 print(i) def identity_peers(file, filter_type=''): """ List peers belonging to an identity, optionally filtered by type. Output: one peer name per line. """ data = _identity_read(file) if not data: return for peer_name in data.get('peers', []): if filter_type: dev = data.get('devices', {}).get(peer_name, {}) if dev.get('type') != filter_type: continue print(peer_name) def identity_migrate(identities_dir, clients_dir, meta_dir, dry_run): """ Scan all peer configs and auto-create identity files from name convention. dry_run: 'true' -> print what would be done, no writes. Output per action: action|identity|peer|type|index """ import glob is_dry = dry_run == 'true' grouped = {} # identity_name -> [(peer_name, type, index)] for conf in sorted(glob.glob(f"{clients_dir}/*.conf")): peer_name = os.path.basename(conf).replace('.conf', '') parsed = _parse_peer_name(peer_name) if not parsed: print(f"skip|{peer_name}") continue peer_type, identity_name, index = parsed if identity_name not in grouped: grouped[identity_name] = [] grouped[identity_name].append((peer_name, peer_type, index)) for identity_name, peers in sorted(grouped.items()): id_file = os.path.join(identities_dir, f"{identity_name}.identity") for peer_name, peer_type, index in peers: print(f"create|{identity_name}|{peer_name}|{peer_type}|{index}") if not is_dry: identity_add_peer(id_file, identity_name, peer_name, peer_type, index) def identity_infer(peer_name): """ Parse a peer name and print identity|type|index, or nothing if no match. Used by add.command.sh to auto-attach on vanilla wgctl add. """ parsed = _parse_peer_name(peer_name) if parsed: peer_type, identity_name, index = parsed print(f"{identity_name}|{peer_type}|{index}") def identity_exists(file): """Exit 0 if identity file exists and is valid, else exit 1""" data = _identity_read(file) sys.exit(0 if data is not None else 1) # ====================================================== # Policy # ====================================================== def _policy_read(file): try: if not os.path.exists(file): return {} with open(file) as f: content = f.read().strip() if not content: return {} return json.loads(content) except Exception: return {} def _policy_write(file, data): """Write policies.json.""" os.makedirs(os.path.dirname(file), exist_ok=True) with open(file, 'w') as f: json.dump(data, f, indent=2) def _policy_get_entry(data, name): """Get a policy entry, falling back to 'default' policy values for missing fields.""" entry = data.get(name, {}) default = data.get('default', _POLICY_DEFAULTS.get('default', {})) # Merge: entry fields override default resolved = dict(default) resolved.update(entry) return resolved def policy_get(file, name, field=''): """ Get a policy entry or a specific field from it. policy_get(file, "guest") -> prints all fields as key|value lines policy_get(file, "guest", "tunnel_mode") -> prints "split" policy_get(file, "guest", "strict_rule") -> prints "true" or "false" """ data = _policy_read(file) entry = _policy_get_entry(data, name) if field: val = entry.get(field) if val is None: print('') elif isinstance(val, bool): print('true' if val else 'false') else: print(val) else: for k, v in entry.items(): if isinstance(v, bool): print(f"{k}|{'true' if v else 'false'}") elif v is None: print(f"{k}|") else: print(f"{k}|{v}") def policy_list(file): """ List all policies. Output per line: name|tunnel_mode|default_rule|strict_rule|auto_apply|desc """ data = _policy_read(file) for name, raw_entry in data.items(): entry = _policy_get_entry(data, name) tunnel_mode = entry.get('tunnel_mode', 'split') default_rule = entry.get('default_rule') or '' strict_rule = 'true' if entry.get('strict_rule', False) else 'false' auto_apply = 'true' if entry.get('auto_apply', True) else 'false' desc = entry.get('desc', '') print(f"{name}|{tunnel_mode}|{default_rule}|{strict_rule}|{auto_apply}|{desc}") def policy_exists(file, name): """Exit 0 if policy exists, 1 otherwise.""" data = _policy_read(file) sys.exit(0 if name in data else 1) def policy_add(file, name, tunnel_mode, default_rule, strict_rule, auto_apply, desc): """Add or update a policy entry.""" data = _policy_read(file) data[name] = { 'tunnel_mode': tunnel_mode or 'split', 'default_rule': default_rule if default_rule else None, 'strict_rule': strict_rule == 'true', 'auto_apply': auto_apply != 'false', 'desc': desc or '' } _policy_write(file, data) def policy_remove(file, name): """Remove a policy. Refuses to remove hardcoded defaults.""" if name in _POLICY_DEFAULTS: print(f"Error: Cannot remove built-in policy '{name}'", file=sys.stderr) sys.exit(1) data = _policy_read(file) if name not in data: print(f"Error: Policy '{name}' not found", file=sys.stderr) sys.exit(1) del data[name] _policy_write(file, data) def policy_set_field(file, name, field, value): """Set a single field on an existing policy.""" data = _policy_read(file) if name not in data: print(f"Error: Policy '{name}' not found", file=sys.stderr) sys.exit(1) entry = data[name] if field in ('strict_rule', 'auto_apply'): entry[field] = value == 'true' elif field == 'default_rule': entry[field] = value if value else None else: entry[field] = value data[name] = entry _policy_write(file, data) # ====================================================== # Hosts # ====================================================== def _hosts_read(file): if not os.path.exists(file): return {"hosts": {}, "subnets": {}, "ports": {}} try: with open(file) as f: data = json.load(f) # Ensure all sections exist data.setdefault("hosts", {}) data.setdefault("subnets", {}) data.setdefault("ports", {}) return data except Exception: return {"hosts": {}, "subnets": {}, "ports": {}} def _hosts_write(file, data): with open(file, 'w') as f: json.dump(data, f, indent=2) def hosts_list(file): """ List all host entries. Output per line: type|key|name|desc|tags """ data = _hosts_read(file) for ip, entry in sorted(data["hosts"].items()): if isinstance(entry, dict): name = entry.get("name", "") desc = entry.get("desc", "") tags = ",".join(entry.get("tags", [])) else: name = str(entry) desc = "" tags = "" print(f"host|{ip}|{name}|{desc}|{tags}") for subnet, entry in sorted(data["subnets"].items()): if isinstance(entry, dict): name = entry.get("name", "") desc = entry.get("desc", "") tags = ",".join(entry.get("tags", [])) else: name = str(entry) desc = "" tags = "" print(f"subnet|{subnet}|{name}|{desc}|{tags}") for port, entry in sorted(data["ports"].items()): if isinstance(entry, dict): name = entry.get("name", "") desc = entry.get("desc", "") tags = ",".join(entry.get("tags", [])) else: name = str(entry) desc = "" tags = "" print(f"port|{port}|{name}|{desc}|{tags}") def hosts_show(file, key, entry_type): """Show a single host entry. type: host|subnet|port""" data = _hosts_read(file) section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"} section = section_map.get(entry_type, "hosts") entry = data[section].get(key) if not entry: print(f"Error: not found: {key}", file=sys.stderr) sys.exit(1) if isinstance(entry, dict): print(f"name|{entry.get('name', '')}") print(f"desc|{entry.get('desc', '')}") print(f"tags|{','.join(entry.get('tags', []))}") else: print(f"name|{entry}") print(f"desc|") print(f"tags|") def hosts_add(file, entry_type, key, name, desc, tags): """ Add a host entry. entry_type: host|subnet|port key: IP, subnet CIDR, or port number """ data = _hosts_read(file) section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"} section = section_map.get(entry_type, "hosts") tag_list = [t.strip() for t in tags.split(",") if t.strip()] if tags else [] data[section][key] = { "name": name, "desc": desc, "tags": tag_list } _hosts_write(file, data) def hosts_remove(file, entry_type, key): """Remove a host entry.""" data = _hosts_read(file) section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"} section = section_map.get(entry_type, "hosts") if key not in data[section]: print(f"Error: not found: {key}", file=sys.stderr) sys.exit(1) del data[section][key] _hosts_write(file, data) def hosts_exists(file, entry_type, key): """Check if a host entry exists.""" data = _hosts_read(file) section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"} section = section_map.get(entry_type, "hosts") print("true" if key in data[section] else "false") def hosts_lookup(file, ip): """ Resolve an IP to a display name. Checks hosts section only (exact match). Returns name or empty string. """ data = _hosts_read(file) entry = data["hosts"].get(ip) if not entry: print("") return if isinstance(entry, dict): print(entry.get("name", ip)) else: print(str(entry)) # ====================================================== def _net_read(file): try: if not os.path.exists(file): return {} with open(file) as f: content = f.read().strip() if not content: return {} return json.loads(content) except Exception: return {} def _net_write(file, data): os.makedirs(os.path.dirname(file), exist_ok=True) with open(file, 'w') as f: json.dump(data, f, indent=2) def net_list(file): data = _net_read(file) for name, svc in sorted(data.items()): print(f"{name}|{svc.get('ip','')}|{svc.get('desc','')}|" f"{','.join(svc.get('tags',[]))}|{len(svc.get('ports',{}))}") def net_show(file, name): data = _net_read(file) if name not in data: print(f"Error: Service not found: {name}", file=sys.stderr); sys.exit(1) svc = data[name] print(f"name|{name}") print(f"ip|{svc.get('ip','')}") print(f"desc|{svc.get('desc','')}") print(f"tags|{','.join(svc.get('tags',[]))}") for port_name, port_def in svc.get('ports', {}).items(): print(f"port|{port_name}|{port_def.get('port','')}|" f"{port_def.get('proto','tcp')}|{port_def.get('desc','')}") def net_exists(file, name): data = _net_read(file) if ':' in name: svc_name, port_name = name.split(':', 1) if port_name == 'ports': print('true' if svc_name in data else 'false') else: svc = data.get(svc_name, {}) print('true' if port_name in svc.get('ports', {}) else 'false') else: print('true' if name in data else 'false') def net_add_service(file, name, ip, desc='', tags=''): data = _net_read(file) if name not in data: data[name] = {'ip': ip, 'ports': {}} else: data[name]['ip'] = ip if desc: data[name]['desc'] = desc if tags: data[name]['tags'] = [t.strip() for t in tags.split(',') if t.strip()] _net_write(file, data) def net_add_port(file, service, port_name, port, proto='tcp', desc=''): data = _net_read(file) if service not in data: print(f"Error: Service not found: {service}", file=sys.stderr); sys.exit(1) if 'ports' not in data[service]: data[service]['ports'] = {} entry = {'port': int(port), 'proto': proto} if desc: entry['desc'] = desc data[service]['ports'][port_name] = entry _net_write(file, data) def net_remove(file, name): data = _net_read(file) if ':' in name: svc_name, port_name = name.split(':', 1) if svc_name not in data: print(f"Error: Service not found: {svc_name}", file=sys.stderr); sys.exit(1) if port_name == 'ports': data[svc_name]['ports'] = {} else: if port_name not in data[svc_name].get('ports', {}): print(f"Error: Port not found: {port_name}", file=sys.stderr); sys.exit(1) del data[svc_name]['ports'][port_name] else: if name not in data: print(f"Error: Service not found: {name}", file=sys.stderr); sys.exit(1) del data[name] _net_write(file, data) def net_resolve(file, name): data = _net_read(file) if ':' in name: svc_name, port_name = name.split(':', 1) if svc_name not in data: print(f"Error: Service not found: {svc_name}", file=sys.stderr); sys.exit(1) svc = data[svc_name] ip = svc.get('ip', '') if port_name == 'ports': for pname, pdef in svc.get('ports', {}).items(): print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}") else: if port_name not in svc.get('ports', {}): print(f"Error: Port not found: {port_name}", file=sys.stderr); sys.exit(1) pdef = svc['ports'][port_name] print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}") else: if name not in data: print(f"Error: Service not found: {name}", file=sys.stderr); sys.exit(1) print(data[name].get('ip', '')) def net_reverse_lookup(file, ip, port='', proto=''): from lib.util import load_net_data, reverse_lookup as _rl net_data = _rl(load_net_data(file), ip, port, proto) if net_data: print(net_data) def group_list_data(groups_dir, blocks_dir, clients_dir): import glob blocked_peers = set() for f in glob.glob(f"{blocks_dir}/*.block"): blocked_peers.add(os.path.basename(f).replace('.block', '')) for group_file in sorted(glob.glob(f"{groups_dir}/*.group")): try: with open(group_file) as f: g = json.load(f) name = g.get('name', '') desc = g.get('desc', '') peers = [p for p in g.get('peers', []) if p] valid = [p for p in peers if os.path.exists(os.path.join(clients_dir, f"{p}.conf"))] total = len(valid) blocked = sum(1 for p in peers if p in blocked_peers) print(f"{name}|{desc}|{total}|{blocked}") except Exception: pass def create_group(file, name, desc): try: with open(file, 'w') as f: json.dump({'name': name, 'desc': desc, 'peers': []}, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr); sys.exit(1) def group_has_peer(file, peer_name): try: with open(file) as f: data = json.load(f) print('true' if peer_name in data.get('peers', []) else 'false') except Exception: print('false') # ── All block/subnet/identity/policy/hosts functions copied verbatim ──────── # (unchanged from original — omitting here for brevity but must be included # in the actual deployed file) def json_get_nested(file, *keys): try: with open(file) as f: data = json.load(f) val = data for key in keys: if not isinstance(val, dict): print(''); return val = val.get(key) if val is None: print(''); return if isinstance(val, bool): print('true' if val else 'false') elif val is None: print('') else: print(val) except Exception: print('') def json_set_nested(file, *args): if len(args) < 2: return keys, value = args[:-1], args[-1] try: data = {} if os.path.exists(file): with open(file) as f: data = json.load(f) target = data for key in keys[:-1]: if key not in target or not isinstance(target[key], dict): target[key] = {} target = target[key] final_key = keys[-1] if value == 'true': target[final_key] = True elif value == 'false': target[final_key] = False else: target[final_key] = value with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr); sys.exit(1) # ── Commands dispatch ──────────────────────────────────────────────────────── commands = { # JSON CRUD 'get': lambda args: get(args[0], args[1]), 'set': lambda args: set_key(args[0], args[1], args[2]), 'delete': lambda args: delete_key(args[0], args[1]), 'append': lambda args: append(args[0], args[1], args[2]), 'remove': lambda args: remove_value(args[0], args[1], args[2]), 'cat': lambda args: cat(args[0]), 'has_key': lambda args: has_key(args[0], args[1]), 'filter_values': lambda args: filter_values(args[0], args[1], args[2]), 'count': lambda args: count(args[0], args[1]), 'get_raw': lambda args: get_raw(args[0], args[1]), 'fmt_datetime': lambda args: fmt_datetime(args[0], args[1]), 'json_get_nested': lambda args: json_get_nested(args[0], *args[1:]), 'json_set_nested': lambda args: json_set_nested(args[0], *args[1:]), 'cleanup_config': lambda args: cleanup_config(args[0]), 'remove_peer_block':lambda args: remove_peer_block(args[0], args[1]), 'audit_fw_counts': lambda args: audit_fw_counts(args[0]), # Events (from lib.events) 'fw_events': lambda args: __import__('lib.events', fromlist=['fw_events']).fw_events( args[0], args[1], args[2], args[3], args[4], args[5] if len(args) > 5 else '50', args[6] if len(args) > 6 else '1', args[7] if len(args) > 7 else '', args[8] if len(args) > 8 else '', args[9] if len(args) > 9 else '', args[10] if len(args) > 10 else 'desc'), 'wg_events': lambda args: __import__('lib.events', fromlist=['wg_events']).wg_events( args[0], args[1], args[2], args[3] if len(args) > 3 else '50', args[4] if len(args) > 4 else '1', args[5] if len(args) > 5 else '', args[6] if len(args) > 6 else '', args[7] if len(args) > 7 else '', args[8] if len(args) > 8 else 'desc'), 'parse_event': lambda args: __import__('lib.events', fromlist=['parse_event']).parse_event(args[0]), 'parse_fw_event': lambda args: __import__('lib.events', fromlist=['parse_fw_event']).parse_fw_event(args[0]), 'format_fw_event': lambda args: __import__('lib.events', fromlist=['format_fw_event']).format_fw_event(args[0], args[1]), 'format_wg_event': lambda args: __import__('lib.events', fromlist=['format_wg_event']).format_wg_event(args[0]), 'remove_events': lambda args: __import__('lib.events', fromlist=['remove_events']).remove_events(args[0], args[1]), 'remove_events_filtered': lambda args: __import__('lib.events', fromlist=['remove_events_filtered']).remove_events_filtered( args[0], args[1], args[2], args[3], args[4] == 'true', args[5] == 'true', args[6] if len(args) > 6 else ''), 'follow_logs': lambda args: __import__('lib.events', fromlist=['follow_logs']).follow_logs( args[0], args[1], args[2], args[3], args[4], args[5] if len(args) > 5 else ''), 'last_event': lambda args: __import__('lib.events', fromlist=['last_event']).last_event(args[0], args[1], args[2], args[3]), 'events_for': lambda args: __import__('lib.events', fromlist=['events_for']).events_for(args[0], args[1], args[2]), 'iso_to_ts': lambda args: __import__('lib.events', fromlist=['iso_to_ts']).iso_to_ts(args[0]), # Peers (from lib.peers) 'peer_data': lambda args: __import__('lib.peers', fromlist=['peer_data']).peer_data(args[0], args[1], args[2]), 'peer_transfer': lambda args: __import__('lib.peers', fromlist=['peer_transfer']).peer_transfer(args[0]), 'peer_transfer_delta': lambda args: __import__('lib.peers', fromlist=['peer_transfer_delta']).peer_transfer_delta(args[0], args[1]), 'peer_group_map': lambda args: __import__('lib.peers', fromlist=['peer_group_map']).peer_group_map(args[0]), 'peer_groups': lambda args: __import__('lib.peers', fromlist=['peer_groups']).peer_groups(args[0], args[1]), # Activity (from lib.activity) 'activity_aggregate': lambda args: __import__('lib.activity', fromlist=['activity_aggregate']).activity_aggregate( args[0], args[1], args[2], args[3], args[4], args[5], args[6] if len(args) > 6 else '24', args[7] if len(args) > 7 else '', args[8] if len(args) > 8 else ''), # Rules 'rule_resolve': lambda args: rule_resolve(args[0], args[1]), 'rule_resolve_field':lambda args: rule_resolve_field(args[0], args[1], args[2]), 'rule_list_data': lambda args: rule_list_data(args[0], args[1]), 'rule_inspect': lambda args: rule_inspect(args[0], args[1]), 'count_resolved': lambda args: count_resolved(args[0], args[1], args[2]), 'create_rule': lambda args: create_rule(*args), 'find_rule_file': lambda args: print(find_rule_file(args[0], args[1])), # Net 'net_list': lambda args: net_list(args[0]), 'net_show': lambda args: net_show(args[0], args[1]), 'net_exists': lambda args: net_exists(args[0], args[1]), 'net_add_service': lambda args: net_add_service(args[0], args[1], args[2], args[3] if len(args) > 3 else '', args[4] if len(args) > 4 else ''), 'net_add_port': lambda args: net_add_port(args[0], args[1], args[2], args[3], args[4] if len(args) > 4 else 'tcp', args[5] if len(args) > 5 else ''), 'net_remove': lambda args: net_remove(args[0], args[1]), 'net_resolve': lambda args: net_resolve(args[0], args[1]), 'net_reverse_lookup':lambda args: net_reverse_lookup(args[0], args[1], args[2] if len(args) > 2 else '', args[3] if len(args) > 3 else ''), # Groups 'group_list_data': lambda args: group_list_data(args[0], args[1], args[2]), 'create_group': lambda args: create_group(args[0], args[1], args[2] if len(args) > 2 else ''), 'group_has_peer': lambda args: group_has_peer(args[0], args[1]), # Block 'block_get': lambda args: block_get(args[0]), 'block_is_blocked': lambda args: block_is_blocked(args[0]), 'block_set_direct': lambda args: block_set_direct(args[0], args[1], args[2]), 'block_add_group': lambda args: block_add_group(args[0], args[1], args[2]), 'block_remove_group': lambda args: block_remove_group(args[0], args[1], args[2]), 'block_add_rule': lambda args: block_add_rule( args[0], args[1], args[2], args[3] if len(args) > 3 else '', args[4] if len(args) > 4 else '', args[5] if len(args) > 5 else '', args[6] if len(args) > 6 else '' ), 'block_remove_rule': lambda args: block_remove_rule( args[0], args[1], args[2] if len(args) > 2 else '', args[3] if len(args) > 3 else '', args[4] if len(args) > 4 else '' ), 'block_get_rules': lambda args: block_get_rules(args[0]), 'block_get_groups': lambda args: block_get_groups(args[0]), 'block_get_direct': lambda args: block_get_direct(args[0]), 'block_is_empty': lambda args: block_is_empty(args[0]), # Subnet 'subnet_lookup': lambda args: subnet_lookup(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_type': lambda args: subnet_type(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_tunnel_mode': lambda args: subnet_tunnel_mode(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_for_ip': lambda args: subnet_for_ip(args[0], args[1]), 'subnet_list': lambda args: subnet_list(args[0]), 'subnet_show': lambda args: subnet_show(args[0], args[1]), 'subnet_add': lambda args: subnet_add( args[0], args[1], args[2], args[3], args[4] if len(args) > 4 else 'split', args[5] if len(args) > 5 else '', args[6] if len(args) > 6 else '' ), 'subnet_remove': lambda args: subnet_remove(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_rename': lambda args: subnet_rename(args[0], args[1], args[2], args[3] if len(args) > 3 else ''), 'subnet_peers': lambda args: subnet_peers(args[0], args[1], args[2], args[3]), 'subnet_exists': lambda args: subnet_exists(args[0], args[1]), 'subnet_default_rule': lambda args: subnet_default_rule(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_list_names': lambda args: subnet_list_names(args[0]), # Identity 'identity_list': lambda args: identity_list(args[0]), 'identity_show': lambda args: identity_show(args[0]), 'identity_add_peer': lambda args: identity_add_peer(args[0], args[1], args[2], args[3], args[4]), 'identity_remove_peer':lambda args: identity_remove_peer(args[0], args[1]), 'identity_remove': lambda args: identity_remove(args[0]), 'identity_next_index': lambda args: identity_next_index(args[0], args[1]), 'identity_peers': lambda args: identity_peers(args[0], args[1] if len(args) > 1 else ''), 'identity_migrate': lambda args: identity_migrate(args[0], args[1], args[2], args[3]), 'identity_infer': lambda args: identity_infer(args[0]), 'identity_exists': lambda args: identity_exists(args[0]), 'identity_rules': lambda args: identity_rules(args[0]), 'identity_add_rule': lambda args: identity_add_rule(args[0], args[1], args[2]), 'identity_remove_rule': lambda args: identity_remove_rule(args[0], args[1]), 'identity_clear_rules': lambda args: identity_clear_rules(args[0]), 'identity_has_rule': lambda args: identity_has_rule(args[0], args[1]), # Policy 'policy_get': lambda args: policy_get(args[0], args[1], args[2] if len(args) > 2 else ''), 'policy_list': lambda args: policy_list(args[0]), 'policy_exists': lambda args: policy_exists(args[0], args[1]), 'policy_add': lambda args: policy_add(args[0], args[1], args[2], args[3], args[4], args[5], args[6] if len(args) > 6 else ''), 'policy_remove': lambda args: policy_remove(args[0], args[1]), 'policy_set_field': lambda args: policy_set_field(args[0], args[1], args[2], args[3]), 'subnet_policy': lambda args: subnet_policy(args[0], args[1], args[2] if len(args) > 2 else ''), # Hosts 'hosts_list': lambda args: hosts_list(args[0]), 'hosts_show': lambda args: hosts_show(args[0], args[1], args[2]), 'hosts_add': lambda args: hosts_add(args[0], args[1], args[2], args[3], args[4] if len(args) > 4 else '', args[5] if len(args) > 5 else ''), 'hosts_remove': lambda args: hosts_remove(args[0], args[1], args[2]), 'hosts_exists': lambda args: hosts_exists(args[0], args[1], args[2]), 'hosts_lookup': lambda args: hosts_lookup(args[0], args[1]), } # ── Main ───────────────────────────────────────────────────────────────────── def main(): if len(sys.argv) < 2: print("Usage: json_helper.py [args...]", file=sys.stderr) sys.exit(1) cmd = sys.argv[1] args = sys.argv[2:] if cmd not in commands: print(f"Unknown command: {cmd}", file=sys.stderr) sys.exit(1) try: commands[cmd](args) except Exception as e: print(f"Error in {cmd}: {e}", file=sys.stderr) sys.exit(1) if __name__ == '__main__': main()