fix: handshake session detection, endpoint cache, watch ordering

- wgctl-monitor: update _hs_last_logged on ALL handshakes not just new sessions
- wgctl-monitor: fix endpoint_cache.json absolute path
- wgctl-monitor: move script to wgctl/daemon/ (correct location)
- watch: _poll_handshakes sorts by ts descending, endpoint cache fallback
- watch: empty endpoint uses - not em dash (alignment fix)
- logs: newline between fw and wg sections
- monitor::live extracted, cmd::logs::follow no longer calls cmd::run
- ui.sh: UTF-8 extra byte constants
This commit is contained in:
Nuno Duque Nunes 2026-05-25 16:19:13 +00:00
parent 3058750c3d
commit 86220850c1
6 changed files with 378 additions and 56 deletions

View file

@ -195,8 +195,14 @@ function cmd::logs::show() {
log::section "WireGuard Activity Log" log::section "WireGuard Activity Log"
printf "\n" printf "\n"
if [[ -n "$fw_output" ]]; then printf "%s\n" "$fw_output"; fi; if [[ -n "$fw_output" && -n "$wg_output" ]]; then
if [[ -n "$wg_output" ]]; then printf "%s\n" "$wg_output"; fi; printf "%s\n\n" "$fw_output"
printf "%s\n" "$wg_output"
elif [[ -n "$fw_output" ]]; then
printf "%s\n" "$fw_output"
else
printf "%s\n" "$wg_output"
fi
} }
function cmd::logs::show_fw_events() { function cmd::logs::show_fw_events() {
@ -361,13 +367,12 @@ function cmd::logs::follow() {
log::section "WireGuard Live Log (Ctrl+C to stop)" log::section "WireGuard Live Log (Ctrl+C to stop)"
printf "\n" printf "\n"
local watch_args=() local restricted_only=false blocked_only=false
[[ -n "$filter_name" ]] && watch_args+=(--name "$filter_name") $fw_only && restricted_only=true
[[ -n "$filter_type" ]] && watch_args+=(--type "$filter_type") $wg_only && blocked_only=true
$fw_only && watch_args+=(--restricted)
$wg_only && watch_args+=(--blocked)
cmd::watch::run "${watch_args[@]}" monitor::live "$filter_name" "$filter_type" "" \
"$blocked_only" "$restricted_only" "false" "false"
} }
function cmd::logs::remove() { function cmd::logs::remove() {

View file

@ -75,29 +75,8 @@ function cmd::watch::run() {
log::section "wgctl — Live Monitor (Ctrl+C to stop)" log::section "wgctl — Live Monitor (Ctrl+C to stop)"
printf "\n" printf "\n"
local w_client=20 w_dest=18 monitor::live "$filter_name" "$filter_type" "$filter_peers" \
"$blocked_only" "$restricted_only" "$allowed_only" "$raw"
if ! $blocked_only && ! $restricted_only; then
(
while true; do
cmd::watch::_poll_handshakes \
"$filter_name" "$filter_type" "$filter_peers" "$w_client" "$w_dest"
sleep 5
done
) &
local poller_pid=$!
fi
cmd::watch::_tail_events \
"$filter_name" "$filter_type" "$filter_peers" \
"$blocked_only" "$restricted_only" "$allowed_only" \
"$w_client" "$w_dest" &
local tailer_pid=$!
trap "kill $tailer_pid ${poller_pid:-} 2>/dev/null; \
rm -f /tmp/wgctl_hs_* /tmp/wgctl_attempt_*; printf '\n'; exit 0" INT TERM
wait
} }
# ============================================ # ============================================

View file

@ -27,29 +27,30 @@ _CTX_NET="${_CTX_DATA}/services.json"
# ============================================ # ============================================
function ctx::root() { echo "$_CTX_ROOT"; } function ctx::root() { echo "$_CTX_ROOT"; }
function ctx::core() { echo "$_CTX_CORE"; } function ctx::core() { echo "$_CTX_CORE"; }
function ctx::modules() { echo "$_CTX_MODULES"; } function ctx::modules() { echo "$_CTX_MODULES"; }
function ctx::commands() { echo "$_CTX_COMMANDS"; } function ctx::commands() { echo "$_CTX_COMMANDS"; }
function ctx::blocks() { echo "$_CTX_BLOCKS"; } function ctx::blocks() { echo "$_CTX_BLOCKS"; }
function ctx::groups() { echo "$_CTX_GROUPS"; } function ctx::groups() { echo "$_CTX_GROUPS"; }
function ctx::rules() { echo "$_CTX_RULES"; } function ctx::rules() { echo "$_CTX_RULES"; }
function ctx::rules::base() { echo "$_CTX_RULES_BASE"; } function ctx::rules::base() { echo "$_CTX_RULES_BASE"; }
function ctx::clients() { echo "$_CTX_CLIENTS"; } function ctx::clients() { echo "$_CTX_CLIENTS"; }
function ctx::wg() { echo "$_CTX_WG"; } function ctx::wg() { echo "$_CTX_WG"; }
function ctx::data() { echo "$_CTX_DATA"; } function ctx::data() { echo "$_CTX_DATA"; }
function ctx::rules() { echo "$_CTX_RULES"; } function ctx::rules() { echo "$_CTX_RULES"; }
function ctx::groups() { echo "$_CTX_GROUPS"; } function ctx::groups() { echo "$_CTX_GROUPS"; }
function ctx::blocks() { echo "$_CTX_BLOCKS"; } function ctx::blocks() { echo "$_CTX_BLOCKS"; }
function ctx::meta() { echo "$_CTX_META"; } function ctx::meta() { echo "$_CTX_META"; }
function ctx::daemon() { echo "$_CTX_DAEMON"; } function ctx::daemon() { echo "$_CTX_DAEMON"; }
function ctx::net() { echo "$_CTX_NET"; } function ctx::net() { echo "$_CTX_NET"; }
function ctx::identities() { echo "${_CTX_IDENTITY}"; } function ctx::identities() { echo "${_CTX_IDENTITY}"; }
function ctx::subnets() { echo "${_CTX_DATA}/subnets.json"; } function ctx::subnets() { echo "${_CTX_DATA}/subnets.json"; }
function ctx::hosts() { echo "${_CTX_DATA}/hosts.json"; } function ctx::hosts() { echo "${_CTX_DATA}/hosts.json"; }
function ctx::events_log() { echo "$(ctx::daemon)/events.log"; } function ctx::events_log() { echo "$(ctx::daemon)/events.log"; }
function ctx::fw_events_log() { echo "$(ctx::daemon)/fw_events.log"; } function ctx::fw_events_log() { echo "$(ctx::daemon)/fw_events.log"; }
function ctx::json_helper() { echo "${_CTX_CORE}/json_helper.py"; } function ctx::json_helper() { echo "${_CTX_CORE}/json_helper.py"; }
function ctx::monitor_script() { echo "${_CTX_ROOT}/daemon/wgctl-monitor.py"; }
# ============================================ # ============================================
# Path Helpers # Path Helpers

303
daemon/wgctl-monitor.py Executable file
View file

@ -0,0 +1,303 @@
#!/usr/bin/env python3
import subprocess
import threading
import json
import logging
import os
import signal
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from scapy.all import IP, UDP, sniff
# ============================================
# Config
# ============================================
WATCHLIST_FILE = Path("/etc/wireguard/.wgctl/daemon/watchlist.json")
EVENTS_LOG = Path("/etc/wireguard/.wgctl/daemon/events.log")
WG_INTERFACE = os.environ.get("WG_INTERFACE", "eth0")
WG_PORT = int(os.environ.get("WG_PORT", "51820"))
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
WG_HANDSHAKE_CHECK_SEC = int(os.environ.get("WG_HANDSHAKE_CHECK_TIME_SEC", "300"))
WG_WG_INTERFACE = os.environ.get("WG_WG_INTERFACE", "wg0") # WireGuard interface, not capture interface
HS_CACHE_FILE = Path("/etc/wireguard/.wgctl/daemon/hs_cache.json")
ENDPOINT_CACHE_FILE = Path("/etc/wireguard/.wgctl/daemon/endpoint_cache.json")
# ============================================
# Logging
# ============================================
logging.basicConfig(
level=getattr(logging, LOG_LEVEL),
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
log = logging.getLogger("wgctl-monitor")
# ============================================
# Watchlist
# ============================================
_watchlist: dict[str, str] = {}
_watchlist_mtime: float = 0.0
def load_watchlist() -> dict[str, str]:
global _watchlist, _watchlist_mtime
try:
mtime = WATCHLIST_FILE.stat().st_mtime
if mtime == _watchlist_mtime:
return _watchlist
with WATCHLIST_FILE.open() as f:
_watchlist = json.load(f)
_watchlist_mtime = mtime
log.debug(f"Watchlist reloaded: {len(_watchlist)} entries")
except Exception as e:
log.error(f"Failed to load watchlist: {e}")
return _watchlist
def is_watched(ip: str) -> str | None:
watchlist = load_watchlist()
return watchlist.get(ip)
# ============================================
# Endpoint Resolution
# ============================================
def get_endpoint(public_key: str) -> str | None:
try:
import subprocess
result = subprocess.run(
["wg", "show", WG_INTERFACE, "endpoints"],
capture_output=True, text=True
)
for line in result.stdout.splitlines():
parts = line.split()
if len(parts) == 2 and parts[0] == public_key:
# Return just the IP without port
return parts[1].rsplit(":", 1)[0]
except Exception as e:
log.debug(f"Failed to get endpoint: {e}")
return None
def get_client_public_key(client_name: str) -> str | None:
key_file = Path(f"/etc/wireguard/clients/{client_name}_public.key")
try:
return key_file.read_text().strip()
except Exception:
return None
# ============================================
# Event Logging
# ============================================
def log_event(ip: str, client: str, event: str, endpoint: str | None = None):
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"ip": ip,
"client": client,
"event": event,
}
# Update endpoint cache when we see a packet
cache_file = ENDPOINT_CACHE_FILE
try:
with open(cache_file) as f:
cache = json.load(f)
except:
cache = {}
cache[client] = ip
with open(cache_file, 'w') as f:
json.dump(cache, f, indent=2)
if endpoint:
entry["endpoint"] = endpoint
try:
with EVENTS_LOG.open("a") as f:
f.write(json.dumps(entry) + "\n")
log.debug(f"Event logged: {entry}")
except Exception as e:
log.error(f"Failed to write event: {e}")
# ============================================
# Handshake Poller
# ============================================
# Tracks last logged handshake ts per pubkey
_hs_last_logged: dict[str, int] = {}
def load_hs_cache():
try:
with HS_CACHE_FILE.open() as f:
return {k: int(v) for k, v in json.load(f).items()}
except Exception:
return {}
def save_hs_cache(cache):
try:
with HS_CACHE_FILE.open('w') as f:
json.dump(cache, f)
except Exception:
pass
def build_pubkey_to_name() -> dict[str, str]:
"""Build pubkey -> client name map from public key files."""
mapping = {}
clients_dir = Path("/etc/wireguard/clients")
for kf in clients_dir.glob("*_public.key"):
name = kf.stem.replace("_public", "")
try:
mapping[kf.read_text().strip()] = name
except Exception:
pass
return mapping
def poll_handshakes():
"""
Poll wg show latest-handshakes periodically.
Log a handshake event only when gap > WG_HANDSHAKE_CHECK_SEC (new session).
"""
global _hs_last_logged
_hs_last_logged = load_hs_cache()
pubkey_to_name = build_pubkey_to_name()
log.info(f"Handshake poller started — {len(pubkey_to_name)} peers, "
f"session threshold {WG_HANDSHAKE_CHECK_SEC}s")
while True:
try:
result = subprocess.run(
["wg", "show", WG_WG_INTERFACE, "latest-handshakes"],
capture_output=True, text=True
)
for line in result.stdout.strip().splitlines():
parts = line.split()
if len(parts) != 2:
continue
pubkey, ts_str = parts
try:
ts = int(ts_str)
except ValueError:
continue
if ts == 0:
continue
client = pubkey_to_name.get(pubkey)
if not client:
continue
last = _hs_last_logged.get(pubkey, 0)
gap = ts - last
# Always update last seen
_hs_last_logged[pubkey] = ts
if gap < WG_HANDSHAKE_CHECK_SEC:
continue # keepalive, skip
# Get endpoint
endpoint = get_endpoint(pubkey) or ''
# New session, log it
entry = {
"timestamp": datetime.fromtimestamp(ts, tz=timezone.utc).isoformat(),
"ip": "",
"client": client,
"event": "handshake",
"endpoint": endpoint,
}
try:
with EVENTS_LOG.open("a") as f:
f.write(json.dumps(entry) + "\n")
log.info(f"New session: {client} from {endpoint}")
except Exception as e:
log.error(f"Failed to write handshake event: {e}")
log.debug(f"Gap for {client}: {gap}s (threshold: {WG_HANDSHAKE_CHECK_SEC}s)")
save_hs_cache(_hs_last_logged)
except Exception as e:
log.error(f"Handshake poll error: {e}")
time.sleep(WG_HANDSHAKE_CHECK_SEC // 2) # poll at half the threshold
# ============================================
# Packet Handler
# ============================================
def handle_packet(pkt):
if not (IP in pkt and UDP in pkt):
return
# Only care about packets targeting WireGuard port
if pkt[UDP].dport != WG_PORT:
return
src_ip = pkt[IP].src
client = is_watched(src_ip)
if not client:
return
# Resolve real endpoint IP
public_key = get_client_public_key(client)
endpoint = None
if public_key:
endpoint = get_endpoint(public_key)
# If no endpoint from wg show, use packet source IP
if not endpoint:
endpoint = src_ip
log_event(src_ip, client, "attempt", endpoint)
log.info(f"Blocked attempt: {client} ({src_ip}) from endpoint {endpoint}")
# ============================================
# Signal Handling
# ============================================
def handle_signal(signum, frame):
log.info("Shutting down wgctl-monitor")
sys.exit(0)
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
# ============================================
# Main
# ============================================
def main():
log.info(f"wgctl-monitor starting on interface {WG_INTERFACE} port {WG_PORT}")
if not WATCHLIST_FILE.exists():
log.error(f"Watchlist not found: {WATCHLIST_FILE}")
sys.exit(1)
load_watchlist()
log.info("Watchlist loaded, starting packet capture...")
# Start handshake poller in background thread
hs_thread = threading.Thread(target=poll_handshakes, daemon=True)
hs_thread.start()
sniff(
iface=WG_INTERFACE,
filter=f"udp port {WG_PORT}",
prn=handle_packet,
store=0
)
if __name__ == "__main__":
main()

View file

@ -4,7 +4,7 @@ After=network.target wg-quick@wg0.service
[Service] [Service]
Type=simple Type=simple
ExecStart=/usr/bin/python3 /etc/wireguard/.wgctl/daemon/wgctl-monitor.py ExecStart=/usr/bin/python3 /etc/wireguard/wgctl/daemon/wgctl-monitor.py
Restart=always Restart=always
RestartSec=5 RestartSec=5
Environment=WG_INTERFACE=eth0 Environment=WG_INTERFACE=eth0

View file

@ -138,3 +138,37 @@ function monitor::restart() {
function monitor::is_running() { function monitor::is_running() {
systemctl is-active --quiet "$MONITOR_SERVICE" systemctl is-active --quiet "$MONITOR_SERVICE"
} }
function monitor::live() {
local filter_name="${1:-}" filter_type="${2:-}" filter_peers="${3:-}"
local blocked_only="${4:-false}" restricted_only="${5:-false}" allowed_only="${6:-false}"
local raw="${7:-false}"
[[ "$raw" == "true" ]] && _WGCTL_RAW=true
rm -f /tmp/wgctl_hs_* /tmp/wgctl_attempt_* 2>/dev/null || true
local w_client=20 w_dest=18
if ! $blocked_only && ! $restricted_only; then
(
while true; do
cmd::watch::_poll_handshakes \
"$filter_name" "$filter_type" "$filter_peers" "$w_client" "$w_dest"
sleep 5
done
) &
local poller_pid=$!
fi
cmd::watch::_tail_events \
"$filter_name" "$filter_type" "$filter_peers" \
"$blocked_only" "$restricted_only" "$allowed_only" \
"$w_client" "$w_dest" &
local tailer_pid=$!
trap "kill $tailer_pid ${poller_pid:-} 2>/dev/null; \
rm -f /tmp/wgctl_hs_* /tmp/wgctl_attempt_*; printf '\n'; exit 0" INT TERM
wait
}