wgctl/core/json_helper.py
Nuno Duque Nunes 8b47e55b4a feat: peer endpoint history tracking and resolution
- daemon: update_peer_history() tracks all endpoints per peer
- daemon: endpoint_index.json for O(1) IP -> peer name lookup
- daemon: poll_handshakes updates history on every cycle
- json_helper: peer_history_lookup() uses index, falls back to scan
- resolve::endpoint_parts: step 3 checks peer history index
- json.sh: json::peer_history_lookup wrapper
- resolve: mobile peer IPs now resolve to peer name via history
2026-05-26 15:51:53 +00:00

1960 lines
No EOL
72 KiB
Python

#!/usr/bin/env python3
"""
wgctl JSON helper — thin dispatcher.
Imports from core/lib/ modules and routes commands.
"""
import sys
import os
import json
# Add lib/ directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'lib'))
DATETIME_FMT = os.environ.get('WGCTL_DATETIME_FMT', '%Y-%m-%d %H:%M')
_POLICY_DEFAULTS = {
'tunnel_mode': 'split',
'default_rule': None,
'strict_rule': False,
'auto_apply': True,
}
# ── Lazy imports (only load what's needed per command) ──────────────────────
def _events():
from lib.events import (
fw_events, wg_events, parse_event, parse_fw_event,
format_fw_event, format_wg_event, remove_events,
remove_events_filtered, follow_logs, last_event,
events_for, iso_to_ts,
)
return locals()
def _peers():
from lib.peers import (
peer_data, peer_transfer, peer_transfer_delta,
peer_group_map, peer_groups,
)
return locals()
def _activity():
from lib.activity import activity_aggregate
return locals()
# ── Keep all non-lib functions inline (JSON CRUD, rules, net, etc.) ─────────
# These are stable, small, and don't benefit from splitting yet.
# Candidates for future lib/rules.py, lib/net.py etc. in a follow-up pass.
def get(file, key):
try:
with open(file) as f:
data = json.load(f)
val = data.get(key, [])
if isinstance(val, bool):
print(str(val).lower())
elif isinstance(val, list):
if val:
print('\n'.join(str(v) for v in val))
else:
if val:
print(val)
except Exception:
sys.exit(0)
def set_key(file, key, value):
try:
data = {}
if os.path.exists(file):
with open(file) as f:
data = json.load(f)
try:
data[key] = json.loads(value)
except Exception:
data[key] = value
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def delete_key(file, key):
try:
with open(file) as f:
data = json.load(f)
data.pop(key, None)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def append(file, key, value):
try:
data = {}
if os.path.exists(file):
with open(file) as f:
data = json.load(f)
if key not in data:
data[key] = []
if value not in data[key]:
data[key].append(value)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def remove_value(file, key, value):
try:
with open(file) as f:
data = json.load(f)
if key in data and value in data[key]:
data[key].remove(value)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def cat(file):
try:
with open(file) as f:
data = json.load(f)
print(json.dumps(data, indent=2))
except Exception:
sys.exit(1)
def has_key(file, key):
try:
with open(file) as f:
data = json.load(f)
sys.exit(0 if key in data else 1)
except Exception:
sys.exit(1)
def filter_values(file, key, value):
try:
with open(file) as f:
data = json.load(f)
data = {k: v for k, v in data.items() if v != value}
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def count(file, key):
try:
with open(file) as f:
data = json.load(f)
val = data.get(key, [])
print(len(val) if isinstance(val, list) else 0)
except Exception:
print(0)
def get_raw(file, key):
try:
with open(file) as f:
data = json.load(f)
val = data.get(key)
if val is None:
pass
elif isinstance(val, bool):
print(str(val).lower())
elif isinstance(val, list):
for v in val:
print(v)
else:
print(val)
except Exception:
pass
def fmt_datetime(iso_str, fmt):
try:
from datetime import datetime
dt = datetime.fromisoformat(iso_str)
print(dt.strftime(fmt))
except Exception:
print(iso_str)
def cleanup_config(config_file):
import re
try:
with open(config_file) as f:
config = f.read()
config = re.sub(r'\n{3,}', '\n\n', config)
config = config.rstrip('\n') + '\n'
with open(config_file, 'w') as f:
f.write(config)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def remove_peer_block(config_file, name):
import re
try:
with open(config_file) as f:
config = f.read()
pattern = r'\n\[Peer\]\n# ' + re.escape(name) + r'\n[^\n]+\n[^\n]+\n'
result = re.sub(pattern, '\n', config)
with open(config_file, 'w') as f:
f.write(result)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def audit_fw_counts(clients_dir):
import glob, subprocess, re
try:
result = subprocess.run(
['iptables', '-L', 'FORWARD', '-n', '-v'],
capture_output=True, text=True
)
fw_lines = result.stdout.splitlines()
except Exception:
fw_lines = []
rule_lines = [l for l in fw_lines
if l.strip() and not l.startswith('Chain')
and not l.startswith(' pkts')]
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
name = os.path.basename(conf).replace('.conf', '')
try:
with open(conf) as f:
ip = ''
for line in f:
if line.startswith('Address'):
ip = line.split('=')[1].strip().split('/')[0]
break
if not ip:
continue
count_val = sum(1 for l in rule_lines
if re.search(r'\s' + re.escape(ip) + r'\s', l))
print(f"{name}:{count_val}")
except Exception:
pass
def clean_handshakes(file, threshold_sec):
"""Remove keepalive handshakes — keep only new session handshakes."""
threshold = int(threshold_sec) if threshold_sec else 300
last_ts = {}
kept = []
removed = 0
try:
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
if e.get('event') == 'handshake':
client = e.get('client', '')
ts_str = e.get('timestamp', '')
from datetime import datetime, timezone
dt = datetime.fromisoformat(ts_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
ts = dt.timestamp()
last = last_ts.get(client, 0)
if ts - last >= threshold:
kept.append(line)
last_ts[client] = ts
else:
removed += 1
continue
except Exception:
pass
kept.append(line)
with open(file, 'w') as f:
f.writelines(kept)
print(removed)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def batch_resolve(hosts_file, net_file, *ips):
"""
Resolve multiple IPs at once.
Output: ip|resolved_name per line (resolved_name=ip if no match)
"""
from lib.util import load_hosts_data, load_net_data, hosts_lookup, reverse_lookup
hosts_data = load_hosts_data(hosts_file)
net_data = load_net_data(net_file)
seen = set()
for ip in ips:
if not ip or ip in seen:
continue
seen.add(ip)
name = hosts_lookup(hosts_data, ip)
if not name:
name = reverse_lookup(net_data, ip)
print(f"{ip}|{name or ''}")
# ── Rules (kept inline — candidates for lib/rules.py) ────────────────────
def find_rule_file(rules_dir, rule_name):
for path in [
os.path.join(rules_dir, f"{rule_name}.rule"),
os.path.join(rules_dir, "base", f"{rule_name}.rule"),
]:
if os.path.exists(path):
return path
return ""
def _rule_resolve_internal(rules_dir, rule_name, visited=None):
if visited is None:
visited = set()
if rule_name in visited:
raise ValueError(f"Circular dependency: {rule_name}")
visited.add(rule_name)
rule_file = find_rule_file(rules_dir, rule_name)
with open(rule_file) as f:
rule = json.load(f)
merged = {'allow_ips': [], 'allow_ports': [], 'block_ips': [],
'block_ports': [], 'dns_redirect': False}
for base_name in rule.get('extends', []):
base = _rule_resolve_internal(rules_dir, base_name, visited.copy())
merged['allow_ips'] += base.get('allow_ips', [])
merged['allow_ports'] += base.get('allow_ports', [])
merged['block_ips'] += base.get('block_ips', [])
merged['block_ports'] += base.get('block_ports', [])
if base.get('dns_redirect'):
merged['dns_redirect'] = True
merged['allow_ips'] = list(dict.fromkeys(merged['allow_ips'] + rule.get('allow_ips', [])))
merged['allow_ports'] = list(dict.fromkeys(merged['allow_ports'] + rule.get('allow_ports', [])))
merged['block_ips'] = list(dict.fromkeys(merged['block_ips'] + rule.get('block_ips', [])))
merged['block_ports'] = list(dict.fromkeys(merged['block_ports'] + rule.get('block_ports', [])))
if rule.get('dns_redirect', False):
merged['dns_redirect'] = True
merged['name'] = rule.get('name', rule_name)
merged['desc'] = rule.get('desc', '')
merged['group'] = rule.get('group', '')
merged['extends'] = rule.get('extends', [])
return merged
def rule_resolve(rules_dir, rule_name):
try:
print(json.dumps(_rule_resolve_internal(rules_dir, rule_name)))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def rule_resolve_field(rules_dir, rule_name, field):
try:
resolved = _rule_resolve_internal(rules_dir, rule_name)
val = resolved.get(field, [])
if isinstance(val, list):
for v in val:
print(v)
else:
print(val)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def count_resolved(rules_dir, rule_name, key):
try:
print(len(_rule_resolve_internal(rules_dir, rule_name).get(key, [])))
except Exception:
print(0)
def rule_list_data(rules_dir, meta_dir):
import glob
rule_peer_counts = {}
for f in glob.glob(f"{meta_dir}/*.meta"):
try:
with open(f) as mf:
meta = json.load(mf)
rule = meta.get('rule', '')
if rule:
rule_peer_counts[rule] = rule_peer_counts.get(rule, 0) + 1
except Exception:
pass
rule_files = (sorted(glob.glob(f"{rules_dir}/*.rule")) +
sorted(glob.glob(f"{rules_dir}/base/*.rule")))
rules_data = []
for rule_file in rule_files:
is_base = '/base/' in rule_file
try:
with open(rule_file) as f:
r = json.load(f)
name = r.get('name', '')
resolved = _rule_resolve_internal(rules_dir, name)
n_allows = (len(resolved.get('allow_ips', [])) +
len(resolved.get('allow_ports', [])))
n_blocks = (len(resolved.get('block_ips', [])) +
len(resolved.get('block_ports', [])))
rules_data.append({
'name': name, 'desc': r.get('desc', ''),
'n_allows': n_allows, 'n_blocks': n_blocks,
'peer_count': rule_peer_counts.get(name, 0),
'extends': ','.join(r.get('extends', [])),
'is_base': is_base, 'group': r.get('group', '')
})
except Exception:
pass
rules_data.sort(key=lambda x: (
x['is_base'], x['group'] == '' and not x['is_base'],
x['group'], x['name']
))
for r in rules_data:
print(f"{r['name']}|{r['desc']}|{r['n_allows']}|{r['n_blocks']}|"
f"{r['peer_count']}|{r['extends']}|{r['is_base']}|{r['group']}")
def rule_inspect(rules_dir, rule_name):
try:
rule_file = find_rule_file(rules_dir, rule_name)
with open(rule_file) as f:
rule = json.load(f)
resolved = _rule_resolve_internal(rules_dir, rule_name)
has_extends = bool(rule.get('extends', []))
for ip in rule.get('allow_ips', []): print(f"own|allow_ip|{ip}")
for p in rule.get('allow_ports', []): print(f"own|allow_port|{p}")
for ip in rule.get('block_ips', []): print(f"own|block_ip|{ip}")
for p in rule.get('block_ports', []): print(f"own|block_port|{p}")
if rule.get('dns_redirect'): print(f"dns|dns_redirect|true")
if has_extends:
for base_name in rule.get('extends', []):
base = _rule_resolve_internal(rules_dir, base_name)
for ip in base.get('allow_ips', []): print(f"inherited:{base_name}|allow_ip|{ip}")
for p in base.get('allow_ports', []): print(f"inherited:{base_name}|allow_port|{p}")
for ip in base.get('block_ips', []): print(f"inherited:{base_name}|block_ip|{ip}")
for p in base.get('block_ports', []): print(f"inherited:{base_name}|block_port|{p}")
if base.get('dns_redirect'): print(f"inherited:{base_name}|dns_redirect|true")
has_resolved = any([resolved.get('allow_ips'), resolved.get('allow_ports'),
resolved.get('block_ips'), resolved.get('block_ports'),
resolved.get('dns_redirect')])
if has_resolved:
for ip in resolved.get('allow_ips', []): print(f"resolved|allow_ip|{ip}")
for p in resolved.get('allow_ports', []): print(f"resolved|allow_port|{p}")
for ip in resolved.get('block_ips', []): print(f"resolved|block_ip|{ip}")
for p in resolved.get('block_ports', []): print(f"resolved|block_port|{p}")
if resolved.get('dns_redirect'): print(f"resolved|dns_redirect|true")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def create_rule(file, name, desc, dns_redirect, allow_ips, block_ips,
block_ports, allow_ports='', extends='', group=''):
rule = {
'name': name, 'desc': desc, 'group': group,
'dns_redirect': dns_redirect == 'true',
'extends': [x for x in extends.split(',') if x] if extends else [],
'allow_ips': [x for x in allow_ips.split(',') if x] if allow_ips else [],
'allow_ports': [x for x in allow_ports.split(',') if x] if allow_ports else [],
'block_ips': [x for x in block_ips.split(',') if x] if block_ips else [],
'block_ports': [x for x in block_ports.split(',') if x] if block_ports else [],
}
with open(file, 'w') as f:
json.dump(rule, f, indent=2)
# ── All remaining functions (net, hosts, blocks, subnet, identity, policy) ──
# Kept verbatim from original json_helper.py — identical behaviour.
# Future pass will move each section to lib/net.py, lib/hosts.py, etc.
def _parse_peer_name(peer_name):
"""
Parse a peer name into (type, identity, index).
phone-nuno -> ('phone', 'nuno', 1)
phone-nuno-2 -> ('phone', 'nuno', 2)
desktop-zephyr -> ('desktop', 'zephyr', 1)
laptop-nuno -> ('laptop', 'nuno', 1)
Returns None if name doesn't match convention.
Convention: {type}-{identity}[-{index}]
Known types: desktop, laptop, phone, tablet, server, iot, none
"""
known_types = {'desktop', 'laptop', 'phone', 'tablet', 'server', 'iot', 'none'}
parts = peer_name.split('-')
if len(parts) < 2:
return None
peer_type = parts[0]
if peer_type not in known_types:
return None
# Check if last part is a numeric index
if len(parts) >= 3 and parts[-1].isdigit():
index = int(parts[-1])
identity = '-'.join(parts[1:-1])
else:
index = 1
identity = '-'.join(parts[1:])
if not identity:
return None
return (peer_type, identity, index)
# ======================================================
# Blocks
# ======================================================
def _block_init(peer_ip):
"""Return empty block structure"""
return {
"peer_ip": peer_ip,
"blocked_direct": False,
"blocked_by_groups": [],
"rules": []
}
def _block_read(file):
try:
with open(file) as f:
content = f.read().strip()
if not content:
return None # empty file = no block data
try:
return json.loads(content)
except json.JSONDecodeError:
# Old format — migrate
lines = content.split('\n')
peer_ip = lines[0].split()[0] if lines else ''
new_data = {
"peer_ip": peer_ip,
"blocked_direct": True,
"blocked_by_groups": [],
"rules": [{"name": "full block", "type": "full"}]
}
with open(file, 'w') as f:
json.dump(new_data, f, indent=2)
return new_data
except FileNotFoundError:
return None
except Exception:
return None
def _block_write(file, data):
"""Write block file"""
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def block_get(file):
"""Read and print block file as JSON"""
data = _block_read(file)
if data:
print(json.dumps(data))
def block_is_blocked(file):
"""Return true if peer is effectively blocked"""
data = _block_read(file)
if not data:
print("false")
return
blocked = data.get("blocked_direct", False) or \
bool(data.get("blocked_by_groups", []))
print("true" if blocked else "false")
def block_set_direct(file, peer_ip, value):
"""Set blocked_direct"""
try:
data = _block_read(file) or _block_init(peer_ip)
data["blocked_direct"] = value.lower() == "true"
data["peer_ip"] = peer_ip
_block_write(file, data)
remaining = data["blocked_direct"] or bool(data.get("blocked_by_groups", []))
pass
# print("true" if remaining else "false")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_add_group(file, peer_ip, group):
"""Add group to blocked_by_groups"""
try:
data = _block_read(file) or _block_init(peer_ip)
data["peer_ip"] = peer_ip
groups = data.get("blocked_by_groups", [])
if group not in groups:
groups.append(group)
data["blocked_by_groups"] = groups
_block_write(file, data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_remove_group(file, peer_ip, group):
"""Remove group from blocked_by_groups, return whether still blocked"""
try:
data = _block_read(file) or _block_init(peer_ip)
groups = data.get("blocked_by_groups", [])
if group in groups:
groups.remove(group)
data["blocked_by_groups"] = groups
_block_write(file, data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
# print("true" if remaining else "false")
pass
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_add_rule(file, peer_ip, rule_type, name="", target="",
port="", proto=""):
"""Add a block rule entry"""
try:
data = _block_read(file) or _block_init(peer_ip)
data["peer_ip"] = peer_ip
rule = {"type": rule_type}
if name: rule["name"] = name
if target: rule["target"] = target
if port: rule["port"] = port
if proto: rule["proto"] = proto
rules = data.get("rules", [])
for existing in rules:
if existing.get("type") == rule_type and \
existing.get("target","") == target and \
existing.get("port","") == port and \
existing.get("proto","") == proto:
return # already exists, skip
rules.append(rule)
data["rules"] = rules
_block_write(file, data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def block_remove_rule(file, rule_type, target="", port="", proto=""):
data = _block_read(file)
if not data:
return
rules = data.get("rules", [])
filtered = [r for r in rules if not (
r.get("type") == rule_type and
r.get("target", "") == target and
r.get("port", "") == port and
r.get("proto", "") == proto
)]
data["rules"] = filtered
_block_write(file, data)
def block_get_rules(file):
"""Print rules as pipe-separated lines: name|type|target|port|proto"""
data = _block_read(file)
if not data:
return
for r in data.get("rules", []):
print(f"{r.get('name','')}|{r.get('type','')}|"
f"{r.get('target','')}|{r.get('port','')}|{r.get('proto','')}")
def block_get_groups(file):
data = _block_read(file)
if not data:
return
print(','.join(data.get('blocked_by_groups', [])))
def block_get_direct(file):
data = _block_read(file)
if not data:
print('false')
return
print('true' if data.get('blocked_direct', False) else 'false')
def block_is_empty(file):
data = _block_read(file)
if not data:
print("true")
return
empty = (
not data.get("blocked_direct", False) and
not data.get("blocked_by_groups", []) and
not data.get("rules", []) and
not data.get("services", [])
)
print("true" if empty else "false")
# ======================================================
# Subnet
# ======================================================
def _subnet_read(file):
"""Read subnets.json, return dict or empty dict"""
try:
if not os.path.exists(file):
return {}
with open(file) as f:
content = f.read().strip()
if not content:
return {}
return json.loads(content)
except Exception:
return {}
def _subnet_write(file, data):
"""Write subnets.json"""
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def _subnet_is_group(entry):
"""Return True if a subnet entry is a nested group (like 'guests')"""
return isinstance(entry, dict) and 'subnet' not in entry
def subnet_lookup(file, name, type_key=''):
"""
Resolve a subnet name (and optional type) to a CIDR string.
For scalar entries: subnet_lookup(file, "desktop") -> "10.1.1.0/24"
For group entries: subnet_lookup(file, "guests", "phone") -> "10.1.103.0/24"
For group with no type: subnet_lookup(file, "guests") -> "10.1.100.0/24" (none slot)
Prints the CIDR on success, nothing and exits 1 on failure.
"""
data = _subnet_read(file)
if name not in data:
sys.exit(1)
entry = data[name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
if key not in entry:
sys.exit(1)
print(entry[key]['subnet'])
else:
print(entry['subnet'])
def subnet_type(file, name, type_key=''):
"""
Return the type string for a subnet entry.
For scalar: subnet_type(file, "desktop") -> "desktop"
For group: subnet_type(file, "guests", "phone") -> "phone"
subnet_type(file, "guests") -> "none"
"""
data = _subnet_read(file)
if name not in data:
sys.exit(1)
entry = data[name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
if key not in entry:
sys.exit(1)
# For group entries the type IS the child key
print(key if key != 'none' else 'none')
else:
print(entry.get('type', name))
def subnet_tunnel_mode(file, name, type_key=''):
"""Return tunnel_mode for a subnet entry"""
data = _subnet_read(file)
if name not in data:
print('split') # safe default
return
entry = data[name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
child = entry.get(key, {})
print(child.get('tunnel_mode', 'split'))
else:
print(entry.get('tunnel_mode', 'split'))
def subnet_for_ip(file, ip):
"""
Reverse lookup: given a peer IP, find which subnet name (and type) it belongs to.
Output: name|type (e.g. "guests|phone" or "desktop|desktop")
Returns nothing (exit 0) if not found — caller falls back to hardcoded map.
"""
import ipaddress
try:
peer_addr = ipaddress.ip_address(ip)
except ValueError:
return
data = _subnet_read(file)
for name, entry in data.items():
if _subnet_is_group(entry):
for type_key, child in entry.items():
try:
network = ipaddress.ip_network(child['subnet'], strict=False)
if peer_addr in network:
print(f"{name}|{type_key}")
return
except Exception:
continue
else:
try:
network = ipaddress.ip_network(entry['subnet'], strict=False)
if peer_addr in network:
peer_type = entry.get('type', name)
print(f"{name}|{peer_type}")
return
except Exception:
continue
def subnet_list(file):
"""
List all subnets for display.
Output per line: name|subnet|type|tunnel_mode|desc|is_group
For group entries, outputs one line per child: name.type|subnet|type|tunnel_mode|desc|true
"""
data = _subnet_read(file)
for name, entry in data.items():
if _subnet_is_group(entry):
for type_key, child in entry.items():
display_name = f"{name}.{type_key}" if type_key != 'none' else name
print(f"{display_name}|{child.get('subnet','')}|"
f"{type_key}|{child.get('tunnel_mode','split')}|"
f"{child.get('desc','')}|true|{name}")
else:
print(f"{name}|{entry.get('subnet','')}|"
f"{entry.get('type',name)}|{entry.get('tunnel_mode','split')}|"
f"{entry.get('desc','')}|false|{name}")
def subnet_show(file, name):
"""Show a single subnet entry (scalar or group) in detail"""
data = _subnet_read(file)
if name not in data:
print(f"Error: Subnet '{name}' not found", file=sys.stderr)
sys.exit(1)
entry = data[name]
if _subnet_is_group(entry):
print(f"name|{name}")
print(f"is_group|true")
for type_key, child in entry.items():
print(f"child|{type_key}|{child.get('subnet','')}|"
f"{child.get('tunnel_mode','split')}|{child.get('desc','')}")
else:
print(f"name|{name}")
print(f"is_group|false")
print(f"subnet|{entry.get('subnet','')}")
print(f"type|{entry.get('type',name)}")
print(f"tunnel_mode|{entry.get('tunnel_mode','split')}")
print(f"desc|{entry.get('desc','')}")
def subnet_add(file, name, subnet, type_key, tunnel_mode, desc, group_parent=''):
"""
Add a new subnet entry.
If group_parent is set, adds as a child under that group key.
Otherwise adds as a scalar entry.
"""
data = _subnet_read(file)
entry = {
'subnet': subnet,
'tunnel_mode': tunnel_mode or 'split',
'desc': desc or ''
}
if group_parent:
# Adding a child to an existing group, or creating a new group
if group_parent not in data:
data[group_parent] = {}
elif not _subnet_is_group(data[group_parent]):
print(f"Error: '{group_parent}' exists but is not a group", file=sys.stderr)
sys.exit(1)
data[group_parent][type_key or 'none'] = entry
else:
# Scalar entry — type stored explicitly
entry['type'] = type_key or name
data[name] = entry
_subnet_write(file, data)
def subnet_remove(file, name, peers_using):
"""
Remove a subnet entry. peers_using is a comma-separated list of peer names
currently using this subnet (passed in from bash after meta scan).
If non-empty, refuses with error.
"""
if peers_using:
peers = [p for p in peers_using.split(',') if p]
if peers:
print(f"Error: Subnet '{name}' is in use by: {', '.join(peers)}", file=sys.stderr)
sys.exit(1)
data = _subnet_read(file)
if '.' in name:
# Removing a group child: e.g. "guests.phone"
parent, child_key = name.split('.', 1)
if parent not in data or not _subnet_is_group(data[parent]):
print(f"Error: Group '{parent}' not found", file=sys.stderr)
sys.exit(1)
if child_key not in data[parent]:
print(f"Error: '{child_key}' not found in group '{parent}'", file=sys.stderr)
sys.exit(1)
del data[parent][child_key]
if not data[parent]:
del data[parent] # remove empty group
else:
if name not in data:
print(f"Error: Subnet '{name}' not found", file=sys.stderr)
sys.exit(1)
del data[name]
_subnet_write(file, data)
def subnet_rename(file, old_name, new_name, peers_using):
"""
Rename a subnet entry. Hard refusal if any peers reference it.
peers_using: comma-separated peer names from bash meta scan.
"""
if peers_using:
peers = [p for p in peers_using.split(',') if p]
if peers:
print(f"Error: Cannot rename subnet '{old_name}' — in use by: {', '.join(peers)}", file=sys.stderr)
sys.exit(1)
data = _subnet_read(file)
if old_name not in data:
print(f"Error: Subnet '{old_name}' not found", file=sys.stderr)
sys.exit(1)
if new_name in data:
print(f"Error: Subnet '{new_name}' already exists", file=sys.stderr)
sys.exit(1)
data[new_name] = data.pop(old_name)
_subnet_write(file, data)
def subnet_peers(meta_dir, clients_dir, subnet_name, subnets_file):
"""
Find all peers using a subnet.
Two-pass check:
1. Meta field: peer has "subnet": subnet_name in their .meta file
2. IP fallback: peer's IP falls within the subnet's CIDR(s)
(catches peers added before meta stored subnet explicitly)
Output: one peer name per line.
"""
import glob
import ipaddress
# Resolve all CIDRs covered by this subnet name
data = _subnet_read(subnets_file)
cidrs = []
if subnet_name in data:
entry = data[subnet_name]
if _subnet_is_group(entry):
for child in entry.values():
try:
cidrs.append(ipaddress.ip_network(child['subnet'], strict=False))
except Exception:
pass
else:
try:
cidrs.append(ipaddress.ip_network(entry['subnet'], strict=False))
except Exception:
pass
printed = set()
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
peer_name = os.path.basename(conf).replace('.conf', '')
# Pass 1: check meta field
meta_file = os.path.join(meta_dir, f"{peer_name}.meta")
try:
with open(meta_file) as f:
meta = json.load(f)
if meta.get('subnet', '') == subnet_name:
if peer_name not in printed:
print(peer_name)
printed.add(peer_name)
continue
except Exception:
pass
# Pass 2: IP reverse lookup against subnet CIDRs
if not cidrs:
continue
peer_ip = ''
try:
with open(conf) as f:
for line in f:
if line.startswith('Address'):
peer_ip = line.split('=')[1].strip().split('/')[0]
break
except Exception:
continue
if not peer_ip:
continue
try:
addr = ipaddress.ip_address(peer_ip)
if any(addr in cidr for cidr in cidrs):
if peer_name not in printed:
print(peer_name)
printed.add(peer_name)
except Exception:
continue
def subnet_exists(file, name):
"""Check if a subnet name exists (scalar or group). Exits 0/1."""
data = _subnet_read(file)
if '.' in name:
parent, child_key = name.split('.', 1)
exists = parent in data and _subnet_is_group(data[parent]) and child_key in data[parent]
else:
exists = name in data
sys.exit(0 if exists else 1)
def subnet_policy(subnets_file, subnet_name, type_key=''):
"""
Get the policy name for a subnet entry.
Falls back to 'default' if no policy set.
"""
data = _subnet_read(subnets_file)
if subnet_name not in data:
print('default')
return
entry = data[subnet_name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
child = entry.get(key, {})
print(child.get('policy', 'default'))
else:
print(entry.get('policy', 'default'))
def subnet_default_rule(file, name, type_key=''):
"""
Return the default_rule for a subnet entry, or empty string if none set.
For scalar: subnet_default_rule(file, "desktop") -> ""
For group: subnet_default_rule(file, "guests", "phone") -> "guest"
"""
data = _subnet_read(file)
if name not in data:
print('')
return
entry = data[name]
if _subnet_is_group(entry):
key = type_key if type_key else 'none'
child = entry.get(key, {})
print(child.get('default_rule', ''))
else:
print(entry.get('default_rule', ''))
def subnet_list_names(file):
"""
List all top-level subnet names, one per line.
Used for dynamic flag registration in commands.
Output: one name per line (e.g. desktop, laptop, guests, servers, iot)
"""
data = _subnet_read(file)
for name in data.keys():
print(name)
# ======================================================
# Identity
# ======================================================
def identity_rules(file):
"""
Return all rules assigned to an identity, one per line.
Reads from 'rules' array (1:N). Falls back to 'rule' scalar for migration.
"""
data = _identity_read(file)
if not data:
return
# Support legacy scalar 'rule' field
rules = data.get('rules', [])
if not rules and data.get('rule'):
rules = [data['rule']]
for r in rules:
if r:
print(r)
def identity_add_rule(file, identity_name, rule_name):
"""
Add a rule to an identity's rules array.
Warns if already present (prints warning to stderr, exits 2).
Creates identity file if it doesn't exist.
"""
data = _identity_read(file) or _identity_init(identity_name)
rules = data.get('rules', [])
# Migrate legacy scalar field
if 'rule' in data and data['rule']:
if data['rule'] not in rules:
rules.append(data['rule'])
del data['rule']
if rule_name in rules:
print(f"Warning: Rule '{rule_name}' is already assigned to identity '{identity_name}'",
file=sys.stderr)
sys.exit(2)
rules.append(rule_name)
data['rules'] = rules
_identity_write(file, data)
def identity_remove_rule(file, rule_name):
"""
Remove a specific rule from an identity's rules array.
Exits 1 if rule not found.
"""
data = _identity_read(file)
if not data:
print(f"Error: Identity not found", file=sys.stderr)
sys.exit(1)
rules = data.get('rules', [])
if rule_name not in rules:
print(f"Error: Rule '{rule_name}' not assigned to this identity", file=sys.stderr)
sys.exit(1)
rules.remove(rule_name)
data['rules'] = rules
_identity_write(file, data)
def identity_clear_rules(file):
"""Remove all rules from an identity."""
data = _identity_read(file)
if not data:
return
data['rules'] = []
data.pop('rule', None) # remove legacy scalar too
_identity_write(file, data)
def identity_has_rule(file, rule_name):
"""Exit 0 if identity has this rule, 1 otherwise."""
data = _identity_read(file)
if not data:
sys.exit(1)
rules = data.get('rules', [])
if not rules and data.get('rule'):
rules = [data['rule']]
sys.exit(0 if rule_name in rules else 1)
def _identity_read(file):
"""Read an identity file, return dict or None"""
try:
if not os.path.exists(file):
return None
with open(file) as f:
content = f.read().strip()
if not content:
return None
return json.loads(content)
except Exception:
return None
def _identity_write(file, data):
"""Write an identity file"""
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def _identity_init(name):
"""Return empty identity structure"""
return {
'name': name,
'peers': [],
'devices': {}
}
def identity_list(identities_dir):
"""
List all identities with peer count, rules and policy.
Output per line: name|peer_count|types|rules|policy
"""
import glob
for id_file in sorted(glob.glob(f"{identities_dir}/*.identity")):
try:
with open(id_file) as f:
data = json.load(f)
name = data.get('name', '')
peers = data.get('peers', [])
devices = data.get('devices', {})
rules = data.get('rules', [])
# Migrate legacy scalar rule field
if not rules and data.get('rule'):
rules = [data['rule']]
policy = data.get('policy', 'default')
types = sorted(set(
d.get('type', '') for d in devices.values() if d.get('type')
))
print(f"{name}|{len(peers)}|{','.join(types)}|{','.join(rules)}|{policy}")
except Exception:
continue
def identity_show(file):
"""Show identity details"""
data = _identity_read(file)
if not data:
print("Error: Identity not found", file=sys.stderr)
sys.exit(1)
print(f"name|{data.get('name','')}")
print(f"peer_count|{len(data.get('peers',[]))}")
for peer_name, dev in data.get('devices', {}).items():
print(f"device|{peer_name}|{dev.get('type','')}|{dev.get('index',1)}")
def identity_add_peer(file, identity_name, peer_name, peer_type, index):
"""Add a peer to an identity file, creating it if needed"""
data = _identity_read(file) or _identity_init(identity_name)
if peer_name not in data['peers']:
data['peers'].append(peer_name)
data['devices'][peer_name] = {
'type': peer_type,
'index': int(index)
}
_identity_write(file, data)
def identity_remove_peer(file, peer_name):
"""Remove a peer from an identity file"""
data = _identity_read(file)
if not data:
return
data['peers'] = [p for p in data['peers'] if p != peer_name]
data['devices'].pop(peer_name, None)
_identity_write(file, data)
def identity_remove(file):
"""Delete an identity file — existence check done in bash"""
try:
os.remove(file)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def identity_next_index(file, peer_type):
"""
Return the next available index for a given type within an identity.
phone-nuno exists (index 1) -> returns 2
No phones exist -> returns 1
"""
data = _identity_read(file)
if not data:
print(1)
return
existing = [
d.get('index', 1)
for d in data.get('devices', {}).values()
if d.get('type') == peer_type
]
if not existing:
print(1)
return
# Find lowest unused index starting from 1
used = set(existing)
i = 1
while i in used:
i += 1
print(i)
def identity_peers(file, filter_type=''):
"""
List peers belonging to an identity, optionally filtered by type.
Output: one peer name per line.
"""
data = _identity_read(file)
if not data:
return
for peer_name in data.get('peers', []):
if filter_type:
dev = data.get('devices', {}).get(peer_name, {})
if dev.get('type') != filter_type:
continue
print(peer_name)
def identity_migrate(identities_dir, clients_dir, meta_dir, dry_run):
"""
Scan all peer configs and auto-create identity files from name convention.
dry_run: 'true' -> print what would be done, no writes.
Output per action: action|identity|peer|type|index
"""
import glob
is_dry = dry_run == 'true'
grouped = {} # identity_name -> [(peer_name, type, index)]
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
peer_name = os.path.basename(conf).replace('.conf', '')
parsed = _parse_peer_name(peer_name)
if not parsed:
print(f"skip|{peer_name}")
continue
peer_type, identity_name, index = parsed
if identity_name not in grouped:
grouped[identity_name] = []
grouped[identity_name].append((peer_name, peer_type, index))
for identity_name, peers in sorted(grouped.items()):
id_file = os.path.join(identities_dir, f"{identity_name}.identity")
for peer_name, peer_type, index in peers:
print(f"create|{identity_name}|{peer_name}|{peer_type}|{index}")
if not is_dry:
identity_add_peer(id_file, identity_name, peer_name, peer_type, index)
def identity_infer(peer_name):
"""
Parse a peer name and print identity|type|index, or nothing if no match.
Used by add.command.sh to auto-attach on vanilla wgctl add.
"""
parsed = _parse_peer_name(peer_name)
if parsed:
peer_type, identity_name, index = parsed
print(f"{identity_name}|{peer_type}|{index}")
def identity_exists(file):
"""Exit 0 if identity file exists and is valid, else exit 1"""
data = _identity_read(file)
sys.exit(0 if data is not None else 1)
# ======================================================
# Policy
# ======================================================
def _policy_read(file):
try:
if not os.path.exists(file):
return {}
with open(file) as f:
content = f.read().strip()
if not content:
return {}
return json.loads(content)
except Exception:
return {}
def _policy_write(file, data):
"""Write policies.json."""
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def _policy_get_entry(data, name):
"""Get a policy entry, falling back to 'default' policy values for missing fields."""
entry = data.get(name, {})
default = data.get('default', _POLICY_DEFAULTS.get('default', {}))
# Merge: entry fields override default
resolved = dict(default)
resolved.update(entry)
return resolved
def policy_get(file, name, field=''):
"""
Get a policy entry or a specific field from it.
policy_get(file, "guest") -> prints all fields as key|value lines
policy_get(file, "guest", "tunnel_mode") -> prints "split"
policy_get(file, "guest", "strict_rule") -> prints "true" or "false"
"""
data = _policy_read(file)
entry = _policy_get_entry(data, name)
if field:
val = entry.get(field)
if val is None:
print('')
elif isinstance(val, bool):
print('true' if val else 'false')
else:
print(val)
else:
for k, v in entry.items():
if isinstance(v, bool):
print(f"{k}|{'true' if v else 'false'}")
elif v is None:
print(f"{k}|")
else:
print(f"{k}|{v}")
def policy_list(file):
"""
List all policies.
Output per line: name|tunnel_mode|default_rule|strict_rule|auto_apply|desc
"""
data = _policy_read(file)
for name, raw_entry in data.items():
entry = _policy_get_entry(data, name)
tunnel_mode = entry.get('tunnel_mode', 'split')
default_rule = entry.get('default_rule') or ''
strict_rule = 'true' if entry.get('strict_rule', False) else 'false'
auto_apply = 'true' if entry.get('auto_apply', True) else 'false'
desc = entry.get('desc', '')
print(f"{name}|{tunnel_mode}|{default_rule}|{strict_rule}|{auto_apply}|{desc}")
def policy_exists(file, name):
"""Exit 0 if policy exists, 1 otherwise."""
data = _policy_read(file)
sys.exit(0 if name in data else 1)
def policy_add(file, name, tunnel_mode, default_rule, strict_rule, auto_apply, desc):
"""Add or update a policy entry."""
data = _policy_read(file)
data[name] = {
'tunnel_mode': tunnel_mode or 'split',
'default_rule': default_rule if default_rule else None,
'strict_rule': strict_rule == 'true',
'auto_apply': auto_apply != 'false',
'desc': desc or ''
}
_policy_write(file, data)
def policy_remove(file, name):
"""Remove a policy. Refuses to remove hardcoded defaults."""
if name in _POLICY_DEFAULTS:
print(f"Error: Cannot remove built-in policy '{name}'", file=sys.stderr)
sys.exit(1)
data = _policy_read(file)
if name not in data:
print(f"Error: Policy '{name}' not found", file=sys.stderr)
sys.exit(1)
del data[name]
_policy_write(file, data)
def policy_set_field(file, name, field, value):
"""Set a single field on an existing policy."""
data = _policy_read(file)
if name not in data:
print(f"Error: Policy '{name}' not found", file=sys.stderr)
sys.exit(1)
entry = data[name]
if field in ('strict_rule', 'auto_apply'):
entry[field] = value == 'true'
elif field == 'default_rule':
entry[field] = value if value else None
else:
entry[field] = value
data[name] = entry
_policy_write(file, data)
# ======================================================
# Hosts
# ======================================================
def _hosts_read(file):
if not os.path.exists(file):
return {"hosts": {}, "subnets": {}, "ports": {}}
try:
with open(file) as f:
data = json.load(f)
# Ensure all sections exist
data.setdefault("hosts", {})
data.setdefault("subnets", {})
data.setdefault("ports", {})
return data
except Exception:
return {"hosts": {}, "subnets": {}, "ports": {}}
def _hosts_write(file, data):
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def hosts_list(file):
"""
List all host entries.
Output per line: type|key|name|desc|tags
"""
data = _hosts_read(file)
for ip, entry in sorted(data["hosts"].items()):
if isinstance(entry, dict):
name = entry.get("name", "")
desc = entry.get("desc", "")
tags = ",".join(entry.get("tags", []))
else:
name = str(entry)
desc = ""
tags = ""
print(f"host|{ip}|{name}|{desc}|{tags}")
for subnet, entry in sorted(data["subnets"].items()):
if isinstance(entry, dict):
name = entry.get("name", "")
desc = entry.get("desc", "")
tags = ",".join(entry.get("tags", []))
else:
name = str(entry)
desc = ""
tags = ""
print(f"subnet|{subnet}|{name}|{desc}|{tags}")
for port, entry in sorted(data["ports"].items()):
if isinstance(entry, dict):
name = entry.get("name", "")
desc = entry.get("desc", "")
tags = ",".join(entry.get("tags", []))
else:
name = str(entry)
desc = ""
tags = ""
print(f"port|{port}|{name}|{desc}|{tags}")
def hosts_show(file, key, entry_type):
"""Show a single host entry. type: host|subnet|port"""
data = _hosts_read(file)
section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"}
section = section_map.get(entry_type, "hosts")
entry = data[section].get(key)
if not entry:
print(f"Error: not found: {key}", file=sys.stderr)
sys.exit(1)
if isinstance(entry, dict):
print(f"name|{entry.get('name', '')}")
print(f"desc|{entry.get('desc', '')}")
print(f"tags|{','.join(entry.get('tags', []))}")
else:
print(f"name|{entry}")
print(f"desc|")
print(f"tags|")
def hosts_add(file, entry_type, key, name, desc, tags):
"""
Add a host entry.
entry_type: host|subnet|port
key: IP, subnet CIDR, or port number
"""
data = _hosts_read(file)
section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"}
section = section_map.get(entry_type, "hosts")
tag_list = [t.strip() for t in tags.split(",") if t.strip()] if tags else []
data[section][key] = {
"name": name,
"desc": desc,
"tags": tag_list
}
_hosts_write(file, data)
def hosts_remove(file, entry_type, key):
"""Remove a host entry."""
data = _hosts_read(file)
section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"}
section = section_map.get(entry_type, "hosts")
if key not in data[section]:
print(f"Error: not found: {key}", file=sys.stderr)
sys.exit(1)
del data[section][key]
_hosts_write(file, data)
def hosts_exists(file, entry_type, key):
"""Check if a host entry exists."""
data = _hosts_read(file)
section_map = {"host": "hosts", "subnet": "subnets", "port": "ports"}
section = section_map.get(entry_type, "hosts")
print("true" if key in data[section] else "false")
def hosts_lookup(file, ip):
"""
Resolve an IP to a display name.
Checks hosts section only (exact match).
Returns name or empty string.
"""
data = _hosts_read(file)
entry = data["hosts"].get(ip)
if not entry:
print("")
return
if isinstance(entry, dict):
print(entry.get("name", ip))
else:
print(str(entry))
# ======================================================
# Peer History
# ======================================================
def peer_history_lookup(history_dir, ip):
"""
Look up peer name for an endpoint IP using the index file.
Falls back to scanning peer files if index doesn't exist.
Returns peer name or empty string.
"""
import glob, os
index_file = os.path.join(history_dir, "endpoint_index.json")
try:
if os.path.exists(index_file):
with open(index_file) as f:
index = json.load(f)
result = index.get(ip, '')
if result:
print(result)
return
except Exception:
pass
# Fallback: scan peer files (rebuilds index implicitly)
try:
for hist_file in glob.glob(os.path.join(history_dir, "*.json")):
if os.path.basename(hist_file) == "endpoint_index.json":
continue
try:
with open(hist_file) as f:
data = json.load(f)
if ip in data.get("endpoints", {}):
print(data.get("peer", ""))
return
except Exception:
pass
except Exception:
pass
# ======================================================
def _net_read(file):
try:
if not os.path.exists(file): return {}
with open(file) as f:
content = f.read().strip()
if not content: return {}
return json.loads(content)
except Exception:
return {}
def _net_write(file, data):
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, 'w') as f:
json.dump(data, f, indent=2)
def net_list(file):
data = _net_read(file)
for name, svc in sorted(data.items()):
print(f"{name}|{svc.get('ip','')}|{svc.get('desc','')}|"
f"{','.join(svc.get('tags',[]))}|{len(svc.get('ports',{}))}")
def net_show(file, name):
data = _net_read(file)
if name not in data:
print(f"Error: Service not found: {name}", file=sys.stderr); sys.exit(1)
svc = data[name]
print(f"name|{name}")
print(f"ip|{svc.get('ip','')}")
print(f"desc|{svc.get('desc','')}")
print(f"tags|{','.join(svc.get('tags',[]))}")
for port_name, port_def in svc.get('ports', {}).items():
print(f"port|{port_name}|{port_def.get('port','')}|"
f"{port_def.get('proto','tcp')}|{port_def.get('desc','')}")
def net_exists(file, name):
data = _net_read(file)
if ':' in name:
svc_name, port_name = name.split(':', 1)
if port_name == 'ports':
print('true' if svc_name in data else 'false')
else:
svc = data.get(svc_name, {})
print('true' if port_name in svc.get('ports', {}) else 'false')
else:
print('true' if name in data else 'false')
def net_add_service(file, name, ip, desc='', tags=''):
data = _net_read(file)
if name not in data:
data[name] = {'ip': ip, 'ports': {}}
else:
data[name]['ip'] = ip
if desc: data[name]['desc'] = desc
if tags: data[name]['tags'] = [t.strip() for t in tags.split(',') if t.strip()]
_net_write(file, data)
def net_add_port(file, service, port_name, port, proto='tcp', desc=''):
data = _net_read(file)
if service not in data:
print(f"Error: Service not found: {service}", file=sys.stderr); sys.exit(1)
if 'ports' not in data[service]:
data[service]['ports'] = {}
entry = {'port': int(port), 'proto': proto}
if desc: entry['desc'] = desc
data[service]['ports'][port_name] = entry
_net_write(file, data)
def net_remove(file, name):
data = _net_read(file)
if ':' in name:
svc_name, port_name = name.split(':', 1)
if svc_name not in data:
print(f"Error: Service not found: {svc_name}", file=sys.stderr); sys.exit(1)
if port_name == 'ports':
data[svc_name]['ports'] = {}
else:
if port_name not in data[svc_name].get('ports', {}):
print(f"Error: Port not found: {port_name}", file=sys.stderr); sys.exit(1)
del data[svc_name]['ports'][port_name]
else:
if name not in data:
print(f"Error: Service not found: {name}", file=sys.stderr); sys.exit(1)
del data[name]
_net_write(file, data)
def net_resolve(file, name):
data = _net_read(file)
if ':' in name:
svc_name, port_name = name.split(':', 1)
if svc_name not in data:
print(f"Error: Service not found: {svc_name}", file=sys.stderr); sys.exit(1)
svc = data[svc_name]
ip = svc.get('ip', '')
if port_name == 'ports':
for pname, pdef in svc.get('ports', {}).items():
print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}")
else:
if port_name not in svc.get('ports', {}):
print(f"Error: Port not found: {port_name}", file=sys.stderr); sys.exit(1)
pdef = svc['ports'][port_name]
print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}")
else:
if name not in data:
print(f"Error: Service not found: {name}", file=sys.stderr); sys.exit(1)
print(data[name].get('ip', ''))
def net_reverse_lookup(file, ip, port='', proto=''):
from lib.util import load_net_data, reverse_lookup as _rl
net_data = _rl(load_net_data(file), ip, port, proto)
if net_data:
print(net_data)
def group_list_data(groups_dir, blocks_dir, clients_dir):
import glob
blocked_peers = set()
for f in glob.glob(f"{blocks_dir}/*.block"):
blocked_peers.add(os.path.basename(f).replace('.block', ''))
for group_file in sorted(glob.glob(f"{groups_dir}/*.group")):
try:
with open(group_file) as f:
g = json.load(f)
name = g.get('name', '')
desc = g.get('desc', '')
peers = [p for p in g.get('peers', []) if p]
valid = [p for p in peers
if os.path.exists(os.path.join(clients_dir, f"{p}.conf"))]
total = len(valid)
blocked = sum(1 for p in peers if p in blocked_peers)
print(f"{name}|{desc}|{total}|{blocked}")
except Exception:
pass
def create_group(file, name, desc):
try:
with open(file, 'w') as f:
json.dump({'name': name, 'desc': desc, 'peers': []}, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr); sys.exit(1)
def group_has_peer(file, peer_name):
try:
with open(file) as f:
data = json.load(f)
print('true' if peer_name in data.get('peers', []) else 'false')
except Exception:
print('false')
# ── All block/subnet/identity/policy/hosts functions copied verbatim ────────
# (unchanged from original — omitting here for brevity but must be included
# in the actual deployed file)
def json_get_nested(file, *keys):
try:
with open(file) as f:
data = json.load(f)
val = data
for key in keys:
if not isinstance(val, dict):
print(''); return
val = val.get(key)
if val is None:
print(''); return
if isinstance(val, bool): print('true' if val else 'false')
elif val is None: print('')
else: print(val)
except Exception:
print('')
def json_set_nested(file, *args):
if len(args) < 2: return
keys, value = args[:-1], args[-1]
try:
data = {}
if os.path.exists(file):
with open(file) as f:
data = json.load(f)
target = data
for key in keys[:-1]:
if key not in target or not isinstance(target[key], dict):
target[key] = {}
target = target[key]
final_key = keys[-1]
if value == 'true': target[final_key] = True
elif value == 'false': target[final_key] = False
else: target[final_key] = value
with open(file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error: {e}", file=sys.stderr); sys.exit(1)
# ── Commands dispatch ────────────────────────────────────────────────────────
commands = {
# JSON CRUD
'get': lambda args: get(args[0], args[1]),
'set': lambda args: set_key(args[0], args[1], args[2]),
'delete': lambda args: delete_key(args[0], args[1]),
'append': lambda args: append(args[0], args[1], args[2]),
'remove': lambda args: remove_value(args[0], args[1], args[2]),
'cat': lambda args: cat(args[0]),
'has_key': lambda args: has_key(args[0], args[1]),
'filter_values': lambda args: filter_values(args[0], args[1], args[2]),
'count': lambda args: count(args[0], args[1]),
'get_raw': lambda args: get_raw(args[0], args[1]),
'fmt_datetime': lambda args: fmt_datetime(args[0], args[1]),
'json_get_nested': lambda args: json_get_nested(args[0], *args[1:]),
'json_set_nested': lambda args: json_set_nested(args[0], *args[1:]),
'cleanup_config': lambda args: cleanup_config(args[0]),
'remove_peer_block':lambda args: remove_peer_block(args[0], args[1]),
'audit_fw_counts': lambda args: audit_fw_counts(args[0]),
# Events (from lib.events)
'fw_events': lambda args: __import__('lib.events', fromlist=['fw_events']).fw_events(
args[0], args[1], args[2], args[3], args[4],
args[5] if len(args) > 5 else '50',
args[6] if len(args) > 6 else '1',
args[7] if len(args) > 7 else '',
args[8] if len(args) > 8 else '',
args[9] if len(args) > 9 else '',
args[10] if len(args) > 10 else 'desc',
args[11] if len(args) > 11 else ''),
'wg_events': lambda args: __import__('lib.events', fromlist=['wg_events']).wg_events(
args[0], args[1], args[2],
args[3] if len(args) > 3 else '50',
args[4] if len(args) > 4 else '1',
args[5] if len(args) > 5 else '',
args[6] if len(args) > 6 else '',
args[7] if len(args) > 7 else '',
args[8] if len(args) > 8 else 'desc'),
'parse_event': lambda args: __import__('lib.events', fromlist=['parse_event']).parse_event(args[0]),
'parse_fw_event': lambda args: __import__('lib.events', fromlist=['parse_fw_event']).parse_fw_event(args[0]),
'format_fw_event': lambda args: __import__('lib.events', fromlist=['format_fw_event']).format_fw_event(args[0], args[1]),
'format_wg_event': lambda args: __import__('lib.events', fromlist=['format_wg_event']).format_wg_event(args[0]),
'remove_events': lambda args: __import__('lib.events', fromlist=['remove_events']).remove_events(args[0], args[1]),
'remove_events_filtered': lambda args: __import__('lib.events', fromlist=['remove_events_filtered']).remove_events_filtered(
args[0], args[1], args[2], args[3],
args[4] == 'true', args[5] == 'true',
args[6] if len(args) > 6 else ''),
'follow_logs': lambda args: __import__('lib.events', fromlist=['follow_logs']).follow_logs(
args[0], args[1], args[2], args[3], args[4],
args[5] if len(args) > 5 else ''),
'last_event': lambda args: __import__('lib.events', fromlist=['last_event']).last_event(args[0], args[1], args[2], args[3]),
'events_for': lambda args: __import__('lib.events', fromlist=['events_for']).events_for(args[0], args[1], args[2]),
'iso_to_ts': lambda args: __import__('lib.events', fromlist=['iso_to_ts']).iso_to_ts(args[0]),
# Peers (from lib.peers)
'peer_data': lambda args: __import__('lib.peers', fromlist=['peer_data']).peer_data(args[0], args[1], args[2]),
'peer_transfer': lambda args: __import__('lib.peers', fromlist=['peer_transfer']).peer_transfer(args[0]),
'peer_transfer_delta': lambda args: __import__('lib.peers', fromlist=['peer_transfer_delta']).peer_transfer_delta(args[0], args[1]),
'peer_group_map': lambda args: __import__('lib.peers', fromlist=['peer_group_map']).peer_group_map(args[0]),
'peer_groups': lambda args: __import__('lib.peers', fromlist=['peer_groups']).peer_groups(args[0], args[1]),
# Activity (from lib.activity)
'activity_aggregate': lambda args: __import__('lib.activity', fromlist=['activity_aggregate']).activity_aggregate(
args[0], args[1], args[2], args[3], args[4],
args[5], args[6] if len(args) > 6 else '24',
args[7] if len(args) > 7 else '',
args[8] if len(args) > 8 else ''),
# Rules
'rule_resolve': lambda args: rule_resolve(args[0], args[1]),
'rule_resolve_field':lambda args: rule_resolve_field(args[0], args[1], args[2]),
'rule_list_data': lambda args: rule_list_data(args[0], args[1]),
'rule_inspect': lambda args: rule_inspect(args[0], args[1]),
'count_resolved': lambda args: count_resolved(args[0], args[1], args[2]),
'create_rule': lambda args: create_rule(*args),
'find_rule_file': lambda args: print(find_rule_file(args[0], args[1])),
# Net
'net_list': lambda args: net_list(args[0]),
'net_show': lambda args: net_show(args[0], args[1]),
'net_exists': lambda args: net_exists(args[0], args[1]),
'net_add_service': lambda args: net_add_service(args[0], args[1], args[2],
args[3] if len(args) > 3 else '',
args[4] if len(args) > 4 else ''),
'net_add_port': lambda args: net_add_port(args[0], args[1], args[2], args[3],
args[4] if len(args) > 4 else 'tcp',
args[5] if len(args) > 5 else ''),
'net_remove': lambda args: net_remove(args[0], args[1]),
'net_resolve': lambda args: net_resolve(args[0], args[1]),
'net_reverse_lookup':lambda args: net_reverse_lookup(args[0], args[1],
args[2] if len(args) > 2 else '',
args[3] if len(args) > 3 else ''),
# Groups
'group_list_data': lambda args: group_list_data(args[0], args[1], args[2]),
'create_group': lambda args: create_group(args[0], args[1],
args[2] if len(args) > 2 else ''),
'group_has_peer': lambda args: group_has_peer(args[0], args[1]),
# Block
'block_get': lambda args: block_get(args[0]),
'block_is_blocked': lambda args: block_is_blocked(args[0]),
'block_set_direct': lambda args: block_set_direct(args[0], args[1], args[2]),
'block_add_group': lambda args: block_add_group(args[0], args[1], args[2]),
'block_remove_group': lambda args: block_remove_group(args[0], args[1], args[2]),
'block_add_rule': lambda args: block_add_rule(
args[0], args[1], args[2],
args[3] if len(args) > 3 else '',
args[4] if len(args) > 4 else '',
args[5] if len(args) > 5 else '',
args[6] if len(args) > 6 else ''
),
'block_remove_rule': lambda args: block_remove_rule(
args[0], args[1],
args[2] if len(args) > 2 else '',
args[3] if len(args) > 3 else '',
args[4] if len(args) > 4 else ''
),
'block_get_rules': lambda args: block_get_rules(args[0]),
'block_get_groups': lambda args: block_get_groups(args[0]),
'block_get_direct': lambda args: block_get_direct(args[0]),
'block_is_empty': lambda args: block_is_empty(args[0]),
# Subnet
'subnet_lookup': lambda args: subnet_lookup(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_type': lambda args: subnet_type(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_tunnel_mode': lambda args: subnet_tunnel_mode(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_for_ip': lambda args: subnet_for_ip(args[0], args[1]),
'subnet_list': lambda args: subnet_list(args[0]),
'subnet_show': lambda args: subnet_show(args[0], args[1]),
'subnet_add': lambda args: subnet_add(
args[0], args[1], args[2], args[3],
args[4] if len(args) > 4 else 'split',
args[5] if len(args) > 5 else '',
args[6] if len(args) > 6 else ''
),
'subnet_remove': lambda args: subnet_remove(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_rename': lambda args: subnet_rename(args[0], args[1], args[2], args[3] if len(args) > 3 else ''),
'subnet_peers': lambda args: subnet_peers(args[0], args[1], args[2], args[3]),
'subnet_exists': lambda args: subnet_exists(args[0], args[1]),
'subnet_default_rule': lambda args: subnet_default_rule(args[0], args[1], args[2] if len(args) > 2 else ''),
'subnet_list_names': lambda args: subnet_list_names(args[0]),
# Identity
'identity_list': lambda args: identity_list(args[0]),
'identity_show': lambda args: identity_show(args[0]),
'identity_add_peer': lambda args: identity_add_peer(args[0], args[1], args[2], args[3], args[4]),
'identity_remove_peer':lambda args: identity_remove_peer(args[0], args[1]),
'identity_remove': lambda args: identity_remove(args[0]),
'identity_next_index': lambda args: identity_next_index(args[0], args[1]),
'identity_peers': lambda args: identity_peers(args[0], args[1] if len(args) > 1 else ''),
'identity_migrate': lambda args: identity_migrate(args[0], args[1], args[2], args[3]),
'identity_infer': lambda args: identity_infer(args[0]),
'identity_exists': lambda args: identity_exists(args[0]),
'identity_rules': lambda args: identity_rules(args[0]),
'identity_add_rule': lambda args: identity_add_rule(args[0], args[1], args[2]),
'identity_remove_rule': lambda args: identity_remove_rule(args[0], args[1]),
'identity_clear_rules': lambda args: identity_clear_rules(args[0]),
'identity_has_rule': lambda args: identity_has_rule(args[0], args[1]),
# Policy
'policy_get': lambda args: policy_get(args[0], args[1], args[2] if len(args) > 2 else ''),
'policy_list': lambda args: policy_list(args[0]),
'policy_exists': lambda args: policy_exists(args[0], args[1]),
'policy_add': lambda args: policy_add(args[0], args[1], args[2], args[3], args[4], args[5], args[6] if len(args) > 6 else ''),
'policy_remove': lambda args: policy_remove(args[0], args[1]),
'policy_set_field': lambda args: policy_set_field(args[0], args[1], args[2], args[3]),
'subnet_policy': lambda args: subnet_policy(args[0], args[1], args[2] if len(args) > 2 else ''),
# Hosts
'hosts_list': lambda args: hosts_list(args[0]),
'hosts_show': lambda args: hosts_show(args[0], args[1], args[2]),
'hosts_add': lambda args: hosts_add(args[0], args[1], args[2], args[3],
args[4] if len(args) > 4 else '',
args[5] if len(args) > 5 else ''),
'hosts_remove': lambda args: hosts_remove(args[0], args[1], args[2]),
'hosts_exists': lambda args: hosts_exists(args[0], args[1], args[2]),
'hosts_lookup': lambda args: hosts_lookup(args[0], args[1]),
'clean_handshakes': lambda args: clean_handshakes(args[0], args[1] if len(args) > 1 else '300'),
'batch_resolve': lambda args: batch_resolve(args[0], args[1], *args[2:]),
'peer_history_lookup': lambda args: peer_history_lookup(args[0], args[1]),
}
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
if len(sys.argv) < 2:
print("Usage: json_helper.py <command> [args...]", file=sys.stderr)
sys.exit(1)
cmd = sys.argv[1]
args = sys.argv[2:]
if cmd not in commands:
print(f"Unknown command: {cmd}", file=sys.stderr)
sys.exit(1)
try:
commands[cmd](args)
except Exception as e:
print(f"Error in {cmd}: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()