#!/usr/bin/env python3 """ wgctl JSON helper — thin dispatcher. Imports from core/lib/ modules and routes commands. """ import sys import os import json # Add lib/ directory to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'lib')) DATETIME_FMT = os.environ.get('WGCTL_DATETIME_FMT', '%Y-%m-%d %H:%M') _POLICY_DEFAULTS = { 'tunnel_mode': 'split', 'default_rule': None, 'strict_rule': False, 'auto_apply': True, } # ── Lazy imports (only load what's needed per command) ────────────────────── def _events(): from lib.events import ( fw_events, wg_events, parse_event, parse_fw_event, format_fw_event, format_wg_event, remove_events, remove_events_filtered, follow_logs, last_event, events_for, iso_to_ts, ) return locals() def _peers(): from lib.peers import ( peer_data, peer_transfer, peer_transfer_delta, peer_group_map, peer_groups, ) return locals() def _activity(): from lib.activity import activity_aggregate return locals() # ── Keep all non-lib functions inline (JSON CRUD, rules, net, etc.) ───────── # These are stable, small, and don't benefit from splitting yet. # Candidates for future lib/rules.py, lib/net.py etc. in a follow-up pass. def get(file, key): try: with open(file) as f: data = json.load(f) val = data.get(key, []) if isinstance(val, bool): print(str(val).lower()) elif isinstance(val, list): if val: print('\n'.join(str(v) for v in val)) else: if val: print(val) except Exception: sys.exit(0) def set_key(file, key, value): try: data = {} if os.path.exists(file): with open(file) as f: data = json.load(f) try: data[key] = json.loads(value) except Exception: data[key] = value with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def delete_key(file, key): try: with open(file) as f: data = json.load(f) data.pop(key, None) with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def append(file, key, value): try: data = {} if os.path.exists(file): with open(file) as f: data = json.load(f) if key not in data: data[key] = [] if value not in data[key]: data[key].append(value) with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def remove_value(file, key, value): try: with open(file) as f: data = json.load(f) if key in data and value in data[key]: data[key].remove(value) with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def cat(file): try: with open(file) as f: data = json.load(f) print(json.dumps(data, indent=2)) except Exception: sys.exit(1) def has_key(file, key): try: with open(file) as f: data = json.load(f) sys.exit(0 if key in data else 1) except Exception: sys.exit(1) def filter_values(file, key, value): try: with open(file) as f: data = json.load(f) data = {k: v for k, v in data.items() if v != value} with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def count(file, key): try: with open(file) as f: data = json.load(f) val = data.get(key, []) print(len(val) if isinstance(val, list) else 0) except Exception: print(0) def get_raw(file, key): try: with open(file) as f: data = json.load(f) val = data.get(key) if val is None: pass elif isinstance(val, bool): print(str(val).lower()) elif isinstance(val, list): for v in val: print(v) else: print(val) except Exception: pass def fmt_datetime(iso_str, fmt): try: from datetime import datetime dt = datetime.fromisoformat(iso_str) print(dt.strftime(fmt)) except Exception: print(iso_str) def cleanup_config(config_file): import re try: with open(config_file) as f: config = f.read() config = re.sub(r'\n{3,}', '\n\n', config) config = config.rstrip('\n') + '\n' with open(config_file, 'w') as f: f.write(config) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def remove_peer_block(config_file, name): import re try: with open(config_file) as f: config = f.read() pattern = r'\n\[Peer\]\n# ' + re.escape(name) + r'\n[^\n]+\n[^\n]+\n' result = re.sub(pattern, '\n', config) with open(config_file, 'w') as f: f.write(result) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def audit_fw_counts(clients_dir): import glob, subprocess, re try: result = subprocess.run( ['iptables', '-L', 'FORWARD', '-n', '-v'], capture_output=True, text=True ) fw_lines = result.stdout.splitlines() except Exception: fw_lines = [] rule_lines = [l for l in fw_lines if l.strip() and not l.startswith('Chain') and not l.startswith(' pkts')] for conf in sorted(glob.glob(f"{clients_dir}/*.conf")): name = os.path.basename(conf).replace('.conf', '') try: with open(conf) as f: ip = '' for line in f: if line.startswith('Address'): ip = line.split('=')[1].strip().split('/')[0] break if not ip: continue count_val = sum(1 for l in rule_lines if re.search(r'\s' + re.escape(ip) + r'\s', l)) print(f"{name}:{count_val}") except Exception: pass def clean_handshakes(file, threshold_sec): """Remove keepalive handshakes — keep only new session handshakes.""" threshold = int(threshold_sec) if threshold_sec else 300 last_ts = {} kept = [] removed = 0 try: with open(file) as f: for line in f: try: e = json.loads(line.strip()) if e.get('event') == 'handshake': client = e.get('client', '') ts_str = e.get('timestamp', '') from datetime import datetime, timezone dt = datetime.fromisoformat(ts_str) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) ts = dt.timestamp() last = last_ts.get(client, 0) if ts - last >= threshold: kept.append(line) last_ts[client] = ts else: removed += 1 continue except Exception: pass kept.append(line) with open(file, 'w') as f: f.writelines(kept) print(removed) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def batch_resolve(hosts_file, net_file, *ips): """ Resolve multiple IPs at once. Output: ip|resolved_name per line (resolved_name=ip if no match) """ from lib.util import load_hosts_data, load_net_data, hosts_lookup, reverse_lookup hosts_data = load_hosts_data(hosts_file) net_data = load_net_data(net_file) seen = set() for ip in ips: if not ip or ip in seen: continue seen.add(ip) name = hosts_lookup(hosts_data, ip) if not name: name = reverse_lookup(net_data, ip) print(f"{ip}|{name or ''}") # ── Rules (kept inline — candidates for lib/rules.py) ──────────────────── def find_rule_file(rules_dir, rule_name): for path in [ os.path.join(rules_dir, f"{rule_name}.rule"), os.path.join(rules_dir, "base", f"{rule_name}.rule"), ]: if os.path.exists(path): return path return "" def _rule_resolve_internal(rules_dir, rule_name, visited=None): if visited is None: visited = set() if rule_name in visited: raise ValueError(f"Circular dependency: {rule_name}") visited.add(rule_name) rule_file = find_rule_file(rules_dir, rule_name) with open(rule_file) as f: rule = json.load(f) merged = {'allow_ips': [], 'allow_ports': [], 'block_ips': [], 'block_ports': [], 'dns_redirect': False} for base_name in rule.get('extends', []): base = _rule_resolve_internal(rules_dir, base_name, visited.copy()) merged['allow_ips'] += base.get('allow_ips', []) merged['allow_ports'] += base.get('allow_ports', []) merged['block_ips'] += base.get('block_ips', []) merged['block_ports'] += base.get('block_ports', []) if base.get('dns_redirect'): merged['dns_redirect'] = True merged['allow_ips'] = list(dict.fromkeys(merged['allow_ips'] + rule.get('allow_ips', []))) merged['allow_ports'] = list(dict.fromkeys(merged['allow_ports'] + rule.get('allow_ports', []))) merged['block_ips'] = list(dict.fromkeys(merged['block_ips'] + rule.get('block_ips', []))) merged['block_ports'] = list(dict.fromkeys(merged['block_ports'] + rule.get('block_ports', []))) if rule.get('dns_redirect', False): merged['dns_redirect'] = True merged['name'] = rule.get('name', rule_name) merged['desc'] = rule.get('desc', '') merged['group'] = rule.get('group', '') merged['extends'] = rule.get('extends', []) return merged def rule_resolve(rules_dir, rule_name): try: print(json.dumps(_rule_resolve_internal(rules_dir, rule_name))) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def rule_resolve_field(rules_dir, rule_name, field): try: resolved = _rule_resolve_internal(rules_dir, rule_name) val = resolved.get(field, []) if isinstance(val, list): for v in val: print(v) else: print(val) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def count_resolved(rules_dir, rule_name, key): try: print(len(_rule_resolve_internal(rules_dir, rule_name).get(key, []))) except Exception: print(0) def rule_list_data(rules_dir, meta_dir): import glob rule_peer_counts = {} for f in glob.glob(f"{meta_dir}/*.meta"): try: with open(f) as mf: meta = json.load(mf) rule = meta.get('rule', '') if rule: rule_peer_counts[rule] = rule_peer_counts.get(rule, 0) + 1 except Exception: pass rule_files = (sorted(glob.glob(f"{rules_dir}/*.rule")) + sorted(glob.glob(f"{rules_dir}/base/*.rule"))) rules_data = [] for rule_file in rule_files: is_base = '/base/' in rule_file try: with open(rule_file) as f: r = json.load(f) name = r.get('name', '') resolved = _rule_resolve_internal(rules_dir, name) n_allows = (len(resolved.get('allow_ips', [])) + len(resolved.get('allow_ports', []))) n_blocks = (len(resolved.get('block_ips', [])) + len(resolved.get('block_ports', []))) rules_data.append({ 'name': name, 'desc': r.get('desc', ''), 'n_allows': n_allows, 'n_blocks': n_blocks, 'peer_count': rule_peer_counts.get(name, 0), 'extends': ','.join(r.get('extends', [])), 'is_base': is_base, 'group': r.get('group', '') }) except Exception: pass rules_data.sort(key=lambda x: ( x['is_base'], x['group'] == '' and not x['is_base'], x['group'], x['name'] )) for r in rules_data: print(f"{r['name']}|{r['desc']}|{r['n_allows']}|{r['n_blocks']}|" f"{r['peer_count']}|{r['extends']}|{r['is_base']}|{r['group']}") def rule_inspect(rules_dir, rule_name): try: rule_file = find_rule_file(rules_dir, rule_name) with open(rule_file) as f: rule = json.load(f) resolved = _rule_resolve_internal(rules_dir, rule_name) has_extends = bool(rule.get('extends', [])) for ip in rule.get('allow_ips', []): print(f"own|allow_ip|{ip}") for p in rule.get('allow_ports', []): print(f"own|allow_port|{p}") for ip in rule.get('block_ips', []): print(f"own|block_ip|{ip}") for p in rule.get('block_ports', []): print(f"own|block_port|{p}") if rule.get('dns_redirect'): print(f"dns|dns_redirect|true") if has_extends: for base_name in rule.get('extends', []): base = _rule_resolve_internal(rules_dir, base_name) for ip in base.get('allow_ips', []): print(f"inherited:{base_name}|allow_ip|{ip}") for p in base.get('allow_ports', []): print(f"inherited:{base_name}|allow_port|{p}") for ip in base.get('block_ips', []): print(f"inherited:{base_name}|block_ip|{ip}") for p in base.get('block_ports', []): print(f"inherited:{base_name}|block_port|{p}") if base.get('dns_redirect'): print(f"inherited:{base_name}|dns_redirect|true") has_resolved = any([resolved.get('allow_ips'), resolved.get('allow_ports'), resolved.get('block_ips'), resolved.get('block_ports'), resolved.get('dns_redirect')]) if has_resolved: for ip in resolved.get('allow_ips', []): print(f"resolved|allow_ip|{ip}") for p in resolved.get('allow_ports', []): print(f"resolved|allow_port|{p}") for ip in resolved.get('block_ips', []): print(f"resolved|block_ip|{ip}") for p in resolved.get('block_ports', []): print(f"resolved|block_port|{p}") if resolved.get('dns_redirect'): print(f"resolved|dns_redirect|true") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def create_rule(file, name, desc, dns_redirect, allow_ips, block_ips, block_ports, allow_ports='', extends='', group=''): rule = { 'name': name, 'desc': desc, 'group': group, 'dns_redirect': dns_redirect == 'true', 'extends': [x for x in extends.split(',') if x] if extends else [], 'allow_ips': [x for x in allow_ips.split(',') if x] if allow_ips else [], 'allow_ports': [x for x in allow_ports.split(',') if x] if allow_ports else [], 'block_ips': [x for x in block_ips.split(',') if x] if block_ips else [], 'block_ports': [x for x in block_ports.split(',') if x] if block_ports else [], } with open(file, 'w') as f: json.dump(rule, f, indent=2) # ── All remaining functions (net, hosts, blocks, subnet, identity, policy) ── # Kept verbatim from original json_helper.py — identical behaviour. # Future pass will move each section to lib/net.py, lib/hosts.py, etc. def _parse_peer_name(peer_name): """ Parse a peer name into (type, identity, index). phone-nuno -> ('phone', 'nuno', 1) phone-nuno-2 -> ('phone', 'nuno', 2) desktop-zephyr -> ('desktop', 'zephyr', 1) laptop-nuno -> ('laptop', 'nuno', 1) Returns None if name doesn't match convention. Convention: {type}-{identity}[-{index}] Known types: desktop, laptop, phone, tablet, server, iot, none """ known_types = {'desktop', 'laptop', 'phone', 'tablet', 'server', 'iot', 'none'} parts = peer_name.split('-') if len(parts) < 2: return None peer_type = parts[0] if peer_type not in known_types: return None # Check if last part is a numeric index if len(parts) >= 3 and parts[-1].isdigit(): index = int(parts[-1]) identity = '-'.join(parts[1:-1]) else: index = 1 identity = '-'.join(parts[1:]) if not identity: return None return (peer_type, identity, index) # ====================================================== # Blocks # ====================================================== def _block_init(peer_ip): """Return empty block structure""" return { "peer_ip": peer_ip, "blocked_direct": False, "blocked_by_groups": [], "rules": [] } def _block_read(file): try: with open(file) as f: content = f.read().strip() if not content: return None # empty file = no block data try: return json.loads(content) except json.JSONDecodeError: # Old format — migrate lines = content.split('\n') peer_ip = lines[0].split()[0] if lines else '' new_data = { "peer_ip": peer_ip, "blocked_direct": True, "blocked_by_groups": [], "rules": [{"name": "full block", "type": "full"}] } with open(file, 'w') as f: json.dump(new_data, f, indent=2) return new_data except FileNotFoundError: return None except Exception: return None def _block_write(file, data): """Write block file""" with open(file, 'w') as f: json.dump(data, f, indent=2) def block_get(file): """Read and print block file as JSON""" data = _block_read(file) if data: print(json.dumps(data)) def block_is_blocked(file): """Return true if peer is effectively blocked""" data = _block_read(file) if not data: print("false") return blocked = data.get("blocked_direct", False) or \ bool(data.get("blocked_by_groups", [])) print("true" if blocked else "false") def block_set_direct(file, peer_ip, value): """Set blocked_direct""" try: data = _block_read(file) or _block_init(peer_ip) data["blocked_direct"] = value.lower() == "true" data["peer_ip"] = peer_ip _block_write(file, data) remaining = data["blocked_direct"] or bool(data.get("blocked_by_groups", [])) pass # print("true" if remaining else "false") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def block_add_group(file, peer_ip, group): """Add group to blocked_by_groups""" try: data = _block_read(file) or _block_init(peer_ip) data["peer_ip"] = peer_ip groups = data.get("blocked_by_groups", []) if group not in groups: groups.append(group) data["blocked_by_groups"] = groups _block_write(file, data) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def block_remove_group(file, peer_ip, group): """Remove group from blocked_by_groups, return whether still blocked""" try: data = _block_read(file) or _block_init(peer_ip) groups = data.get("blocked_by_groups", []) if group in groups: groups.remove(group) data["blocked_by_groups"] = groups _block_write(file, data) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) # print("true" if remaining else "false") pass except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def block_add_rule(file, peer_ip, rule_type, name="", target="", port="", proto=""): """Add a block rule entry""" try: data = _block_read(file) or _block_init(peer_ip) data["peer_ip"] = peer_ip rule = {"type": rule_type} if name: rule["name"] = name if target: rule["target"] = target if port: rule["port"] = port if proto: rule["proto"] = proto rules = data.get("rules", []) for existing in rules: if existing.get("type") == rule_type and \ existing.get("target","") == target and \ existing.get("port","") == port and \ existing.get("proto","") == proto: return # already exists, skip rules.append(rule) data["rules"] = rules _block_write(file, data) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def block_remove_rule(file, rule_type, target="", port="", proto=""): data = _block_read(file) if not data: return rules = data.get("rules", []) filtered = [r for r in rules if not ( r.get("type") == rule_type and r.get("target", "") == target and r.get("port", "") == port and r.get("proto", "") == proto )] data["rules"] = filtered _block_write(file, data) def block_get_rules(file): """Print rules as pipe-separated lines: name|type|target|port|proto""" data = _block_read(file) if not data: return for r in data.get("rules", []): print(f"{r.get('name','')}|{r.get('type','')}|" f"{r.get('target','')}|{r.get('port','')}|{r.get('proto','')}") def block_get_groups(file): data = _block_read(file) if not data: return print(','.join(data.get('blocked_by_groups', []))) def block_get_direct(file): data = _block_read(file) if not data: print('false') return print('true' if data.get('blocked_direct', False) else 'false') def block_is_empty(file): data = _block_read(file) if not data: print("true") return empty = ( not data.get("blocked_direct", False) and not data.get("blocked_by_groups", []) and not data.get("rules", []) and not data.get("services", []) ) print("true" if empty else "false") # ====================================================== # Subnet # ====================================================== def _subnet_read(file): """Read subnets.json, return dict or empty dict""" try: if not os.path.exists(file): return {} with open(file) as f: content = f.read().strip() if not content: return {} return json.loads(content) except Exception: return {} def _subnet_write(file, data): """Write subnets.json""" os.makedirs(os.path.dirname(file), exist_ok=True) with open(file, 'w') as f: json.dump(data, f, indent=2) def _subnet_is_group(entry): """Return True if a subnet entry is a nested group (like 'guests')""" return isinstance(entry, dict) and 'subnet' not in entry def subnet_lookup(file, name, type_key=''): """ Resolve a subnet name (and optional type) to a CIDR string. For scalar entries: subnet_lookup(file, "desktop") -> "10.1.1.0/24" For group entries: subnet_lookup(file, "guests", "phone") -> "10.1.103.0/24" For group with no type: subnet_lookup(file, "guests") -> "10.1.100.0/24" (none slot) Prints the CIDR on success, nothing and exits 1 on failure. """ data = _subnet_read(file) if name not in data: sys.exit(1) entry = data[name] if _subnet_is_group(entry): key = type_key if type_key else 'none' if key not in entry: sys.exit(1) print(entry[key]['subnet']) else: print(entry['subnet']) def subnet_type(file, name, type_key=''): """ Return the type string for a subnet entry. For scalar: subnet_type(file, "desktop") -> "desktop" For group: subnet_type(file, "guests", "phone") -> "phone" subnet_type(file, "guests") -> "none" """ data = _subnet_read(file) if name not in data: sys.exit(1) entry = data[name] if _subnet_is_group(entry): key = type_key if type_key else 'none' if key not in entry: sys.exit(1) # For group entries the type IS the child key print(key if key != 'none' else 'none') else: print(entry.get('type', name)) def subnet_tunnel_mode(file, name, type_key=''): """Return tunnel_mode for a subnet entry""" data = _subnet_read(file) if name not in data: print('split') # safe default return entry = data[name] if _subnet_is_group(entry): key = type_key if type_key else 'none' child = entry.get(key, {}) print(child.get('tunnel_mode', 'split')) else: print(entry.get('tunnel_mode', 'split')) def subnet_for_ip(file, ip): """ Reverse lookup: given a peer IP, find which subnet name (and type) it belongs to. Output: name|type (e.g. "guests|phone" or "desktop|desktop") Returns nothing (exit 0) if not found — caller falls back to hardcoded map. """ import ipaddress try: peer_addr = ipaddress.ip_address(ip) except ValueError: return data = _subnet_read(file) for name, entry in data.items(): if _subnet_is_group(entry): for type_key, child in entry.items(): try: network = ipaddress.ip_network(child['subnet'], strict=False) if peer_addr in network: print(f"{name}|{type_key}") return except Exception: continue else: try: network = ipaddress.ip_network(entry['subnet'], strict=False) if peer_addr in network: peer_type = entry.get('type', name) print(f"{name}|{peer_type}") return except Exception: continue def subnet_list(file): """ List all subnets for display. Output per line: name|subnet|type|tunnel_mode|desc|is_group For group entries, outputs one line per child: name.type|subnet|type|tunnel_mode|desc|true """ data = _subnet_read(file) for name, entry in data.items(): if _subnet_is_group(entry): for type_key, child in entry.items(): display_name = f"{name}.{type_key}" if type_key != 'none' else name print(f"{display_name}|{child.get('subnet','')}|" f"{type_key}|{child.get('tunnel_mode','split')}|" f"{child.get('desc','')}|true|{name}") else: print(f"{name}|{entry.get('subnet','')}|" f"{entry.get('type',name)}|{entry.get('tunnel_mode','split')}|" f"{entry.get('desc','')}|false|{name}") def subnet_show(file, name): """Show a single subnet entry (scalar or group) in detail""" data = _subnet_read(file) if name not in data: print(f"Error: Subnet '{name}' not found", file=sys.stderr) sys.exit(1) entry = data[name] if _subnet_is_group(entry): print(f"name|{name}") print(f"is_group|true") for type_key, child in entry.items(): print(f"child|{type_key}|{child.get('subnet','')}|" f"{child.get('tunnel_mode','split')}|{child.get('desc','')}") else: print(f"name|{name}") print(f"is_group|false") print(f"subnet|{entry.get('subnet','')}") print(f"type|{entry.get('type',name)}") print(f"tunnel_mode|{entry.get('tunnel_mode','split')}") print(f"desc|{entry.get('desc','')}") def subnet_add(file, name, subnet, type_key, tunnel_mode, desc, group_parent=''): """ Add a new subnet entry. If group_parent is set, adds as a child under that group key. Otherwise adds as a scalar entry. """ data = _subnet_read(file) entry = { 'subnet': subnet, 'tunnel_mode': tunnel_mode or 'split', 'desc': desc or '' } if group_parent: # Adding a child to an existing group, or creating a new group if group_parent not in data: data[group_parent] = {} elif not _subnet_is_group(data[group_parent]): print(f"Error: '{group_parent}' exists but is not a group", file=sys.stderr) sys.exit(1) data[group_parent][type_key or 'none'] = entry else: # Scalar entry — type stored explicitly entry['type'] = type_key or name data[name] = entry _subnet_write(file, data) def subnet_remove(file, name, peers_using): """ Remove a subnet entry. peers_using is a comma-separated list of peer names currently using this subnet (passed in from bash after meta scan). If non-empty, refuses with error. """ if peers_using: peers = [p for p in peers_using.split(',') if p] if peers: print(f"Error: Subnet '{name}' is in use by: {', '.join(peers)}", file=sys.stderr) sys.exit(1) data = _subnet_read(file) if '.' in name: # Removing a group child: e.g. "guests.phone" parent, child_key = name.split('.', 1) if parent not in data or not _subnet_is_group(data[parent]): print(f"Error: Group '{parent}' not found", file=sys.stderr) sys.exit(1) if child_key not in data[parent]: print(f"Error: '{child_key}' not found in group '{parent}'", file=sys.stderr) sys.exit(1) del data[parent][child_key] if not data[parent]: del data[parent] # remove empty group else: if name not in data: print(f"Error: Subnet '{name}' not found", file=sys.stderr) sys.exit(1) del data[name] _subnet_write(file, data) def subnet_rename(file, old_name, new_name, peers_using): """ Rename a subnet entry. Hard refusal if any peers reference it. peers_using: comma-separated peer names from bash meta scan. """ if peers_using: peers = [p for p in peers_using.split(',') if p] if peers: print(f"Error: Cannot rename subnet '{old_name}' — in use by: {', '.join(peers)}", file=sys.stderr) sys.exit(1) data = _subnet_read(file) if old_name not in data: print(f"Error: Subnet '{old_name}' not found", file=sys.stderr) sys.exit(1) if new_name in data: print(f"Error: Subnet '{new_name}' already exists", file=sys.stderr) sys.exit(1) data[new_name] = data.pop(old_name) _subnet_write(file, data) def subnet_peers(meta_dir, clients_dir, subnet_name, subnets_file): """ Find all peers using a subnet. Two-pass check: 1. Meta field: peer has "subnet": subnet_name in their .meta file 2. IP fallback: peer's IP falls within the subnet's CIDR(s) (catches peers added before meta stored subnet explicitly) Output: one peer name per line. """ import glob import ipaddress # Resolve all CIDRs covered by this subnet name data = _subnet_read(subnets_file) cidrs = [] if subnet_name in data: entry = data[subnet_name] if _subnet_is_group(entry): for child in entry.values(): try: cidrs.append(ipaddress.ip_network(child['subnet'], strict=False)) except Exception: pass else: try: cidrs.append(ipaddress.ip_network(entry['subnet'], strict=False)) except Exception: pass printed = set() for conf in sorted(glob.glob(f"{clients_dir}/*.conf")): peer_name = os.path.basename(conf).replace('.conf', '') # Pass 1: check meta field meta_file = os.path.join(meta_dir, f"{peer_name}.meta") try: with open(meta_file) as f: meta = json.load(f) if meta.get('subnet', '') == subnet_name: if peer_name not in printed: print(peer_name) printed.add(peer_name) continue except Exception: pass # Pass 2: IP reverse lookup against subnet CIDRs if not cidrs: continue peer_ip = '' try: with open(conf) as f: for line in f: if line.startswith('Address'): peer_ip = line.split('=')[1].strip().split('/')[0] break except Exception: continue if not peer_ip: continue try: addr = ipaddress.ip_address(peer_ip) if any(addr in cidr for cidr in cidrs): if peer_name not in printed: print(peer_name) printed.add(peer_name) except Exception: continue def subnet_exists(file, name): """Check if a subnet name exists (scalar or group). Exits 0/1.""" data = _subnet_read(file) if '.' in name: parent, child_key = name.split('.', 1) exists = parent in data and _subnet_is_group(data[parent]) and child_key in data[parent] else: exists = name in data sys.exit(0 if exists else 1) def subnet_policy(subnets_file, subnet_name, type_key=''): """ Get the policy name for a subnet entry. Falls back to 'default' if no policy set. """ data = _subnet_read(subnets_file) if subnet_name not in data: print('default') return entry = data[subnet_name] if _subnet_is_group(entry): key = type_key if type_key else 'none' child = entry.get(key, {}) print(child.get('policy', 'default')) else: print(entry.get('policy', 'default')) def subnet_default_rule(file, name, type_key=''): """ Return the default_rule for a subnet entry, or empty string if none set. For scalar: subnet_default_rule(file, "desktop") -> "" For group: subnet_default_rule(file, "guests", "phone") -> "guest" """ data = _subnet_read(file) if name not in data: print('') return entry = data[name] if _subnet_is_group(entry): key = type_key if type_key else 'none' child = entry.get(key, {}) print(child.get('default_rule', '')) else: print(entry.get('default_rule', '')) def subnet_list_names(file): """ List all top-level subnet names, one per line. Used for dynamic flag registration in commands. Output: one name per line (e.g. desktop, laptop, guests, servers, iot) """ data = _subnet_read(file) for name in data.keys(): print(name) # ====================================================== # Identity # ====================================================== def identity_rules(file): """ Return all rules assigned to an identity, one per line. Reads from 'rules' array (1:N). Falls back to 'rule' scalar for migration. """ data = _identity_read(file) if not data: return # Support legacy scalar 'rule' field rules = data.get('rules', []) if not rules and data.get('rule'): rules = [data['rule']] for r in rules: if r: print(r) def identity_add_rule(file, identity_name, rule_name): """ Add a rule to an identity's rules array. Warns if already present (prints warning to stderr, exits 2). Creates identity file if it doesn't exist. """ data = _identity_read(file) or _identity_init(identity_name) rules = data.get('rules', []) # Migrate legacy scalar field if 'rule' in data and data['rule']: if data['rule'] not in rules: rules.append(data['rule']) del data['rule'] if rule_name in rules: print(f"Warning: Rule '{rule_name}' is already assigned to identity '{identity_name}'", file=sys.stderr) sys.exit(2) rules.append(rule_name) data['rules'] = rules _identity_write(file, data) def identity_remove_rule(file, rule_name): """ Remove a specific rule from an identity's rules array. Exits 1 if rule not found. """ data = _identity_read(file) if not data: print(f"Error: Identity not found", file=sys.stderr) sys.exit(1) rules = data.get('rules', []) if rule_name not in rules: print(f"Error: Rule '{rule_name}' not assigned to this identity", file=sys.stderr) sys.exit(1) rules.remove(rule_name) data['rules'] = rules _identity_write(file, data) def identity_clear_rules(file): """Remove all rules from an identity.""" data = _identity_read(file) if not data: return data['rules'] = [] data.pop('rule', None) # remove legacy scalar too _identity_write(file, data) def identity_has_rule(file, rule_name): """Exit 0 if identity has this rule, 1 otherwise.""" data = _identity_read(file) if not data: sys.exit(1) rules = data.get('rules', []) if not rules and data.get('rule'): rules = [data['rule']] sys.exit(0 if rule_name in rules else 1) def _identity_read(file): """Read an identity file, return dict or None""" try: if not os.path.exists(file): return None with open(file) as f: content = f.read().strip() if not content: return None return json.loads(content) except Exception: return None def _identity_write(file, data): """Write an identity file""" os.makedirs(os.path.dirname(file), exist_ok=True) with open(file, 'w') as f: json.dump(data, f, indent=2) def _identity_init(name): """Return empty identity structure""" return { 'name': name, 'peers': [], 'devices': {} } def identity_list(identities_dir): """ List all identities with peer count, rules and policy. Output per line: name|peer_count|types|rules|policy """ import glob for id_file in sorted(glob.glob(f"{identities_dir}/*.identity")): try: with open(id_file) as f: data = json.load(f) name = data.get('name', '') peers = data.get('peers', []) devices = data.get('devices', {}) rules = data.get('rules', []) # Migrate legacy scalar rule field if not rules and data.get('rule'): rules = [data['rule']] policy = data.get('policy', 'default') types = sorted(set( d.get('type', '') for d in devices.values() if d.get('type') )) print(f"{name}|{len(peers)}|{','.join(types)}|{','.join(rules)}|{policy}") except Exception: continue def identity_show(file): """Show identity details""" data = _identity_read(file) if not data: print("Error: Identity not found", file=sys.stderr) sys.exit(1) print(f"name|{data.get('name','')}") print(f"peer_count|{len(data.get('peers',[]))}") for peer_name, dev in data.get('devices', {}).items(): print(f"device|{peer_name}|{dev.get('type','')}|{dev.get('index',1)}") def identity_add_peer(file, identity_name, peer_name, peer_type, index): """Add a peer to an identity file, creating it if needed""" data = _identity_read(file) or _identity_init(identity_name) if peer_name not in data['peers']: data['peers'].append(peer_name) data['devices'][peer_name] = { 'type': peer_type, 'index': int(index) } _identity_write(file, data) def identity_remove_peer(file, peer_name): """Remove a peer from an identity file""" data = _identity_read(file) if not data: return data['peers'] = [p for p in data['peers'] if p != peer_name] data['devices'].pop(peer_name, None) _identity_write(file, data) def identity_remove(file): """Delete an identity file — existence check done in bash""" try: os.remove(file) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def identity_next_index(file, peer_type): """ Return the next available index for a given type within an identity. phone-nuno exists (index 1) -> returns 2 No phones exist -> returns 1 """ data = _identity_read(file) if not data: print(1) return existing = [ d.get('index', 1) for d in data.get('devices', {}).values() if d.get('type') == peer_type ] if not existing: print(1) return # Find lowest unused index starting from 1 used = set(existing) i = 1 while i in used: i += 1 print(i) def identity_peers(file, filter_type=''): """ List peers belonging to an identity, optionally filtered by type. Output: one peer name per line. """ data = _identity_read(file) if not data: return for peer_name in data.get('peers', []): if filter_type: dev = data.get('devices', {}).get(peer_name, {}) if dev.get('type') != filter_type: continue print(peer_name) def identity_migrate(identities_dir, clients_dir, meta_dir, dry_run): """ Scan all peer configs and auto-create identity files from name convention. dry_run: 'true' -> print what would be done, no writes. Output per action: action|identity|peer|type|index """ import glob is_dry = dry_run == 'true' grouped = {} # identity_name -> [(peer_name, type, index)] for conf in sorted(glob.glob(f"{clients_dir}/*.conf")): peer_name = os.path.basename(conf).replace('.conf', '') parsed = _parse_peer_name(peer_name) if not parsed: print(f"skip|{peer_name}") continue peer_type, identity_name, index = parsed if identity_name not in grouped: grouped[identity_name] = [] grouped[identity_name].append((peer_name, peer_type, index)) for identity_name, peers in sorted(grouped.items()): id_file = os.path.join(identities_dir, f"{identity_name}.identity") for peer_name, peer_type, index in peers: print(f"create|{identity_name}|{peer_name}|{peer_type}|{index}") if not is_dry: identity_add_peer(id_file, identity_name, peer_name, peer_type, index) def identity_infer(peer_name): """ Parse a peer name and print identity|type|index, or nothing if no match. Used by add.command.sh to auto-attach on vanilla wgctl add. """ parsed = _parse_peer_name(peer_name) if parsed: peer_type, identity_name, index = parsed print(f"{identity_name}|{peer_type}|{index}") def identity_exists(file): """Exit 0 if identity file exists and is valid, else exit 1""" data = _identity_read(file) sys.exit(0 if data is not None else 1) # ====================================================== # Policy # ====================================================== def _policy_read(file): try: if not os.path.exists(file): return {} with open(file) as f: content = f.read().strip() if not content: return {} return json.loads(content) except Exception: return {} def _policy_write(file, data): """Write policies.json.""" os.makedirs(os.path.dirname(file), exist_ok=True) with open(file, 'w') as f: json.dump(data, f, indent=2) def _policy_get_entry(data, name): """Get a policy entry, falling back to 'default' policy values for missing fields.""" entry = data.get(name, {}) default = data.get('default', _POLICY_DEFAULTS.get('default', {})) # Merge: entry fields override default resolved = dict(default) resolved.update(entry) return resolved def policy_get(file, name, field=''): """ Get a policy entry or a specific field from it. policy_get(file, "guest") -> prints all fields as key|value lines policy_get(file, "guest", "tunnel_mode") -> prints "split" policy_get(file, "guest", "strict_rule") -> prints "true" or "false" """ data = _policy_read(file) entry = _policy_get_entry(data, name) if field: val = entry.get(field) if val is None: print('') elif isinstance(val, bool): print('true' if val else 'false') else: print(val) else: for k, v in entry.items(): if isinstance(v, bool): print(f"{k}|{'true' if v else 'false'}") elif v is None: print(f"{k}|") else: print(f"{k}|{v}") def policy_list(file): """ List all policies. Output per line: name|tunnel_mode|default_rule|strict_rule|auto_apply|desc """ data = _policy_read(file) for name, raw_entry in data.items(): entry = _policy_get_entry(data, name) tunnel_mode = entry.get('tunnel_mode', 'split') default_rule = entry.get('default_rule') or '' strict_rule = 'true' if entry.get('strict_rule', False) else 'false' auto_apply = 'true' if entry.get('auto_apply', True) else 'false' desc = entry.get('desc', '') print(f"{name}|{tunnel_mode}|{default_rule}|{strict_rule}|{auto_apply}|{desc}") def policy_exists(file, name): """Exit 0 if policy exists, 1 otherwise.""" data = _policy_read(file) sys.exit(0 if name in data else 1) def policy_add(file, name, tunnel_mode, default_rule, strict_rule, auto_apply, desc): """Add or update a policy entry.""" data = _policy_read(file) data[name] = { 'tunnel_mode': tunnel_mode or 'split', 'default_rule': default_rule if default_rule else None, 'strict_rule': strict_rule == 'true', 'auto_apply': auto_apply != 'false', 'desc': desc or '' } _policy_write(file, data) def policy_remove(file, name): """Remove a policy. Refuses to remove hardcoded defaults.""" if name in _POLICY_DEFAULTS: print(f"Error: Cannot remove built-in policy '{name}'", file=sys.stderr) sys.exit(1) data = _policy_read(file) if name not in data: print(f"Error: Policy '{name}' not found", file=sys.stderr) sys.exit(1) del data[name] _policy_write(file, data) def policy_set_field(file, name, field, value): """Set a single field on an existing policy.""" data = _policy_read(file) if name not in data: print(f"Error: Policy '{name}' not found", file=sys.stderr) sys.exit(1) entry = data[name] if field in ('strict_rule', 'auto_apply'): entry[field] = value == 'true' elif field == 'default_rule': entry[field] = value if value else None else: entry[field] = value data[name] = entry _policy_write(file, data) # ====================================================== # Hosts # ====================================================== def _hosts_read(file): if not os.path.exists(file): return {"hosts": {}, "subnets": {}, "ports": {}} try: with open(file) as f: data = json.load(f) # Ensure all sections exist data.setdefault("hosts", {}) data.setdefault("subnets", {}) data.setdefault("ports", {}) return data except Exception: return {"hosts": {}, "subnets": {}, "ports": {}} def _hosts_write(file, data): with open(file, 'w') as f: json.dump(data, f, indent=2) def hosts_list(file): """ List all host entries. Output per line: type|key|name|desc|tags """ data = _hosts_read(file) for ip, entry in sorted(data["hosts"].items()): if isinstance(entry, dict): name = entry.get("name", "") desc = entry.get("desc", "") tags = ",".join(entry.get("tags", [])) else: name = str(entry) desc = "" tags = "" print(f"host|{ip}|{name}|{desc}|{tags}") for subnet, entry in sorted(data["subnets"].items()): if isinstance(entry, dict): name = entry.get("name", "") desc = entry.get("desc", "") tags = ",".join(entry.get("tags", [])) else: name = str(entry) desc = "" tags = "" print(f"subnet|{subnet}|{name}|{desc}|{tags}") for port, entry in sorted(data["ports"].items()): if isinstance(entry, dict): name = entry.get("name", "") desc = entry.get("desc", "") tags = ",".join(entry.get("tags", [])) else: name = str(entry) desc = "" tags = "" print(f"port|{port}|{name}|{desc}|{tags}") def hosts_show(file, key, entry_type): """Show a single host entry. type: host|subnet|port""" data = _hosts_read(file) section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"} section = section_map.get(entry_type, "hosts") entry = data[section].get(key) if not entry: print(f"Error: not found: {key}", file=sys.stderr) sys.exit(1) if isinstance(entry, dict): print(f"name|{entry.get('name', '')}") print(f"desc|{entry.get('desc', '')}") print(f"tags|{','.join(entry.get('tags', []))}") else: print(f"name|{entry}") print(f"desc|") print(f"tags|") def hosts_add(file, entry_type, key, name, desc, tags): """ Add a host entry. entry_type: host|subnet|port key: IP, subnet CIDR, or port number """ data = _hosts_read(file) section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"} section = section_map.get(entry_type, "hosts") tag_list = [t.strip() for t in tags.split(",") if t.strip()] if tags else [] data[section][key] = { "name": name, "desc": desc, "tags": tag_list } _hosts_write(file, data) def hosts_remove(file, entry_type, key): """Remove a host entry.""" data = _hosts_read(file) section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"} section = section_map.get(entry_type, "hosts") if key not in data[section]: print(f"Error: not found: {key}", file=sys.stderr) sys.exit(1) del data[section][key] _hosts_write(file, data) def hosts_exists(file, entry_type, key): """Check if a host entry exists.""" data = _hosts_read(file) section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"} section = section_map.get(entry_type, "hosts") print("true" if key in data[section] else "false") def hosts_lookup(file, ip): """ Resolve an IP to a display name. Checks hosts section only (exact match). Returns name or empty string. """ data = _hosts_read(file) entry = data["hosts"].get(ip) if not entry: print("") return if isinstance(entry, dict): print(entry.get("name", ip)) else: print(str(entry)) # ====================================================== # Peer History # ====================================================== def peer_history_lookup(history_dir, ip): """ Look up peer name for an endpoint IP using the index file. Falls back to scanning peer files if index doesn't exist. Returns peer name or empty string. """ import glob, os index_file = os.path.join(history_dir, "endpoint_index.json") try: if os.path.exists(index_file): with open(index_file) as f: index = json.load(f) result = index.get(ip, '') if result: print(result) return except Exception: pass # Fallback: scan peer files (rebuilds index implicitly) try: for hist_file in glob.glob(os.path.join(history_dir, "*.json")): if os.path.basename(hist_file) == "endpoint_index.json": continue try: with open(hist_file) as f: data = json.load(f) if ip in data.get("endpoints", {}): print(data.get("peer", "")) return except Exception: pass except Exception: pass def config_load(file): """ Load wgctl.json and output KEY=value pairs for all config fields. Automatically handles nested sections — add fields to JSON, they appear here. """ import os if not os.path.exists(file): return try: with open(file) as f: d = json.load(f) def emit(key, val): if val is not None and val != '': print(f"{key}={val}") wg = d.get('wireguard', {}) dns = d.get('dns', {}) hs = d.get('handshake', {}) act = d.get('activity', {}) dis = d.get('display', {}) emit('WG_INTERFACE', wg.get('interface')) emit('WG_ENDPOINT', wg.get('endpoint')) emit('WG_PORT', wg.get('port')) emit('WG_SUBNET', wg.get('subnet')) emit('WG_LAN', wg.get('lan')) emit('WG_DNS', dns.get('primary')) # fallback: join array to comma-separated string fb = dns.get('fallback', []) if fb: emit('WG_DNS_FALLBACK', ', '.join(str(x) for x in fb)) emit('WG_HANDSHAKE_CHECK_TIME_SEC', hs.get('check_interval_sec')) emit('DATE_FORMAT', dis.get('date_format')) atot = act.get('total', {}) acur = act.get('current', {}) emit('ACTIVITY_TOTAL_LOW_BYTES', atot.get('low')) emit('ACTIVITY_TOTAL_MED_BYTES', atot.get('medium')) emit('ACTIVITY_TOTAL_HIGH_BYTES', atot.get('high')) emit('ACTIVITY_CURRENT_LOW_BYTES', acur.get('low')) emit('ACTIVITY_CURRENT_MED_BYTES', acur.get('medium')) emit('ACTIVITY_CURRENT_HIGH_BYTES', acur.get('high')) # Command defaults and aliases # Output format: # CMD_DEFAULT:activity=--exclude-service pihole:dns-udp --limit 50 # CMD_ALIAS:act=activity # CMD_ALIAS:a=activity cmds = d.get('commands', {}) for cmd_name, cmd_cfg in cmds.items(): if not isinstance(cmd_cfg, dict): continue defaults = cmd_cfg.get('defaults', []) if defaults: print(f"CMD_DEFAULT:{cmd_name}={' '.join(str(x) for x in defaults)}") aliases = cmd_cfg.get('aliases', []) for alias in aliases: print(f"CMD_ALIAS:{alias}={cmd_name}") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def display_load(file): """ Load display.json and output view=style pairs. Output: view_name=style per line """ try: with open(file) as f: d = json.load(f) views = d.get('views', {}) for view_name, view_config in views.items(): style = view_config.get('style', 'compact') print(f"{view_name}={style}") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def _export_peer_data(name, clients_dir, meta_dir, identities_dir, groups_dir, blocks_dir): """Build peer data dict for export.""" import glob, base64, os conf_path = os.path.join(clients_dir, f"{name}.conf") if not os.path.exists(conf_path): return None with open(conf_path, 'rb') as f: conf_b64 = base64.b64encode(f.read()).decode() # Public key key_path = os.path.join(clients_dir, f"{name}_public.key") public_key = open(key_path).read().strip() if os.path.exists(key_path) else '' # IP from conf ip = '' with open(conf_path) as f: for line in f: if line.startswith('Address'): ip = line.split('=')[1].strip().split('/')[0] break # Meta meta = {} meta_path = os.path.join(meta_dir, f"{name}.meta") if os.path.exists(meta_path): try: with open(meta_path) as f: meta = json.load(f) except Exception: pass direct_rule = meta.get('rule', '') # Identity + type — scan identity files once for both identity = '' peer_type = meta.get('type', '') for id_file in sorted(glob.glob(os.path.join(identities_dir, '*.identity'))): try: with open(id_file) as f: id_data = json.load(f) if name in id_data.get('peers', []): identity = id_data.get('name', '') if not peer_type: peer_type = id_data.get('devices', {}).get(name, {}).get('type', '') break except Exception: pass # Groups peer_groups = [] for grp_file in sorted(glob.glob(os.path.join(groups_dir, '*.group'))): try: with open(grp_file) as f: g = json.load(f) if name in g.get('peers', []): peer_groups.append(g.get('name', '')) except Exception: pass # Blocks blocks = {'is_blocked': False, 'block_file': None} block_path = os.path.join(blocks_dir, f"{name}.block") if os.path.exists(block_path): with open(block_path, 'rb') as f: blocks = { 'is_blocked': True, 'block_file': base64.b64encode(f.read()).decode() } return { 'name': name, 'ip': ip, 'type': peer_type, 'public_key': public_key, 'conf': conf_b64, 'meta': meta, 'identity': identity, 'groups': peer_groups, 'direct_rule': direct_rule, 'blocks': blocks } def export_full(clients_dir, meta_dir, rules_dir, identities_dir, groups_dir, blocks_dir, block_history_dir, config_file, policies_file, subnets_file, net_file, hosts_file, no_config, no_peers, version): """Build full wgctl export as JSON.""" import glob, os from datetime import datetime, timezone data = {} # Config if no_config != 'true' and os.path.exists(config_file): try: with open(config_file) as f: data['config'] = json.load(f) except Exception: data['config'] = {} # Peers if no_peers != 'true': peers = [] for conf_path in sorted(glob.glob(f"{clients_dir}/*.conf")): name = os.path.basename(conf_path).replace('.conf', '') try: peer = _export_peer_data( name, clients_dir, meta_dir, identities_dir, groups_dir, blocks_dir) if peer: peers.append(peer) except Exception: pass data['peers'] = peers # Rules rules = [] for rule_file in sorted( glob.glob(f"{rules_dir}/*.rule") + glob.glob(f"{rules_dir}/base/*.rule") ): try: with open(rule_file) as f: rules.append(json.load(f)) except Exception: pass data['rules'] = rules # Identities identities = [] for id_file in sorted(glob.glob(f"{identities_dir}/*.identity")): try: with open(id_file) as f: identities.append(json.load(f)) except Exception: pass data['identities'] = identities # Groups groups = [] for grp_file in sorted(glob.glob(f"{groups_dir}/*.group")): try: with open(grp_file) as f: groups.append(json.load(f)) except Exception: pass data['groups'] = groups # Block history block_histories = [] for path in sorted(glob.glob(f"{block_history_dir}/*.json")): try: with open(path) as f: block_histories.append(json.load(f)) except Exception: pass data['block_history'] = block_histories # Flat JSON files for key, path in [ ('policies', policies_file), ('subnets', subnets_file), ('services', net_file), ('hosts', hosts_file), ]: if os.path.exists(path): try: with open(path) as f: data[key] = json.load(f) except Exception: pass ts = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') result = { 'wgctl_version': version, 'export_type': 'full', 'exported_at': ts, 'data': data, } print(json.dumps(result)) def endpoint_cache_get(cache_file, peer): """Get cached endpoint IP for a peer.""" try: with open(cache_file) as f: cache = json.load(f) print(cache.get(peer, '')) except Exception: print('') def batch_resolve_dest(net_file, hosts_file, *dest_specs): """ Resolve multiple ip:port:proto specs at once. Input: "ip:port:proto" strings Output: "ip:port:proto|display_name" per line Uses same logic as resolve::dest bash function. """ from lib.util import load_net_data, load_hosts_data, reverse_lookup, hosts_lookup net_data = load_net_data(net_file) hosts_data = load_hosts_data(hosts_file) seen = set() for spec in dest_specs: if not spec or spec in seen: continue seen.add(spec) parts = spec.split(':') if len(parts) < 3: print(f"{spec}|{spec}") continue ip = parts[0] port = parts[1] proto = parts[2] # Try service name first svc = reverse_lookup(net_data, ip, port, proto) if svc and svc != ip: if port: display = f"{svc}:{proto}-{port}" if False else f"{svc}" # Use same format as resolve::dest: "svcname/proto" or "svcname:port" display = svc else: display = svc print(f"{spec}|{display}") continue # Try host name host = hosts_lookup(hosts_data, ip) if host and host != ip: if port: print(f"{spec}|{host}:{port}/{proto}") else: print(f"{spec}|{host}") continue # Raw fallback if port: print(f"{spec}|{ip}:{port}/{proto}") else: print(f"{spec}|{ip}") # ====================================================== def _net_read(file): try: if not os.path.exists(file): return {} with open(file) as f: content = f.read().strip() if not content: return {} return json.loads(content) except Exception: return {} def _net_write(file, data): os.makedirs(os.path.dirname(file), exist_ok=True) with open(file, 'w') as f: json.dump(data, f, indent=2) def net_list(file): data = _net_read(file) for name, svc in sorted(data.items()): print(f"{name}|{svc.get('ip','')}|{svc.get('desc','')}|" f"{','.join(svc.get('tags',[]))}|{len(svc.get('ports',{}))}") def net_show(file, name): data = _net_read(file) if name not in data: print(f"Error: Service not found: {name}", file=sys.stderr); sys.exit(1) svc = data[name] print(f"name|{name}") print(f"ip|{svc.get('ip','')}") print(f"desc|{svc.get('desc','')}") print(f"tags|{','.join(svc.get('tags',[]))}") for port_name, port_def in svc.get('ports', {}).items(): print(f"port|{port_name}|{port_def.get('port','')}|" f"{port_def.get('proto','tcp')}|{port_def.get('desc','')}") def net_exists(file, name): data = _net_read(file) if ':' in name: svc_name, port_name = name.split(':', 1) if port_name == 'ports': print('true' if svc_name in data else 'false') else: svc = data.get(svc_name, {}) print('true' if port_name in svc.get('ports', {}) else 'false') else: print('true' if name in data else 'false') def net_add_service(file, name, ip, desc='', tags=''): data = _net_read(file) if name not in data: data[name] = {'ip': ip, 'ports': {}} else: data[name]['ip'] = ip if desc: data[name]['desc'] = desc if tags: data[name]['tags'] = [t.strip() for t in tags.split(',') if t.strip()] _net_write(file, data) def net_add_port(file, service, port_name, port, proto='tcp', desc=''): data = _net_read(file) if service not in data: print(f"Error: Service not found: {service}", file=sys.stderr); sys.exit(1) if 'ports' not in data[service]: data[service]['ports'] = {} entry = {'port': int(port), 'proto': proto} if desc: entry['desc'] = desc data[service]['ports'][port_name] = entry _net_write(file, data) def net_remove(file, name): data = _net_read(file) if ':' in name: svc_name, port_name = name.split(':', 1) if svc_name not in data: print(f"Error: Service not found: {svc_name}", file=sys.stderr); sys.exit(1) if port_name == 'ports': data[svc_name]['ports'] = {} else: if port_name not in data[svc_name].get('ports', {}): print(f"Error: Port not found: {port_name}", file=sys.stderr); sys.exit(1) del data[svc_name]['ports'][port_name] else: if name not in data: print(f"Error: Service not found: {name}", file=sys.stderr); sys.exit(1) del data[name] _net_write(file, data) def net_resolve(file, name): data = _net_read(file) if ':' in name: svc_name, port_name = name.split(':', 1) if svc_name not in data: print(f"Error: Service not found: {svc_name}", file=sys.stderr); sys.exit(1) svc = data[svc_name] ip = svc.get('ip', '') if port_name == 'ports': for pname, pdef in svc.get('ports', {}).items(): print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}") else: if port_name not in svc.get('ports', {}): print(f"Error: Port not found: {port_name}", file=sys.stderr); sys.exit(1) pdef = svc['ports'][port_name] print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}") else: if name not in data: print(f"Error: Service not found: {name}", file=sys.stderr); sys.exit(1) print(data[name].get('ip', '')) def net_reverse_lookup(file, ip, port='', proto=''): from lib.util import load_net_data, reverse_lookup as _rl net_data = _rl(load_net_data(file), ip, port, proto) if net_data: print(net_data) def group_list_data(groups_dir, blocks_dir, clients_dir): import glob blocked_peers = set() for f in glob.glob(f"{blocks_dir}/*.block"): blocked_peers.add(os.path.basename(f).replace('.block', '')) for group_file in sorted(glob.glob(f"{groups_dir}/*.group")): try: with open(group_file) as f: g = json.load(f) name = g.get('name', '') desc = g.get('desc', '') peers = [p for p in g.get('peers', []) if p] valid = [p for p in peers if os.path.exists(os.path.join(clients_dir, f"{p}.conf"))] total = len(valid) blocked = sum(1 for p in peers if p in blocked_peers) print(f"{name}|{desc}|{total}|{blocked}") except Exception: pass def create_group(file, name, desc): try: with open(file, 'w') as f: json.dump({'name': name, 'desc': desc, 'peers': []}, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr); sys.exit(1) def group_has_peer(file, peer_name): try: with open(file) as f: data = json.load(f) print('true' if peer_name in data.get('peers', []) else 'false') except Exception: print('false') # ── All block/subnet/identity/policy/hosts functions copied verbatim ──────── # (unchanged from original — omitting here for brevity but must be included # in the actual deployed file) def json_get_nested(file, *keys): try: with open(file) as f: data = json.load(f) val = data for key in keys: if not isinstance(val, dict): print(''); return val = val.get(key) if val is None: print(''); return if isinstance(val, bool): print('true' if val else 'false') elif val is None: print('') else: print(val) except Exception: print('') def json_set_nested(file, *args): if len(args) < 2: return keys, value = args[:-1], args[-1] try: data = {} if os.path.exists(file): with open(file) as f: data = json.load(f) target = data for key in keys[:-1]: if key not in target or not isinstance(target[key], dict): target[key] = {} target = target[key] final_key = keys[-1] if value == 'true': target[final_key] = True elif value == 'false': target[final_key] = False else: target[final_key] = value with open(file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error: {e}", file=sys.stderr); sys.exit(1) # ── Commands dispatch ──────────────────────────────────────────────────────── commands = { # JSON CRUD 'get': lambda args: get(args[0], args[1]), 'set': lambda args: set_key(args[0], args[1], args[2]), 'delete': lambda args: delete_key(args[0], args[1]), 'append': lambda args: append(args[0], args[1], args[2]), 'remove': lambda args: remove_value(args[0], args[1], args[2]), 'cat': lambda args: cat(args[0]), 'has_key': lambda args: has_key(args[0], args[1]), 'filter_values': lambda args: filter_values(args[0], args[1], args[2]), 'count': lambda args: count(args[0], args[1]), 'get_raw': lambda args: get_raw(args[0], args[1]), 'fmt_datetime': lambda args: fmt_datetime(args[0], args[1]), 'json_get_nested': lambda args: json_get_nested(args[0], *args[1:]), 'json_set_nested': lambda args: json_set_nested(args[0], *args[1:]), 'cleanup_config': lambda args: cleanup_config(args[0]), 'remove_peer_block':lambda args: remove_peer_block(args[0], args[1]), 'audit_fw_counts': lambda args: audit_fw_counts(args[0]), # Events (from lib.events) 'fw_events': lambda args: __import__('lib.events', fromlist=['fw_events']).fw_events( args[0], args[1], args[2], args[3], args[4], args[5] if len(args) > 5 else '50', args[6] if len(args) > 6 else '1', args[7] if len(args) > 7 else '', args[8] if len(args) > 8 else '', args[9] if len(args) > 9 else '', args[10] if len(args) > 10 else 'desc', args[11] if len(args) > 11 else ''), 'wg_events': lambda args: __import__('lib.events', fromlist=['wg_events']).wg_events( args[0], args[1], args[2], args[3] if len(args) > 3 else '50', args[4] if len(args) > 4 else '1', args[5] if len(args) > 5 else '', args[6] if len(args) > 6 else '', args[7] if len(args) > 7 else '', args[8] if len(args) > 8 else 'desc'), 'parse_event': lambda args: __import__('lib.events', fromlist=['parse_event']).parse_event(args[0]), 'parse_fw_event': lambda args: __import__('lib.events', fromlist=['parse_fw_event']).parse_fw_event(args[0]), 'format_fw_event': lambda args: __import__('lib.events', fromlist=['format_fw_event']).format_fw_event(args[0], args[1]), 'format_wg_event': lambda args: __import__('lib.events', fromlist=['format_wg_event']).format_wg_event(args[0]), 'remove_events': lambda args: __import__('lib.events', fromlist=['remove_events']).remove_events(args[0], args[1]), 'remove_events_filtered': lambda args: __import__('lib.events', fromlist=['remove_events_filtered']).remove_events_filtered( args[0], args[1], args[2], args[3], args[4] == 'true', args[5] == 'true', args[6] if len(args) > 6 else ''), 'follow_logs': lambda args: __import__('lib.events', fromlist=['follow_logs']).follow_logs( args[0], args[1], args[2], args[3], args[4], args[5] if len(args) > 5 else ''), 'last_event': lambda args: __import__('lib.events', fromlist=['last_event']).last_event(args[0], args[1], args[2], args[3]), 'events_for': lambda args: __import__('lib.events', fromlist=['events_for']).events_for(args[0], args[1], args[2]), 'iso_to_ts': lambda args: __import__('lib.events', fromlist=['iso_to_ts']).iso_to_ts(args[0]), # Peers (from lib.peers) 'peer_data': lambda args: __import__('lib.peers', fromlist=['peer_data']).peer_data(args[0], args[1], args[2]), 'peer_transfer': lambda args: __import__('lib.peers', fromlist=['peer_transfer']).peer_transfer(args[0]), 'peer_transfer_delta': lambda args: __import__('lib.peers', fromlist=['peer_transfer_delta']).peer_transfer_delta(args[0], args[1]), 'peer_group_map': lambda args: __import__('lib.peers', fromlist=['peer_group_map']).peer_group_map(args[0]), 'peer_groups': lambda args: __import__('lib.peers', fromlist=['peer_groups']).peer_groups(args[0], args[1]), # Activity (from lib.activity) 'activity_aggregate': lambda args: __import__('lib.activity', fromlist=['activity_aggregate']).activity_aggregate( args[0], args[1], args[2], args[3], args[4], args[5], args[6] if len(args) > 6 else '24', args[7] if len(args) > 7 else '', args[8] if len(args) > 8 else ''), # Rules 'rule_resolve': lambda args: rule_resolve(args[0], args[1]), 'rule_resolve_field':lambda args: rule_resolve_field(args[0], args[1], args[2]), 'rule_list_data': lambda args: rule_list_data(args[0], args[1]), 'rule_inspect': lambda args: rule_inspect(args[0], args[1]), 'count_resolved': lambda args: count_resolved(args[0], args[1], args[2]), 'create_rule': lambda args: create_rule(*args), 'find_rule_file': lambda args: print(find_rule_file(args[0], args[1])), # Net 'net_list': lambda args: net_list(args[0]), 'net_show': lambda args: net_show(args[0], args[1]), 'net_exists': lambda args: net_exists(args[0], args[1]), 'net_add_service': lambda args: net_add_service(args[0], args[1], args[2], args[3] if len(args) > 3 else '', args[4] if len(args) > 4 else ''), 'net_add_port': lambda args: net_add_port(args[0], args[1], args[2], args[3], args[4] if len(args) > 4 else 'tcp', args[5] if len(args) > 5 else ''), 'net_remove': lambda args: net_remove(args[0], args[1]), 'net_resolve': lambda args: net_resolve(args[0], args[1]), 'net_reverse_lookup':lambda args: net_reverse_lookup(args[0], args[1], args[2] if len(args) > 2 else '', args[3] if len(args) > 3 else ''), # Groups 'group_list_data': lambda args: group_list_data(args[0], args[1], args[2]), 'create_group': lambda args: create_group(args[0], args[1], args[2] if len(args) > 2 else ''), 'group_has_peer': lambda args: group_has_peer(args[0], args[1]), # Block 'block_get': lambda args: block_get(args[0]), 'block_is_blocked': lambda args: block_is_blocked(args[0]), 'block_set_direct': lambda args: block_set_direct(args[0], args[1], args[2]), 'block_add_group': lambda args: block_add_group(args[0], args[1], args[2]), 'block_remove_group': lambda args: block_remove_group(args[0], args[1], args[2]), 'block_add_rule': lambda args: block_add_rule( args[0], args[1], args[2], args[3] if len(args) > 3 else '', args[4] if len(args) > 4 else '', args[5] if len(args) > 5 else '', args[6] if len(args) > 6 else '' ), 'block_remove_rule': lambda args: block_remove_rule( args[0], args[1], args[2] if len(args) > 2 else '', args[3] if len(args) > 3 else '', args[4] if len(args) > 4 else '' ), 'block_get_rules': lambda args: block_get_rules(args[0]), 'block_get_groups': lambda args: block_get_groups(args[0]), 'block_get_direct': lambda args: block_get_direct(args[0]), 'block_is_empty': lambda args: block_is_empty(args[0]), # Subnet 'subnet_lookup': lambda args: subnet_lookup(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_type': lambda args: subnet_type(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_tunnel_mode': lambda args: subnet_tunnel_mode(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_for_ip': lambda args: subnet_for_ip(args[0], args[1]), 'subnet_list': lambda args: subnet_list(args[0]), 'subnet_show': lambda args: subnet_show(args[0], args[1]), 'subnet_add': lambda args: subnet_add( args[0], args[1], args[2], args[3], args[4] if len(args) > 4 else 'split', args[5] if len(args) > 5 else '', args[6] if len(args) > 6 else '' ), 'subnet_remove': lambda args: subnet_remove(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_rename': lambda args: subnet_rename(args[0], args[1], args[2], args[3] if len(args) > 3 else ''), 'subnet_peers': lambda args: subnet_peers(args[0], args[1], args[2], args[3]), 'subnet_exists': lambda args: subnet_exists(args[0], args[1]), 'subnet_default_rule': lambda args: subnet_default_rule(args[0], args[1], args[2] if len(args) > 2 else ''), 'subnet_list_names': lambda args: subnet_list_names(args[0]), # Identity 'identity_list': lambda args: identity_list(args[0]), 'identity_show': lambda args: identity_show(args[0]), 'identity_add_peer': lambda args: identity_add_peer(args[0], args[1], args[2], args[3], args[4]), 'identity_remove_peer':lambda args: identity_remove_peer(args[0], args[1]), 'identity_remove': lambda args: identity_remove(args[0]), 'identity_next_index': lambda args: identity_next_index(args[0], args[1]), 'identity_peers': lambda args: identity_peers(args[0], args[1] if len(args) > 1 else ''), 'identity_migrate': lambda args: identity_migrate(args[0], args[1], args[2], args[3]), 'identity_infer': lambda args: identity_infer(args[0]), 'identity_exists': lambda args: identity_exists(args[0]), 'identity_rules': lambda args: identity_rules(args[0]), 'identity_add_rule': lambda args: identity_add_rule(args[0], args[1], args[2]), 'identity_remove_rule': lambda args: identity_remove_rule(args[0], args[1]), 'identity_clear_rules': lambda args: identity_clear_rules(args[0]), 'identity_has_rule': lambda args: identity_has_rule(args[0], args[1]), # Policy 'policy_get': lambda args: policy_get(args[0], args[1], args[2] if len(args) > 2 else ''), 'policy_list': lambda args: policy_list(args[0]), 'policy_exists': lambda args: policy_exists(args[0], args[1]), 'policy_add': lambda args: policy_add(args[0], args[1], args[2], args[3], args[4], args[5], args[6] if len(args) > 6 else ''), 'policy_remove': lambda args: policy_remove(args[0], args[1]), 'policy_set_field': lambda args: policy_set_field(args[0], args[1], args[2], args[3]), 'subnet_policy': lambda args: subnet_policy(args[0], args[1], args[2] if len(args) > 2 else ''), # Hosts 'hosts_list': lambda args: hosts_list(args[0]), 'hosts_show': lambda args: hosts_show(args[0], args[1], args[2]), 'hosts_add': lambda args: hosts_add(args[0], args[1], args[2], args[3], args[4] if len(args) > 4 else '', args[5] if len(args) > 5 else ''), 'hosts_remove': lambda args: hosts_remove(args[0], args[1], args[2]), 'hosts_exists': lambda args: hosts_exists(args[0], args[1], args[2]), 'hosts_lookup': lambda args: hosts_lookup(args[0], args[1]), 'clean_handshakes': lambda args: clean_handshakes(args[0], args[1] if len(args) > 1 else '300'), 'batch_resolve': lambda args: batch_resolve(args[0], args[1], *args[2:]), 'peer_history_lookup': lambda args: peer_history_lookup(args[0], args[1]), 'config_load': lambda args: config_load(args[0]), 'display_load': lambda args: display_load(args[0]), 'export_full': lambda args: export_full( args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11], args[12] if len(args) > 12 else 'false', args[13] if len(args) > 13 else 'false', args[14] if len(args) > 14 else 'unknown'), # Import 'import_peer': lambda args: __import__('lib.importer', fromlist=['import_peer']).import_peer( args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7] if len(args) > 7 else 'false'), 'import_identity': lambda args: __import__('lib.importer', fromlist=['import_identity']).import_identity( args[0], args[1], args[2], args[3], args[4] if len(args) > 4 else 'false'), 'import_full': lambda args: __import__('lib.importer', fromlist=['import_full']).import_full( args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8], args[9], args[10], args[11] if len(args) > 11 else 'false'), 'import_get_field': lambda args: __import__('lib.importer', fromlist=['import_get_field']).import_get_field(args[0], *args[1:]), 'block_history_record': lambda args: __import__('lib.block_history', fromlist=['block_history_record']).block_history_record( args[0], args[1], args[2], args[3], args[4] if len(args) > 4 else '', args[5] if len(args) > 5 else ''), 'block_history_unblock': lambda args: __import__('lib.block_history', fromlist=['block_history_unblock']).block_history_unblock( args[0], args[1], args[2], args[3] if len(args) > 3 else ''), 'block_history_list': lambda args: __import__('lib.block_history', fromlist=['block_history_list']).block_history_list(args[0], args[1]), 'block_history_list_all': lambda args: __import__('lib.block_history', fromlist=['block_history_list_all']).block_history_list_all(args[0]), 'endpoint_cache_get': lambda args: endpoint_cache_get(args[0], args[1]), 'accept_events': lambda args: __import__('lib.accept_events', fromlist=['accept_events']).accept_events( args[0], args[1], args[2], args[3], args[4] if len(args) > 4 else '100', args[5] if len(args) > 5 else '1', args[6] if len(args) > 6 else '', args[7] if len(args) > 7 else '0', args[8] if len(args) > 8 else 'desc'), 'accept_aggregate': lambda args: __import__('lib.accept_events', fromlist=['accept_aggregate']).accept_aggregate( args[0], args[1], args[2], args[3] if len(args) > 3 else '', args[4] if len(args) > 4 else '', args[5] if len(args) > 5 else '0'), 'batch_resolve_dest': lambda args: batch_resolve_dest(args[0], args[1], *args[2:]), } # ── Main ───────────────────────────────────────────────────────────────────── def main(): if len(sys.argv) < 2: print("Usage: json_helper.py [args...]", file=sys.stderr) sys.exit(1) cmd = sys.argv[1] args = sys.argv[2:] if cmd not in commands: print(f"Unknown command: {cmd}", file=sys.stderr) sys.exit(1) try: commands[cmd](args) except Exception as e: print(f"Error in {cmd}: {e}", file=sys.stderr) sys.exit(1) if __name__ == '__main__': main()