wgctl/core/json_helper.py
2026-05-25 14:07:26 +00:00

1857 lines
No EOL
68 KiB
Python

#!/usr/bin/env python3
"""
wgctl JSON helper — thin dispatcher.
Imports from core/lib/ modules and routes commands.
"""
import sys
import os
import json
# Add lib/ directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'lib'))
DATETIME_FMT = os.environ.get('WGCTL_DATETIME_FMT', '%Y-%m-%d %H:%M')
# ── Lazy imports (only load what's needed per command) ──────────────────────
def _events():
from lib.events import (
fw_events, wg_events, parse_event, parse_fw_event,
format_fw_event, format_wg_event, remove_events,
remove_events_filtered, follow_logs, last_event,
events_for, iso_to_ts,
)
return locals()
def _peers():
from lib.peers import (
peer_data, peer_transfer, peer_transfer_delta,
peer_group_map, peer_groups,
)
return locals()
def _activity():
from lib.activity import activity_aggregate
return locals()
# ── Keep all non-lib functions inline (JSON CRUD, rules, net, etc.) ─────────
# These are stable, small, and don't benefit from splitting yet.
# Candidates for future lib/rules.py, lib/net.py etc. in a follow-up pass.
def get(file, key):
try:
with open(file) as f:
data = json.load(f)
val = data.get(key, [])
if isinstance(val, bool):
print(str(val).lower())
elif isinstance(val, list):
if val:
print('\n'.join(str(v) for v in val))
else:
if val:
print(val)
except Exception:
sys.exit(0)
def set_key(file, key, value):
try:
data = {}
if os.path.exists(file):
with open(file) as f:
data = json.load(f)
try:
data[key] = json.loads(value)
except Exception:
data[key] = value
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def delete_key(file, key):
try:
with open(file) as f:
data = json.load(f)
data.pop(key, None)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def append(file, key, value):
try:
data = {}
if os.path.exists(file):
with open(file) as f:
data = json.load(f)
if key not in data:
data[key] = []
if value not in data[key]:
data[key].append(value)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def remove_value(file, key, value):
try:
with open(file) as f:
data = json.load(f)
if key in data and value in data[key]:
data[key].remove(value)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def cat(file):
try:
with open(file) as f:
data = json.load(f)
print(json.dumps(data, indent=2))
except Exception:
sys.exit(1)
def has_key(file, key):
try:
with open(file) as f:
data = json.load(f)
sys.exit(0 if key in data else 1)
except Exception:
sys.exit(1)
def filter_values(file, key, value):
try:
with open(file) as f:
data = json.load(f)
data = {k: v for k, v in data.items() if v != value}
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def count(file, key):
try:
with open(file) as f:
data = json.load(f)
val = data.get(key, [])
print(len(val) if isinstance(val, list) else 0)
except Exception:
print(0)
def get_raw(file, key):
try:
with open(file) as f:
data = json.load(f)
val = data.get(key)
if val is None:
pass
elif isinstance(val, bool):
print(str(val).lower())
elif isinstance(val, list):
for v in val:
print(v)
else:
print(val)
except Exception:
pass
def fmt_datetime(iso_str, fmt):
try:
from datetime import datetime
dt = datetime.fromisoformat(iso_str)
print(dt.strftime(fmt))
except Exception:
print(iso_str)
def cleanup_config(config_file):
import re
try:
with open(config_file) as f:
config = f.read()
config = re.sub(r'\n{3,}', '\n\n', config)
config = config.rstrip('\n') + '\n'
with open(config_file, 'w') as f:
f.write(config)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def remove_peer_block(config_file, name):
import re
try:
with open(config_file) as f:
config = f.read()
pattern = r'\n\[Peer\]\n# ' + re.escape(name) + r'\n[^\n]+\n[^\n]+\n'
result = re.sub(pattern, '\n', config)
with open(config_file, 'w') as f:
f.write(result)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def audit_fw_counts(clients_dir):
import glob, subprocess, re
try:
result = subprocess.run(
['iptables', '-L', 'FORWARD', '-n', '-v'],
capture_output=True, text=True
)
fw_lines = result.stdout.splitlines()
except Exception:
fw_lines = []
rule_lines = [l for l in fw_lines
if l.strip() and not l.startswith('Chain')
and not l.startswith(' pkts')]
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
name = os.path.basename(conf).replace('.conf', '')
try:
with open(conf) as f:
ip = ''
for line in f:
if line.startswith('Address'):
ip = line.split('=')[1].strip().split('/')[0]
break
if not ip:
continue
count_val = sum(1 for l in rule_lines
if re.search(r'\s' + re.escape(ip) + r'\s', l))
print(f"{name}:{count_val}")
except Exception:
pass
# ── Rules (kept inline — candidates for lib/rules.py) ────────────────────
def find_rule_file(rules_dir, rule_name):
for path in [
os.path.join(rules_dir, f"{rule_name}.rule"),
os.path.join(rules_dir, "base", f"{rule_name}.rule"),
]:
if os.path.exists(path):
return path
return ""
def _rule_resolve_internal(rules_dir, rule_name, visited=None):
if visited is None:
visited = set()
if rule_name in visited:
raise ValueError(f"Circular dependency: {rule_name}")
visited.add(rule_name)
rule_file = find_rule_file(rules_dir, rule_name)
with open(rule_file) as f:
rule = json.load(f)
merged = {'allow_ips': [], 'allow_ports': [], 'block_ips': [],
'block_ports': [], 'dns_redirect': False}
for base_name in rule.get('extends', []):
base = _rule_resolve_internal(rules_dir, base_name, visited.copy())
merged['allow_ips'] += base.get('allow_ips', [])
merged['allow_ports'] += base.get('allow_ports', [])
merged['block_ips'] += base.get('block_ips', [])
merged['block_ports'] += base.get('block_ports', [])
if base.get('dns_redirect'):
merged['dns_redirect'] = True
merged['allow_ips'] = list(dict.fromkeys(merged['allow_ips'] + rule.get('allow_ips', [])))
merged['allow_ports'] = list(dict.fromkeys(merged['allow_ports'] + rule.get('allow_ports', [])))
merged['block_ips'] = list(dict.fromkeys(merged['block_ips'] + rule.get('block_ips', [])))
merged['block_ports'] = list(dict.fromkeys(merged['block_ports'] + rule.get('block_ports', [])))
if rule.get('dns_redirect', False):
merged['dns_redirect'] = True
merged['name'] = rule.get('name', rule_name)
merged['desc'] = rule.get('desc', '')
merged['group'] = rule.get('group', '')
merged['extends'] = rule.get('extends', [])
return merged
def rule_resolve(rules_dir, rule_name):
try:
print(json.dumps(_rule_resolve_internal(rules_dir, rule_name)))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def rule_resolve_field(rules_dir, rule_name, field):
try:
resolved = _rule_resolve_internal(rules_dir, rule_name)
val = resolved.get(field, [])
if isinstance(val, list):
for v in val:
print(v)
else:
print(val)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def count_resolved(rules_dir, rule_name, key):
try:
print(len(_rule_resolve_internal(rules_dir, rule_name).get(key, [])))
except Exception:
print(0)
def rule_list_data(rules_dir, meta_dir):
import glob
rule_peer_counts = {}
for f in glob.glob(f"{meta_dir}/*.meta"):
try:
with open(f) as mf:
meta = json.load(mf)
rule = meta.get('rule', '')
if rule:
rule_peer_counts[rule] = rule_peer_counts.get(rule, 0) + 1
except Exception:
pass
rule_files = (sorted(glob.glob(f"{rules_dir}/*.rule")) +
sorted(glob.glob(f"{rules_dir}/base/*.rule")))
rules_data = []
for rule_file in rule_files:
is_base = '/base/' in rule_file
try:
with open(rule_file) as f:
r = json.load(f)
name = r.get('name', '')
resolved = _rule_resolve_internal(rules_dir, name)
n_allows = (len(resolved.get('allow_ips', [])) +
len(resolved.get('allow_ports', [])))
n_blocks = (len(resolved.get('block_ips', [])) +
len(resolved.get('block_ports', [])))
rules_data.append({
'name': name, 'desc': r.get('desc', ''),
'n_allows': n_allows, 'n_blocks': n_blocks,
'peer_count': rule_peer_counts.get(name, 0),
'extends': ','.join(r.get('extends', [])),
'is_base': is_base, 'group': r.get('group', '')
})
except Exception:
pass
rules_data.sort(key=lambda x: (
x['is_base'], x['group'] == '' and not x['is_base'],
x['group'], x['name']
))
for r in rules_data:
print(f"{r['name']}|{r['desc']}|{r['n_allows']}|{r['n_blocks']}|"
f"{r['peer_count']}|{r['extends']}|{r['is_base']}|{r['group']}")
def rule_inspect(rules_dir, rule_name):
try:
rule_file = find_rule_file(rules_dir, rule_name)
with open(rule_file) as f:
rule = json.load(f)
resolved = _rule_resolve_internal(rules_dir, rule_name)
has_extends = bool(rule.get('extends', []))
for ip in rule.get('allow_ips', []): print(f"own|allow_ip|{ip}")
for p in rule.get('allow_ports', []): print(f"own|allow_port|{p}")
for ip in rule.get('block_ips', []): print(f"own|block_ip|{ip}")
for p in rule.get('block_ports', []): print(f"own|block_port|{p}")
if rule.get('dns_redirect'): print(f"dns|dns_redirect|true")
if has_extends:
for base_name in rule.get('extends', []):
base = _rule_resolve_internal(rules_dir, base_name)
for ip in base.get('allow_ips', []): print(f"inherited:{base_name}|allow_ip|{ip}")
for p in base.get('allow_ports', []): print(f"inherited:{base_name}|allow_port|{p}")
for ip in base.get('block_ips', []): print(f"inherited:{base_name}|block_ip|{ip}")
for p in base.get('block_ports', []): print(f"inherited:{base_name}|block_port|{p}")
if base.get('dns_redirect'): print(f"inherited:{base_name}|dns_redirect|true")
has_resolved = any([resolved.get('allow_ips'), resolved.get('allow_ports'),
resolved.get('block_ips'), resolved.get('block_ports'),
resolved.get('dns_redirect')])
if has_resolved:
for ip in resolved.get('allow_ips', []): print(f"resolved|allow_ip|{ip}")
for p in resolved.get('allow_ports', []): print(f"resolved|allow_port|{p}")
for ip in resolved.get('block_ips', []): print(f"resolved|block_ip|{ip}")
for p in resolved.get('block_ports', []): print(f"resolved|block_port|{p}")
if resolved.get('dns_redirect'): print(f"resolved|dns_redirect|true")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def create_rule(file, name, desc, dns_redirect, allow_ips, block_ips,
block_ports, allow_ports='', extends='', group=''):
rule = {
'name': name, 'desc': desc, 'group': group,
'dns_redirect': dns_redirect == 'true',
'extends': [x for x in extends.split(',') if x] if extends else [],
'allow_ips': [x for x in allow_ips.split(',') if x] if allow_ips else [],
'allow_ports': [x for x in allow_ports.split(',') if x] if allow_ports else [],
'block_ips': [x for x in block_ips.split(',') if x] if block_ips else [],
'block_ports': [x for x in block_ports.split(',') if x] if block_ports else [],
}
with open(file, 'w') as f:
json.dump(rule, f, indent=2)
# ── All remaining functions (net, hosts, blocks, subnet, identity, policy) ──
# Kept verbatim from original json_helper.py — identical behaviour.
# Future pass will move each section to lib/net.py, lib/hosts.py, etc.
def _parse_peer_name(peer_name):
"""
Parse a peer name into (type, identity, index).
phone-nuno -> ('phone', 'nuno', 1)
phone-nuno-2 -> ('phone', 'nuno', 2)
desktop-zephyr -> ('desktop', 'zephyr', 1)
laptop-nuno -> ('laptop', 'nuno', 1)
Returns None if name doesn't match convention.
Convention: {type}-{identity}[-{index}]
Known types: desktop, laptop, phone, tablet, server, iot, none
"""
known_types = {'desktop', 'laptop', 'phone', 'tablet', 'server', 'iot', 'none'}
parts = peer_name.split('-')
if len(parts) < 2:
return None
peer_type = parts[0]
if peer_type not in known_types:
return None
# Check if last part is a numeric index
if len(parts) >= 3 and parts[-1].isdigit():
index = int(parts[-1])
identity = '-'.join(parts[1:-1])
else:
index = 1
identity = '-'.join(parts[1:])
if not identity:
return None
return (peer_type, identity, index)
# ======================================================
# Blocks
# ======================================================
def _block_init(peer_ip):
"""Return empty block structure"""
return {
"peer_ip": peer_ip,
"blocked_direct": False,
"blocked_by_groups": [],
"rules": []
}
def _block_read(file):
try:
with open(file) as f:
content = f.read().strip()
if not content:
return None # empty file = no block data
try:
return json.loads(content)
except json.JSONDecodeError:
# Old format — migrate
lines = content.split('\n')
peer_ip = lines[0].split()[0] if lines else ''
new_data = {
"peer_ip": peer_ip,
"blocked_direct": True,
"blocked_by_groups": [],
"rules": [{"name": "full block", "type": "full"}]
}
with open(file, 'w') as f:
json.dump(new_data, f, indent=2)
return new_data
except FileNotFoundError:
return None
except Exception:
return None
def _block_write(file, data):
"""Write block file"""
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def block_get(file):
"""Read and print block file as JSON"""
data = _block_read(file)
if data:
print(json.dumps(data))
def block_is_blocked(file):
"""Return true if peer is effectively blocked"""
data = _block_read(file)
if not data:
print("false")
return
blocked = data.get("blocked_direct", False) or \
bool(data.get("blocked_by_groups", []))
print("true" if blocked else "false")
def block_set_direct(file, peer_ip, value):
"""Set blocked_direct"""
try:
data = _block_read(file) or _block_init(peer_ip)
data["blocked_direct"] = value.lower() == "true"
data["peer_ip"] = peer_ip
_block_write(file, data)
remaining = data["blocked_direct"] or bool(data.get("blocked_by_groups", []))
pass
# print("true" if remaining else "false")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_add_group(file, peer_ip, group):
"""Add group to blocked_by_groups"""
try:
data = _block_read(file) or _block_init(peer_ip)
data["peer_ip"] = peer_ip
groups = data.get("blocked_by_groups", [])
if group not in groups:
groups.append(group)
data["blocked_by_groups"] = groups
_block_write(file, data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_remove_group(file, peer_ip, group):
"""Remove group from blocked_by_groups, return whether still blocked"""
try:
data = _block_read(file) or _block_init(peer_ip)
groups = data.get("blocked_by_groups", [])
if group in groups:
groups.remove(group)
data["blocked_by_groups"] = groups
_block_write(file, data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# print("true" if remaining else "false")
pass
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_add_rule(file, peer_ip, rule_type, name="", target="",
port="", proto=""):
"""Add a block rule entry"""
try:
data = _block_read(file) or _block_init(peer_ip)
data["peer_ip"] = peer_ip
rule = {"type": rule_type}
if name: rule["name"] = name
if target: rule["target"] = target
if port: rule["port"] = port
if proto: rule["proto"] = proto
rules = data.get("rules", [])
for existing in rules:
if existing.get("type") == rule_type and \
existing.get("target","") == target and \
existing.get("port","") == port and \
existing.get("proto","") == proto:
return # already exists, skip
rules.append(rule)
data["rules"] = rules
_block_write(file, data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_remove_rule(file, rule_type, target="", port="", proto=""):
data = _block_read(file)
if not data:
return
rules = data.get("rules", [])
filtered = [r for r in rules if not (
r.get("type") == rule_type and
r.get("target", "") == target and
r.get("port", "") == port and
r.get("proto", "") == proto
)]
data["rules"] = filtered
_block_write(file, data)
def block_get_rules(file):
"""Print rules as pipe-separated lines: name|type|target|port|proto"""
data = _block_read(file)
if not data:
return
for r in data.get("rules", []):
print(f"{r.get('name','')}|{r.get('type','')}|"
f"{r.get('target','')}|{r.get('port','')}|{r.get('proto','')}")
def block_get_groups(file):
data = _block_read(file)
if not data:
return
print(','.join(data.get('blocked_by_groups', [])))
def block_get_direct(file):
data = _block_read(file)
if not data:
print('false')
return
print('true' if data.get('blocked_direct', False) else 'false')
def block_is_empty(file):
data = _block_read(file)
if not data:
print("true")
return
empty = (
not data.get("blocked_direct", False) and
not data.get("blocked_by_groups", []) and
not data.get("rules", []) and
not data.get("services", [])
)
print("true" if empty else "false")
# ======================================================
# Subnet
# ======================================================
def _subnet_read(file):
"""Read subnets.json, return dict or empty dict"""
try:
if not os.path.exists(file):
return {}
with open(file) as f:
content = f.read().strip()
if not content:
return {}
return json.loads(content)
except Exception:
return {}
def _subnet_write(file, data):
"""Write subnets.json"""
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def _subnet_is_group(entry):
"""Return True if a subnet entry is a nested group (like 'guests')"""
return isinstance(entry, dict) and 'subnet' not in entry
def subnet_lookup(file, name, type_key=''):
"""
Resolve a subnet name (and optional type) to a CIDR string.
For scalar entries: subnet_lookup(file, "desktop") -> "10.1.1.0/24"
For group entries: subnet_lookup(file, "guests", "phone") -> "10.1.103.0/24"
For group with no type: subnet_lookup(file, "guests") -> "10.1.100.0/24" (none slot)
Prints the CIDR on success, nothing and exits 1 on failure.
"""
data = _subnet_read(file)
if name not in data:
sys.exit(1)
entry = data[name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
if key not in entry:
sys.exit(1)
print(entry[key]['subnet'])
else:
print(entry['subnet'])
def subnet_type(file, name, type_key=''):
"""
Return the type string for a subnet entry.
For scalar: subnet_type(file, "desktop") -> "desktop"
For group: subnet_type(file, "guests", "phone") -> "phone"
subnet_type(file, "guests") -> "none"
"""
data = _subnet_read(file)
if name not in data:
sys.exit(1)
entry = data[name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
if key not in entry:
sys.exit(1)
# For group entries the type IS the child key
print(key if key != 'none' else 'none')
else:
print(entry.get('type', name))
def subnet_tunnel_mode(file, name, type_key=''):
"""Return tunnel_mode for a subnet entry"""
data = _subnet_read(file)
if name not in data:
print('split') # safe default
return
entry = data[name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
child = entry.get(key, {})
print(child.get('tunnel_mode', 'split'))
else:
print(entry.get('tunnel_mode', 'split'))
def subnet_for_ip(file, ip):
"""
Reverse lookup: given a peer IP, find which subnet name (and type) it belongs to.
Output: name|type (e.g. "guests|phone" or "desktop|desktop")
Returns nothing (exit 0) if not found — caller falls back to hardcoded map.
"""
import ipaddress
try:
peer_addr = ipaddress.ip_address(ip)
except ValueError:
return
data = _subnet_read(file)
for name, entry in data.items():
if _subnet_is_group(entry):
for type_key, child in entry.items():
try:
network = ipaddress.ip_network(child['subnet'], strict=False)
if peer_addr in network:
print(f"{name}|{type_key}")
return
except Exception:
continue
else:
try:
network = ipaddress.ip_network(entry['subnet'], strict=False)
if peer_addr in network:
peer_type = entry.get('type', name)
print(f"{name}|{peer_type}")
return
except Exception:
continue
def subnet_list(file):
"""
List all subnets for display.
Output per line: name|subnet|type|tunnel_mode|desc|is_group
For group entries, outputs one line per child: name.type|subnet|type|tunnel_mode|desc|true
"""
data = _subnet_read(file)
for name, entry in data.items():
if _subnet_is_group(entry):
for type_key, child in entry.items():
display_name = f"{name}.{type_key}" if type_key != 'none' else name
print(f"{display_name}|{child.get('subnet','')}|"
f"{type_key}|{child.get('tunnel_mode','split')}|"
f"{child.get('desc','')}|true|{name}")
else:
print(f"{name}|{entry.get('subnet','')}|"
f"{entry.get('type',name)}|{entry.get('tunnel_mode','split')}|"
f"{entry.get('desc','')}|false|{name}")
def subnet_show(file, name):
"""Show a single subnet entry (scalar or group) in detail"""
data = _subnet_read(file)
if name not in data:
print(f"Error: Subnet '{name}' not found", file=sys.stderr)
sys.exit(1)
entry = data[name]
if _subnet_is_group(entry):
print(f"name|{name}")
print(f"is_group|true")
for type_key, child in entry.items():
print(f"child|{type_key}|{child.get('subnet','')}|"
f"{child.get('tunnel_mode','split')}|{child.get('desc','')}")
else:
print(f"name|{name}")
print(f"is_group|false")
print(f"subnet|{entry.get('subnet','')}")
print(f"type|{entry.get('type',name)}")
print(f"tunnel_mode|{entry.get('tunnel_mode','split')}")
print(f"desc|{entry.get('desc','')}")
def subnet_add(file, name, subnet, type_key, tunnel_mode, desc, group_parent=''):
"""
Add a new subnet entry.
If group_parent is set, adds as a child under that group key.
Otherwise adds as a scalar entry.
"""
data = _subnet_read(file)
entry = {
'subnet': subnet,
'tunnel_mode': tunnel_mode or 'split',
'desc': desc or ''
}
if group_parent:
# Adding a child to an existing group, or creating a new group
if group_parent not in data:
data[group_parent] = {}
elif not _subnet_is_group(data[group_parent]):
print(f"Error: '{group_parent}' exists but is not a group", file=sys.stderr)
sys.exit(1)
data[group_parent][type_key or 'none'] = entry
else:
# Scalar entry — type stored explicitly
entry['type'] = type_key or name
data[name] = entry
_subnet_write(file, data)
def subnet_remove(file, name, peers_using):
"""
Remove a subnet entry. peers_using is a comma-separated list of peer names
currently using this subnet (passed in from bash after meta scan).
If non-empty, refuses with error.
"""
if peers_using:
peers = [p for p in peers_using.split(',') if p]
if peers:
print(f"Error: Subnet '{name}' is in use by: {', '.join(peers)}", file=sys.stderr)
sys.exit(1)
data = _subnet_read(file)
if '.' in name:
# Removing a group child: e.g. "guests.phone"
parent, child_key = name.split('.', 1)
if parent not in data or not _subnet_is_group(data[parent]):
print(f"Error: Group '{parent}' not found", file=sys.stderr)
sys.exit(1)
if child_key not in data[parent]:
print(f"Error: '{child_key}' not found in group '{parent}'", file=sys.stderr)
sys.exit(1)
del data[parent][child_key]
if not data[parent]:
del data[parent] # remove empty group
else:
if name not in data:
print(f"Error: Subnet '{name}' not found", file=sys.stderr)
sys.exit(1)
del data[name]
_subnet_write(file, data)
def subnet_rename(file, old_name, new_name, peers_using):
"""
Rename a subnet entry. Hard refusal if any peers reference it.
peers_using: comma-separated peer names from bash meta scan.
"""
if peers_using:
peers = [p for p in peers_using.split(',') if p]
if peers:
print(f"Error: Cannot rename subnet '{old_name}' — in use by: {', '.join(peers)}", file=sys.stderr)
sys.exit(1)
data = _subnet_read(file)
if old_name not in data:
print(f"Error: Subnet '{old_name}' not found", file=sys.stderr)
sys.exit(1)
if new_name in data:
print(f"Error: Subnet '{new_name}' already exists", file=sys.stderr)
sys.exit(1)
data[new_name] = data.pop(old_name)
_subnet_write(file, data)
def subnet_peers(meta_dir, clients_dir, subnet_name, subnets_file):
"""
Find all peers using a subnet.
Two-pass check:
1. Meta field: peer has "subnet": subnet_name in their .meta file
2. IP fallback: peer's IP falls within the subnet's CIDR(s)
(catches peers added before meta stored subnet explicitly)
Output: one peer name per line.
"""
import glob
import ipaddress
# Resolve all CIDRs covered by this subnet name
data = _subnet_read(subnets_file)
cidrs = []
if subnet_name in data:
entry = data[subnet_name]
if _subnet_is_group(entry):
for child in entry.values():
try:
cidrs.append(ipaddress.ip_network(child['subnet'], strict=False))
except Exception:
pass
else:
try:
cidrs.append(ipaddress.ip_network(entry['subnet'], strict=False))
except Exception:
pass
printed = set()
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
peer_name = os.path.basename(conf).replace('.conf', '')
# Pass 1: check meta field
meta_file = os.path.join(meta_dir, f"{peer_name}.meta")
try:
with open(meta_file) as f:
meta = json.load(f)
if meta.get('subnet', '') == subnet_name:
if peer_name not in printed:
print(peer_name)
printed.add(peer_name)
continue
except Exception:
pass
# Pass 2: IP reverse lookup against subnet CIDRs
if not cidrs:
continue
peer_ip = ''
try:
with open(conf) as f:
for line in f:
if line.startswith('Address'):
peer_ip = line.split('=')[1].strip().split('/')[0]
break
except Exception:
continue
if not peer_ip:
continue
try:
addr = ipaddress.ip_address(peer_ip)
if any(addr in cidr for cidr in cidrs):
if peer_name not in printed:
print(peer_name)
printed.add(peer_name)
except Exception:
continue
def subnet_exists(file, name):
"""Check if a subnet name exists (scalar or group). Exits 0/1."""
data = _subnet_read(file)
if '.' in name:
parent, child_key = name.split('.', 1)
exists = parent in data and _subnet_is_group(data[parent]) and child_key in data[parent]
else:
exists = name in data
sys.exit(0 if exists else 1)
def subnet_policy(subnets_file, subnet_name, type_key=''):
"""
Get the policy name for a subnet entry.
Falls back to 'default' if no policy set.
"""
data = _subnet_read(subnets_file)
if subnet_name not in data:
print('default')
return
entry = data[subnet_name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
child = entry.get(key, {})
print(child.get('policy', 'default'))
else:
print(entry.get('policy', 'default'))
def subnet_default_rule(file, name, type_key=''):
"""
Return the default_rule for a subnet entry, or empty string if none set.
For scalar: subnet_default_rule(file, "desktop") -> ""
For group: subnet_default_rule(file, "guests", "phone") -> "guest"
"""
data = _subnet_read(file)
if name not in data:
print('')
return
entry = data[name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
child = entry.get(key, {})
print(child.get('default_rule', ''))
else:
print(entry.get('default_rule', ''))
def subnet_list_names(file):
"""
List all top-level subnet names, one per line.
Used for dynamic flag registration in commands.
Output: one name per line (e.g. desktop, laptop, guests, servers, iot)
"""
data = _subnet_read(file)
for name in data.keys():
print(name)
# ======================================================
# Identity
# ======================================================
def identity_rules(file):
"""
Return all rules assigned to an identity, one per line.
Reads from 'rules' array (1:N). Falls back to 'rule' scalar for migration.
"""
data = _identity_read(file)
if not data:
return
# Support legacy scalar 'rule' field
rules = data.get('rules', [])
if not rules and data.get('rule'):
rules = [data['rule']]
for r in rules:
if r:
print(r)
def identity_add_rule(file, identity_name, rule_name):
"""
Add a rule to an identity's rules array.
Warns if already present (prints warning to stderr, exits 2).
Creates identity file if it doesn't exist.
"""
data = _identity_read(file) or _identity_init(identity_name)
rules = data.get('rules', [])
# Migrate legacy scalar field
if 'rule' in data and data['rule']:
if data['rule'] not in rules:
rules.append(data['rule'])
del data['rule']
if rule_name in rules:
print(f"Warning: Rule '{rule_name}' is already assigned to identity '{identity_name}'",
file=sys.stderr)
sys.exit(2)
rules.append(rule_name)
data['rules'] = rules
_identity_write(file, data)
def identity_remove_rule(file, rule_name):
"""
Remove a specific rule from an identity's rules array.
Exits 1 if rule not found.
"""
data = _identity_read(file)
if not data:
print(f"Error: Identity not found", file=sys.stderr)
sys.exit(1)
rules = data.get('rules', [])
if rule_name not in rules:
print(f"Error: Rule '{rule_name}' not assigned to this identity", file=sys.stderr)
sys.exit(1)
rules.remove(rule_name)
data['rules'] = rules
_identity_write(file, data)
def identity_clear_rules(file):
"""Remove all rules from an identity."""
data = _identity_read(file)
if not data:
return
data['rules'] = []
data.pop('rule', None) # remove legacy scalar too
_identity_write(file, data)
def identity_has_rule(file, rule_name):
"""Exit 0 if identity has this rule, 1 otherwise."""
data = _identity_read(file)
if not data:
sys.exit(1)
rules = data.get('rules', [])
if not rules and data.get('rule'):
rules = [data['rule']]
sys.exit(0 if rule_name in rules else 1)
def _identity_read(file):
"""Read an identity file, return dict or None"""
try:
if not os.path.exists(file):
return None
with open(file) as f:
content = f.read().strip()
if not content:
return None
return json.loads(content)
except Exception:
return None
def _identity_write(file, data):
"""Write an identity file"""
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def _identity_init(name):
"""Return empty identity structure"""
return {
'name': name,
'peers': [],
'devices': {}
}
def identity_list(identities_dir):
"""
List all identities with peer count, rules and policy.
Output per line: name|peer_count|types|rules|policy
"""
import glob
for id_file in sorted(glob.glob(f"{identities_dir}/*.identity")):
try:
with open(id_file) as f:
data = json.load(f)
name = data.get('name', '')
peers = data.get('peers', [])
devices = data.get('devices', {})
rules = data.get('rules', [])
# Migrate legacy scalar rule field
if not rules and data.get('rule'):
rules = [data['rule']]
policy = data.get('policy', 'default')
types = sorted(set(
d.get('type', '') for d in devices.values() if d.get('type')
))
print(f"{name}|{len(peers)}|{','.join(types)}|{','.join(rules)}|{policy}")
except Exception:
continue
def identity_show(file):
"""Show identity details"""
data = _identity_read(file)
if not data:
print("Error: Identity not found", file=sys.stderr)
sys.exit(1)
print(f"name|{data.get('name','')}")
print(f"peer_count|{len(data.get('peers',[]))}")
for peer_name, dev in data.get('devices', {}).items():
print(f"device|{peer_name}|{dev.get('type','')}|{dev.get('index',1)}")
def identity_add_peer(file, identity_name, peer_name, peer_type, index):
"""Add a peer to an identity file, creating it if needed"""
data = _identity_read(file) or _identity_init(identity_name)
if peer_name not in data['peers']:
data['peers'].append(peer_name)
data['devices'][peer_name] = {
'type': peer_type,
'index': int(index)
}
_identity_write(file, data)
def identity_remove_peer(file, peer_name):
"""Remove a peer from an identity file"""
data = _identity_read(file)
if not data:
return
data['peers'] = [p for p in data['peers'] if p != peer_name]
data['devices'].pop(peer_name, None)
_identity_write(file, data)
def identity_remove(file):
"""Delete an identity file — existence check done in bash"""
try:
os.remove(file)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def identity_next_index(file, peer_type):
"""
Return the next available index for a given type within an identity.
phone-nuno exists (index 1) -> returns 2
No phones exist -> returns 1
"""
data = _identity_read(file)
if not data:
print(1)
return
existing = [
d.get('index', 1)
for d in data.get('devices', {}).values()
if d.get('type') == peer_type
]
if not existing:
print(1)
return
# Find lowest unused index starting from 1
used = set(existing)
i = 1
while i in used:
i += 1
print(i)
def identity_peers(file, filter_type=''):
"""
List peers belonging to an identity, optionally filtered by type.
Output: one peer name per line.
"""
data = _identity_read(file)
if not data:
return
for peer_name in data.get('peers', []):
if filter_type:
dev = data.get('devices', {}).get(peer_name, {})
if dev.get('type') != filter_type:
continue
print(peer_name)
def identity_migrate(identities_dir, clients_dir, meta_dir, dry_run):
"""
Scan all peer configs and auto-create identity files from name convention.
dry_run: 'true' -> print what would be done, no writes.
Output per action: action|identity|peer|type|index
"""
import glob
is_dry = dry_run == 'true'
grouped = {} # identity_name -> [(peer_name, type, index)]
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
peer_name = os.path.basename(conf).replace('.conf', '')
parsed = _parse_peer_name(peer_name)
if not parsed:
print(f"skip|{peer_name}")
continue
peer_type, identity_name, index = parsed
if identity_name not in grouped:
grouped[identity_name] = []
grouped[identity_name].append((peer_name, peer_type, index))
for identity_name, peers in sorted(grouped.items()):
id_file = os.path.join(identities_dir, f"{identity_name}.identity")
for peer_name, peer_type, index in peers:
print(f"create|{identity_name}|{peer_name}|{peer_type}|{index}")
if not is_dry:
identity_add_peer(id_file, identity_name, peer_name, peer_type, index)
def identity_infer(peer_name):
"""
Parse a peer name and print identity|type|index, or nothing if no match.
Used by add.command.sh to auto-attach on vanilla wgctl add.
"""
parsed = _parse_peer_name(peer_name)
if parsed:
peer_type, identity_name, index = parsed
print(f"{identity_name}|{peer_type}|{index}")
def identity_exists(file):
"""Exit 0 if identity file exists and is valid, else exit 1"""
data = _identity_read(file)
sys.exit(0 if data is not None else 1)
# ======================================================
# Policy
# ======================================================
def _policy_read(file):
"""Read policies.json, fall back to hardcoded defaults if missing."""
try:
if not os.path.exists(file):
return dict(_POLICY_DEFAULTS)
with open(file) as f:
content = f.read().strip()
if not content:
return dict(_POLICY_DEFAULTS)
data = json.loads(content)
# Merge with defaults so hardcoded policies always exist
merged = dict(_POLICY_DEFAULTS)
merged.update(data)
return merged
except Exception:
return dict(_POLICY_DEFAULTS)
def _policy_write(file, data):
"""Write policies.json."""
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def _policy_get_entry(data, name):
"""Get a policy entry, falling back to 'default' policy values for missing fields."""
entry = data.get(name, {})
default = data.get('default', _POLICY_DEFAULTS.get('default', {}))
# Merge: entry fields override default
resolved = dict(default)
resolved.update(entry)
return resolved
def policy_get(file, name, field=''):
"""
Get a policy entry or a specific field from it.
policy_get(file, "guest") -> prints all fields as key|value lines
policy_get(file, "guest", "tunnel_mode") -> prints "split"
policy_get(file, "guest", "strict_rule") -> prints "true" or "false"
"""
data = _policy_read(file)
entry = _policy_get_entry(data, name)
if field:
val = entry.get(field)
if val is None:
print('')
elif isinstance(val, bool):
print('true' if val else 'false')
else:
print(val)
else:
for k, v in entry.items():
if isinstance(v, bool):
print(f"{k}|{'true' if v else 'false'}")
elif v is None:
print(f"{k}|")
else:
print(f"{k}|{v}")
def policy_list(file):
"""
List all policies.
Output per line: name|tunnel_mode|default_rule|strict_rule|auto_apply|desc
"""
data = _policy_read(file)
for name, raw_entry in data.items():
entry = _policy_get_entry(data, name)
tunnel_mode = entry.get('tunnel_mode', 'split')
default_rule = entry.get('default_rule') or ''
strict_rule = 'true' if entry.get('strict_rule', False) else 'false'
auto_apply = 'true' if entry.get('auto_apply', True) else 'false'
desc = entry.get('desc', '')
print(f"{name}|{tunnel_mode}|{default_rule}|{strict_rule}|{auto_apply}|{desc}")
def policy_exists(file, name):
"""Exit 0 if policy exists, 1 otherwise."""
data = _policy_read(file)
sys.exit(0 if name in data else 1)
def policy_add(file, name, tunnel_mode, default_rule, strict_rule, auto_apply, desc):
"""Add or update a policy entry."""
data = _policy_read(file)
data[name] = {
'tunnel_mode': tunnel_mode or 'split',
'default_rule': default_rule if default_rule else None,
'strict_rule': strict_rule == 'true',
'auto_apply': auto_apply != 'false',
'desc': desc or ''
}
_policy_write(file, data)
def policy_remove(file, name):
"""Remove a policy. Refuses to remove hardcoded defaults."""
if name in _POLICY_DEFAULTS:
print(f"Error: Cannot remove built-in policy '{name}'", file=sys.stderr)
sys.exit(1)
data = _policy_read(file)
if name not in data:
print(f"Error: Policy '{name}' not found", file=sys.stderr)
sys.exit(1)
del data[name]
_policy_write(file, data)
def policy_set_field(file, name, field, value):
"""Set a single field on an existing policy."""
data = _policy_read(file)
if name not in data:
print(f"Error: Policy '{name}' not found", file=sys.stderr)
sys.exit(1)
entry = data[name]
if field in ('strict_rule', 'auto_apply'):
entry[field] = value == 'true'
elif field == 'default_rule':
entry[field] = value if value else None
else:
entry[field] = value
data[name] = entry
_policy_write(file, data)
# ======================================================
# Hosts
# ======================================================
def _hosts_read(file):
if not os.path.exists(file):
return {"hosts": {}, "subnets": {}, "ports": {}}
try:
with open(file) as f:
data = json.load(f)
# Ensure all sections exist
data.setdefault("hosts", {})
data.setdefault("subnets", {})
data.setdefault("ports", {})
return data
except Exception:
return {"hosts": {}, "subnets": {}, "ports": {}}
def _hosts_write(file, data):
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def hosts_list(file):
"""
List all host entries.
Output per line: type|key|name|desc|tags
"""
data = _hosts_read(file)
for ip, entry in sorted(data["hosts"].items()):
if isinstance(entry, dict):
name = entry.get("name", "")
desc = entry.get("desc", "")
tags = ",".join(entry.get("tags", []))
else:
name = str(entry)
desc = ""
tags = ""
print(f"host|{ip}|{name}|{desc}|{tags}")
for subnet, entry in sorted(data["subnets"].items()):
if isinstance(entry, dict):
name = entry.get("name", "")
desc = entry.get("desc", "")
tags = ",".join(entry.get("tags", []))
else:
name = str(entry)
desc = ""
tags = ""
print(f"subnet|{subnet}|{name}|{desc}|{tags}")
for port, entry in sorted(data["ports"].items()):
if isinstance(entry, dict):
name = entry.get("name", "")
desc = entry.get("desc", "")
tags = ",".join(entry.get("tags", []))
else:
name = str(entry)
desc = ""
tags = ""
print(f"port|{port}|{name}|{desc}|{tags}")
def hosts_show(file, key, entry_type):
"""Show a single host entry. type: host|subnet|port"""
data = _hosts_read(file)
section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"}
section = section_map.get(entry_type, "hosts")
entry = data[section].get(key)
if not entry:
print(f"Error: not found: {key}", file=sys.stderr)
sys.exit(1)
if isinstance(entry, dict):
print(f"name|{entry.get('name', '')}")
print(f"desc|{entry.get('desc', '')}")
print(f"tags|{','.join(entry.get('tags', []))}")
else:
print(f"name|{entry}")
print(f"desc|")
print(f"tags|")
def hosts_add(file, entry_type, key, name, desc, tags):
"""
Add a host entry.
entry_type: host|subnet|port
key: IP, subnet CIDR, or port number
"""
data = _hosts_read(file)
section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"}
section = section_map.get(entry_type, "hosts")
tag_list = [t.strip() for t in tags.split(",") if t.strip()] if tags else []
data[section][key] = {
"name": name,
"desc": desc,
"tags": tag_list
}
_hosts_write(file, data)
def hosts_remove(file, entry_type, key):
"""Remove a host entry."""
data = _hosts_read(file)
section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"}
section = section_map.get(entry_type, "hosts")
if key not in data[section]:
print(f"Error: not found: {key}", file=sys.stderr)
sys.exit(1)
del data[section][key]
_hosts_write(file, data)
def hosts_exists(file, entry_type, key):
"""Check if a host entry exists."""
data = _hosts_read(file)
section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"}
section = section_map.get(entry_type, "hosts")
print("true" if key in data[section] else "false")
def hosts_lookup(file, ip):
"""
Resolve an IP to a display name.
Checks hosts section only (exact match).
Returns name or empty string.
"""
data = _hosts_read(file)
entry = data["hosts"].get(ip)
if not entry:
print("")
return
if isinstance(entry, dict):
print(entry.get("name", ip))
else:
print(str(entry))
# ======================================================
def _net_read(file):
try:
if not os.path.exists(file): return {}
with open(file) as f:
content = f.read().strip()
if not content: return {}
return json.loads(content)
except Exception:
return {}
def _net_write(file, data):
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def net_list(file):
data = _net_read(file)
for name, svc in sorted(data.items()):
print(f"{name}|{svc.get('ip','')}|{svc.get('desc','')}|"
f"{','.join(svc.get('tags',[]))}|{len(svc.get('ports',{}))}")
def net_show(file, name):
data = _net_read(file)
if name not in data:
print(f"Error: Service not found: {name}", file=sys.stderr); sys.exit(1)
svc = data[name]
print(f"name|{name}")
print(f"ip|{svc.get('ip','')}")
print(f"desc|{svc.get('desc','')}")
print(f"tags|{','.join(svc.get('tags',[]))}")
for port_name, port_def in svc.get('ports', {}).items():
print(f"port|{port_name}|{port_def.get('port','')}|"
f"{port_def.get('proto','tcp')}|{port_def.get('desc','')}")
def net_exists(file, name):
data = _net_read(file)
if ':' in name:
svc_name, port_name = name.split(':', 1)
if port_name == 'ports':
print('true' if svc_name in data else 'false')
else:
svc = data.get(svc_name, {})
print('true' if port_name in svc.get('ports', {}) else 'false')
else:
print('true' if name in data else 'false')
def net_add_service(file, name, ip, desc='', tags=''):
data = _net_read(file)
if name not in data:
data[name] = {'ip': ip, 'ports': {}}
else:
data[name]['ip'] = ip
if desc: data[name]['desc'] = desc
if tags: data[name]['tags'] = [t.strip() for t in tags.split(',') if t.strip()]
_net_write(file, data)
def net_add_port(file, service, port_name, port, proto='tcp', desc=''):
data = _net_read(file)
if service not in data:
print(f"Error: Service not found: {service}", file=sys.stderr); sys.exit(1)
if 'ports' not in data[service]:
data[service]['ports'] = {}
entry = {'port': int(port), 'proto': proto}
if desc: entry['desc'] = desc
data[service]['ports'][port_name] = entry
_net_write(file, data)
def net_remove(file, name):
data = _net_read(file)
if ':' in name:
svc_name, port_name = name.split(':', 1)
if svc_name not in data:
print(f"Error: Service not found: {svc_name}", file=sys.stderr); sys.exit(1)
if port_name == 'ports':
data[svc_name]['ports'] = {}
else:
if port_name not in data[svc_name].get('ports', {}):
print(f"Error: Port not found: {port_name}", file=sys.stderr); sys.exit(1)
del data[svc_name]['ports'][port_name]
else:
if name not in data:
print(f"Error: Service not found: {name}", file=sys.stderr); sys.exit(1)
del data[name]
_net_write(file, data)
def net_resolve(file, name):
data = _net_read(file)
if ':' in name:
svc_name, port_name = name.split(':', 1)
if svc_name not in data:
print(f"Error: Service not found: {svc_name}", file=sys.stderr); sys.exit(1)
svc = data[svc_name]
ip = svc.get('ip', '')
if port_name == 'ports':
for pname, pdef in svc.get('ports', {}).items():
print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}")
else:
if port_name not in svc.get('ports', {}):
print(f"Error: Port not found: {port_name}", file=sys.stderr); sys.exit(1)
pdef = svc['ports'][port_name]
print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}")
else:
if name not in data:
print(f"Error: Service not found: {name}", file=sys.stderr); sys.exit(1)
print(data[name].get('ip', ''))
def net_reverse_lookup(file, ip, port='', proto=''):
from lib.util import load_net_data, reverse_lookup as _rl
net_data = _rl(load_net_data(file), ip, port, proto)
if net_data:
print(net_data)
def group_list_data(groups_dir, blocks_dir, clients_dir):
import glob
blocked_peers = set()
for f in glob.glob(f"{blocks_dir}/*.block"):
blocked_peers.add(os.path.basename(f).replace('.block', ''))
for group_file in sorted(glob.glob(f"{groups_dir}/*.group")):
try:
with open(group_file) as f:
g = json.load(f)
name = g.get('name', '')
desc = g.get('desc', '')
peers = [p for p in g.get('peers', []) if p]
valid = [p for p in peers
if os.path.exists(os.path.join(clients_dir, f"{p}.conf"))]
total = len(valid)
blocked = sum(1 for p in peers if p in blocked_peers)
print(f"{name}|{desc}|{total}|{blocked}")
except Exception:
pass
def create_group(file, name, desc):
try:
with open(file, 'w') as f:
json.dump({'name': name, 'desc': desc, 'peers': []}, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr); sys.exit(1)
def group_has_peer(file, peer_name):
try:
with open(file) as f:
data = json.load(f)
print('true' if peer_name in data.get('peers', []) else 'false')
except Exception:
print('false')
# ── All block/subnet/identity/policy/hosts functions copied verbatim ────────
# (unchanged from original — omitting here for brevity but must be included
# in the actual deployed file)
def json_get_nested(file, *keys):
try:
with open(file) as f:
data = json.load(f)
val = data
for key in keys:
if not isinstance(val, dict):
print(''); return
val = val.get(key)
if val is None:
print(''); return
if isinstance(val, bool): print('true' if val else 'false')
elif val is None: print('')
else: print(val)
except Exception:
print('')
def json_set_nested(file, *args):
if len(args) < 2: return
keys, value = args[:-1], args[-1]
try:
data = {}
if os.path.exists(file):
with open(file) as f:
data = json.load(f)
target = data
for key in keys[:-1]:
if key not in target or not isinstance(target[key], dict):
target[key] = {}
target = target[key]
final_key = keys[-1]
if value == 'true': target[final_key] = True
elif value == 'false': target[final_key] = False
else: target[final_key] = value
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr); sys.exit(1)
# ── Commands dispatch ────────────────────────────────────────────────────────
commands = {
# JSON CRUD
'get': lambda args: get(args[0], args[1]),
'set': lambda args: set_key(args[0], args[1], args[2]),
'delete': lambda args: delete_key(args[0], args[1]),
'append': lambda args: append(args[0], args[1], args[2]),
'remove': lambda args: remove_value(args[0], args[1], args[2]),
'cat': lambda args: cat(args[0]),
'has_key': lambda args: has_key(args[0], args[1]),
'filter_values': lambda args: filter_values(args[0], args[1], args[2]),
'count': lambda args: count(args[0], args[1]),
'get_raw': lambda args: get_raw(args[0], args[1]),
'fmt_datetime': lambda args: fmt_datetime(args[0], args[1]),
'json_get_nested': lambda args: json_get_nested(args[0], *args[1:]),
'json_set_nested': lambda args: json_set_nested(args[0], *args[1:]),
'cleanup_config': lambda args: cleanup_config(args[0]),
'remove_peer_block':lambda args: remove_peer_block(args[0], args[1]),
'audit_fw_counts': lambda args: audit_fw_counts(args[0]),
# Events (from lib.events)
'fw_events': lambda args: __import__('lib.events', fromlist=['fw_events']).fw_events(
args[0], args[1], args[2], args[3], args[4],
args[5] if len(args) > 5 else '50',
args[6] if len(args) > 6 else '1',
args[7] if len(args) > 7 else '',
args[8] if len(args) > 8 else '',
args[9] if len(args) > 9 else ''),
'wg_events': lambda args: __import__('lib.events', fromlist=['wg_events']).wg_events(
args[0], args[1], args[2],
args[3] if len(args) > 3 else '50',
args[4] if len(args) > 4 else '1',
args[5] if len(args) > 5 else '',
args[6] if len(args) > 6 else ''),
'parse_event': lambda args: __import__('lib.events', fromlist=['parse_event']).parse_event(args[0]),
'parse_fw_event': lambda args: __import__('lib.events', fromlist=['parse_fw_event']).parse_fw_event(args[0]),
'format_fw_event': lambda args: __import__('lib.events', fromlist=['format_fw_event']).format_fw_event(args[0], args[1]),
'format_wg_event': lambda args: __import__('lib.events', fromlist=['format_wg_event']).format_wg_event(args[0]),
'remove_events': lambda args: __import__('lib.events', fromlist=['remove_events']).remove_events(args[0], args[1]),
'remove_events_filtered': lambda args: __import__('lib.events', fromlist=['remove_events_filtered']).remove_events_filtered(
args[0], args[1], args[2], args[3],
args[4] == 'true', args[5] == 'true',
args[6] if len(args) > 6 else ''),
'follow_logs': lambda args: __import__('lib.events', fromlist=['follow_logs']).follow_logs(
args[0], args[1], args[2], args[3], args[4],
args[5] if len(args) > 5 else ''),
'last_event': lambda args: __import__('lib.events', fromlist=['last_event']).last_event(args[0], args[1], args[2], args[3]),
'events_for': lambda args: __import__('lib.events', fromlist=['events_for']).events_for(args[0], args[1], args[2]),
'iso_to_ts': lambda args: __import__('lib.events', fromlist=['iso_to_ts']).iso_to_ts(args[0]),
# Peers (from lib.peers)
'peer_data': lambda args: __import__('lib.peers', fromlist=['peer_data']).peer_data(args[0], args[1], args[2]),
'peer_transfer': lambda args: __import__('lib.peers', fromlist=['peer_transfer']).peer_transfer(args[0]),
'peer_transfer_delta': lambda args: __import__('lib.peers', fromlist=['peer_transfer_delta']).peer_transfer_delta(args[0], args[1]),
'peer_group_map': lambda args: __import__('lib.peers', fromlist=['peer_group_map']).peer_group_map(args[0]),
'peer_groups': lambda args: __import__('lib.peers', fromlist=['peer_groups']).peer_groups(args[0], args[1]),
# Activity (from lib.activity)
'activity_aggregate': lambda args: __import__('lib.activity', fromlist=['activity_aggregate']).activity_aggregate(
args[0], args[1], args[2], args[3], args[4],
args[5], args[6] if len(args) > 6 else '24',
args[7] if len(args) > 7 else '',
args[8] if len(args) > 8 else ''),
# Rules
'rule_resolve': lambda args: rule_resolve(args[0], args[1]),
'rule_resolve_field':lambda args: rule_resolve_field(args[0], args[1], args[2]),
'rule_list_data': lambda args: rule_list_data(args[0], args[1]),
'rule_inspect': lambda args: rule_inspect(args[0], args[1]),
'count_resolved': lambda args: count_resolved(args[0], args[1], args[2]),
'create_rule': lambda args: create_rule(*args),
'find_rule_file': lambda args: print(find_rule_file(args[0], args[1])),
# Net
'net_list': lambda args: net_list(args[0]),
'net_show': lambda args: net_show(args[0], args[1]),
'net_exists': lambda args: net_exists(args[0], args[1]),
'net_add_service': lambda args: net_add_service(args[0], args[1], args[2],
args[3] if len(args) > 3 else '',
args[4] if len(args) > 4 else ''),
'net_add_port': lambda args: net_add_port(args[0], args[1], args[2], args[3],
args[4] if len(args) > 4 else 'tcp',
args[5] if len(args) > 5 else ''),
'net_remove': lambda args: net_remove(args[0], args[1]),
'net_resolve': lambda args: net_resolve(args[0], args[1]),
'net_reverse_lookup':lambda args: net_reverse_lookup(args[0], args[1],
args[2] if len(args) > 2 else '',
args[3] if len(args) > 3 else ''),
# Groups
'group_list_data': lambda args: group_list_data(args[0], args[1], args[2]),
'create_group': lambda args: create_group(args[0], args[1],
args[2] if len(args) > 2 else ''),
'group_has_peer': lambda args: group_has_peer(args[0], args[1]),
# Block
'block_get': lambda args: block_get(args[0]),
'block_is_blocked': lambda args: block_is_blocked(args[0]),
'block_set_direct': lambda args: block_set_direct(args[0], args[1], args[2]),
'block_add_group': lambda args: block_add_group(args[0], args[1], args[2]),
'block_remove_group': lambda args: block_remove_group(args[0], args[1], args[2]),
'block_add_rule': lambda args: block_add_rule(
args[0], args[1], args[2],
args[3] if len(args) > 3 else '',
args[4] if len(args) > 4 else '',
args[5] if len(args) > 5 else '',
args[6] if len(args) > 6 else ''
),
'block_remove_rule': lambda args: block_remove_rule(
args[0], args[1],
args[2] if len(args) > 2 else '',
args[3] if len(args) > 3 else '',
args[4] if len(args) > 4 else ''
),
'block_get_rules': lambda args: block_get_rules(args[0]),
'block_get_groups': lambda args: block_get_groups(args[0]),
'block_get_direct': lambda args: block_get_direct(args[0]),
'block_is_empty': lambda args: block_is_empty(args[0]),
# Subnet
'subnet_lookup': lambda args: subnet_lookup(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_type': lambda args: subnet_type(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_tunnel_mode': lambda args: subnet_tunnel_mode(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_for_ip': lambda args: subnet_for_ip(args[0], args[1]),
'subnet_list': lambda args: subnet_list(args[0]),
'subnet_show': lambda args: subnet_show(args[0], args[1]),
'subnet_add': lambda args: subnet_add(
args[0], args[1], args[2], args[3],
args[4] if len(args) > 4 else 'split',
args[5] if len(args) > 5 else '',
args[6] if len(args) > 6 else ''
),
'subnet_remove': lambda args: subnet_remove(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_rename': lambda args: subnet_rename(args[0], args[1], args[2], args[3] if len(args) > 3 else ''),
'subnet_peers': lambda args: subnet_peers(args[0], args[1], args[2], args[3]),
'subnet_exists': lambda args: subnet_exists(args[0], args[1]),
'subnet_default_rule': lambda args: subnet_default_rule(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_list_names': lambda args: subnet_list_names(args[0]),
# Identity
'identity_list': lambda args: identity_list(args[0]),
'identity_show': lambda args: identity_show(args[0]),
'identity_add_peer': lambda args: identity_add_peer(args[0], args[1], args[2], args[3], args[4]),
'identity_remove_peer':lambda args: identity_remove_peer(args[0], args[1]),
'identity_remove': lambda args: identity_remove(args[0]),
'identity_next_index': lambda args: identity_next_index(args[0], args[1]),
'identity_peers': lambda args: identity_peers(args[0], args[1] if len(args) > 1 else ''),
'identity_migrate': lambda args: identity_migrate(args[0], args[1], args[2], args[3]),
'identity_infer': lambda args: identity_infer(args[0]),
'identity_exists': lambda args: identity_exists(args[0]),
'identity_rules': lambda args: identity_rules(args[0]),
'identity_add_rule': lambda args: identity_add_rule(args[0], args[1], args[2]),
'identity_remove_rule': lambda args: identity_remove_rule(args[0], args[1]),
'identity_clear_rules': lambda args: identity_clear_rules(args[0]),
'identity_has_rule': lambda args: identity_has_rule(args[0], args[1]),
# Policy
'policy_get': lambda args: policy_get(args[0], args[1], args[2] if len(args) > 2 else ''),
'policy_list': lambda args: policy_list(args[0]),
'policy_exists': lambda args: policy_exists(args[0], args[1]),
'policy_add': lambda args: policy_add(args[0], args[1], args[2], args[3], args[4], args[5], args[6] if len(args) > 6 else ''),
'policy_remove': lambda args: policy_remove(args[0], args[1]),
'policy_set_field': lambda args: policy_set_field(args[0], args[1], args[2], args[3]),
'subnet_policy': lambda args: subnet_policy(args[0], args[1], args[2] if len(args) > 2 else ''),
# Hosts
'hosts_list': lambda args: hosts_list(args[0]),
'hosts_show': lambda args: hosts_show(args[0], args[1], args[2]),
'hosts_add': lambda args: hosts_add(args[0], args[1], args[2], args[3],
args[4] if len(args) > 4 else '',
args[5] if len(args) > 5 else ''),
'hosts_remove': lambda args: hosts_remove(args[0], args[1], args[2]),
'hosts_exists': lambda args: hosts_exists(args[0], args[1], args[2]),
'hosts_lookup': lambda args: hosts_lookup(args[0], args[1]),
}
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
if len(sys.argv) < 2:
print("Usage: json_helper.py <command> [args...]", file=sys.stderr)
sys.exit(1)
cmd = sys.argv[1]
args = sys.argv[2:]
if cmd not in commands:
print(f"Unknown command: {cmd}", file=sys.stderr)
sys.exit(1)
try:
commands[cmd](args)
except Exception as e:
print(f"Error in {cmd}: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()