- core/lib/block_history.py: record/unblock/list functions - ctx::block_history: .wgctl/data/block-history/ path - block --reason: record block event with reason, endpoint, triggered_by - unblock --reason: update block event with unblock timestamp - json::block_history_record/unblock/list/list_all wrappers - json::endpoint_cache_get: get cached endpoint for peer - export --all: include block-history in full backup - import --all: restore block-history files - tests: section_block_unblock with fixture peer, history field validation
103 lines
No EOL
3 KiB
Python
103 lines
No EOL
3 KiB
Python
# core/lib/block_history.py
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
BLOCK_HISTORY_VERSION = 1
|
|
|
|
|
|
def _history_file(history_dir, peer):
|
|
return os.path.join(history_dir, f"{peer}.json")
|
|
|
|
|
|
def _load(history_dir, peer):
|
|
path = _history_file(history_dir, peer)
|
|
if os.path.exists(path):
|
|
try:
|
|
return json.load(open(path))
|
|
except Exception:
|
|
pass
|
|
return {"peer": peer, "version": BLOCK_HISTORY_VERSION, "history": []}
|
|
|
|
|
|
def _save(history_dir, peer, data):
|
|
os.makedirs(history_dir, exist_ok=True)
|
|
path = _history_file(history_dir, peer)
|
|
open(path, 'w').write(json.dumps(data, indent=2))
|
|
|
|
|
|
def _next_id(history):
|
|
if not history:
|
|
return 1
|
|
return max(int(e.get("id", 0)) for e in history) + 1
|
|
|
|
|
|
def _now():
|
|
return datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
|
|
|
|
def _get_endpoint(clients_dir, peer):
|
|
"""Try to get current endpoint from endpoint cache."""
|
|
cache_file = os.path.join(
|
|
os.path.dirname(os.path.dirname(clients_dir)),
|
|
'.wgctl', 'daemon', 'endpoint_cache.json')
|
|
try:
|
|
cache = json.load(open(cache_file))
|
|
return cache.get(peer, '')
|
|
except Exception:
|
|
return ''
|
|
|
|
|
|
def block_history_record(history_dir, peer, block_type,
|
|
triggered_by, reason, endpoint_at_block):
|
|
"""Record a new block event."""
|
|
data = _load(history_dir, peer)
|
|
entry = {
|
|
"id": _next_id(data["history"]),
|
|
"blocked_at": _now(),
|
|
"unblocked_at": None,
|
|
"block_type": block_type,
|
|
"triggered_by": triggered_by,
|
|
"reason": reason or '',
|
|
"endpoint_at_block": endpoint_at_block or '',
|
|
"unblocked_by": None,
|
|
"unblock_reason": None,
|
|
}
|
|
data["history"].append(entry)
|
|
_save(history_dir, peer, data)
|
|
print(entry["id"])
|
|
|
|
|
|
def block_history_unblock(history_dir, peer, unblocked_by, unblock_reason):
|
|
"""Update the most recent open block event with unblock timestamp."""
|
|
data = _load(history_dir, peer)
|
|
# Find most recent entry without unblocked_at
|
|
for entry in reversed(data["history"]):
|
|
if entry.get("unblocked_at") is None:
|
|
entry["unblocked_at"] = _now()
|
|
entry["unblocked_by"] = unblocked_by
|
|
entry["unblock_reason"] = unblock_reason or ''
|
|
_save(history_dir, peer, data)
|
|
print(entry["id"])
|
|
return
|
|
# No open block found — not an error, peer may have been unblocked externally
|
|
|
|
|
|
def block_history_list(history_dir, peer):
|
|
"""Output block history for a peer as JSON."""
|
|
data = _load(history_dir, peer)
|
|
print(json.dumps(data))
|
|
|
|
|
|
def block_history_list_all(history_dir):
|
|
"""Output block history for all peers as JSON array."""
|
|
import glob
|
|
results = []
|
|
for path in sorted(glob.glob(os.path.join(history_dir, '*.json'))):
|
|
try:
|
|
results.append(json.load(open(path)))
|
|
except Exception:
|
|
pass
|
|
print(json.dumps(results)) |