wgctl/core/lib/block_history.py
Nuno Duque Nunes 0b9f113453 feat: block history tracking
- core/lib/block_history.py: record/unblock/list functions
- ctx::block_history: .wgctl/data/block-history/ path
- block --reason: record block event with reason, endpoint, triggered_by
- unblock --reason: update block event with unblock timestamp
- json::block_history_record/unblock/list/list_all wrappers
- json::endpoint_cache_get: get cached endpoint for peer
- export --all: include block-history in full backup
- import --all: restore block-history files
- tests: section_block_unblock with fixture peer, history field validation
2026-05-28 01:51:37 +00:00

103 lines
No EOL
3 KiB
Python

# core/lib/block_history.py
import json
import os
from datetime import datetime, timezone
BLOCK_HISTORY_VERSION = 1
def _history_file(history_dir, peer):
return os.path.join(history_dir, f"{peer}.json")
def _load(history_dir, peer):
path = _history_file(history_dir, peer)
if os.path.exists(path):
try:
return json.load(open(path))
except Exception:
pass
return {"peer": peer, "version": BLOCK_HISTORY_VERSION, "history": []}
def _save(history_dir, peer, data):
os.makedirs(history_dir, exist_ok=True)
path = _history_file(history_dir, peer)
open(path, 'w').write(json.dumps(data, indent=2))
def _next_id(history):
if not history:
return 1
return max(int(e.get("id", 0)) for e in history) + 1
def _now():
return datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
def _get_endpoint(clients_dir, peer):
"""Try to get current endpoint from endpoint cache."""
cache_file = os.path.join(
os.path.dirname(os.path.dirname(clients_dir)),
'.wgctl', 'daemon', 'endpoint_cache.json')
try:
cache = json.load(open(cache_file))
return cache.get(peer, '')
except Exception:
return ''
def block_history_record(history_dir, peer, block_type,
triggered_by, reason, endpoint_at_block):
"""Record a new block event."""
data = _load(history_dir, peer)
entry = {
"id": _next_id(data["history"]),
"blocked_at": _now(),
"unblocked_at": None,
"block_type": block_type,
"triggered_by": triggered_by,
"reason": reason or '',
"endpoint_at_block": endpoint_at_block or '',
"unblocked_by": None,
"unblock_reason": None,
}
data["history"].append(entry)
_save(history_dir, peer, data)
print(entry["id"])
def block_history_unblock(history_dir, peer, unblocked_by, unblock_reason):
"""Update the most recent open block event with unblock timestamp."""
data = _load(history_dir, peer)
# Find most recent entry without unblocked_at
for entry in reversed(data["history"]):
if entry.get("unblocked_at") is None:
entry["unblocked_at"] = _now()
entry["unblocked_by"] = unblocked_by
entry["unblock_reason"] = unblock_reason or ''
_save(history_dir, peer, data)
print(entry["id"])
return
# No open block found — not an error, peer may have been unblocked externally
def block_history_list(history_dir, peer):
"""Output block history for a peer as JSON."""
data = _load(history_dir, peer)
print(json.dumps(data))
def block_history_list_all(history_dir):
"""Output block history for all peers as JSON array."""
import glob
results = []
for path in sorted(glob.glob(os.path.join(history_dir, '*.json'))):
try:
results.append(json.load(open(path)))
except Exception:
pass
print(json.dumps(results))