2531 lines
No EOL
87 KiB
Python
2531 lines
No EOL
87 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
wgctl JSON helper — called by shell functions to read/write JSON files.
|
|
Usage: json_helper.py <command> <file> [key] [value]
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import os
|
|
|
|
DATETIME_FMT = os.environ.get('WGCTL_DATETIME_FMT', '%Y-%m-%d %H:%M')
|
|
|
|
def get(file, key):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
val = data.get(key, [])
|
|
if isinstance(val, bool):
|
|
print(str(val).lower()) # true/false not True/False
|
|
elif isinstance(val, list):
|
|
if val:
|
|
print('\n'.join(str(v) for v in val))
|
|
else:
|
|
if val:
|
|
print(val)
|
|
except:
|
|
sys.exit(0)
|
|
|
|
def set_key(file, key, value):
|
|
try:
|
|
data = {}
|
|
if os.path.exists(file):
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
# Try to parse as JSON value first (for arrays/bools)
|
|
try:
|
|
data[key] = json.loads(value)
|
|
except:
|
|
data[key] = value
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def delete_key(file, key):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
data.pop(key, None)
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def append(file, key, value):
|
|
try:
|
|
data = {}
|
|
if os.path.exists(file):
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
if key not in data:
|
|
data[key] = []
|
|
if value not in data[key]:
|
|
data[key].append(value)
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def remove_value(file, key, value):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
if key in data and value in data[key]:
|
|
data[key].remove(value)
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def cat(file):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
print(json.dumps(data, indent=2))
|
|
except Exception as e:
|
|
sys.exit(1)
|
|
|
|
def has_key(file, key):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
sys.exit(0 if key in data else 1)
|
|
except:
|
|
sys.exit(1)
|
|
|
|
def filter_values(file, key, value):
|
|
"""Remove all entries where value matches"""
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
data = {k: v for k, v in data.items() if v != value}
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def last_event(file, key, field, client):
|
|
"""Get last event field for a client"""
|
|
try:
|
|
last = None
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if e.get(key) == client:
|
|
last = e
|
|
except:
|
|
pass
|
|
if last:
|
|
print(last.get(field, ''))
|
|
except:
|
|
pass
|
|
|
|
def events_for(file, ip, limit):
|
|
"""Format events for a given IP"""
|
|
try:
|
|
from datetime import datetime
|
|
events = []
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if e.get('ip') == ip:
|
|
events.append(e)
|
|
except:
|
|
pass
|
|
for e in events[-int(limit):]:
|
|
ts = e.get('timestamp', '')
|
|
try:
|
|
dt = datetime.fromisoformat(ts)
|
|
ts = dt.strftime(DATETIME_FMT)
|
|
except:
|
|
pass
|
|
endpoint = e.get('endpoint', '—')
|
|
client = e.get('client', '—')
|
|
event = e.get('event', '—')
|
|
print(f' {ts} {client:<20} {endpoint:<20} {event}')
|
|
except:
|
|
pass
|
|
|
|
def fw_events(file, filter_ip, filter_type, clients_dir, limit):
|
|
"""Format firewall drop events with dedup and counts"""
|
|
import glob
|
|
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
|
|
|
|
ip_to_name = {}
|
|
for conf in glob.glob(f"{clients_dir}/*.conf"):
|
|
name = os.path.basename(conf).replace('.conf', '')
|
|
try:
|
|
with open(conf) as f:
|
|
for line in f:
|
|
if line.startswith('Address'):
|
|
ip = line.split('=')[1].strip().split('/')[0]
|
|
ip_to_name[ip] = name
|
|
except:
|
|
pass
|
|
|
|
events = []
|
|
last_seen = {}
|
|
|
|
try:
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
src = e.get('src_ip', '')
|
|
if not src:
|
|
continue
|
|
if filter_ip and src != filter_ip:
|
|
continue
|
|
dst = e.get('dest_ip', '')
|
|
port = e.get('dest_port', '')
|
|
proto = e.get('ip.protocol', 0)
|
|
key = (src, dst, port, proto)
|
|
ts_str = e.get('timestamp', '')
|
|
try:
|
|
from datetime import datetime
|
|
ts = datetime.fromisoformat(ts_str).timestamp()
|
|
except:
|
|
ts = 0
|
|
windows = {1: 5, 6: 30, 17: 10}
|
|
window = windows.get(proto, 10)
|
|
if key in last_seen and (ts - last_seen[key]) < window:
|
|
continue
|
|
last_seen[key] = ts
|
|
events.append(e)
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
# Dedup consecutive same src+dst+port within 60s with count
|
|
deduped = []
|
|
counts = []
|
|
for e in events:
|
|
ts_str = e.get('timestamp', '')
|
|
try:
|
|
from datetime import datetime
|
|
ts = datetime.fromisoformat(ts_str).timestamp()
|
|
except:
|
|
ts = 0
|
|
src = e.get('src_ip', '')
|
|
dst = e.get('dest_ip', '')
|
|
port = e.get('dest_port', '')
|
|
proto = e.get('ip.protocol', 0)
|
|
key = (src, dst, port, proto)
|
|
|
|
if deduped and counts:
|
|
prev = deduped[-1]
|
|
try:
|
|
prev_ts = datetime.fromisoformat(prev.get('timestamp','')).timestamp()
|
|
except:
|
|
prev_ts = 0
|
|
prev_key = (prev.get('src_ip',''), prev.get('dest_ip',''),
|
|
prev.get('dest_port',''), prev.get('ip.protocol',0))
|
|
if key == prev_key and (ts - prev_ts) < 60:
|
|
counts[-1] += 1
|
|
continue
|
|
|
|
deduped.append(e)
|
|
counts.append(1)
|
|
|
|
grouped = []
|
|
group_counts = []
|
|
for e in deduped:
|
|
ts_str = e.get('timestamp', '')
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(ts_str)
|
|
# Truncate to minute for grouping
|
|
minute_key = dt.strftime('%Y-%m-%d %H:%M')
|
|
except:
|
|
minute_key = ts_str[:16]
|
|
|
|
src = e.get('src_ip', '')
|
|
dst = e.get('dest_ip', '')
|
|
port = e.get('dest_port', '')
|
|
proto = e.get('ip.protocol', 0)
|
|
key = (minute_key, src, dst, proto) # group within same minute
|
|
|
|
if grouped and group_counts:
|
|
prev = grouped[-1]
|
|
try:
|
|
prev_dt = datetime.fromisoformat(prev.get('timestamp',''))
|
|
prev_minute = prev_dt.strftime('%Y-%m-%d %H:%M')
|
|
except:
|
|
prev_minute = ''
|
|
prev_key = (prev_minute, prev.get('src_ip',''),
|
|
prev.get('dest_ip',''), prev.get('ip.protocol',0))
|
|
if key == prev_key:
|
|
group_counts[-1] += 1
|
|
continue
|
|
|
|
grouped.append(e)
|
|
group_counts.append(1)
|
|
|
|
for e, count in zip(deduped[-int(limit):], counts[-int(limit):]):
|
|
ts = e.get('timestamp', '')
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(ts)
|
|
ts = dt.strftime(DATETIME_FMT)
|
|
except:
|
|
pass
|
|
src = e.get('src_ip', '—')
|
|
dst = e.get('dest_ip', '—')
|
|
port = e.get('dest_port', '')
|
|
proto_num = e.get('ip.protocol', 0)
|
|
proto = proto_map.get(proto_num, str(proto_num))
|
|
dst_str = f"{dst}:{port}" if port else dst
|
|
client = ip_to_name.get(src, src)
|
|
if filter_type and not client.startswith(filter_type + '-'):
|
|
continue
|
|
count_str = f" (x{count})" if count > 1 else ""
|
|
print(f"{ts}|{client}|{dst_str}|{proto}{count_str}")
|
|
|
|
def wg_events(file, filter_client, filter_type, limit):
|
|
"""Format WireGuard events from events.log with dedup"""
|
|
events = []
|
|
try:
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
client = e.get('client', '')
|
|
if not client:
|
|
continue
|
|
if filter_client and client != filter_client:
|
|
continue
|
|
if filter_type and not client.startswith(filter_type + '-'):
|
|
continue
|
|
events.append(e)
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
# Dedup consecutive same client+event+endpoint within 60s
|
|
deduped = []
|
|
counts = []
|
|
for e in events:
|
|
ts_str = e.get('timestamp', '')
|
|
try:
|
|
from datetime import datetime
|
|
ts = datetime.fromisoformat(ts_str).timestamp()
|
|
except:
|
|
ts = 0
|
|
client = e.get('client', '')
|
|
event = e.get('event', '')
|
|
endpoint = e.get('endpoint', '')
|
|
key = (client, event, endpoint[:15])
|
|
|
|
if deduped and counts:
|
|
prev = deduped[-1]
|
|
prev_ts_str = prev.get('timestamp', '')
|
|
try:
|
|
prev_ts = datetime.fromisoformat(prev_ts_str).timestamp()
|
|
except:
|
|
prev_ts = 0
|
|
prev_key = (prev.get('client',''), prev.get('event',''), prev.get('endpoint','')[:15])
|
|
if key == prev_key and (ts - prev_ts) < 60:
|
|
counts[-1] += 1
|
|
continue
|
|
|
|
deduped.append(e)
|
|
counts.append(1)
|
|
|
|
for e, count in zip(deduped[-int(limit):], counts[-int(limit):]):
|
|
ts = e.get('timestamp', '')
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(ts)
|
|
ts = dt.strftime(DATETIME_FMT)
|
|
except:
|
|
pass
|
|
client = e.get('client', '—')
|
|
endpoint = e.get('endpoint', '—')
|
|
event = e.get('event', '—')
|
|
count_str = f" (x{count})" if count > 1 else ""
|
|
print(f"{ts}|{client}|{endpoint}|{event}{count_str}")
|
|
|
|
def format_fw_event(line, clients_dir):
|
|
"""Format a single fw_event line"""
|
|
import glob
|
|
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
|
|
|
|
# Build ip->name map
|
|
ip_to_name = {}
|
|
for conf in glob.glob(f"{clients_dir}/*.conf"):
|
|
name = os.path.basename(conf).replace('.conf', '')
|
|
try:
|
|
with open(conf) as f:
|
|
for l in f:
|
|
if l.startswith('Address'):
|
|
ip = l.split('=')[1].strip().split('/')[0]
|
|
ip_to_name[ip] = name
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
e = json.loads(line.strip())
|
|
src = e.get('src_ip', '')
|
|
if not src:
|
|
return None
|
|
ts = e.get('timestamp', '')
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(ts)
|
|
ts = dt.strftime(DATETIME_FMT)
|
|
except:
|
|
pass
|
|
dst = e.get('dest_ip', '—')
|
|
port = e.get('dest_port', '')
|
|
proto_num = e.get('ip.protocol', 0)
|
|
proto = proto_map.get(proto_num, str(proto_num))
|
|
dst_str = f"{dst}:{port}" if port else dst
|
|
client = ip_to_name.get(src, src)
|
|
return f"{ts}|{client}|{dst_str}|{proto}"
|
|
except:
|
|
return None
|
|
|
|
def format_wg_event(line):
|
|
"""Format a single wg_event line"""
|
|
try:
|
|
e = json.loads(line.strip())
|
|
client = e.get('client', '')
|
|
if not client:
|
|
return None
|
|
ts = e.get('timestamp', '')
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(ts)
|
|
ts = dt.strftime(DATETIME_FMT)
|
|
except:
|
|
pass
|
|
endpoint = e.get('endpoint', '—')
|
|
event = e.get('event', '—')
|
|
return f"{ts}|{client}|{endpoint}|{event}|wg"
|
|
except:
|
|
return None
|
|
|
|
def remove_events(file, identifier):
|
|
"""Remove all events for a client/ip from a JSONL file"""
|
|
try:
|
|
lines = []
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if e.get('client') == identifier or e.get('src_ip') == identifier:
|
|
continue
|
|
lines.append(line)
|
|
except:
|
|
lines.append(line)
|
|
with open(file, 'w') as f:
|
|
f.writelines(lines)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def follow_logs(fw_file, wg_file, filter_ip, filter_type, clients_dir, filter_peers=""):
|
|
"""Follow both log files and output formatted events"""
|
|
import glob, time, select
|
|
|
|
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
|
|
peer_filter = set(filter_peers.split(',')) if filter_peers else set()
|
|
|
|
# Build ip->name map
|
|
ip_to_name = {}
|
|
for conf in glob.glob(f"{clients_dir}/*.conf"):
|
|
name = os.path.basename(conf).replace('.conf', '')
|
|
try:
|
|
with open(conf) as f:
|
|
for l in f:
|
|
if l.startswith('Address'):
|
|
ip = l.split('=')[1].strip().split('/')[0]
|
|
ip_to_name[ip] = name
|
|
except:
|
|
pass
|
|
|
|
# Open files and seek to end
|
|
files = {}
|
|
for label, path in [('fw', fw_file), ('wg', wg_file)]:
|
|
if path and os.path.exists(path):
|
|
f = open(path)
|
|
f.seek(0, 2) # seek to end
|
|
files[label] = f
|
|
|
|
dedup = {}
|
|
|
|
try:
|
|
while True:
|
|
for label, f in files.items():
|
|
line = f.readline()
|
|
if not line:
|
|
continue
|
|
try:
|
|
e = json.loads(line.strip())
|
|
except:
|
|
continue
|
|
|
|
if label == 'fw':
|
|
src = e.get('src_ip', '')
|
|
if not src:
|
|
continue
|
|
if filter_ip and src != filter_ip:
|
|
continue
|
|
|
|
# Filter by peer names if specified
|
|
if peer_filter:
|
|
client_name = ip_to_name.get(src, '')
|
|
if client_name not in peer_filter:
|
|
continue
|
|
dst = e.get('dest_ip', '—')
|
|
port = e.get('dest_port', '')
|
|
proto_num = e.get('ip.protocol', 0)
|
|
proto = proto_map.get(proto_num, str(proto_num))
|
|
|
|
# Dedup
|
|
key = (src, dst, port, proto_num)
|
|
windows = {1: 5, 6: 30, 17: 10}
|
|
window = windows.get(proto_num, 10)
|
|
now = time.time()
|
|
if key in dedup and (now - dedup[key]) < window:
|
|
continue
|
|
dedup[key] = now
|
|
|
|
client = ip_to_name.get(src, src)
|
|
if filter_type and not client.startswith(filter_type + '-'):
|
|
continue
|
|
dst_str = f"{dst}:{port}" if port else dst
|
|
ts = e.get('timestamp', '')[:16].replace('T', ' ')
|
|
print(f"fw|{ts}|{client}|{dst_str}|{proto}", flush=True)
|
|
|
|
elif label == 'wg':
|
|
client = e.get('client', '')
|
|
if not client:
|
|
continue
|
|
if filter_ip:
|
|
ip = ip_to_name.get(filter_ip, '')
|
|
if client != ip and client != filter_ip:
|
|
continue
|
|
|
|
if peer_filter and client not in peer_filter:
|
|
continue
|
|
if filter_type and not client.startswith(filter_type + '-'):
|
|
continue
|
|
ts = e.get('timestamp', '')[:16].replace('T', ' ')
|
|
endpoint = e.get('endpoint', '—')
|
|
event = e.get('event', '—')
|
|
print(f"wg|{ts}|{client}|{endpoint}|{event}", flush=True)
|
|
|
|
time.sleep(0.1)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
def count(file, key):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
val = data.get(key, [])
|
|
print(len(val) if isinstance(val, list) else 0)
|
|
except:
|
|
print(0)
|
|
|
|
def audit_fw_counts(clients_dir):
|
|
"""Return peer_name:fw_count pairs from iptables"""
|
|
import glob, subprocess, re
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
['iptables', '-L', 'FORWARD', '-n', '-v'],
|
|
capture_output=True, text=True
|
|
)
|
|
fw_lines = result.stdout.splitlines()
|
|
except Exception:
|
|
fw_lines = []
|
|
|
|
# Filter to only data lines (skip headers and blanks)
|
|
# In -v output, source IP is in column 8 (0-indexed)
|
|
# Format: pkts bytes target prot opt in out source destination [options]
|
|
rule_lines = [l for l in fw_lines if l.strip() and not l.startswith('Chain') and not l.startswith(' pkts')]
|
|
|
|
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
|
|
name = os.path.basename(conf).replace('.conf', '')
|
|
try:
|
|
with open(conf) as f:
|
|
ip = ''
|
|
for line in f:
|
|
if line.startswith('Address'):
|
|
ip = line.split('=')[1].strip().split('/')[0]
|
|
break
|
|
if not ip:
|
|
continue
|
|
# Count lines where source column exactly matches the peer IP
|
|
count = sum(1 for l in rule_lines if re.search(r'\s' + re.escape(ip) + r'\s', l))
|
|
print(f"{name}:{count}")
|
|
except Exception:
|
|
pass
|
|
|
|
def peer_group_map(groups_dir):
|
|
"""Return peer:group pairs for all groups"""
|
|
import glob
|
|
try:
|
|
for group_file in glob.glob(f"{groups_dir}/*.group"):
|
|
try:
|
|
with open(group_file) as f:
|
|
g = json.load(f)
|
|
name = g.get('name', '')
|
|
for peer in g.get('peers', []):
|
|
if peer:
|
|
print(f"{peer}:{name}")
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
def peer_groups(groups_dir, peer_name):
|
|
"""Find all groups containing a peer"""
|
|
import glob
|
|
try:
|
|
for group_file in glob.glob(f"{groups_dir}/*.group"):
|
|
try:
|
|
with open(group_file) as f:
|
|
g = json.load(f)
|
|
if peer_name in g.get('peers', []):
|
|
print(g.get('name', ''))
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
def iso_to_ts(iso_str):
|
|
"""Convert ISO timestamp to unix timestamp"""
|
|
try:
|
|
from datetime import datetime, timezone
|
|
dt = datetime.fromisoformat(iso_str)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
print(int(dt.timestamp()))
|
|
except:
|
|
print(0)
|
|
|
|
def rule_list_data(rules_dir, meta_dir):
|
|
"""Return all rule data including base rules and extends"""
|
|
import glob
|
|
|
|
rule_peer_counts = {}
|
|
for f in glob.glob(f"{meta_dir}/*.meta"):
|
|
try:
|
|
with open(f) as mf:
|
|
meta = json.load(mf)
|
|
rule = meta.get('rule', '')
|
|
if rule:
|
|
rule_peer_counts[rule] = rule_peer_counts.get(rule, 0) + 1
|
|
except:
|
|
pass
|
|
|
|
rule_files = (
|
|
sorted(glob.glob(f"{rules_dir}/*.rule")) +
|
|
sorted(glob.glob(f"{rules_dir}/base/*.rule"))
|
|
)
|
|
|
|
# Collect all data first
|
|
rules_data = []
|
|
for rule_file in rule_files:
|
|
is_base = '/base/' in rule_file
|
|
try:
|
|
with open(rule_file) as f:
|
|
r = json.load(f)
|
|
name = r.get('name', '')
|
|
desc = r.get('desc', '')
|
|
group = r.get('group', '')
|
|
extends = ','.join(r.get('extends', []))
|
|
resolved = _rule_resolve_internal(rules_dir, name)
|
|
n_allows = len(resolved.get('allow_ips', [])) + \
|
|
len(resolved.get('allow_ports', []))
|
|
n_blocks = len(resolved.get('block_ips', [])) + \
|
|
len(resolved.get('block_ports', []))
|
|
peer_count = rule_peer_counts.get(name, 0)
|
|
rules_data.append({
|
|
'name': name, 'desc': desc, 'n_allows': n_allows,
|
|
'n_blocks': n_blocks, 'peer_count': peer_count,
|
|
'extends': extends, 'is_base': is_base, 'group': group
|
|
})
|
|
except:
|
|
pass
|
|
|
|
# Sort: non-base first, then by group (empty group last within non-base),
|
|
# then by name within group
|
|
rules_data.sort(key=lambda x: (
|
|
x['is_base'],
|
|
x['group'] == '' and not x['is_base'],
|
|
x['group'],
|
|
x['name']
|
|
))
|
|
|
|
for r in rules_data:
|
|
print(f"{r['name']}|{r['desc']}|{r['n_allows']}|{r['n_blocks']}|"
|
|
f"{r['peer_count']}|{r['extends']}|{r['is_base']}|{r['group']}")
|
|
|
|
def group_list_data(groups_dir, blocks_dir):
|
|
"""Return group summary data in one call"""
|
|
import glob
|
|
|
|
# Get all block files
|
|
blocked_peers = set()
|
|
for f in glob.glob(f"{blocks_dir}/*.block"):
|
|
name = os.path.basename(f).replace('.block', '')
|
|
blocked_peers.add(name)
|
|
|
|
for group_file in sorted(glob.glob(f"{groups_dir}/*.group")):
|
|
try:
|
|
with open(group_file) as f:
|
|
g = json.load(f)
|
|
name = g.get('name', '')
|
|
desc = g.get('desc', '')
|
|
peers = [p for p in g.get('peers', []) if p]
|
|
total = len(peers)
|
|
blocked = sum(1 for p in peers if p in blocked_peers)
|
|
print(f"{name}|{desc}|{total}|{blocked}")
|
|
except:
|
|
pass
|
|
|
|
def fmt_datetime(iso_str, fmt):
|
|
"""Format ISO timestamp with given strftime format"""
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(iso_str)
|
|
print(dt.strftime(fmt))
|
|
except:
|
|
print(iso_str)
|
|
|
|
def create_rule(file, name, desc, dns_redirect, allow_ips, block_ips,
|
|
block_ports, allow_ports='', extends='', group=''):
|
|
rule = {
|
|
'name': name,
|
|
'desc': desc,
|
|
'group': group,
|
|
'dns_redirect': dns_redirect == 'true',
|
|
'extends': [x for x in extends.split(',') if x] if extends else [],
|
|
'allow_ips': [x for x in allow_ips.split(',') if x] if allow_ips else [],
|
|
'allow_ports': [x for x in allow_ports.split(',') if x] if allow_ports else [],
|
|
'block_ips': [x for x in block_ips.split(',') if x] if block_ips else [],
|
|
'block_ports': [x for x in block_ports.split(',') if x] if block_ports else [],
|
|
}
|
|
with open(file, 'w') as f:
|
|
json.dump(rule, f, indent=2)
|
|
|
|
def cleanup_config(config_file):
|
|
"""Normalize blank lines in WireGuard config"""
|
|
import re
|
|
try:
|
|
with open(config_file) as f:
|
|
config = f.read()
|
|
config = re.sub(r'\n{3,}', '\n\n', config)
|
|
config = config.rstrip('\n') + '\n'
|
|
with open(config_file, 'w') as f:
|
|
f.write(config)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def remove_peer_block(config_file, name):
|
|
"""Remove a peer block from WireGuard config by name"""
|
|
import re
|
|
try:
|
|
with open(config_file) as f:
|
|
config = f.read()
|
|
pattern = r'\n\[Peer\]\n# ' + re.escape(name) + r'\n[^\n]+\n[^\n]+\n'
|
|
result = re.sub(pattern, '\n', config)
|
|
with open(config_file, 'w') as f:
|
|
f.write(result)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def create_group(file, name, desc):
|
|
"""Create a new group JSON file"""
|
|
try:
|
|
group = {'name': name, 'desc': desc, 'peers': []}
|
|
with open(file, 'w') as f:
|
|
json.dump(group, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def parse_event(line):
|
|
"""Parse a single JSON event line"""
|
|
try:
|
|
e = json.loads(line)
|
|
print(f"{e.get('timestamp','')}|{e.get('client','')}|{e.get('endpoint','')}|{e.get('event','')}")
|
|
except:
|
|
pass
|
|
|
|
def parse_fw_event(line):
|
|
"""Parse a single fw_events.log JSON line"""
|
|
try:
|
|
e = json.loads(line)
|
|
ts = e.get('timestamp', '')
|
|
src = e.get('src_ip', '')
|
|
dst = e.get('dest_ip', '')
|
|
port = e.get('dest_port', '')
|
|
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
|
|
proto_num = e.get('ip.protocol', 0)
|
|
proto = proto_map.get(proto_num, str(proto_num))
|
|
print(f"{ts}|{src}|{dst}|{port}|{proto}")
|
|
except:
|
|
pass
|
|
|
|
def peer_transfer(wg_interface):
|
|
"""Get total transfer bytes per peer"""
|
|
import subprocess
|
|
low = int(os.environ.get('ACTIVITY_TOTAL_LOW_BYTES', '1000000'))
|
|
med = int(os.environ.get('ACTIVITY_TOTAL_MED_BYTES', '10000000'))
|
|
high = int(os.environ.get('ACTIVITY_TOTAL_HIGH_BYTES', '100000000'))
|
|
try:
|
|
result = subprocess.run(
|
|
['wg', 'show', wg_interface, 'transfer'],
|
|
capture_output=True, text=True
|
|
)
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
parts = line.split('\t')
|
|
if len(parts) == 3:
|
|
pubkey, rx, tx = parts
|
|
total = int(rx) + int(tx)
|
|
if total == 0: level = 'none'
|
|
elif total < low: level = 'low'
|
|
elif total < med: level = 'medium'
|
|
elif total < high: level = 'high'
|
|
else: level = 'very high'
|
|
print(f"{pubkey}|{rx}|{tx}|{level}")
|
|
except:
|
|
pass
|
|
|
|
def peer_transfer_delta(wg_interface, cache_file):
|
|
"""Calculate current transfer rate using delta from previous sample"""
|
|
import subprocess, time
|
|
|
|
low = int(os.environ.get('ACTIVITY_CURRENT_LOW_BYTES', '10000')) # 10KB/s
|
|
med = int(os.environ.get('ACTIVITY_CURRENT_MED_BYTES', '100000')) # 100KB/s
|
|
high = int(os.environ.get('ACTIVITY_CURRENT_HIGH_BYTES', '1000000')) # 1MB/s
|
|
|
|
current = {}
|
|
now = time.time()
|
|
try:
|
|
result = subprocess.run(
|
|
['wg', 'show', wg_interface, 'transfer'],
|
|
capture_output=True, text=True
|
|
)
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
parts = line.split('\t')
|
|
if len(parts) == 3:
|
|
pubkey, rx, tx = parts
|
|
current[pubkey] = {'rx': int(rx), 'tx': int(tx), 'ts': now}
|
|
except:
|
|
pass
|
|
|
|
prev = {}
|
|
if os.path.exists(cache_file):
|
|
try:
|
|
with open(cache_file) as f:
|
|
prev = json.load(f)
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
with open(cache_file, 'w') as f:
|
|
json.dump(current, f)
|
|
except:
|
|
pass
|
|
|
|
for pubkey, data in current.items():
|
|
if pubkey in prev:
|
|
dt = data['ts'] - prev[pubkey].get('ts', data['ts'])
|
|
if dt > 0:
|
|
rx_rate = max(0, (data['rx'] - prev[pubkey]['rx']) / dt)
|
|
tx_rate = max(0, (data['tx'] - prev[pubkey]['tx']) / dt)
|
|
total = rx_rate + tx_rate
|
|
if total <= 0: level = 'idle'
|
|
elif total < low: level = 'low'
|
|
elif total < med: level = 'medium'
|
|
elif total < high: level = 'high'
|
|
else: level = 'very high'
|
|
print(f"{pubkey}|{int(rx_rate)}|{int(tx_rate)}|{level}")
|
|
else:
|
|
print(f"{pubkey}|0|0|idle")
|
|
else:
|
|
print(f"{pubkey}|0|0|unknown")
|
|
|
|
def remove_events_filtered(wg_file, fw_file, filter_name, filter_ip,
|
|
filter_fw, filter_wg, before_days):
|
|
"""Remove events with filters: by name/ip, source, or age"""
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
cutoff_ts = None
|
|
if before_days:
|
|
cutoff_ts = time.time() - (float(before_days) * 86400)
|
|
|
|
def should_remove_wg(e):
|
|
if filter_name and e.get('client') != filter_name:
|
|
return False
|
|
if cutoff_ts:
|
|
try:
|
|
ts = datetime.fromisoformat(e.get('timestamp','')).timestamp()
|
|
return ts < cutoff_ts
|
|
except:
|
|
return False
|
|
return True
|
|
|
|
def should_remove_fw(e):
|
|
if filter_ip and e.get('src_ip') != filter_ip:
|
|
return False
|
|
if cutoff_ts:
|
|
try:
|
|
ts = datetime.fromisoformat(e.get('timestamp','')).timestamp()
|
|
return ts < cutoff_ts
|
|
except:
|
|
return False
|
|
return True
|
|
|
|
removed_wg = removed_fw = 0
|
|
|
|
if not filter_fw and os.path.exists(wg_file):
|
|
lines = []
|
|
with open(wg_file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if should_remove_wg(e):
|
|
removed_wg += 1
|
|
continue
|
|
except:
|
|
pass
|
|
lines.append(line)
|
|
with open(wg_file, 'w') as f:
|
|
f.writelines(lines)
|
|
|
|
if not filter_wg and os.path.exists(fw_file):
|
|
lines = []
|
|
with open(fw_file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if should_remove_fw(e):
|
|
removed_fw += 1
|
|
continue
|
|
except:
|
|
pass
|
|
lines.append(line)
|
|
with open(fw_file, 'w') as f:
|
|
f.writelines(lines)
|
|
|
|
print(f"{removed_wg}|{removed_fw}")
|
|
|
|
def _rule_resolve_internal(rules_dir, rule_name, visited=None):
|
|
"""Internal recursive resolver — returns dict, does not print"""
|
|
if visited is None:
|
|
visited = set()
|
|
if rule_name in visited:
|
|
raise ValueError(f"Circular dependency detected: {rule_name}")
|
|
visited.add(rule_name)
|
|
|
|
rule_file = find_rule_file(rules_dir, rule_name)
|
|
with open(rule_file) as f:
|
|
rule = json.load(f)
|
|
|
|
merged = {
|
|
'allow_ips': [],
|
|
'allow_ports': [],
|
|
'block_ips': [],
|
|
'block_ports': [],
|
|
'dns_redirect': False
|
|
}
|
|
|
|
for base_name in rule.get('extends', []):
|
|
base = _rule_resolve_internal(rules_dir, base_name, visited.copy())
|
|
merged['allow_ips'] += base.get('allow_ips', [])
|
|
merged['allow_ports'] += base.get('allow_ports', [])
|
|
merged['block_ips'] += base.get('block_ips', [])
|
|
merged['block_ports'] += base.get('block_ports', [])
|
|
if base.get('dns_redirect'):
|
|
merged['dns_redirect'] = True
|
|
|
|
# Merge own fields — use .get() with defaults for all fields
|
|
merged['allow_ips'] = list(dict.fromkeys(
|
|
merged['allow_ips'] + rule.get('allow_ips', [])))
|
|
merged['allow_ports'] = list(dict.fromkeys(
|
|
merged['allow_ports'] + rule.get('allow_ports', [])))
|
|
merged['block_ips'] = list(dict.fromkeys(
|
|
merged['block_ips'] + rule.get('block_ips', [])))
|
|
merged['block_ports'] = list(dict.fromkeys(
|
|
merged['block_ports'] + rule.get('block_ports', [])))
|
|
if rule.get('dns_redirect', False):
|
|
merged['dns_redirect'] = True
|
|
|
|
merged['name'] = rule.get('name', rule_name)
|
|
merged['desc'] = rule.get('desc', '')
|
|
merged['group'] = rule.get('group', '')
|
|
merged['extends'] = rule.get('extends', [])
|
|
return merged
|
|
|
|
def rule_resolve(rules_dir, rule_name):
|
|
"""Resolve a rule with inheritance — prints JSON"""
|
|
try:
|
|
resolved = _rule_resolve_internal(rules_dir, rule_name)
|
|
print(json.dumps(resolved))
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def rule_resolve_field(rules_dir, rule_name, field):
|
|
"""Get a single field from resolved rule — prints values one per line"""
|
|
try:
|
|
resolved = _rule_resolve_internal(rules_dir, rule_name)
|
|
val = resolved.get(field, [])
|
|
if isinstance(val, list):
|
|
for v in val:
|
|
print(v)
|
|
else:
|
|
print(val)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def rule_inspect(rules_dir, rule_name):
|
|
"""Show inheritance tree for a rule"""
|
|
try:
|
|
rule_file = find_rule_file(rules_dir, rule_name)
|
|
with open(rule_file) as f:
|
|
rule = json.load(f)
|
|
resolved = _rule_resolve_internal(rules_dir, rule_name)
|
|
has_extends = bool(rule.get('extends', []))
|
|
|
|
# Own rules
|
|
for ip in rule.get('allow_ips', []):
|
|
print(f"own|allow_ip|{ip}")
|
|
for p in rule.get('allow_ports', []):
|
|
print(f"own|allow_port|{p}")
|
|
for ip in rule.get('block_ips', []):
|
|
print(f"own|block_ip|{ip}")
|
|
for p in rule.get('block_ports', []):
|
|
print(f"own|block_port|{p}")
|
|
|
|
# DNS redirect — separate section
|
|
if rule.get('dns_redirect'):
|
|
print(f"dns|dns_redirect|true")
|
|
|
|
if has_extends:
|
|
# Inherited rules per base
|
|
for base_name in rule.get('extends', []):
|
|
base = _rule_resolve_internal(rules_dir, base_name)
|
|
for ip in base.get('allow_ips', []):
|
|
print(f"inherited:{base_name}|allow_ip|{ip}")
|
|
for p in base.get('allow_ports', []):
|
|
print(f"inherited:{base_name}|allow_port|{p}")
|
|
for ip in base.get('block_ips', []):
|
|
print(f"inherited:{base_name}|block_ip|{ip}")
|
|
for p in base.get('block_ports', []):
|
|
print(f"inherited:{base_name}|block_port|{p}")
|
|
if base.get('dns_redirect'):
|
|
print(f"inherited:{base_name}|dns_redirect|true")
|
|
|
|
# Resolved summary only when inheritance exists
|
|
has_resolved = (
|
|
resolved.get('allow_ips') or resolved.get('allow_ports') or
|
|
resolved.get('block_ips') or resolved.get('block_ports') or
|
|
resolved.get('dns_redirect')
|
|
)
|
|
if has_resolved:
|
|
for ip in resolved.get('allow_ips', []):
|
|
print(f"resolved|allow_ip|{ip}")
|
|
for p in resolved.get('allow_ports', []):
|
|
print(f"resolved|allow_port|{p}")
|
|
for ip in resolved.get('block_ips', []):
|
|
print(f"resolved|block_ip|{ip}")
|
|
for p in resolved.get('block_ports', []):
|
|
print(f"resolved|block_port|{p}")
|
|
if resolved.get('dns_redirect'):
|
|
print(f"resolved|dns_redirect|true")
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def find_rule_file(rules_dir, rule_name):
|
|
"""Find rule file in rules/ or rules/base/"""
|
|
for path in [
|
|
os.path.join(rules_dir, f"{rule_name}.rule"),
|
|
os.path.join(rules_dir, "base", f"{rule_name}.rule"),
|
|
]:
|
|
if os.path.exists(path):
|
|
return path
|
|
return ""
|
|
|
|
def get_raw(file, key):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
val = data.get(key) # returns None if missing
|
|
if val is None:
|
|
pass # print nothing
|
|
elif isinstance(val, bool):
|
|
print(str(val).lower())
|
|
elif isinstance(val, list):
|
|
for v in val:
|
|
print(v)
|
|
else:
|
|
print(val)
|
|
except:
|
|
pass
|
|
|
|
def count_resolved(rules_dir, rule_name, key):
|
|
"""Count entries in resolved rule field"""
|
|
try:
|
|
resolved = _rule_resolve_internal(rules_dir, rule_name)
|
|
print(len(resolved.get(key, [])))
|
|
except:
|
|
print(0)
|
|
|
|
def _block_init(peer_ip):
|
|
"""Return empty block structure"""
|
|
return {
|
|
"peer_ip": peer_ip,
|
|
"blocked_direct": False,
|
|
"blocked_by_groups": [],
|
|
"rules": []
|
|
}
|
|
|
|
def _block_read(file):
|
|
try:
|
|
with open(file) as f:
|
|
content = f.read().strip()
|
|
if not content:
|
|
return None # empty file = no block data
|
|
try:
|
|
return json.loads(content)
|
|
except json.JSONDecodeError:
|
|
# Old format — migrate
|
|
lines = content.split('\n')
|
|
peer_ip = lines[0].split()[0] if lines else ''
|
|
new_data = {
|
|
"peer_ip": peer_ip,
|
|
"blocked_direct": True,
|
|
"blocked_by_groups": [],
|
|
"rules": [{"name": "full block", "type": "full"}]
|
|
}
|
|
with open(file, 'w') as f:
|
|
json.dump(new_data, f, indent=2)
|
|
return new_data
|
|
except FileNotFoundError:
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
def _block_write(file, data):
|
|
"""Write block file"""
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
def block_get(file):
|
|
"""Read and print block file as JSON"""
|
|
data = _block_read(file)
|
|
if data:
|
|
print(json.dumps(data))
|
|
|
|
def block_is_blocked(file):
|
|
"""Return true if peer is effectively blocked"""
|
|
data = _block_read(file)
|
|
if not data:
|
|
print("false")
|
|
return
|
|
blocked = data.get("blocked_direct", False) or \
|
|
bool(data.get("blocked_by_groups", []))
|
|
print("true" if blocked else "false")
|
|
|
|
def block_set_direct(file, peer_ip, value):
|
|
"""Set blocked_direct"""
|
|
try:
|
|
data = _block_read(file) or _block_init(peer_ip)
|
|
data["blocked_direct"] = value.lower() == "true"
|
|
data["peer_ip"] = peer_ip
|
|
_block_write(file, data)
|
|
remaining = data["blocked_direct"] or bool(data.get("blocked_by_groups", []))
|
|
pass
|
|
# print("true" if remaining else "false")
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def block_add_group(file, peer_ip, group):
|
|
"""Add group to blocked_by_groups"""
|
|
try:
|
|
data = _block_read(file) or _block_init(peer_ip)
|
|
data["peer_ip"] = peer_ip
|
|
groups = data.get("blocked_by_groups", [])
|
|
if group not in groups:
|
|
groups.append(group)
|
|
data["blocked_by_groups"] = groups
|
|
_block_write(file, data)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def block_remove_group(file, peer_ip, group):
|
|
"""Remove group from blocked_by_groups, return whether still blocked"""
|
|
try:
|
|
data = _block_read(file) or _block_init(peer_ip)
|
|
groups = data.get("blocked_by_groups", [])
|
|
if group in groups:
|
|
groups.remove(group)
|
|
data["blocked_by_groups"] = groups
|
|
_block_write(file, data)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
# print("true" if remaining else "false")
|
|
pass
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def block_add_rule(file, peer_ip, rule_type, name="", target="",
|
|
port="", proto=""):
|
|
"""Add a block rule entry"""
|
|
try:
|
|
data = _block_read(file) or _block_init(peer_ip)
|
|
data["peer_ip"] = peer_ip
|
|
rule = {"type": rule_type}
|
|
if name: rule["name"] = name
|
|
if target: rule["target"] = target
|
|
if port: rule["port"] = port
|
|
if proto: rule["proto"] = proto
|
|
|
|
rules = data.get("rules", [])
|
|
for existing in rules:
|
|
if existing.get("type") == rule_type and \
|
|
existing.get("target","") == target and \
|
|
existing.get("port","") == port and \
|
|
existing.get("proto","") == proto:
|
|
return # already exists, skip
|
|
|
|
rules.append(rule)
|
|
data["rules"] = rules
|
|
_block_write(file, data)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def block_remove_rule(file, rule_type, target="", port="", proto=""):
|
|
data = _block_read(file)
|
|
if not data:
|
|
return
|
|
rules = data.get("rules", [])
|
|
filtered = [r for r in rules if not (
|
|
r.get("type") == rule_type and
|
|
r.get("target", "") == target and
|
|
r.get("port", "") == port and
|
|
r.get("proto", "") == proto
|
|
)]
|
|
data["rules"] = filtered
|
|
_block_write(file, data)
|
|
|
|
def block_get_rules(file):
|
|
"""Print rules as pipe-separated lines: name|type|target|port|proto"""
|
|
data = _block_read(file)
|
|
if not data:
|
|
return
|
|
for r in data.get("rules", []):
|
|
print(f"{r.get('name','')}|{r.get('type','')}|"
|
|
f"{r.get('target','')}|{r.get('port','')}|{r.get('proto','')}")
|
|
|
|
def block_get_groups(file):
|
|
data = _block_read(file)
|
|
if not data:
|
|
return
|
|
print(','.join(data.get('blocked_by_groups', [])))
|
|
|
|
def block_get_direct(file):
|
|
data = _block_read(file)
|
|
if not data:
|
|
print('false')
|
|
return
|
|
print('true' if data.get('blocked_direct', False) else 'false')
|
|
|
|
# ============================================
|
|
# Net / Services
|
|
# ============================================
|
|
|
|
def _net_read(file):
|
|
"""Read services.json, return dict or empty dict"""
|
|
try:
|
|
if not os.path.exists(file):
|
|
return {}
|
|
with open(file) as f:
|
|
content = f.read().strip()
|
|
if not content:
|
|
return {}
|
|
return json.loads(content)
|
|
except Exception:
|
|
return {}
|
|
|
|
def _net_write(file, data):
|
|
"""Write services.json"""
|
|
os.makedirs(os.path.dirname(file), exist_ok=True)
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
def net_list(file):
|
|
"""List all service names with IP and port count"""
|
|
data = _net_read(file)
|
|
for name, svc in sorted(data.items()):
|
|
ip = svc.get('ip', '')
|
|
desc = svc.get('desc', '')
|
|
tags = ','.join(svc.get('tags', []))
|
|
ports = len(svc.get('ports', {}))
|
|
print(f"{name}|{ip}|{desc}|{tags}|{ports}")
|
|
|
|
def net_show(file, name):
|
|
"""Show full service details"""
|
|
data = _net_read(file)
|
|
if name not in data:
|
|
print(f"Error: Service not found: {name}", file=sys.stderr)
|
|
sys.exit(1)
|
|
svc = data[name]
|
|
print(f"name|{name}")
|
|
print(f"ip|{svc.get('ip','')}")
|
|
print(f"desc|{svc.get('desc','')}")
|
|
print(f"tags|{','.join(svc.get('tags',[]))}")
|
|
for port_name, port_def in svc.get('ports', {}).items():
|
|
port = port_def.get('port', '')
|
|
proto = port_def.get('proto', 'tcp')
|
|
desc = port_def.get('desc', '')
|
|
print(f"port|{port_name}|{port}|{proto}|{desc}")
|
|
|
|
def net_exists(file, name):
|
|
"""Check if service exists"""
|
|
data = _net_read(file)
|
|
# Handle service:port syntax
|
|
if ':' in name:
|
|
svc_name, port_name = name.split(':', 1)
|
|
if port_name == 'ports':
|
|
print('true' if svc_name in data else 'false')
|
|
else:
|
|
svc = data.get(svc_name, {})
|
|
print('true' if port_name in svc.get('ports', {}) else 'false')
|
|
else:
|
|
print('true' if name in data else 'false')
|
|
|
|
def net_add_service(file, name, ip, desc='', tags=''):
|
|
"""Add or update a service"""
|
|
data = _net_read(file)
|
|
if name not in data:
|
|
data[name] = {'ip': ip, 'ports': {}}
|
|
else:
|
|
data[name]['ip'] = ip
|
|
if desc:
|
|
data[name]['desc'] = desc
|
|
if tags:
|
|
data[name]['tags'] = [t.strip() for t in tags.split(',') if t.strip()]
|
|
_net_write(file, data)
|
|
|
|
def net_add_port(file, service, port_name, port, proto='tcp', desc=''):
|
|
"""Add or update a port on a service"""
|
|
data = _net_read(file)
|
|
if service not in data:
|
|
print(f"Error: Service not found: {service}", file=sys.stderr)
|
|
sys.exit(1)
|
|
if 'ports' not in data[service]:
|
|
data[service]['ports'] = {}
|
|
entry = {'port': int(port), 'proto': proto}
|
|
if desc:
|
|
entry['desc'] = desc
|
|
data[service]['ports'][port_name] = entry
|
|
_net_write(file, data)
|
|
|
|
def net_remove(file, name):
|
|
"""Remove service or port"""
|
|
data = _net_read(file)
|
|
if ':' in name:
|
|
svc_name, port_name = name.split(':', 1)
|
|
if svc_name not in data:
|
|
print(f"Error: Service not found: {svc_name}", file=sys.stderr)
|
|
sys.exit(1)
|
|
if port_name == 'ports':
|
|
# Remove all ports
|
|
data[svc_name]['ports'] = {}
|
|
else:
|
|
if port_name not in data[svc_name].get('ports', {}):
|
|
print(f"Error: Port not found: {port_name}", file=sys.stderr)
|
|
sys.exit(1)
|
|
del data[svc_name]['ports'][port_name]
|
|
else:
|
|
if name not in data:
|
|
print(f"Error: Service not found: {name}", file=sys.stderr)
|
|
sys.exit(1)
|
|
del data[name]
|
|
_net_write(file, data)
|
|
|
|
def net_resolve(file, name):
|
|
"""Resolve service name to ip or ip:port:proto lines"""
|
|
data = _net_read(file)
|
|
if ':' in name:
|
|
svc_name, port_name = name.split(':', 1)
|
|
if svc_name not in data:
|
|
print(f"Error: Service not found: {svc_name}", file=sys.stderr)
|
|
sys.exit(1)
|
|
svc = data[svc_name]
|
|
ip = svc.get('ip', '')
|
|
if port_name == 'ports':
|
|
# All ports
|
|
for pname, pdef in svc.get('ports', {}).items():
|
|
print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}")
|
|
else:
|
|
if port_name not in svc.get('ports', {}):
|
|
print(f"Error: Port not found: {port_name}", file=sys.stderr)
|
|
sys.exit(1)
|
|
pdef = svc['ports'][port_name]
|
|
print(f"{ip}:{pdef['port']}:{pdef.get('proto','tcp')}")
|
|
else:
|
|
if name not in data:
|
|
print(f"Error: Service not found: {name}", file=sys.stderr)
|
|
sys.exit(1)
|
|
print(data[name].get('ip', ''))
|
|
|
|
def net_reverse_lookup(file, ip, port='', proto=''):
|
|
"""Reverse lookup IP/port to service name"""
|
|
data = _net_read(file)
|
|
for svc_name, svc in data.items():
|
|
if svc.get('ip') != ip:
|
|
continue
|
|
if not port:
|
|
print(svc_name)
|
|
return
|
|
for port_name, port_def in svc.get('ports', {}).items():
|
|
if (str(port_def.get('port','')) == str(port) and
|
|
port_def.get('proto','tcp') == proto):
|
|
print(f"{svc_name}:{port_name}")
|
|
return
|
|
# IP matched but no port match — return service name
|
|
print(svc_name)
|
|
return
|
|
|
|
def block_is_empty(file):
|
|
data = _block_read(file)
|
|
if not data:
|
|
print("true")
|
|
return
|
|
empty = (
|
|
not data.get("blocked_direct", False) and
|
|
not data.get("blocked_by_groups", []) and
|
|
not data.get("rules", []) and
|
|
not data.get("services", [])
|
|
)
|
|
print("true" if empty else "false")
|
|
|
|
def group_has_peer(file, peer_name):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
peers = data.get('peers', [])
|
|
print('true' if peer_name in peers else 'false')
|
|
except Exception:
|
|
print('false')
|
|
|
|
# ============================================
|
|
# Subnet Map
|
|
# ============================================
|
|
|
|
def _subnet_read(file):
|
|
"""Read subnets.json, return dict or empty dict"""
|
|
try:
|
|
if not os.path.exists(file):
|
|
return {}
|
|
with open(file) as f:
|
|
content = f.read().strip()
|
|
if not content:
|
|
return {}
|
|
return json.loads(content)
|
|
except Exception:
|
|
return {}
|
|
|
|
def _subnet_write(file, data):
|
|
"""Write subnets.json"""
|
|
os.makedirs(os.path.dirname(file), exist_ok=True)
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
def _subnet_is_group(entry):
|
|
"""Return True if a subnet entry is a nested group (like 'guests')"""
|
|
return isinstance(entry, dict) and 'subnet' not in entry
|
|
|
|
def subnet_lookup(file, name, type_key=''):
|
|
"""
|
|
Resolve a subnet name (and optional type) to a CIDR string.
|
|
For scalar entries: subnet_lookup(file, "desktop") -> "10.1.1.0/24"
|
|
For group entries: subnet_lookup(file, "guests", "phone") -> "10.1.103.0/24"
|
|
For group with no type: subnet_lookup(file, "guests") -> "10.1.100.0/24" (none slot)
|
|
Prints the CIDR on success, nothing and exits 1 on failure.
|
|
"""
|
|
data = _subnet_read(file)
|
|
if name not in data:
|
|
sys.exit(1)
|
|
entry = data[name]
|
|
if _subnet_is_group(entry):
|
|
key = type_key if type_key else 'none'
|
|
if key not in entry:
|
|
sys.exit(1)
|
|
print(entry[key]['subnet'])
|
|
else:
|
|
print(entry['subnet'])
|
|
|
|
def subnet_type(file, name, type_key=''):
|
|
"""
|
|
Return the type string for a subnet entry.
|
|
For scalar: subnet_type(file, "desktop") -> "desktop"
|
|
For group: subnet_type(file, "guests", "phone") -> "phone"
|
|
subnet_type(file, "guests") -> "none"
|
|
"""
|
|
data = _subnet_read(file)
|
|
if name not in data:
|
|
sys.exit(1)
|
|
entry = data[name]
|
|
if _subnet_is_group(entry):
|
|
key = type_key if type_key else 'none'
|
|
if key not in entry:
|
|
sys.exit(1)
|
|
# For group entries the type IS the child key
|
|
print(key if key != 'none' else 'none')
|
|
else:
|
|
print(entry.get('type', name))
|
|
|
|
def subnet_tunnel_mode(file, name, type_key=''):
|
|
"""Return tunnel_mode for a subnet entry"""
|
|
data = _subnet_read(file)
|
|
if name not in data:
|
|
print('split') # safe default
|
|
return
|
|
entry = data[name]
|
|
if _subnet_is_group(entry):
|
|
key = type_key if type_key else 'none'
|
|
child = entry.get(key, {})
|
|
print(child.get('tunnel_mode', 'split'))
|
|
else:
|
|
print(entry.get('tunnel_mode', 'split'))
|
|
|
|
def subnet_for_ip(file, ip):
|
|
"""
|
|
Reverse lookup: given a peer IP, find which subnet name (and type) it belongs to.
|
|
Output: name|type (e.g. "guests|phone" or "desktop|desktop")
|
|
Returns nothing (exit 0) if not found — caller falls back to hardcoded map.
|
|
"""
|
|
import ipaddress
|
|
try:
|
|
peer_addr = ipaddress.ip_address(ip)
|
|
except ValueError:
|
|
return
|
|
|
|
data = _subnet_read(file)
|
|
for name, entry in data.items():
|
|
if _subnet_is_group(entry):
|
|
for type_key, child in entry.items():
|
|
try:
|
|
network = ipaddress.ip_network(child['subnet'], strict=False)
|
|
if peer_addr in network:
|
|
print(f"{name}|{type_key}")
|
|
return
|
|
except Exception:
|
|
continue
|
|
else:
|
|
try:
|
|
network = ipaddress.ip_network(entry['subnet'], strict=False)
|
|
if peer_addr in network:
|
|
peer_type = entry.get('type', name)
|
|
print(f"{name}|{peer_type}")
|
|
return
|
|
except Exception:
|
|
continue
|
|
|
|
def subnet_list(file):
|
|
"""
|
|
List all subnets for display.
|
|
Output per line: name|subnet|type|tunnel_mode|desc|is_group
|
|
For group entries, outputs one line per child: name.type|subnet|type|tunnel_mode|desc|true
|
|
"""
|
|
data = _subnet_read(file)
|
|
for name, entry in data.items():
|
|
if _subnet_is_group(entry):
|
|
for type_key, child in entry.items():
|
|
display_name = f"{name}.{type_key}" if type_key != 'none' else name
|
|
print(f"{display_name}|{child.get('subnet','')}|"
|
|
f"{type_key}|{child.get('tunnel_mode','split')}|"
|
|
f"{child.get('desc','')}|true|{name}")
|
|
else:
|
|
print(f"{name}|{entry.get('subnet','')}|"
|
|
f"{entry.get('type',name)}|{entry.get('tunnel_mode','split')}|"
|
|
f"{entry.get('desc','')}|false|{name}")
|
|
|
|
def subnet_show(file, name):
|
|
"""Show a single subnet entry (scalar or group) in detail"""
|
|
data = _subnet_read(file)
|
|
if name not in data:
|
|
print(f"Error: Subnet '{name}' not found", file=sys.stderr)
|
|
sys.exit(1)
|
|
entry = data[name]
|
|
if _subnet_is_group(entry):
|
|
print(f"name|{name}")
|
|
print(f"is_group|true")
|
|
for type_key, child in entry.items():
|
|
print(f"child|{type_key}|{child.get('subnet','')}|"
|
|
f"{child.get('tunnel_mode','split')}|{child.get('desc','')}")
|
|
else:
|
|
print(f"name|{name}")
|
|
print(f"is_group|false")
|
|
print(f"subnet|{entry.get('subnet','')}")
|
|
print(f"type|{entry.get('type',name)}")
|
|
print(f"tunnel_mode|{entry.get('tunnel_mode','split')}")
|
|
print(f"desc|{entry.get('desc','')}")
|
|
|
|
def subnet_add(file, name, subnet, type_key, tunnel_mode, desc, group_parent=''):
|
|
"""
|
|
Add a new subnet entry.
|
|
If group_parent is set, adds as a child under that group key.
|
|
Otherwise adds as a scalar entry.
|
|
"""
|
|
data = _subnet_read(file)
|
|
entry = {
|
|
'subnet': subnet,
|
|
'tunnel_mode': tunnel_mode or 'split',
|
|
'desc': desc or ''
|
|
}
|
|
if group_parent:
|
|
# Adding a child to an existing group, or creating a new group
|
|
if group_parent not in data:
|
|
data[group_parent] = {}
|
|
elif not _subnet_is_group(data[group_parent]):
|
|
print(f"Error: '{group_parent}' exists but is not a group", file=sys.stderr)
|
|
sys.exit(1)
|
|
data[group_parent][type_key or 'none'] = entry
|
|
else:
|
|
# Scalar entry — type stored explicitly
|
|
entry['type'] = type_key or name
|
|
data[name] = entry
|
|
_subnet_write(file, data)
|
|
|
|
def subnet_remove(file, name, peers_using):
|
|
"""
|
|
Remove a subnet entry. peers_using is a comma-separated list of peer names
|
|
currently using this subnet (passed in from bash after meta scan).
|
|
If non-empty, refuses with error.
|
|
"""
|
|
if peers_using:
|
|
peers = [p for p in peers_using.split(',') if p]
|
|
if peers:
|
|
print(f"Error: Subnet '{name}' is in use by: {', '.join(peers)}", file=sys.stderr)
|
|
sys.exit(1)
|
|
data = _subnet_read(file)
|
|
if '.' in name:
|
|
# Removing a group child: e.g. "guests.phone"
|
|
parent, child_key = name.split('.', 1)
|
|
if parent not in data or not _subnet_is_group(data[parent]):
|
|
print(f"Error: Group '{parent}' not found", file=sys.stderr)
|
|
sys.exit(1)
|
|
if child_key not in data[parent]:
|
|
print(f"Error: '{child_key}' not found in group '{parent}'", file=sys.stderr)
|
|
sys.exit(1)
|
|
del data[parent][child_key]
|
|
if not data[parent]:
|
|
del data[parent] # remove empty group
|
|
else:
|
|
if name not in data:
|
|
print(f"Error: Subnet '{name}' not found", file=sys.stderr)
|
|
sys.exit(1)
|
|
del data[name]
|
|
_subnet_write(file, data)
|
|
|
|
def subnet_rename(file, old_name, new_name, peers_using):
|
|
"""
|
|
Rename a subnet entry. Hard refusal if any peers reference it.
|
|
peers_using: comma-separated peer names from bash meta scan.
|
|
"""
|
|
if peers_using:
|
|
peers = [p for p in peers_using.split(',') if p]
|
|
if peers:
|
|
print(f"Error: Cannot rename subnet '{old_name}' — in use by: {', '.join(peers)}", file=sys.stderr)
|
|
sys.exit(1)
|
|
data = _subnet_read(file)
|
|
if old_name not in data:
|
|
print(f"Error: Subnet '{old_name}' not found", file=sys.stderr)
|
|
sys.exit(1)
|
|
if new_name in data:
|
|
print(f"Error: Subnet '{new_name}' already exists", file=sys.stderr)
|
|
sys.exit(1)
|
|
data[new_name] = data.pop(old_name)
|
|
_subnet_write(file, data)
|
|
|
|
def subnet_peers(meta_dir, clients_dir, subnet_name, subnets_file):
|
|
"""
|
|
Find all peers using a subnet.
|
|
Two-pass check:
|
|
1. Meta field: peer has "subnet": subnet_name in their .meta file
|
|
2. IP fallback: peer's IP falls within the subnet's CIDR(s)
|
|
(catches peers added before meta stored subnet explicitly)
|
|
Output: one peer name per line.
|
|
"""
|
|
import glob
|
|
import ipaddress
|
|
|
|
# Resolve all CIDRs covered by this subnet name
|
|
data = _subnet_read(subnets_file)
|
|
cidrs = []
|
|
if subnet_name in data:
|
|
entry = data[subnet_name]
|
|
if _subnet_is_group(entry):
|
|
for child in entry.values():
|
|
try:
|
|
cidrs.append(ipaddress.ip_network(child['subnet'], strict=False))
|
|
except Exception:
|
|
pass
|
|
else:
|
|
try:
|
|
cidrs.append(ipaddress.ip_network(entry['subnet'], strict=False))
|
|
except Exception:
|
|
pass
|
|
|
|
printed = set()
|
|
|
|
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
|
|
peer_name = os.path.basename(conf).replace('.conf', '')
|
|
|
|
# Pass 1: check meta field
|
|
meta_file = os.path.join(meta_dir, f"{peer_name}.meta")
|
|
try:
|
|
with open(meta_file) as f:
|
|
meta = json.load(f)
|
|
if meta.get('subnet', '') == subnet_name:
|
|
if peer_name not in printed:
|
|
print(peer_name)
|
|
printed.add(peer_name)
|
|
continue
|
|
except Exception:
|
|
pass
|
|
|
|
# Pass 2: IP reverse lookup against subnet CIDRs
|
|
if not cidrs:
|
|
continue
|
|
|
|
peer_ip = ''
|
|
try:
|
|
with open(conf) as f:
|
|
for line in f:
|
|
if line.startswith('Address'):
|
|
peer_ip = line.split('=')[1].strip().split('/')[0]
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if not peer_ip:
|
|
continue
|
|
|
|
try:
|
|
addr = ipaddress.ip_address(peer_ip)
|
|
if any(addr in cidr for cidr in cidrs):
|
|
if peer_name not in printed:
|
|
print(peer_name)
|
|
printed.add(peer_name)
|
|
except Exception:
|
|
continue
|
|
|
|
|
|
def subnet_exists(file, name):
|
|
"""Check if a subnet name exists (scalar or group). Exits 0/1."""
|
|
data = _subnet_read(file)
|
|
if '.' in name:
|
|
parent, child_key = name.split('.', 1)
|
|
exists = parent in data and _subnet_is_group(data[parent]) and child_key in data[parent]
|
|
else:
|
|
exists = name in data
|
|
sys.exit(0 if exists else 1)
|
|
|
|
|
|
# ============================================
|
|
# Identity System
|
|
# ============================================
|
|
|
|
def identity_rules(file):
|
|
"""
|
|
Return all rules assigned to an identity, one per line.
|
|
Reads from 'rules' array (1:N). Falls back to 'rule' scalar for migration.
|
|
"""
|
|
data = _identity_read(file)
|
|
if not data:
|
|
return
|
|
# Support legacy scalar 'rule' field
|
|
rules = data.get('rules', [])
|
|
if not rules and data.get('rule'):
|
|
rules = [data['rule']]
|
|
for r in rules:
|
|
if r:
|
|
print(r)
|
|
|
|
def identity_add_rule(file, identity_name, rule_name):
|
|
"""
|
|
Add a rule to an identity's rules array.
|
|
Warns if already present (prints warning to stderr, exits 2).
|
|
Creates identity file if it doesn't exist.
|
|
"""
|
|
data = _identity_read(file) or _identity_init(identity_name)
|
|
rules = data.get('rules', [])
|
|
# Migrate legacy scalar field
|
|
if 'rule' in data and data['rule']:
|
|
if data['rule'] not in rules:
|
|
rules.append(data['rule'])
|
|
del data['rule']
|
|
if rule_name in rules:
|
|
print(f"Warning: Rule '{rule_name}' is already assigned to identity '{identity_name}'",
|
|
file=sys.stderr)
|
|
sys.exit(2)
|
|
rules.append(rule_name)
|
|
data['rules'] = rules
|
|
_identity_write(file, data)
|
|
|
|
def identity_remove_rule(file, rule_name):
|
|
"""
|
|
Remove a specific rule from an identity's rules array.
|
|
Exits 1 if rule not found.
|
|
"""
|
|
data = _identity_read(file)
|
|
if not data:
|
|
print(f"Error: Identity not found", file=sys.stderr)
|
|
sys.exit(1)
|
|
rules = data.get('rules', [])
|
|
if rule_name not in rules:
|
|
print(f"Error: Rule '{rule_name}' not assigned to this identity", file=sys.stderr)
|
|
sys.exit(1)
|
|
rules.remove(rule_name)
|
|
data['rules'] = rules
|
|
_identity_write(file, data)
|
|
|
|
def identity_clear_rules(file):
|
|
"""Remove all rules from an identity."""
|
|
data = _identity_read(file)
|
|
if not data:
|
|
return
|
|
data['rules'] = []
|
|
data.pop('rule', None) # remove legacy scalar too
|
|
_identity_write(file, data)
|
|
|
|
def identity_has_rule(file, rule_name):
|
|
"""Exit 0 if identity has this rule, 1 otherwise."""
|
|
data = _identity_read(file)
|
|
if not data:
|
|
sys.exit(1)
|
|
rules = data.get('rules', [])
|
|
if not rules and data.get('rule'):
|
|
rules = [data['rule']]
|
|
sys.exit(0 if rule_name in rules else 1)
|
|
|
|
def _identity_read(file):
|
|
"""Read an identity file, return dict or None"""
|
|
try:
|
|
if not os.path.exists(file):
|
|
return None
|
|
with open(file) as f:
|
|
content = f.read().strip()
|
|
if not content:
|
|
return None
|
|
return json.loads(content)
|
|
except Exception:
|
|
return None
|
|
|
|
def _identity_write(file, data):
|
|
"""Write an identity file"""
|
|
os.makedirs(os.path.dirname(file), exist_ok=True)
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
def _identity_init(name):
|
|
"""Return empty identity structure"""
|
|
return {
|
|
'name': name,
|
|
'peers': [],
|
|
'devices': {}
|
|
}
|
|
|
|
def _parse_peer_name(peer_name):
|
|
"""
|
|
Parse a peer name into (type, identity, index).
|
|
phone-nuno -> ('phone', 'nuno', 1)
|
|
phone-nuno-2 -> ('phone', 'nuno', 2)
|
|
desktop-zephyr -> ('desktop', 'zephyr', 1)
|
|
laptop-nuno -> ('laptop', 'nuno', 1)
|
|
Returns None if name doesn't match convention.
|
|
|
|
Convention: {type}-{identity}[-{index}]
|
|
Known types: desktop, laptop, phone, tablet, server, iot, none
|
|
"""
|
|
known_types = {'desktop', 'laptop', 'phone', 'tablet', 'server', 'iot', 'none'}
|
|
parts = peer_name.split('-')
|
|
if len(parts) < 2:
|
|
return None
|
|
peer_type = parts[0]
|
|
if peer_type not in known_types:
|
|
return None
|
|
# Check if last part is a numeric index
|
|
if len(parts) >= 3 and parts[-1].isdigit():
|
|
index = int(parts[-1])
|
|
identity = '-'.join(parts[1:-1])
|
|
else:
|
|
index = 1
|
|
identity = '-'.join(parts[1:])
|
|
if not identity:
|
|
return None
|
|
return (peer_type, identity, index)
|
|
|
|
def identity_list(identities_dir):
|
|
"""
|
|
List all identities with peer count, rules and policy.
|
|
Output per line: name|peer_count|types|rules|policy
|
|
"""
|
|
import glob
|
|
for id_file in sorted(glob.glob(f"{identities_dir}/*.identity")):
|
|
try:
|
|
with open(id_file) as f:
|
|
data = json.load(f)
|
|
name = data.get('name', '')
|
|
peers = data.get('peers', [])
|
|
devices = data.get('devices', {})
|
|
rules = data.get('rules', [])
|
|
# Migrate legacy scalar rule field
|
|
if not rules and data.get('rule'):
|
|
rules = [data['rule']]
|
|
policy = data.get('policy', 'default')
|
|
types = sorted(set(
|
|
d.get('type', '') for d in devices.values() if d.get('type')
|
|
))
|
|
print(f"{name}|{len(peers)}|{','.join(types)}|{','.join(rules)}|{policy}")
|
|
except Exception:
|
|
continue
|
|
|
|
def identity_show(file):
|
|
"""Show identity details"""
|
|
data = _identity_read(file)
|
|
if not data:
|
|
print("Error: Identity not found", file=sys.stderr)
|
|
sys.exit(1)
|
|
print(f"name|{data.get('name','')}")
|
|
print(f"peer_count|{len(data.get('peers',[]))}")
|
|
for peer_name, dev in data.get('devices', {}).items():
|
|
print(f"device|{peer_name}|{dev.get('type','')}|{dev.get('index',1)}")
|
|
|
|
def identity_add_peer(file, identity_name, peer_name, peer_type, index):
|
|
"""Add a peer to an identity file, creating it if needed"""
|
|
data = _identity_read(file) or _identity_init(identity_name)
|
|
if peer_name not in data['peers']:
|
|
data['peers'].append(peer_name)
|
|
data['devices'][peer_name] = {
|
|
'type': peer_type,
|
|
'index': int(index)
|
|
}
|
|
_identity_write(file, data)
|
|
|
|
def identity_remove_peer(file, peer_name):
|
|
"""Remove a peer from an identity file"""
|
|
data = _identity_read(file)
|
|
if not data:
|
|
return
|
|
data['peers'] = [p for p in data['peers'] if p != peer_name]
|
|
data['devices'].pop(peer_name, None)
|
|
_identity_write(file, data)
|
|
|
|
def identity_remove(file):
|
|
"""Delete an identity file — existence check done in bash"""
|
|
try:
|
|
os.remove(file)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def identity_next_index(file, peer_type):
|
|
"""
|
|
Return the next available index for a given type within an identity.
|
|
phone-nuno exists (index 1) -> returns 2
|
|
No phones exist -> returns 1
|
|
"""
|
|
data = _identity_read(file)
|
|
if not data:
|
|
print(1)
|
|
return
|
|
existing = [
|
|
d.get('index', 1)
|
|
for d in data.get('devices', {}).values()
|
|
if d.get('type') == peer_type
|
|
]
|
|
if not existing:
|
|
print(1)
|
|
return
|
|
# Find lowest unused index starting from 1
|
|
used = set(existing)
|
|
i = 1
|
|
while i in used:
|
|
i += 1
|
|
print(i)
|
|
|
|
def identity_peers(file, filter_type=''):
|
|
"""
|
|
List peers belonging to an identity, optionally filtered by type.
|
|
Output: one peer name per line.
|
|
"""
|
|
data = _identity_read(file)
|
|
if not data:
|
|
return
|
|
for peer_name in data.get('peers', []):
|
|
if filter_type:
|
|
dev = data.get('devices', {}).get(peer_name, {})
|
|
if dev.get('type') != filter_type:
|
|
continue
|
|
print(peer_name)
|
|
|
|
def identity_migrate(identities_dir, clients_dir, meta_dir, dry_run):
|
|
"""
|
|
Scan all peer configs and auto-create identity files from name convention.
|
|
dry_run: 'true' -> print what would be done, no writes.
|
|
Output per action: action|identity|peer|type|index
|
|
"""
|
|
import glob
|
|
|
|
is_dry = dry_run == 'true'
|
|
grouped = {} # identity_name -> [(peer_name, type, index)]
|
|
|
|
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
|
|
peer_name = os.path.basename(conf).replace('.conf', '')
|
|
parsed = _parse_peer_name(peer_name)
|
|
if not parsed:
|
|
print(f"skip|{peer_name}")
|
|
continue
|
|
peer_type, identity_name, index = parsed
|
|
if identity_name not in grouped:
|
|
grouped[identity_name] = []
|
|
grouped[identity_name].append((peer_name, peer_type, index))
|
|
|
|
for identity_name, peers in sorted(grouped.items()):
|
|
id_file = os.path.join(identities_dir, f"{identity_name}.identity")
|
|
for peer_name, peer_type, index in peers:
|
|
print(f"create|{identity_name}|{peer_name}|{peer_type}|{index}")
|
|
if not is_dry:
|
|
identity_add_peer(id_file, identity_name, peer_name, peer_type, index)
|
|
|
|
def identity_infer(peer_name):
|
|
"""
|
|
Parse a peer name and print identity|type|index, or nothing if no match.
|
|
Used by add.command.sh to auto-attach on vanilla wgctl add.
|
|
"""
|
|
parsed = _parse_peer_name(peer_name)
|
|
if parsed:
|
|
peer_type, identity_name, index = parsed
|
|
print(f"{identity_name}|{peer_type}|{index}")
|
|
|
|
def identity_exists(file):
|
|
"""Exit 0 if identity file exists and is valid, else exit 1"""
|
|
data = _identity_read(file)
|
|
sys.exit(0 if data is not None else 1)
|
|
|
|
# ============================================
|
|
# peer_data update — adds type field from meta
|
|
# ============================================
|
|
# NOTE: This replaces the existing peer_data function.
|
|
# The new version reads 'type' from meta directly.
|
|
# Output format: name|ip|rule|type|last_ts|last_evt|main_group
|
|
|
|
def peer_data(clients_dir, meta_dir, events_log):
|
|
"""
|
|
Updated peer_data that reads 'type' from meta.
|
|
Output: name|ip|rule|type|last_ts|last_evt|main_group
|
|
"""
|
|
import glob
|
|
|
|
meta = {}
|
|
for f in glob.glob(f"{meta_dir}/*.meta"):
|
|
name = os.path.basename(f).replace('.meta', '')
|
|
try:
|
|
with open(f) as mf:
|
|
meta[name] = json.load(mf)
|
|
except Exception:
|
|
meta[name] = {}
|
|
|
|
last_events = {}
|
|
try:
|
|
with open(events_log) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
client = e.get('client', '')
|
|
if client:
|
|
last_events[client] = e
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
|
|
name = os.path.basename(conf).replace('.conf', '')
|
|
ip = ''
|
|
try:
|
|
with open(conf) as f:
|
|
for line in f:
|
|
if line.startswith('Address'):
|
|
ip = line.split('=')[1].strip().split('/')[0]
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
m = meta.get(name, {})
|
|
rule = m.get('rule', '')
|
|
peer_type = m.get('type', '')
|
|
main_group = m.get('main_group', '')
|
|
|
|
last_event = last_events.get(name, {})
|
|
last_ts = last_event.get('timestamp', '')
|
|
last_evt = last_event.get('event', '')
|
|
|
|
print(f"{name}|{ip}|{rule}|{peer_type}|{last_ts}|{last_evt}|{main_group}")
|
|
|
|
def subnet_default_rule(file, name, type_key=''):
|
|
"""
|
|
Return the default_rule for a subnet entry, or empty string if none set.
|
|
For scalar: subnet_default_rule(file, "desktop") -> ""
|
|
For group: subnet_default_rule(file, "guests", "phone") -> "guest"
|
|
"""
|
|
data = _subnet_read(file)
|
|
if name not in data:
|
|
print('')
|
|
return
|
|
entry = data[name]
|
|
if _subnet_is_group(entry):
|
|
key = type_key if type_key else 'none'
|
|
child = entry.get(key, {})
|
|
print(child.get('default_rule', ''))
|
|
else:
|
|
print(entry.get('default_rule', ''))
|
|
|
|
def subnet_list_names(file):
|
|
"""
|
|
List all top-level subnet names, one per line.
|
|
Used for dynamic flag registration in commands.
|
|
Output: one name per line (e.g. desktop, laptop, guests, servers, iot)
|
|
"""
|
|
data = _subnet_read(file)
|
|
for name in data.keys():
|
|
print(name)
|
|
|
|
# ============================================
|
|
# Policy System
|
|
# ============================================
|
|
|
|
_POLICY_DEFAULTS = {
|
|
"default": {
|
|
"tunnel_mode": "split",
|
|
"default_rule": None,
|
|
"strict_rule": False,
|
|
"auto_apply": True,
|
|
"desc": "Default policy"
|
|
},
|
|
"guest": {
|
|
"tunnel_mode": "split",
|
|
"default_rule": "guest",
|
|
"strict_rule": True,
|
|
"auto_apply": True,
|
|
"desc": "Guest access policy"
|
|
},
|
|
"trusted": {
|
|
"tunnel_mode": "split",
|
|
"default_rule": None,
|
|
"strict_rule": False,
|
|
"auto_apply": True,
|
|
"desc": "Trusted device policy"
|
|
},
|
|
"server": {
|
|
"tunnel_mode": "split",
|
|
"default_rule": None,
|
|
"strict_rule": False,
|
|
"auto_apply": True,
|
|
"desc": "Server policy"
|
|
},
|
|
"iot": {
|
|
"tunnel_mode": "split",
|
|
"default_rule": None,
|
|
"strict_rule": False,
|
|
"auto_apply": True,
|
|
"desc": "IoT device policy"
|
|
}
|
|
}
|
|
|
|
def _policy_read(file):
|
|
"""Read policies.json, fall back to hardcoded defaults if missing."""
|
|
try:
|
|
if not os.path.exists(file):
|
|
return dict(_POLICY_DEFAULTS)
|
|
with open(file) as f:
|
|
content = f.read().strip()
|
|
if not content:
|
|
return dict(_POLICY_DEFAULTS)
|
|
data = json.loads(content)
|
|
# Merge with defaults so hardcoded policies always exist
|
|
merged = dict(_POLICY_DEFAULTS)
|
|
merged.update(data)
|
|
return merged
|
|
except Exception:
|
|
return dict(_POLICY_DEFAULTS)
|
|
|
|
def _policy_write(file, data):
|
|
"""Write policies.json."""
|
|
os.makedirs(os.path.dirname(file), exist_ok=True)
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
def _policy_get_entry(data, name):
|
|
"""Get a policy entry, falling back to 'default' policy values for missing fields."""
|
|
entry = data.get(name, {})
|
|
default = data.get('default', _POLICY_DEFAULTS.get('default', {}))
|
|
# Merge: entry fields override default
|
|
resolved = dict(default)
|
|
resolved.update(entry)
|
|
return resolved
|
|
|
|
def policy_get(file, name, field=''):
|
|
"""
|
|
Get a policy entry or a specific field from it.
|
|
policy_get(file, "guest") -> prints all fields as key|value lines
|
|
policy_get(file, "guest", "tunnel_mode") -> prints "split"
|
|
policy_get(file, "guest", "strict_rule") -> prints "true" or "false"
|
|
"""
|
|
data = _policy_read(file)
|
|
entry = _policy_get_entry(data, name)
|
|
|
|
if field:
|
|
val = entry.get(field)
|
|
if val is None:
|
|
print('')
|
|
elif isinstance(val, bool):
|
|
print('true' if val else 'false')
|
|
else:
|
|
print(val)
|
|
else:
|
|
for k, v in entry.items():
|
|
if isinstance(v, bool):
|
|
print(f"{k}|{'true' if v else 'false'}")
|
|
elif v is None:
|
|
print(f"{k}|")
|
|
else:
|
|
print(f"{k}|{v}")
|
|
|
|
def policy_list(file):
|
|
"""
|
|
List all policies.
|
|
Output per line: name|tunnel_mode|default_rule|strict_rule|auto_apply|desc
|
|
"""
|
|
data = _policy_read(file)
|
|
for name, raw_entry in data.items():
|
|
entry = _policy_get_entry(data, name)
|
|
tunnel_mode = entry.get('tunnel_mode', 'split')
|
|
default_rule = entry.get('default_rule') or ''
|
|
strict_rule = 'true' if entry.get('strict_rule', False) else 'false'
|
|
auto_apply = 'true' if entry.get('auto_apply', True) else 'false'
|
|
desc = entry.get('desc', '')
|
|
print(f"{name}|{tunnel_mode}|{default_rule}|{strict_rule}|{auto_apply}|{desc}")
|
|
|
|
def policy_exists(file, name):
|
|
"""Exit 0 if policy exists, 1 otherwise."""
|
|
data = _policy_read(file)
|
|
sys.exit(0 if name in data else 1)
|
|
|
|
def policy_add(file, name, tunnel_mode, default_rule, strict_rule, auto_apply, desc):
|
|
"""Add or update a policy entry."""
|
|
data = _policy_read(file)
|
|
data[name] = {
|
|
'tunnel_mode': tunnel_mode or 'split',
|
|
'default_rule': default_rule if default_rule else None,
|
|
'strict_rule': strict_rule == 'true',
|
|
'auto_apply': auto_apply != 'false',
|
|
'desc': desc or ''
|
|
}
|
|
_policy_write(file, data)
|
|
|
|
def policy_remove(file, name):
|
|
"""Remove a policy. Refuses to remove hardcoded defaults."""
|
|
if name in _POLICY_DEFAULTS:
|
|
print(f"Error: Cannot remove built-in policy '{name}'", file=sys.stderr)
|
|
sys.exit(1)
|
|
data = _policy_read(file)
|
|
if name not in data:
|
|
print(f"Error: Policy '{name}' not found", file=sys.stderr)
|
|
sys.exit(1)
|
|
del data[name]
|
|
_policy_write(file, data)
|
|
|
|
def policy_set_field(file, name, field, value):
|
|
"""Set a single field on an existing policy."""
|
|
data = _policy_read(file)
|
|
if name not in data:
|
|
print(f"Error: Policy '{name}' not found", file=sys.stderr)
|
|
sys.exit(1)
|
|
entry = data[name]
|
|
if field in ('strict_rule', 'auto_apply'):
|
|
entry[field] = value == 'true'
|
|
elif field == 'default_rule':
|
|
entry[field] = value if value else None
|
|
else:
|
|
entry[field] = value
|
|
data[name] = entry
|
|
_policy_write(file, data)
|
|
|
|
def subnet_policy(subnets_file, subnet_name, type_key=''):
|
|
"""
|
|
Get the policy name for a subnet entry.
|
|
Falls back to 'default' if no policy set.
|
|
"""
|
|
data = _subnet_read(subnets_file)
|
|
if subnet_name not in data:
|
|
print('default')
|
|
return
|
|
entry = data[subnet_name]
|
|
if _subnet_is_group(entry):
|
|
key = type_key if type_key else 'none'
|
|
child = entry.get(key, {})
|
|
print(child.get('policy', 'default'))
|
|
else:
|
|
print(entry.get('policy', 'default'))
|
|
|
|
def json_get_nested(file, *keys):
|
|
"""
|
|
Get a nested field from a JSON file.
|
|
json_get_nested(file, "rule_flags", "strict_rule")
|
|
Output: the value as a string, or empty string if not found.
|
|
"""
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
val = data
|
|
for key in keys:
|
|
if not isinstance(val, dict):
|
|
print('')
|
|
return
|
|
val = val.get(key)
|
|
if val is None:
|
|
print('')
|
|
return
|
|
if isinstance(val, bool):
|
|
print('true' if val else 'false')
|
|
elif val is None:
|
|
print('')
|
|
else:
|
|
print(val)
|
|
except Exception:
|
|
print('')
|
|
|
|
def json_set_nested(file, *args):
|
|
"""
|
|
Set a nested field in a JSON file.
|
|
Args: file, key1, key2, ..., value
|
|
json_set_nested(file, "rule_flags", "strict_rule", "true")
|
|
Creates intermediate dicts as needed.
|
|
"""
|
|
if len(args) < 2:
|
|
return
|
|
keys = args[:-1]
|
|
value = args[-1]
|
|
|
|
try:
|
|
if os.path.exists(file):
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
else:
|
|
data = {}
|
|
|
|
# Navigate/create nested structure
|
|
target = data
|
|
for key in keys[:-1]:
|
|
if key not in target or not isinstance(target[key], dict):
|
|
target[key] = {}
|
|
target = target[key]
|
|
|
|
# Coerce value types
|
|
final_key = keys[-1]
|
|
if value == 'true':
|
|
target[final_key] = True
|
|
elif value == 'false':
|
|
target[final_key] = False
|
|
else:
|
|
target[final_key] = value
|
|
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
commands = {
|
|
'get': lambda args: get(args[0], args[1]),
|
|
'set': lambda args: set_key(args[0], args[1], args[2]),
|
|
'delete': lambda args: delete_key(args[0], args[1]),
|
|
'append': lambda args: append(args[0], args[1], args[2]),
|
|
'remove': lambda args: remove_value(args[0], args[1], args[2]),
|
|
'cat': lambda args: cat(args[0]),
|
|
'has_key': lambda args: has_key(args[0], args[1]),
|
|
'filter_values': lambda args: filter_values(args[0], args[1], args[2]),
|
|
'last_event': lambda args: last_event(args[0], args[1], args[2], args[3]),
|
|
'events_for': lambda args: events_for(args[0], args[1], args[2]),
|
|
'fw_events': lambda args: fw_events(args[0], args[1], args[2], args[3], args[4]),
|
|
'wg_events': lambda args: wg_events(args[0], args[1], args[2], args[3]),
|
|
'format_fw_event': lambda args: format_fw_event(sys.stdin.read(), args[0]),
|
|
'format_wg_event': lambda args: format_wg_event(sys.stdin.read()),
|
|
'remove_events': lambda args: remove_events(args[0], args[1]),
|
|
'follow_logs': lambda args: follow_logs(args[0], args[1], args[2], args[3], args[4], args[5]),
|
|
'count': lambda args: count(args[0], args[1]),
|
|
'audit_fw_counts': lambda args: audit_fw_counts(args[0]),
|
|
'peer_group_map': lambda args: peer_group_map(args[0]),
|
|
'peer_groups': lambda args: peer_groups(args[0], args[1]),
|
|
'peer_data': lambda args: peer_data(args[0], args[1], args[2]),
|
|
'iso_to_ts': lambda args: iso_to_ts(args[0]),
|
|
'rule_list_data': lambda args: rule_list_data(args[0], args[1]),
|
|
'group_list_data': lambda args: group_list_data(args[0], args[1]),
|
|
'fmt_datetime': lambda args: fmt_datetime(args[0], args[1]),
|
|
'create_rule': lambda args: create_rule(
|
|
args[0], args[1], args[2], args[3], args[4], args[5], args[6],
|
|
args[7] if len(args) > 7 else '',
|
|
args[8] if len(args) > 8 else '',
|
|
args[9] if len(args) > 9 else ''
|
|
),
|
|
'cleanup_config': lambda args: cleanup_config(args[0]),
|
|
'remove_peer_block': lambda args: remove_peer_block(args[0], args[1]),
|
|
'create_group': lambda args: create_group(args[0], args[1], args[2]),
|
|
'parse_event': lambda args: parse_event(args[0]),
|
|
'parse_fw_event': lambda args: parse_fw_event(args[0]),
|
|
'remove_events_filtered': lambda args: remove_events_filtered(
|
|
args[0], args[1], args[2], args[3], args[4]=='true', args[5]=='true', args[6] if len(args)>6 else ''),
|
|
'peer_transfer': lambda args: peer_transfer(args[0]),
|
|
'peer_transfer_delta': lambda args: peer_transfer_delta(args[0], args[1]),
|
|
'rule_resolve': lambda args: rule_resolve(args[0], args[1]),
|
|
'rule_resolve_field': lambda args: rule_resolve_field(args[0], args[1], args[2]),
|
|
'rule_inspect': lambda args: rule_inspect(args[0], args[1]),
|
|
'find_rule_file': lambda args: print(find_rule_file(args[0], args[1])),
|
|
'get_raw': lambda args: print(get_raw(args[0], args[1])),
|
|
'count_resolved': lambda args: count_resolved(args[0], args[1], args[2]),
|
|
'block_get': lambda args: block_get(args[0]),
|
|
'block_is_blocked': lambda args: block_is_blocked(args[0]),
|
|
'block_set_direct': lambda args: block_set_direct(args[0], args[1], args[2]),
|
|
'block_add_group': lambda args: block_add_group(args[0], args[1], args[2]),
|
|
'block_remove_group': lambda args: block_remove_group(args[0], args[1], args[2]),
|
|
'block_add_rule': lambda args: block_add_rule(
|
|
args[0], args[1], args[2],
|
|
args[3] if len(args) > 3 else '',
|
|
args[4] if len(args) > 4 else '',
|
|
args[5] if len(args) > 5 else '',
|
|
args[6] if len(args) > 6 else ''
|
|
),
|
|
'block_remove_rule': lambda args: block_remove_rule(
|
|
args[0], args[1],
|
|
args[2] if len(args) > 2 else '',
|
|
args[3] if len(args) > 3 else '',
|
|
args[4] if len(args) > 4 else ''
|
|
),
|
|
'block_get_rules': lambda args: block_get_rules(args[0]),
|
|
'block_get_groups': lambda args: block_get_groups(args[0]),
|
|
'block_get_direct': lambda args: block_get_direct(args[0]),
|
|
'net_list': lambda args: net_list(args[0]),
|
|
'net_show': lambda args: net_show(args[0], args[1]),
|
|
'net_exists': lambda args: net_exists(args[0], args[1]),
|
|
'net_add_service': lambda args: net_add_service(
|
|
args[0], args[1], args[2],
|
|
args[3] if len(args) > 3 else '',
|
|
args[4] if len(args) > 4 else ''
|
|
),
|
|
'net_add_port': lambda args: net_add_port(
|
|
args[0], args[1], args[2], args[3],
|
|
args[4] if len(args) > 4 else 'tcp',
|
|
args[5] if len(args) > 5 else ''
|
|
),
|
|
'net_remove': lambda args: net_remove(args[0], args[1]),
|
|
'net_resolve': lambda args: net_resolve(args[0], args[1]),
|
|
'net_reverse_lookup': lambda args: net_reverse_lookup(
|
|
args[0], args[1],
|
|
args[2] if len(args) > 2 else '',
|
|
args[3] if len(args) > 3 else ''
|
|
),
|
|
'block_is_empty': lambda args: block_is_empty(args[0]),
|
|
'group_has_peer': lambda args: group_has_peer(args[0], args[1]),
|
|
|
|
# Subnet commands:
|
|
'subnet_lookup': lambda args: subnet_lookup(args[0], args[1], args[2] if len(args) > 2 else ''),
|
|
'subnet_type': lambda args: subnet_type(args[0], args[1], args[2] if len(args) > 2 else ''),
|
|
'subnet_tunnel_mode': lambda args: subnet_tunnel_mode(args[0], args[1], args[2] if len(args) > 2 else ''),
|
|
'subnet_for_ip': lambda args: subnet_for_ip(args[0], args[1]),
|
|
'subnet_list': lambda args: subnet_list(args[0]),
|
|
'subnet_show': lambda args: subnet_show(args[0], args[1]),
|
|
'subnet_add': lambda args: subnet_add(
|
|
args[0], args[1], args[2], args[3],
|
|
args[4] if len(args) > 4 else 'split',
|
|
args[5] if len(args) > 5 else '',
|
|
args[6] if len(args) > 6 else ''
|
|
),
|
|
'subnet_remove': lambda args: subnet_remove(args[0], args[1], args[2] if len(args) > 2 else ''),
|
|
'subnet_rename': lambda args: subnet_rename(args[0], args[1], args[2], args[3] if len(args) > 3 else ''),
|
|
'subnet_peers': lambda args: subnet_peers(args[0], args[1], args[2], args[3]),
|
|
'subnet_exists': lambda args: subnet_exists(args[0], args[1]),
|
|
|
|
# Identity commands:
|
|
'identity_list': lambda args: identity_list(args[0]),
|
|
'identity_show': lambda args: identity_show(args[0]),
|
|
'identity_add_peer': lambda args: identity_add_peer(args[0], args[1], args[2], args[3], args[4]),
|
|
'identity_remove_peer':lambda args: identity_remove_peer(args[0], args[1]),
|
|
'identity_remove': lambda args: identity_remove(args[0]),
|
|
'identity_next_index': lambda args: identity_next_index(args[0], args[1]),
|
|
'identity_peers': lambda args: identity_peers(args[0], args[1] if len(args) > 1 else ''),
|
|
'identity_migrate': lambda args: identity_migrate(args[0], args[1], args[2], args[3]),
|
|
'identity_infer': lambda args: identity_infer(args[0]),
|
|
'identity_exists': lambda args: identity_exists(args[0]),
|
|
'subnet_default_rule': lambda args: subnet_default_rule(args[0], args[1], args[2] if len(args) > 2 else ''),
|
|
'subnet_list_names': lambda args: subnet_list_names(args[0]),
|
|
|
|
# Policy commands:
|
|
'policy_get': lambda args: policy_get(args[0], args[1], args[2] if len(args) > 2 else ''),
|
|
'policy_list': lambda args: policy_list(args[0]),
|
|
'policy_exists': lambda args: policy_exists(args[0], args[1]),
|
|
'policy_add': lambda args: policy_add(args[0], args[1], args[2], args[3], args[4], args[5], args[6] if len(args) > 6 else ''),
|
|
'policy_remove': lambda args: policy_remove(args[0], args[1]),
|
|
'policy_set_field': lambda args: policy_set_field(args[0], args[1], args[2], args[3]),
|
|
'subnet_policy': lambda args: subnet_policy(args[0], args[1], args[2] if len(args) > 2 else ''),
|
|
'get_nested': lambda args: json_get_nested(args[0], *args[1:]),
|
|
'set_nested': lambda args: json_set_nested(args[0], *args[1:]),
|
|
'identity_rules': lambda args: identity_rules(args[0]),
|
|
'identity_add_rule': lambda args: identity_add_rule(args[0], args[1], args[2]),
|
|
'identity_remove_rule': lambda args: identity_remove_rule(args[0], args[1]),
|
|
'identity_clear_rules': lambda args: identity_clear_rules(args[0]),
|
|
'identity_has_rule': lambda args: identity_has_rule(args[0], args[1]),
|
|
}
|
|
|
|
if __name__ == '__main__':
|
|
if len(sys.argv) < 2:
|
|
print("Usage: json_helper.py <command> <file> [key] [value]", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
cmd = sys.argv[1]
|
|
args = sys.argv[2:]
|
|
|
|
if cmd not in commands:
|
|
print(f"Unknown command: {cmd}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
commands[cmd](args) |