From 86220850c187f32ee7522132f240ebc43d1ac4e3 Mon Sep 17 00:00:00 2001 From: Nuno Duque Nunes Date: Mon, 25 May 2026 16:19:13 +0000 Subject: [PATCH] fix: handshake session detection, endpoint cache, watch ordering - wgctl-monitor: update _hs_last_logged on ALL handshakes not just new sessions - wgctl-monitor: fix endpoint_cache.json absolute path - wgctl-monitor: move script to wgctl/daemon/ (correct location) - watch: _poll_handshakes sorts by ts descending, endpoint cache fallback - watch: empty endpoint uses - not em dash (alignment fix) - logs: newline between fw and wg sections - monitor::live extracted, cmd::logs::follow no longer calls cmd::watch::run - ui.sh: UTF-8 extra byte constants --- commands/logs.command.sh | 21 ++- commands/watch.command.sh | 27 +--- core/context.sh | 47 +++--- daemon/wgctl-monitor.py | 303 +++++++++++++++++++++++++++++++++++ daemon/wgctl-monitor.service | 2 +- modules/monitor.module.sh | 34 ++++ 6 files changed, 378 insertions(+), 56 deletions(-) create mode 100755 daemon/wgctl-monitor.py diff --git a/commands/logs.command.sh b/commands/logs.command.sh index 4760760..002bd39 100644 --- a/commands/logs.command.sh +++ b/commands/logs.command.sh @@ -195,8 +195,14 @@ function cmd::logs::show() { log::section "WireGuard Activity Log" printf "\n" - if [[ -n "$fw_output" ]]; then printf "%s\n" "$fw_output"; fi; - if [[ -n "$wg_output" ]]; then printf "%s\n" "$wg_output"; fi; + if [[ -n "$fw_output" && -n "$wg_output" ]]; then + printf "%s\n\n" "$fw_output" + printf "%s\n" "$wg_output" + elif [[ -n "$fw_output" ]]; then + printf "%s\n" "$fw_output" + else + printf "%s\n" "$wg_output" + fi } function cmd::logs::show_fw_events() { @@ -361,13 +367,12 @@ function cmd::logs::follow() { log::section "WireGuard Live Log (Ctrl+C to stop)" printf "\n" - local watch_args=() - [[ -n "$filter_name" ]] && watch_args+=(--name "$filter_name") - [[ -n "$filter_type" ]] && watch_args+=(--type "$filter_type") - $fw_only && watch_args+=(--restricted) - $wg_only && watch_args+=(--blocked) + local restricted_only=false blocked_only=false + $fw_only && restricted_only=true + $wg_only && blocked_only=true - cmd::watch::run "${watch_args[@]}" + monitor::live "$filter_name" "$filter_type" "" \ + "$blocked_only" "$restricted_only" "false" "false" } function cmd::logs::remove() { diff --git a/commands/watch.command.sh b/commands/watch.command.sh index 8826a08..cc7c59c 100644 --- a/commands/watch.command.sh +++ b/commands/watch.command.sh @@ -75,29 +75,8 @@ function cmd::watch::run() { log::section "wgctl — Live Monitor (Ctrl+C to stop)" printf "\n" - local w_client=20 w_dest=18 - - if ! $blocked_only && ! $restricted_only; then - ( - while true; do - cmd::watch::_poll_handshakes \ - "$filter_name" "$filter_type" "$filter_peers" "$w_client" "$w_dest" - sleep 5 - done - ) & - local poller_pid=$! - fi - - cmd::watch::_tail_events \ - "$filter_name" "$filter_type" "$filter_peers" \ - "$blocked_only" "$restricted_only" "$allowed_only" \ - "$w_client" "$w_dest" & - local tailer_pid=$! - - trap "kill $tailer_pid ${poller_pid:-} 2>/dev/null; \ - rm -f /tmp/wgctl_hs_* /tmp/wgctl_attempt_*; printf '\n'; exit 0" INT TERM - - wait + monitor::live "$filter_name" "$filter_type" "$filter_peers" \ + "$blocked_only" "$restricted_only" "$allowed_only" "$raw" } # ============================================ @@ -149,7 +128,7 @@ function cmd::watch::_poll_handshakes() { # Resolve endpoint — try wg show first, fall back to endpoint cache local endpoint endpoint=$(monitor::endpoint_for_key "$public_key") - + if [[ -z "$endpoint" ]]; then endpoint=$(monitor::get_cached_endpoint "$client_name") fi diff --git a/core/context.sh b/core/context.sh index 81c8ee2..c1ebff9 100644 --- a/core/context.sh +++ b/core/context.sh @@ -27,29 +27,30 @@ _CTX_NET="${_CTX_DATA}/services.json" # ============================================ -function ctx::root() { echo "$_CTX_ROOT"; } -function ctx::core() { echo "$_CTX_CORE"; } -function ctx::modules() { echo "$_CTX_MODULES"; } -function ctx::commands() { echo "$_CTX_COMMANDS"; } -function ctx::blocks() { echo "$_CTX_BLOCKS"; } -function ctx::groups() { echo "$_CTX_GROUPS"; } -function ctx::rules() { echo "$_CTX_RULES"; } -function ctx::rules::base() { echo "$_CTX_RULES_BASE"; } -function ctx::clients() { echo "$_CTX_CLIENTS"; } -function ctx::wg() { echo "$_CTX_WG"; } -function ctx::data() { echo "$_CTX_DATA"; } -function ctx::rules() { echo "$_CTX_RULES"; } -function ctx::groups() { echo "$_CTX_GROUPS"; } -function ctx::blocks() { echo "$_CTX_BLOCKS"; } -function ctx::meta() { echo "$_CTX_META"; } -function ctx::daemon() { echo "$_CTX_DAEMON"; } -function ctx::net() { echo "$_CTX_NET"; } -function ctx::identities() { echo "${_CTX_IDENTITY}"; } -function ctx::subnets() { echo "${_CTX_DATA}/subnets.json"; } -function ctx::hosts() { echo "${_CTX_DATA}/hosts.json"; } -function ctx::events_log() { echo "$(ctx::daemon)/events.log"; } -function ctx::fw_events_log() { echo "$(ctx::daemon)/fw_events.log"; } -function ctx::json_helper() { echo "${_CTX_CORE}/json_helper.py"; } +function ctx::root() { echo "$_CTX_ROOT"; } +function ctx::core() { echo "$_CTX_CORE"; } +function ctx::modules() { echo "$_CTX_MODULES"; } +function ctx::commands() { echo "$_CTX_COMMANDS"; } +function ctx::blocks() { echo "$_CTX_BLOCKS"; } +function ctx::groups() { echo "$_CTX_GROUPS"; } +function ctx::rules() { echo "$_CTX_RULES"; } +function ctx::rules::base() { echo "$_CTX_RULES_BASE"; } +function ctx::clients() { echo "$_CTX_CLIENTS"; } +function ctx::wg() { echo "$_CTX_WG"; } +function ctx::data() { echo "$_CTX_DATA"; } +function ctx::rules() { echo "$_CTX_RULES"; } +function ctx::groups() { echo "$_CTX_GROUPS"; } +function ctx::blocks() { echo "$_CTX_BLOCKS"; } +function ctx::meta() { echo "$_CTX_META"; } +function ctx::daemon() { echo "$_CTX_DAEMON"; } +function ctx::net() { echo "$_CTX_NET"; } +function ctx::identities() { echo "${_CTX_IDENTITY}"; } +function ctx::subnets() { echo "${_CTX_DATA}/subnets.json"; } +function ctx::hosts() { echo "${_CTX_DATA}/hosts.json"; } +function ctx::events_log() { echo "$(ctx::daemon)/events.log"; } +function ctx::fw_events_log() { echo "$(ctx::daemon)/fw_events.log"; } +function ctx::json_helper() { echo "${_CTX_CORE}/json_helper.py"; } +function ctx::monitor_script() { echo "${_CTX_ROOT}/daemon/wgctl-monitor.py"; } # ============================================ # Path Helpers diff --git a/daemon/wgctl-monitor.py b/daemon/wgctl-monitor.py new file mode 100755 index 0000000..9fbb342 --- /dev/null +++ b/daemon/wgctl-monitor.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 + +import subprocess +import threading +import json +import logging +import os +import signal +import sys +import time +from datetime import datetime, timezone +from pathlib import Path + +from scapy.all import IP, UDP, sniff + +# ============================================ +# Config +# ============================================ + +WATCHLIST_FILE = Path("/etc/wireguard/.wgctl/daemon/watchlist.json") +EVENTS_LOG = Path("/etc/wireguard/.wgctl/daemon/events.log") +WG_INTERFACE = os.environ.get("WG_INTERFACE", "eth0") +WG_PORT = int(os.environ.get("WG_PORT", "51820")) +LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO") +WG_HANDSHAKE_CHECK_SEC = int(os.environ.get("WG_HANDSHAKE_CHECK_TIME_SEC", "300")) +WG_WG_INTERFACE = os.environ.get("WG_WG_INTERFACE", "wg0") # WireGuard interface, not capture interface +HS_CACHE_FILE = Path("/etc/wireguard/.wgctl/daemon/hs_cache.json") +ENDPOINT_CACHE_FILE = Path("/etc/wireguard/.wgctl/daemon/endpoint_cache.json") + +# ============================================ +# Logging +# ============================================ + +logging.basicConfig( + level=getattr(logging, LOG_LEVEL), + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler(sys.stdout)] +) +log = logging.getLogger("wgctl-monitor") + +# ============================================ +# Watchlist +# ============================================ + +_watchlist: dict[str, str] = {} +_watchlist_mtime: float = 0.0 + +def load_watchlist() -> dict[str, str]: + global _watchlist, _watchlist_mtime + + try: + mtime = WATCHLIST_FILE.stat().st_mtime + if mtime == _watchlist_mtime: + return _watchlist + + with WATCHLIST_FILE.open() as f: + _watchlist = json.load(f) + _watchlist_mtime = mtime + log.debug(f"Watchlist reloaded: {len(_watchlist)} entries") + + except Exception as e: + log.error(f"Failed to load watchlist: {e}") + + return _watchlist + +def is_watched(ip: str) -> str | None: + watchlist = load_watchlist() + return watchlist.get(ip) + +# ============================================ +# Endpoint Resolution +# ============================================ + +def get_endpoint(public_key: str) -> str | None: + try: + import subprocess + result = subprocess.run( + ["wg", "show", WG_INTERFACE, "endpoints"], + capture_output=True, text=True + ) + for line in result.stdout.splitlines(): + parts = line.split() + if len(parts) == 2 and parts[0] == public_key: + # Return just the IP without port + return parts[1].rsplit(":", 1)[0] + except Exception as e: + log.debug(f"Failed to get endpoint: {e}") + return None + +def get_client_public_key(client_name: str) -> str | None: + key_file = Path(f"/etc/wireguard/clients/{client_name}_public.key") + try: + return key_file.read_text().strip() + except Exception: + return None + +# ============================================ +# Event Logging +# ============================================ + +def log_event(ip: str, client: str, event: str, endpoint: str | None = None): + entry = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "ip": ip, + "client": client, + "event": event, + } + + # Update endpoint cache when we see a packet + + cache_file = ENDPOINT_CACHE_FILE + try: + with open(cache_file) as f: + cache = json.load(f) + except: + cache = {} + cache[client] = ip + with open(cache_file, 'w') as f: + json.dump(cache, f, indent=2) + + if endpoint: + entry["endpoint"] = endpoint + + try: + with EVENTS_LOG.open("a") as f: + f.write(json.dumps(entry) + "\n") + log.debug(f"Event logged: {entry}") + except Exception as e: + log.error(f"Failed to write event: {e}") + +# ============================================ +# Handshake Poller +# ============================================ + +# Tracks last logged handshake ts per pubkey +_hs_last_logged: dict[str, int] = {} + +def load_hs_cache(): + try: + with HS_CACHE_FILE.open() as f: + return {k: int(v) for k, v in json.load(f).items()} + except Exception: + return {} + +def save_hs_cache(cache): + try: + with HS_CACHE_FILE.open('w') as f: + json.dump(cache, f) + except Exception: + pass + +def build_pubkey_to_name() -> dict[str, str]: + """Build pubkey -> client name map from public key files.""" + mapping = {} + clients_dir = Path("/etc/wireguard/clients") + for kf in clients_dir.glob("*_public.key"): + name = kf.stem.replace("_public", "") + try: + mapping[kf.read_text().strip()] = name + except Exception: + pass + return mapping + + +def poll_handshakes(): + """ + Poll wg show latest-handshakes periodically. + Log a handshake event only when gap > WG_HANDSHAKE_CHECK_SEC (new session). + """ + global _hs_last_logged + + _hs_last_logged = load_hs_cache() + + pubkey_to_name = build_pubkey_to_name() + log.info(f"Handshake poller started — {len(pubkey_to_name)} peers, " + f"session threshold {WG_HANDSHAKE_CHECK_SEC}s") + + while True: + try: + result = subprocess.run( + ["wg", "show", WG_WG_INTERFACE, "latest-handshakes"], + capture_output=True, text=True + ) + for line in result.stdout.strip().splitlines(): + parts = line.split() + if len(parts) != 2: + continue + pubkey, ts_str = parts + try: + ts = int(ts_str) + except ValueError: + continue + if ts == 0: + continue + + client = pubkey_to_name.get(pubkey) + if not client: + continue + + last = _hs_last_logged.get(pubkey, 0) + gap = ts - last + + # Always update last seen + _hs_last_logged[pubkey] = ts + + if gap < WG_HANDSHAKE_CHECK_SEC: + continue # keepalive, skip + + # Get endpoint + endpoint = get_endpoint(pubkey) or '' + + # New session, log it + entry = { + "timestamp": datetime.fromtimestamp(ts, tz=timezone.utc).isoformat(), + "ip": "", + "client": client, + "event": "handshake", + "endpoint": endpoint, + } + try: + with EVENTS_LOG.open("a") as f: + f.write(json.dumps(entry) + "\n") + log.info(f"New session: {client} from {endpoint}") + except Exception as e: + log.error(f"Failed to write handshake event: {e}") + + log.debug(f"Gap for {client}: {gap}s (threshold: {WG_HANDSHAKE_CHECK_SEC}s)") + save_hs_cache(_hs_last_logged) + + except Exception as e: + log.error(f"Handshake poll error: {e}") + + time.sleep(WG_HANDSHAKE_CHECK_SEC // 2) # poll at half the threshold + +# ============================================ +# Packet Handler +# ============================================ + +def handle_packet(pkt): + if not (IP in pkt and UDP in pkt): + return + + # Only care about packets targeting WireGuard port + if pkt[UDP].dport != WG_PORT: + return + + src_ip = pkt[IP].src + client = is_watched(src_ip) + + if not client: + return + + # Resolve real endpoint IP + public_key = get_client_public_key(client) + endpoint = None + if public_key: + endpoint = get_endpoint(public_key) + + # If no endpoint from wg show, use packet source IP + if not endpoint: + endpoint = src_ip + + log_event(src_ip, client, "attempt", endpoint) + log.info(f"Blocked attempt: {client} ({src_ip}) from endpoint {endpoint}") + +# ============================================ +# Signal Handling +# ============================================ + +def handle_signal(signum, frame): + log.info("Shutting down wgctl-monitor") + sys.exit(0) + +signal.signal(signal.SIGTERM, handle_signal) +signal.signal(signal.SIGINT, handle_signal) + +# ============================================ +# Main +# ============================================ + +def main(): + log.info(f"wgctl-monitor starting on interface {WG_INTERFACE} port {WG_PORT}") + + if not WATCHLIST_FILE.exists(): + log.error(f"Watchlist not found: {WATCHLIST_FILE}") + sys.exit(1) + + load_watchlist() + log.info("Watchlist loaded, starting packet capture...") + + # Start handshake poller in background thread + hs_thread = threading.Thread(target=poll_handshakes, daemon=True) + hs_thread.start() + + sniff( + iface=WG_INTERFACE, + filter=f"udp port {WG_PORT}", + prn=handle_packet, + store=0 + ) + +if __name__ == "__main__": + main() diff --git a/daemon/wgctl-monitor.service b/daemon/wgctl-monitor.service index e50e157..053f826 100644 --- a/daemon/wgctl-monitor.service +++ b/daemon/wgctl-monitor.service @@ -4,7 +4,7 @@ After=network.target wg-quick@wg0.service [Service] Type=simple -ExecStart=/usr/bin/python3 /etc/wireguard/.wgctl/daemon/wgctl-monitor.py +ExecStart=/usr/bin/python3 /etc/wireguard/wgctl/daemon/wgctl-monitor.py Restart=always RestartSec=5 Environment=WG_INTERFACE=eth0 diff --git a/modules/monitor.module.sh b/modules/monitor.module.sh index 9bcee2f..2335478 100644 --- a/modules/monitor.module.sh +++ b/modules/monitor.module.sh @@ -138,3 +138,37 @@ function monitor::restart() { function monitor::is_running() { systemctl is-active --quiet "$MONITOR_SERVICE" } + +function monitor::live() { + local filter_name="${1:-}" filter_type="${2:-}" filter_peers="${3:-}" + local blocked_only="${4:-false}" restricted_only="${5:-false}" allowed_only="${6:-false}" + local raw="${7:-false}" + + [[ "$raw" == "true" ]] && _WGCTL_RAW=true + + rm -f /tmp/wgctl_hs_* /tmp/wgctl_attempt_* 2>/dev/null || true + + local w_client=20 w_dest=18 + + if ! $blocked_only && ! $restricted_only; then + ( + while true; do + cmd::watch::_poll_handshakes \ + "$filter_name" "$filter_type" "$filter_peers" "$w_client" "$w_dest" + sleep 5 + done + ) & + local poller_pid=$! + fi + + cmd::watch::_tail_events \ + "$filter_name" "$filter_type" "$filter_peers" \ + "$blocked_only" "$restricted_only" "$allowed_only" \ + "$w_client" "$w_dest" & + local tailer_pid=$! + + trap "kill $tailer_pid ${poller_pid:-} 2>/dev/null; \ + rm -f /tmp/wgctl_hs_* /tmp/wgctl_attempt_*; printf '\n'; exit 0" INT TERM + + wait +}