736 lines
No EOL
24 KiB
Python
736 lines
No EOL
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
wgctl JSON helper — called by shell functions to read/write JSON files.
|
|
Usage: json_helper.py <command> <file> [key] [value]
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import os
|
|
|
|
DATETIME_FMT = os.environ.get('WGCTL_DATETIME_FMT', '%Y-%m-%d %H:%M')
|
|
|
|
def get(file, key):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
val = data.get(key, [])
|
|
if isinstance(val, bool):
|
|
print(str(val).lower()) # true/false not True/False
|
|
elif isinstance(val, list):
|
|
if val:
|
|
print('\n'.join(str(v) for v in val))
|
|
else:
|
|
if val:
|
|
print(val)
|
|
except:
|
|
sys.exit(0)
|
|
|
|
def set_key(file, key, value):
|
|
try:
|
|
data = {}
|
|
if os.path.exists(file):
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
# Try to parse as JSON value first (for arrays/bools)
|
|
try:
|
|
data[key] = json.loads(value)
|
|
except:
|
|
data[key] = value
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def delete_key(file, key):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
data.pop(key, None)
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def append(file, key, value):
|
|
try:
|
|
data = {}
|
|
if os.path.exists(file):
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
if key not in data:
|
|
data[key] = []
|
|
if value not in data[key]:
|
|
data[key].append(value)
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def remove_value(file, key, value):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
if key in data and value in data[key]:
|
|
data[key].remove(value)
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def cat(file):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
print(json.dumps(data, indent=2))
|
|
except Exception as e:
|
|
sys.exit(1)
|
|
|
|
def has_key(file, key):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
sys.exit(0 if key in data else 1)
|
|
except:
|
|
sys.exit(1)
|
|
|
|
def filter_values(file, key, value):
|
|
"""Remove all entries where value matches"""
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
data = {k: v for k, v in data.items() if v != value}
|
|
with open(file, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def last_event(file, key, field, client):
|
|
"""Get last event field for a client"""
|
|
try:
|
|
last = None
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if e.get(key) == client:
|
|
last = e
|
|
except:
|
|
pass
|
|
if last:
|
|
print(last.get(field, ''))
|
|
except:
|
|
pass
|
|
|
|
def events_for(file, ip, limit):
|
|
"""Format events for a given IP"""
|
|
try:
|
|
from datetime import datetime
|
|
events = []
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if e.get('ip') == ip:
|
|
events.append(e)
|
|
except:
|
|
pass
|
|
for e in events[-int(limit):]:
|
|
ts = e.get('timestamp', '')
|
|
try:
|
|
dt = datetime.fromisoformat(ts)
|
|
ts = dt.strftime(DATETIME_FMT)
|
|
except:
|
|
pass
|
|
endpoint = e.get('endpoint', '—')
|
|
client = e.get('client', '—')
|
|
event = e.get('event', '—')
|
|
print(f' {ts} {client:<20} {endpoint:<20} {event}')
|
|
except:
|
|
pass
|
|
|
|
def fw_events(file, filter_ip, filter_type, clients_dir, limit):
|
|
"""Format firewall drop events"""
|
|
import glob
|
|
|
|
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
|
|
|
|
# Build ip->name map
|
|
ip_to_name = {}
|
|
for conf in glob.glob(f"{clients_dir}/*.conf"):
|
|
name = os.path.basename(conf).replace('.conf', '')
|
|
try:
|
|
with open(conf) as f:
|
|
for line in f:
|
|
if line.startswith('Address'):
|
|
ip = line.split('=')[1].strip().split('/')[0]
|
|
ip_to_name[ip] = name
|
|
except:
|
|
pass
|
|
|
|
events = []
|
|
last_seen = {} # (src, dst, port, proto) -> last timestamp
|
|
|
|
try:
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
src = e.get('src_ip', '')
|
|
if not src:
|
|
continue
|
|
if filter_ip and src != filter_ip:
|
|
continue
|
|
|
|
# Dedup key
|
|
dst = e.get('dest_ip', '')
|
|
port = e.get('dest_port', '')
|
|
proto = e.get('ip.protocol', 0)
|
|
key = (src, dst, port, proto)
|
|
|
|
ts_str = e.get('timestamp', '')
|
|
try:
|
|
from datetime import datetime
|
|
ts = datetime.fromisoformat(ts_str).timestamp()
|
|
except:
|
|
ts = 0
|
|
|
|
# Protocol-aware dedup window
|
|
dedup_windows = {1: 5, 6: 30, 17: 10} # icmp=5s, tcp=30s, udp=10s
|
|
window = dedup_windows.get(proto, 10)
|
|
|
|
# Skip if same event within protocol window
|
|
if key in last_seen and (ts - last_seen[key]) < window:
|
|
continue
|
|
last_seen[key] = ts
|
|
|
|
events.append(e)
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
for e in events[-int(limit):]:
|
|
ts = e.get('timestamp', '')
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(ts)
|
|
ts = dt.strftime(DATETIME_FMT)
|
|
except:
|
|
pass
|
|
src = e.get('src_ip', '—')
|
|
dst = e.get('dest_ip', '—')
|
|
port = e.get('dest_port', '')
|
|
proto_num = e.get('ip.protocol', 0)
|
|
proto = proto_map.get(proto_num, str(proto_num))
|
|
dst_str = f"{dst}:{port}" if port else dst
|
|
client = ip_to_name.get(src, src)
|
|
|
|
if filter_type and not client.startswith(filter_type + '-'):
|
|
continue
|
|
|
|
print(f"{ts}|{client}|{dst_str}|{proto}")
|
|
|
|
def wg_events(file, filter_client, filter_type, limit):
|
|
"""Format WireGuard events from events.log"""
|
|
events = []
|
|
try:
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
client = e.get('client', '')
|
|
if not client:
|
|
continue
|
|
if filter_client and client != filter_client:
|
|
continue
|
|
if filter_type and not client.startswith(filter_type + '-'):
|
|
continue
|
|
events.append(e)
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
for e in events[-int(limit):]:
|
|
ts = e.get('timestamp', '')
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(ts)
|
|
ts = dt.strftime(DATETIME_FMT)
|
|
except:
|
|
pass
|
|
client = e.get('client', '—')
|
|
endpoint = e.get('endpoint', '—')
|
|
event = e.get('event', '—')
|
|
print(f"{ts}|{client}|{endpoint}|{event}")
|
|
|
|
def format_fw_event(line, clients_dir):
|
|
"""Format a single fw_event line"""
|
|
import glob
|
|
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
|
|
|
|
# Build ip->name map
|
|
ip_to_name = {}
|
|
for conf in glob.glob(f"{clients_dir}/*.conf"):
|
|
name = os.path.basename(conf).replace('.conf', '')
|
|
try:
|
|
with open(conf) as f:
|
|
for l in f:
|
|
if l.startswith('Address'):
|
|
ip = l.split('=')[1].strip().split('/')[0]
|
|
ip_to_name[ip] = name
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
e = json.loads(line.strip())
|
|
src = e.get('src_ip', '')
|
|
if not src:
|
|
return None
|
|
ts = e.get('timestamp', '')
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(ts)
|
|
ts = dt.strftime(DATETIME_FMT)
|
|
except:
|
|
pass
|
|
dst = e.get('dest_ip', '—')
|
|
port = e.get('dest_port', '')
|
|
proto_num = e.get('ip.protocol', 0)
|
|
proto = proto_map.get(proto_num, str(proto_num))
|
|
dst_str = f"{dst}:{port}" if port else dst
|
|
client = ip_to_name.get(src, src)
|
|
return f"{ts}|{client}|{dst_str}|{proto}"
|
|
except:
|
|
return None
|
|
|
|
def format_wg_event(line):
|
|
"""Format a single wg_event line"""
|
|
try:
|
|
e = json.loads(line.strip())
|
|
client = e.get('client', '')
|
|
if not client:
|
|
return None
|
|
ts = e.get('timestamp', '')
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(ts)
|
|
ts = dt.strftime(DATETIME_FMT)
|
|
except:
|
|
pass
|
|
endpoint = e.get('endpoint', '—')
|
|
event = e.get('event', '—')
|
|
return f"{ts}|{client}|{endpoint}|{event}|wg"
|
|
except:
|
|
return None
|
|
|
|
def remove_events(file, identifier):
|
|
"""Remove all events for a client/ip from a JSONL file"""
|
|
try:
|
|
lines = []
|
|
with open(file) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
if e.get('client') == identifier or e.get('src_ip') == identifier:
|
|
continue
|
|
lines.append(line)
|
|
except:
|
|
lines.append(line)
|
|
with open(file, 'w') as f:
|
|
f.writelines(lines)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def follow_logs(fw_file, wg_file, filter_ip, filter_type, clients_dir, filter_peers=""):
|
|
"""Follow both log files and output formatted events"""
|
|
import glob, time, select
|
|
|
|
proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'}
|
|
peer_filter = set(filter_peers.split(',')) if filter_peers else set()
|
|
|
|
# Build ip->name map
|
|
ip_to_name = {}
|
|
for conf in glob.glob(f"{clients_dir}/*.conf"):
|
|
name = os.path.basename(conf).replace('.conf', '')
|
|
try:
|
|
with open(conf) as f:
|
|
for l in f:
|
|
if l.startswith('Address'):
|
|
ip = l.split('=')[1].strip().split('/')[0]
|
|
ip_to_name[ip] = name
|
|
except:
|
|
pass
|
|
|
|
# Open files and seek to end
|
|
files = {}
|
|
for label, path in [('fw', fw_file), ('wg', wg_file)]:
|
|
if path and os.path.exists(path):
|
|
f = open(path)
|
|
f.seek(0, 2) # seek to end
|
|
files[label] = f
|
|
|
|
dedup = {}
|
|
|
|
try:
|
|
while True:
|
|
for label, f in files.items():
|
|
line = f.readline()
|
|
if not line:
|
|
continue
|
|
try:
|
|
e = json.loads(line.strip())
|
|
except:
|
|
continue
|
|
|
|
if label == 'fw':
|
|
src = e.get('src_ip', '')
|
|
if not src:
|
|
continue
|
|
if filter_ip and src != filter_ip:
|
|
continue
|
|
dst = e.get('dest_ip', '—')
|
|
port = e.get('dest_port', '')
|
|
proto_num = e.get('ip.protocol', 0)
|
|
proto = proto_map.get(proto_num, str(proto_num))
|
|
|
|
# Dedup
|
|
key = (src, dst, port, proto_num)
|
|
windows = {1: 5, 6: 30, 17: 10}
|
|
window = windows.get(proto_num, 10)
|
|
now = time.time()
|
|
if key in dedup and (now - dedup[key]) < window:
|
|
continue
|
|
dedup[key] = now
|
|
|
|
client = ip_to_name.get(src, src)
|
|
if filter_type and not client.startswith(filter_type + '-'):
|
|
continue
|
|
dst_str = f"{dst}:{port}" if port else dst
|
|
ts = e.get('timestamp', '')[:16].replace('T', ' ')
|
|
print(f"fw|{ts}|{client}|{dst_str}|{proto}", flush=True)
|
|
|
|
elif label == 'wg':
|
|
client = e.get('client', '')
|
|
if not client:
|
|
continue
|
|
if filter_ip:
|
|
ip = ip_to_name.get(filter_ip, '')
|
|
if client != ip and client != filter_ip:
|
|
continue
|
|
|
|
if peer_filter and client not in peer_filter:
|
|
continue
|
|
if filter_type and not client.startswith(filter_type + '-'):
|
|
continue
|
|
ts = e.get('timestamp', '')[:16].replace('T', ' ')
|
|
endpoint = e.get('endpoint', '—')
|
|
event = e.get('event', '—')
|
|
print(f"wg|{ts}|{client}|{endpoint}|{event}", flush=True)
|
|
|
|
time.sleep(0.1)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
def count(file, key):
|
|
try:
|
|
with open(file) as f:
|
|
data = json.load(f)
|
|
val = data.get(key, [])
|
|
print(len(val) if isinstance(val, list) else 0)
|
|
except:
|
|
print(0)
|
|
|
|
def audit_fw_counts(clients_dir):
|
|
"""Return peer_name:fw_count pairs from iptables and client configs"""
|
|
import glob, subprocess
|
|
|
|
# Get iptables output once
|
|
try:
|
|
result = subprocess.run(
|
|
['iptables', '-L', 'FORWARD', '-n'],
|
|
capture_output=True, text=True
|
|
)
|
|
fw_output = result.stdout
|
|
except:
|
|
fw_output = ""
|
|
|
|
# Build ip->name and count rules
|
|
for conf in glob.glob(f"{clients_dir}/*.conf"):
|
|
name = os.path.basename(conf).replace('.conf', '')
|
|
try:
|
|
with open(conf) as f:
|
|
for line in f:
|
|
if line.startswith('Address'):
|
|
ip = line.split('=')[1].strip().split('/')[0]
|
|
count = fw_output.count(ip)
|
|
print(f"{name}:{count}")
|
|
break
|
|
except:
|
|
pass
|
|
|
|
def peer_group_map(groups_dir):
|
|
"""Return peer:group pairs for all groups"""
|
|
import glob
|
|
try:
|
|
for group_file in glob.glob(f"{groups_dir}/*.group"):
|
|
try:
|
|
with open(group_file) as f:
|
|
g = json.load(f)
|
|
name = g.get('name', '')
|
|
for peer in g.get('peers', []):
|
|
if peer:
|
|
print(f"{peer}:{name}")
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
def peer_groups(groups_dir, peer_name):
|
|
"""Find all groups containing a peer"""
|
|
import glob
|
|
try:
|
|
for group_file in glob.glob(f"{groups_dir}/*.group"):
|
|
try:
|
|
with open(group_file) as f:
|
|
g = json.load(f)
|
|
if peer_name in g.get('peers', []):
|
|
print(g.get('name', ''))
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
def peer_data(clients_dir, meta_dir, events_log):
|
|
import glob
|
|
|
|
meta = {}
|
|
for f in glob.glob(f"{meta_dir}/*.meta"):
|
|
name = os.path.basename(f).replace('.meta', '')
|
|
try:
|
|
with open(f) as mf:
|
|
meta[name] = json.load(mf)
|
|
except:
|
|
meta[name] = {}
|
|
|
|
last_events = {}
|
|
try:
|
|
with open(events_log) as f:
|
|
for line in f:
|
|
try:
|
|
e = json.loads(line.strip())
|
|
client = e.get('client', '')
|
|
if client:
|
|
last_events[client] = e
|
|
except:
|
|
pass
|
|
except:
|
|
pass
|
|
|
|
for conf in sorted(glob.glob(f"{clients_dir}/*.conf")):
|
|
name = os.path.basename(conf).replace('.conf', '')
|
|
ip = ''
|
|
try:
|
|
with open(conf) as f:
|
|
for line in f:
|
|
if line.startswith('Address'):
|
|
ip = line.split('=')[1].strip().split('/')[0]
|
|
break
|
|
except:
|
|
pass
|
|
|
|
m = meta.get(name, {})
|
|
rule = m.get('rule', '')
|
|
subtype = m.get('subtype', '')
|
|
|
|
last_event = last_events.get(name, {})
|
|
last_ts = last_event.get('timestamp', '') # raw ISO, no formatting
|
|
last_evt = last_event.get('event', '') # fixed: was last_event
|
|
|
|
print(f"{name}|{ip}|{rule}|{subtype}|{last_ts}|{last_evt}")
|
|
|
|
def iso_to_ts(iso_str):
|
|
"""Convert ISO timestamp to unix timestamp"""
|
|
try:
|
|
from datetime import datetime, timezone
|
|
dt = datetime.fromisoformat(iso_str)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
print(int(dt.timestamp()))
|
|
except:
|
|
print(0)
|
|
|
|
def rule_list_data(rules_dir, meta_dir):
|
|
"""Return all rule data for list display in one call"""
|
|
import glob
|
|
|
|
# Count peers per rule from meta files
|
|
rule_peer_counts = {}
|
|
for f in glob.glob(f"{meta_dir}/*.meta"):
|
|
try:
|
|
with open(f) as mf:
|
|
meta = json.load(mf)
|
|
rule = meta.get('rule', '')
|
|
if rule:
|
|
rule_peer_counts[rule] = rule_peer_counts.get(rule, 0) + 1
|
|
except:
|
|
pass
|
|
|
|
# Read each rule file
|
|
for rule_file in sorted(glob.glob(f"{rules_dir}/*.rule")):
|
|
try:
|
|
with open(rule_file) as f:
|
|
r = json.load(f)
|
|
name = r.get('name', '')
|
|
desc = r.get('desc', '')
|
|
n_allows = len(r.get('allow_ips', [])) + len(r.get('allow_ports', []))
|
|
n_blocks = len(r.get('block_ips', [])) + len(r.get('block_ports', []))
|
|
peer_count = rule_peer_counts.get(name, 0)
|
|
print(f"{name}|{desc}|{n_allows}|{n_blocks}|{peer_count}")
|
|
except:
|
|
pass
|
|
|
|
def group_list_data(groups_dir, blocks_dir):
|
|
"""Return group summary data in one call"""
|
|
import glob
|
|
|
|
# Get all block files
|
|
blocked_peers = set()
|
|
for f in glob.glob(f"{blocks_dir}/*.block"):
|
|
name = os.path.basename(f).replace('.block', '')
|
|
blocked_peers.add(name)
|
|
|
|
for group_file in sorted(glob.glob(f"{groups_dir}/*.group")):
|
|
try:
|
|
with open(group_file) as f:
|
|
g = json.load(f)
|
|
name = g.get('name', '')
|
|
desc = g.get('desc', '')
|
|
peers = [p for p in g.get('peers', []) if p]
|
|
total = len(peers)
|
|
blocked = sum(1 for p in peers if p in blocked_peers)
|
|
print(f"{name}|{desc}|{total}|{blocked}")
|
|
except:
|
|
pass
|
|
|
|
def fmt_datetime(iso_str, fmt):
|
|
"""Format ISO timestamp with given strftime format"""
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(iso_str)
|
|
print(dt.strftime(fmt))
|
|
except:
|
|
print(iso_str)
|
|
|
|
def create_rule(file, name, desc, dns_redirect, allow_ips, block_ips, block_ports):
|
|
rule = {
|
|
'name': name,
|
|
'desc': desc,
|
|
'dns_redirect': dns_redirect == 'true',
|
|
'allow_ips': [x for x in allow_ips.split(',') if x] if allow_ips else [],
|
|
'block_ips': [x for x in block_ips.split(',') if x] if block_ips else [],
|
|
'block_ports': [x for x in block_ports.split(',') if x] if block_ports else [],
|
|
'allow_ports': []
|
|
}
|
|
with open(file, 'w') as f:
|
|
json.dump(rule, f, indent=2)
|
|
|
|
def cleanup_config(config_file):
|
|
"""Normalize blank lines in WireGuard config"""
|
|
import re
|
|
try:
|
|
with open(config_file) as f:
|
|
config = f.read()
|
|
config = re.sub(r'\n{3,}', '\n\n', config)
|
|
config = config.rstrip('\n') + '\n'
|
|
with open(config_file, 'w') as f:
|
|
f.write(config)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def remove_peer_block(config_file, name):
|
|
"""Remove a peer block from WireGuard config by name"""
|
|
import re
|
|
try:
|
|
with open(config_file) as f:
|
|
config = f.read()
|
|
pattern = r'\n\[Peer\]\n# ' + re.escape(name) + r'\n[^\n]+\n[^\n]+\n'
|
|
result = re.sub(pattern, '\n', config)
|
|
with open(config_file, 'w') as f:
|
|
f.write(result)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def create_group(file, name, desc):
|
|
"""Create a new group JSON file"""
|
|
try:
|
|
group = {'name': name, 'desc': desc, 'peers': []}
|
|
with open(file, 'w') as f:
|
|
json.dump(group, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
def parse_event(line):
|
|
"""Parse a single JSON event line"""
|
|
try:
|
|
e = json.loads(line)
|
|
print(f"{e.get('timestamp','')}|{e.get('client','')}|{e.get('endpoint','')}|{e.get('event','')}")
|
|
except:
|
|
pass
|
|
|
|
commands = {
|
|
'get': lambda args: get(args[0], args[1]),
|
|
'set': lambda args: set_key(args[0], args[1], args[2]),
|
|
'delete': lambda args: delete_key(args[0], args[1]),
|
|
'append': lambda args: append(args[0], args[1], args[2]),
|
|
'remove': lambda args: remove_value(args[0], args[1], args[2]),
|
|
'cat': lambda args: cat(args[0]),
|
|
'has_key': lambda args: has_key(args[0], args[1]),
|
|
'filter_values': lambda args: filter_values(args[0], args[1], args[2]),
|
|
'last_event': lambda args: last_event(args[0], args[1], args[2], args[3]),
|
|
'events_for': lambda args: events_for(args[0], args[1], args[2]),
|
|
'fw_events': lambda args: fw_events(args[0], args[1], args[2], args[3], args[4]),
|
|
'wg_events': lambda args: wg_events(args[0], args[1], args[2], args[3]),
|
|
'format_fw_event': lambda args: format_fw_event(sys.stdin.read(), args[0]),
|
|
'format_wg_event': lambda args: format_wg_event(sys.stdin.read()),
|
|
'remove_events': lambda args: remove_events(args[0], args[1]),
|
|
'follow_logs': lambda args: follow_logs(args[0], args[1], args[2], args[3], args[4], args[5]),
|
|
'count': lambda args: count(args[0], args[1]),
|
|
'audit_fw_counts': lambda args: audit_fw_counts(args[0]),
|
|
'peer_group_map': lambda args: peer_group_map(args[0]),
|
|
'peer_groups': lambda args: peer_groups(args[0], args[1]),
|
|
'peer_data': lambda args: peer_data(args[0], args[1], args[2]),
|
|
'iso_to_ts': lambda args: iso_to_ts(args[0]),
|
|
'rule_list_data': lambda args: rule_list_data(args[0], args[1]),
|
|
'group_list_data': lambda args: group_list_data(args[0], args[1]),
|
|
'fmt_datetime': lambda args: fmt_datetime(args[0], args[1]),
|
|
'create_rule': lambda args: create_rule(args[0], args[1], args[2], args[3], args[4], args[5], args[6]),
|
|
'cleanup_config': lambda args: cleanup_config(args[0]),
|
|
'remove_peer_block': lambda args: remove_peer_block(args[0], args[1]),
|
|
'create_group': lambda args: create_group(args[0], args[1], args[2]),
|
|
'parse_event': lambda args: parse_event(args[0]),
|
|
}
|
|
|
|
if __name__ == '__main__':
|
|
if len(sys.argv) < 2:
|
|
print("Usage: json_helper.py <command> <file> [key] [value]", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
cmd = sys.argv[1]
|
|
args = sys.argv[2:]
|
|
|
|
if cmd not in commands:
|
|
print(f"Unknown command: {cmd}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
commands[cmd](args) |