feat: accept logging, conntrack daemon, activity integration

- daemon/wgctl-conntrack: Go daemon for conntrack DESTROY events
- wgctl-conntrack.service: systemd service
- core/lib/accept_events.py: accept_events(), accept_aggregate()
- ctx::accept_events_log: .wgctl/daemon/accept_events.log
- activity: ACCEPT row with bytes in/out and conn count
- activity: accept dest rows with ↓/↑ bytes at end
- activity: --accept, --drop, --external flags
- activity: unified w_count for drop/accept alignment
- activity: drop service rows in red
- activity: accept dest rows in green
- sysctl: nf_conntrack_acct=1 for byte counting
- note: --exclude-service/--include-service deferred
This commit is contained in:
Nuno Duque Nunes 2026-05-28 23:31:10 +00:00
parent d314ba376e
commit b892298259
9 changed files with 568 additions and 104 deletions

View file

@ -13,7 +13,9 @@ function cmd::activity::on_load() {
flag::register --ip flag::register --ip
flag::register --hours flag::register --hours
flag::register --type flag::register --type
flag::register --dropped flag::register --accept
flag::register --drop
flag::register --external
command::mixin json_output command::mixin json_output
} }
@ -35,11 +37,12 @@ Options:
--ip <ip> Filter by destination IP --ip <ip> Filter by destination IP
--hours <n> Time window in hours (default: 24, 0 = all time) --hours <n> Time window in hours (default: 24, 0 = all time)
--type <type> Filter by device type (combined with --peer) --type <type> Filter by device type (combined with --peer)
--dropped Show only peers with at least one drop --accept Show only accepted traffic (from conntrack)
--drop Show only firewall drops
--external Show only external traffic (full tunnel peers)
Examples: Examples:
wgctl activity wgctl activity
wgctl activity --dropped
wgctl activity --peer phone-nuno wgctl activity --peer phone-nuno
wgctl activity --service truenas wgctl activity --service truenas
wgctl activity --hours 0 wgctl activity --hours 0
@ -53,17 +56,20 @@ EOF
function cmd::activity::run() { function cmd::activity::run() {
local filter_peer="" filter_service="" filter_ip="" filter_type="" local filter_peer="" filter_service="" filter_ip="" filter_type=""
local hours=24 dropped_only=false local hours=24
local accept_only=false drop_only=false external_only=false
while [[ $# -gt 0 ]]; do while [[ $# -gt 0 ]]; do
case "$1" in case "$1" in
--peer) filter_peer="$2"; shift 2 ;; --peer) filter_peer="$2"; shift 2 ;;
--service) filter_service="$2"; shift 2 ;; --service) filter_service="$2"; shift 2 ;;
--ip) filter_ip="$2"; shift 2 ;; --ip) filter_ip="$2"; shift 2 ;;
--type) filter_type="$2"; shift 2 ;; --type) filter_type="$2"; shift 2 ;;
--hours) hours="$2"; shift 2 ;; --hours) hours="$2"; shift 2 ;;
--dropped) dropped_only=true; shift ;; --accept) accept_only=true; shift ;;
--help) cmd::activity::help; return ;; --drop) drop_only=true; shift ;;
--external) external_only=true; shift ;;
--help) cmd::activity::help; return ;;
*) *)
log::error "Unknown flag: $1" log::error "Unknown flag: $1"
cmd::activity::help cmd::activity::help
@ -77,42 +83,67 @@ function cmd::activity::run() {
return 0 return 0
fi fi
# Resolve peer name if type provided
if [[ -n "$filter_peer" && -n "$filter_type" ]]; then if [[ -n "$filter_peer" && -n "$filter_type" ]]; then
filter_peer=$(peers::resolve_and_require "$filter_peer" "$filter_type") || return 1 filter_peer=$(peers::resolve_and_require "$filter_peer" "$filter_type") || return 1
fi fi
# Resolve --service to IP
local service_ip="" local service_ip=""
if [[ -n "$filter_service" ]]; then if [[ -n "$filter_service" ]]; then
service_ip=$(net::resolve "$filter_service" 2>/dev/null | head -1 | cut -d: -f1) || true service_ip=$(net::resolve "$filter_service" 2>/dev/null | head -1 | cut -d: -f1) || true
if [[ -z "$service_ip" ]]; then [[ -z "$service_ip" ]] && log::error "Service not found: ${filter_service}" && return 1
log::error "Service not found: ${filter_service}"
return 1
fi
fi fi
[[ -n "$filter_ip" ]] && service_ip="$filter_ip" [[ -n "$filter_ip" ]] && service_ip="$filter_ip"
# Fetch aggregated data # ── Fetch data ──
local data local data=""
data=$(json::activity_aggregate \ if ! $accept_only; then
"$(ctx::fw_events_log)" \ data=$(json::activity_aggregate \
"$(ctx::events_log)" \ "$(ctx::fw_events_log)" "$(ctx::events_log)" \
"$(config::interface)" \ "$(config::interface)" "$(ctx::net)" \
"$(ctx::net)" \ "$(ctx::clients)" "$(ctx::meta)" \
"$(ctx::clients)" \ "$hours" "$filter_peer" "$service_ip" 2>/dev/null)
"$(ctx::meta)" \
"$hours" \
"$filter_peer" \
"$service_ip" 2>/dev/null)
if [[ -z "$data" ]]; then
log::wg_warning "No activity data found"
return 0
fi fi
# Measure column widths local accept_data=""
local w_peer=16 w_drops=1 if ! $drop_only; then
local since_arg="" ext_flag="0"
[[ "$hours" -gt 0 ]] && since_arg="${hours}h"
$external_only && ext_flag="1"
[[ -f "$(ctx::accept_events_log)" ]] && \
accept_data=$(json::accept_aggregate \
"$(ctx::accept_events_log)" "$(ctx::net)" "$(ctx::clients)" \
"$since_arg" "$filter_peer" "$ext_flag" 2>/dev/null)
fi
[[ -z "$data" && -z "$accept_data" ]] && \
log::wg_warning "No activity data found" && return 0
# ── Build accept maps ──
declare -gA _ACCEPT_PEER=()
declare -gA _ACCEPT_DEST_KEYS=()
declare -gA _ACCEPT_DEST=()
while IFS='|' read -r type rest; do
[[ -z "$type" ]] && continue
case "$type" in
peer)
local a_name a_bi a_bo a_pi a_po a_conns
IFS='|' read -r a_name a_bi a_bo a_pi a_po a_conns <<< "$rest"
_ACCEPT_PEER["$a_name"]="${a_bi}|${a_bo}|${a_pi}|${a_po}|${a_conns}"
;;
dest)
local d_peer d_ip d_port d_proto d_bytes_orig d_bytes_reply d_count
IFS='|' read -r d_peer d_ip d_port d_proto d_bytes_orig d_bytes_reply d_count <<< "$rest"
local d_key="${d_peer}:${d_ip}:${d_port}:${d_proto}"
_ACCEPT_DEST["$d_key"]="${d_bytes_orig}|${d_bytes_reply}|${d_count}"
_ACCEPT_DEST_KEYS["$d_peer"]+="${d_key} "
;;
esac
done <<< "$accept_data"
# ── Measure column widths ──
local w_peer=16 w_count=1
while IFS='|' read -r type rest; do while IFS='|' read -r type rest; do
case "$type" in case "$type" in
peer) peer)
@ -120,83 +151,166 @@ function cmd::activity::run() {
name=$(echo "$rest" | cut -d'|' -f1) name=$(echo "$rest" | cut -d'|' -f1)
drops=$(echo "$rest" | cut -d'|' -f4) drops=$(echo "$rest" | cut -d'|' -f4)
(( ${#name} > w_peer )) && w_peer=${#name} (( ${#name} > w_peer )) && w_peer=${#name}
(( ${#drops} > w_drops )) && w_drops=${#drops} (( ${#drops} > w_count )) && w_count=${#drops}
;; ;;
service) service)
local count local svc_count
count=$(echo "$rest" | cut -d'|' -f3) svc_count=$(echo "$rest" | cut -d'|' -f3)
(( ${#count} > w_drops )) && w_drops=${#count} (( ${#svc_count} > w_count )) && w_count=${#svc_count}
;; ;;
esac esac
done <<< "$data" done <<< "$data"
for a_name in "${!_ACCEPT_PEER[@]}"; do
(( ${#a_name} > w_peer )) && w_peer=${#a_name}
local a_conns_val="${_ACCEPT_PEER[$a_name]##*|}"
(( ${#a_conns_val} > w_count )) && w_count=${#a_conns_val}
done
for key in "${!_ACCEPT_DEST[@]}"; do
local d_val="${_ACCEPT_DEST[$key]}"
local d_count_val="${d_val##*|}"
(( ${#d_count_val} > w_count )) && w_count=${#d_count_val}
done
(( w_peer += 2 )) (( w_peer += 2 ))
# Compute column where drop count starts on peer row:
# " " (2) + name (w_peer) + " ↓" (3) + rx (10) + " ↑" (3) + tx (10) + " " (2)
# ↓ and ↑ are multi-byte (3 bytes, 1 visible) — 2 extra bytes each
# Visible: 2 + w_peer + 2+1 + 10 + 2+1 + 10 + 2 = w_peer + 30
local drops_col=$(( w_peer + 30 )) local drops_col=$(( w_peer + 30 ))
local hours_display="${hours}h" local hours_display="${hours}h"
[[ "$hours" == "0" ]] && hours_display="all time" [[ "$hours" == "0" ]] && hours_display="all time"
log::section "Activity Monitor (last ${hours_display})" log::section "Activity Monitor (last ${hours_display})"
echo "" echo ""
if display::is_table "activity"; then if display::is_table "activity"; then
cmd::activity::_render_table "$data" cmd::activity::_render_table "$data"
return 0 return 0
fi fi
local first_peer=true skip_peer=false # ── Accept dest inline renderer ──
_render_peer_accept_dests() {
local peer_name="$1"
local keys="${_ACCEPT_DEST_KEYS[$peer_name]:-}"
[[ -z "$keys" ]] && return 0
for d_key in $keys; do
local dest_stats="${_ACCEPT_DEST[$d_key]:-}"
[[ -z "$dest_stats" ]] && continue
local d_bytes_orig d_bytes_reply d_count
IFS='|' read -r d_bytes_orig d_bytes_reply d_count <<< "$dest_stats"
local rest_key="${d_key#${peer_name}:}"
local d_ip="${rest_key%%:*}"
local pp="${rest_key#*:}"
local d_port="${pp%%:*}"
local d_proto="${pp##*:}"
local dest_display
dest_display=$(resolve::dest "$d_ip" "$d_port" "$d_proto" 2>/dev/null \
|| echo "${d_ip}:${d_port}/${d_proto}")
ui::activity::accept_dest_row \
"$dest_display" "$d_bytes_orig" "$d_bytes_reply" \
"$d_count" "$drops_col" "$w_count"
done
}
local first_peer=true skip_peer=false current_name=""
local -a rendered_peers=()
# ── Main render loop (drop data) ──
while IFS='|' read -r record_type rest; do while IFS='|' read -r record_type rest; do
case "$record_type" in case "$record_type" in
peer) peer)
local name rx tx drops local name rx tx drops
IFS='|' read -r name rx tx drops <<< "$rest" IFS='|' read -r name rx tx drops <<< "$rest"
# Flush previous peer's accept dests
[[ -n "$current_name" ]] && ! $drop_only && \
_render_peer_accept_dests "$current_name"
skip_peer=false skip_peer=false
if $dropped_only && [[ "$drops" -eq 0 ]]; then current_name="$name"
skip_peer=true local has_accept="${_ACCEPT_PEER[$name]:-}"
continue
fi
$first_peer || echo "" $first_peer || echo ""
first_peer=false first_peer=false
rendered_peers+=("$name")
local rx_fmt tx_fmt local rx_fmt tx_fmt
rx_fmt=$(fmt::bytes "$rx") rx_fmt=$(fmt::bytes "$rx")
tx_fmt=$(fmt::bytes "$tx") tx_fmt=$(fmt::bytes "$tx")
local name_pad rx_pad tx_pad local name_pad rx_pad tx_pad
name_pad=$(printf "%-${w_peer}s" "$name") name_pad=$(printf "%-${w_peer}s" "$name")
rx_pad=$(printf "%-10s" "$rx_fmt") rx_pad=$(printf "%-10s" "$rx_fmt")
tx_pad=$(printf "%-10s" "$tx_fmt") tx_pad=$(printf "%-10s" "$tx_fmt")
local drop_word="drops" local drop_word="drops"
[[ "$drops" -eq 1 ]] && drop_word="drop" [[ "$drops" -eq 1 ]] && drop_word="drop"
ui::activity::peer_row \ # Always show peer name — either full row or name-only for accept_only
"$name_pad" "$rx_pad" "$tx_pad" "$drops" "$drop_word" "$w_drops" if $accept_only; then
printf " \033[1m%s\033[0m\n" "$name_pad"
else
ui::activity::peer_row \
"$name_pad" "$rx_pad" "$tx_pad" "$drops" "$drop_word" "$w_count"
fi
# Accept summary row
if [[ -n "$has_accept" ]] && ! $drop_only; then
local a_bi a_bo a_pi a_po a_conns
IFS='|' read -r a_bi a_bo a_pi a_po a_conns <<< "$has_accept"
ui::activity::accept_row \
"$name_pad" \
"$(printf '%-10s' "$(fmt::bytes "$a_bi")")" \
"$(printf '%-10s' "$(fmt::bytes "$a_bo")")" \
"$a_conns" "$w_count"
fi
;; ;;
service) service)
$skip_peer && continue $skip_peer && continue
$accept_only && continue
local peer dest_display drop_count local peer dest_display drop_count
IFS='|' read -r peer dest_display drop_count <<< "$rest" IFS='|' read -r peer dest_display drop_count <<< "$rest"
local svc_drop_word="drops" local svc_drop_word="drops"
[[ "$drop_count" -eq 1 ]] && svc_drop_word="drop" [[ "$drop_count" -eq 1 ]] && svc_drop_word="drop"
ui::activity::service_row \ ui::activity::service_row \
"$dest_display" "$drop_count" "$svc_drop_word" "$drops_col" "$w_drops" "$dest_display" "$drop_count" "$svc_drop_word" "$drops_col" "$w_count"
;; ;;
esac esac
done <<< "$data" done <<< "$data"
# Flush last peer's accept dests
[[ -n "$current_name" ]] && ! $drop_only && \
_render_peer_accept_dests "$current_name"
# ── Accept-only peers (not in drop data) ──
if ! $drop_only; then
for a_name in $(echo "${!_ACCEPT_PEER[@]}" | tr ' ' '\n' | sort); do
# Skip already rendered
local already=false
for rp in "${rendered_peers[@]:-}"; do
[[ "$rp" == "$a_name" ]] && already=true && break
done
$already && continue
$first_peer || echo ""
first_peer=false
local a_stats="${_ACCEPT_PEER[$a_name]}"
local a_bi a_bo a_pi a_po a_conns
IFS='|' read -r a_bi a_bo a_pi a_po a_conns <<< "$a_stats"
local name_pad
name_pad=$(printf "%-${w_peer}s" "$a_name")
# Always show peer name
printf " \033[1m%s\033[0m\n" "$name_pad"
ui::activity::accept_row \
"$name_pad" \
"$(printf '%-10s' "$(fmt::bytes "$a_bi")")" \
"$(printf '%-10s' "$(fmt::bytes "$a_bo")")" \
"$a_conns" "$w_count"
_render_peer_accept_dests "$a_name"
done
fi
echo "" echo ""
} }
@ -276,3 +390,48 @@ function cmd::activity::_output_json() {
array=$(printf '%s\n' "${peers[@]:-}" | paste -sd ',' -) array=$(printf '%s\n' "${peers[@]:-}" | paste -sd ',' -)
printf '{"peers":[%s]}' "${array:-}" | json::envelope "activity" "$count" printf '{"peers":[%s]}' "${array:-}" | json::envelope "activity" "$count"
} }
function cmd::activity::_fetch_accept_data() {
local hours="${1:-24}" filter_peer="${2:-}" external_only="${3:-false}"
[[ ! -f "$(ctx::accept_events_log)" ]] && return 0
local since_arg=""
[[ "$hours" -gt 0 ]] && since_arg="${hours}h"
local ext_flag="0"
$external_only && ext_flag="1"
json::accept_aggregate \
"$(ctx::accept_events_log)" \
"$(ctx::net)" \
"$(ctx::clients)" \
"$since_arg" \
"$filter_peer" \
2>/dev/null
}
function cmd::activity::_build_accept_maps() {
local accept_data="${1:-}"
# Outputs to stdout as bash declare statements — use eval
# Sets: _ACCEPT_PEER[name]="bytes_in|bytes_out|packets_in|packets_out|conn_count"
# _ACCEPT_DEST[name:ip:port:proto]="bytes|count"
declare -gA _ACCEPT_PEER=()
declare -gA _ACCEPT_DEST=()
while IFS='|' read -r type rest; do
[[ -z "$type" ]] && continue
case "$type" in
peer)
local name bytes_in bytes_out packets_in packets_out conn_count
IFS='|' read -r name bytes_in bytes_out packets_in packets_out conn_count <<< "$rest"
_ACCEPT_PEER["$name"]="${bytes_in}|${bytes_out}|${packets_in}|${packets_out}|${conn_count}"
;;
dest)
local peer dst_ip dst_port proto bytes count
IFS='|' read -r peer dst_ip dst_port proto bytes count <<< "$rest"
_ACCEPT_DEST["${peer}:${dst_ip}:${dst_port}:${proto}"]="${bytes}|${count}"
;;
esac
done <<< "$accept_data"
}

View file

@ -44,49 +44,50 @@ _CTX_CONFIG_FILE="${_CTX_CONFIG}/wgctl.json"
# Accessors # Accessors
# ============================================ # ============================================
function ctx::root() { echo "$_CTX_ROOT"; } function ctx::root() { echo "$_CTX_ROOT"; }
function ctx::core() { echo "$_CTX_CORE"; } function ctx::core() { echo "$_CTX_CORE"; }
function ctx::modules() { echo "$_CTX_MODULES"; } function ctx::modules() { echo "$_CTX_MODULES"; }
function ctx::commands() { echo "$_CTX_COMMANDS"; } function ctx::commands() { echo "$_CTX_COMMANDS"; }
function ctx::wg() { echo "$_CTX_WG"; } function ctx::wg() { echo "$_CTX_WG"; }
function ctx::clients() { echo "$_CTX_CLIENTS"; } function ctx::clients() { echo "$_CTX_CLIENTS"; }
# Top-level dirs # Top-level dirs
function ctx::wgctl() { echo "$_CTX_WGCTL"; } function ctx::wgctl() { echo "$_CTX_WGCTL"; }
function ctx::config() { echo "$_CTX_CONFIG"; } function ctx::config() { echo "$_CTX_CONFIG"; }
function ctx::data() { echo "$_CTX_DATA"; } function ctx::data() { echo "$_CTX_DATA"; }
function ctx::daemon() { echo "$_CTX_DAEMON"; } function ctx::daemon() { echo "$_CTX_DAEMON"; }
# Data subdirs # Data subdirs
function ctx::rules() { echo "$_CTX_RULES"; } function ctx::rules() { echo "$_CTX_RULES"; }
function ctx::rules::base() { echo "$_CTX_RULES_BASE"; } function ctx::rules::base() { echo "$_CTX_RULES_BASE"; }
function ctx::groups() { echo "$_CTX_GROUPS"; } function ctx::groups() { echo "$_CTX_GROUPS"; }
function ctx::blocks() { echo "$_CTX_BLOCKS"; } function ctx::blocks() { echo "$_CTX_BLOCKS"; }
function ctx::meta() { echo "$_CTX_META"; } function ctx::meta() { echo "$_CTX_META"; }
function ctx::identities() { echo "$_CTX_IDENTITY"; } function ctx::identities() { echo "$_CTX_IDENTITY"; }
function ctx::peer_history() { echo "$_CTX_PEER_HISTORY"; } function ctx::peer_history() { echo "$_CTX_PEER_HISTORY"; }
# Data files # Data files
function ctx::net() { echo "$_CTX_NET"; } function ctx::net() { echo "$_CTX_NET"; }
function ctx::hosts() { echo "$_CTX_HOSTS"; } function ctx::hosts() { echo "$_CTX_HOSTS"; }
function ctx::subnets() { echo "$_CTX_SUBNETS"; } function ctx::subnets() { echo "$_CTX_SUBNETS"; }
function ctx::policies() { echo "$_CTX_POLICIES"; } function ctx::policies() { echo "$_CTX_POLICIES"; }
# Config files # Config files
function ctx::config_file() { echo "$_CTX_CONFIG_FILE"; } function ctx::config_file() { echo "$_CTX_CONFIG_FILE"; }
function ctx::display() { echo "${_CTX_CONFIG}/display.json"; } function ctx::display() { echo "${_CTX_CONFIG}/display.json"; }
# Daemon files # Daemon files
function ctx::events_log() { echo "${_CTX_DAEMON}/events.log"; } function ctx::events_log() { echo "${_CTX_DAEMON}/events.log"; }
function ctx::fw_events_log() { echo "${_CTX_DAEMON}/fw_events.log"; } function ctx::fw_events_log() { echo "${_CTX_DAEMON}/fw_events.log"; }
function ctx::endpoint_cache() { echo "${_CTX_DAEMON}/endpoint_cache.json"; } function ctx::endpoint_cache() { echo "${_CTX_DAEMON}/endpoint_cache.json"; }
function ctx::accept_events_log() { echo "${_CTX_DAEMON}/accept_events.log"; }
# Tool paths # Tool paths
function ctx::json_helper() { echo "${_CTX_CORE}/json_helper.py"; } function ctx::json_helper() { echo "${_CTX_CORE}/json_helper.py"; }
function ctx::monitor_script() { echo "${_CTX_ROOT}/daemon/wgctl-monitor.py"; } function ctx::monitor_script() { echo "${_CTX_ROOT}/daemon/wgctl-monitor.py"; }
function ctx::lib() { echo "${_CTX_CORE}/lib"; } function ctx::lib() { echo "${_CTX_CORE}/lib"; }
function ctx::block_history() { echo "${_CTX_DATA}/block-history"; } function ctx::block_history() { echo "${_CTX_DATA}/block-history"; }
# ============================================ # ============================================
# Path Helpers # Path Helpers

View file

@ -154,6 +154,10 @@ function json::block_history_list_all() { python3 "$JSON_HELPER" block_history
function json::endpoint_cache_get() { python3 "$JSON_HELPER" endpoint_cache_get "$@" </dev/null; } function json::endpoint_cache_get() { python3 "$JSON_HELPER" endpoint_cache_get "$@" </dev/null; }
# Accept Events
function json::accept_events() { python3 "$JSON_HELPER" accept_events "$@" </dev/null; }
function json::accept_aggregate() { python3 "$JSON_HELPER" accept_aggregate "$@" </dev/null; }
function json::peer_transfer() { function json::peer_transfer() {
ACTIVITY_TOTAL_LOW="$(config::activity_total_low)" \ ACTIVITY_TOTAL_LOW="$(config::activity_total_low)" \
ACTIVITY_TOTAL_MED="$(config::activity_total_med)" \ ACTIVITY_TOTAL_MED="$(config::activity_total_med)" \

View file

@ -2225,6 +2225,19 @@ commands = {
'block_history_list_all': lambda args: __import__('lib.block_history', 'block_history_list_all': lambda args: __import__('lib.block_history',
fromlist=['block_history_list_all']).block_history_list_all(args[0]), fromlist=['block_history_list_all']).block_history_list_all(args[0]),
'endpoint_cache_get': lambda args: endpoint_cache_get(args[0], args[1]), 'endpoint_cache_get': lambda args: endpoint_cache_get(args[0], args[1]),
'accept_events': lambda args: __import__('lib.accept_events', fromlist=['accept_events']).accept_events(
args[0], args[1], args[2], args[3],
args[4] if len(args) > 4 else '100',
args[5] if len(args) > 5 else '1',
args[6] if len(args) > 6 else '',
args[7] if len(args) > 7 else '0',
args[8] if len(args) > 8 else 'desc'),
'accept_aggregate': lambda args: __import__('lib.accept_events',
fromlist=['accept_aggregate']).accept_aggregate(
args[0], args[1], args[2],
args[3] if len(args) > 3 else '',
args[4] if len(args) > 4 else '',
args[5] if len(args) > 5 else '0'),
} }
# ── Main ───────────────────────────────────────────────────────────────────── # ── Main ─────────────────────────────────────────────────────────────────────

Binary file not shown.

228
core/lib/accept_events.py Normal file
View file

@ -0,0 +1,228 @@
"""
accept_events.py conntrack accept event processing.
Reads accept_events.log written by wgctl-conntrack daemon.
Each line is a JSON object with fields:
ts, peer, src_ip, dst_ip, dst_port, proto,
bytes_orig, bytes_reply, packets_orig, packets_reply,
duration_sec, service, event, external
"""
import os
import json
from collections import defaultdict
from datetime import datetime
from lib.util import (
DATETIME_FMT,
load_net_data, load_hosts_data,
reverse_lookup, hosts_lookup,
fmt_ts, fmt_ts_hour, ts_to_unix, parse_since,
)
def accept_events(file, filter_peer, filter_type, net_file,
limit, collapse='1', since='', filter_external='0',
sort_order='desc'):
"""
Format accept events with optional aggregation.
Output per line (collapse=1):
ts|peer|dst_ip|dst_port|proto|bytes_total|packets_total|count|duration_avg
Output per line (collapse=0):
ts|peer|dst_ip|dst_port|proto|bytes_orig|bytes_reply|packets_orig|packets_reply|duration_sec
"""
do_collapse = str(collapse) != '0'
external_only = str(filter_external) == '1'
limit = int(limit) if limit else 100
since_dt = parse_since(since) if since else None
descending = sort_order != 'asc'
events = []
try:
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
if not e.get('peer'):
continue
if filter_peer and e.get('peer') != filter_peer:
continue
if filter_type and not e.get('peer', '').startswith(filter_type + '-'):
continue
if external_only and not e.get('external', False):
continue
if not external_only and e.get('external', False):
continue
if since_dt:
ts_str = e.get('ts', '')
try:
from datetime import timezone
ev_dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
if ev_dt < since_dt:
continue
except Exception:
pass
events.append(e)
except Exception:
pass
except Exception:
pass
if do_collapse:
# Aggregate by peer + dst_ip + dst_port + proto + hour
buckets = defaultdict(lambda: {'count': 0, 'bytes': 0, 'packets': 0, 'duration': 0.0})
bucket_ts = {}
for e in events:
ts_str = e.get('ts', '')
peer = e.get('peer', '')
dst_ip = e.get('dst_ip', '')
dst_port = str(e.get('dst_port', ''))
proto = e.get('proto', '')
try:
dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
hour_key = (peer, dst_ip, dst_port, proto, dt.strftime('%Y-%m-%d %H'))
except Exception:
continue
b = buckets[hour_key]
b['count'] += 1
b['bytes'] += e.get('bytes_orig', 0) + e.get('bytes_reply', 0)
b['packets'] += e.get('packets_orig', 0) + e.get('packets_reply', 0)
b['duration'] += e.get('duration_sec', 0.0)
if hour_key not in bucket_ts:
bucket_ts[hour_key] = dt
# Sort and limit
sorted_buckets = sorted(bucket_ts.items(), key=lambda x: x[1])
output = sorted_buckets[-limit:]
if descending:
output = list(reversed(output))
for hour_key, dt in output:
peer, dst_ip, dst_port, proto, _ = hour_key
b = buckets[hour_key]
ts_fmt = fmt_ts_hour(dt.isoformat())
dur_avg = b['duration'] / b['count'] if b['count'] > 0 else 0.0
print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b['bytes']}|{b['packets']}|{b['count']}|{dur_avg:.1f}")
else:
# Detailed — one row per event
result = [(ts_to_unix(e.get('ts', '')), e) for e in events]
result = result[-limit:]
if descending:
result.reverse()
for _, e in result:
ts_fmt = fmt_ts(e.get('ts', ''))
peer = e.get('peer', '')
dst_ip = e.get('dst_ip', '')
dst_port = str(e.get('dst_port', ''))
proto = e.get('proto', '')
b_orig = e.get('bytes_orig', 0)
b_reply = e.get('bytes_reply', 0)
p_orig = e.get('packets_orig', 0)
p_reply = e.get('packets_reply', 0)
dur = e.get('duration_sec', 0.0)
print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b_orig}|{b_reply}|{p_orig}|{p_reply}|{dur:.1f}")
def accept_aggregate(file, net_file, clients_dir, since='',
filter_peer='', external_only='0'):
"""
Aggregate accept events per peer total bytes, packets, top destinations.
Used by wgctl activity to show accepted traffic alongside drops.
external_only='1': only show traffic to external IPs (non-private)
external_only='0': only show traffic to internal IPs (default)
Output:
peer|peer_name|bytes_in|bytes_out|packets_in|packets_out|conn_count
dest|peer_name|dst_ip|dst_port|proto|bytes_total|conn_count
"""
from collections import defaultdict
from itertools import groupby
since_dt = parse_since(since) if since else None
show_external = str(external_only) == '1'
peer_stats = defaultdict(lambda: {
'bytes_in': 0, 'bytes_out': 0,
'packets_in': 0, 'packets_out': 0,
'conn_count': 0
})
# dest_stats = defaultdict(lambda: {'bytes': 0, 'count': 0})
dest_stats = defaultdict(lambda: {'bytes_orig': 0, 'bytes_reply': 0, 'count': 0})
try:
with open(file) as f:
for line in f:
try:
e = json.loads(line.strip())
peer = e.get('peer', '')
if not peer:
continue
if filter_peer and peer != filter_peer:
continue
# Filter by external/internal
is_external = e.get('external', False)
if show_external and not is_external:
continue
if not show_external and is_external:
continue
if since_dt:
ts_str = e.get('ts', '')
try:
from datetime import timezone
ev_dt = datetime.fromisoformat(
ts_str.replace('Z', '+00:00'))
if ev_dt < since_dt:
continue
except Exception:
pass
dst_ip = e.get('dst_ip', '')
dst_port = str(e.get('dst_port', ''))
proto = e.get('proto', '')
b_orig = e.get('bytes_orig', 0)
b_reply = e.get('bytes_reply', 0)
p_orig = e.get('packets_orig', 0)
p_reply = e.get('packets_reply', 0)
ps = peer_stats[peer]
ps['bytes_out'] += b_orig
ps['bytes_in'] += b_reply
ps['packets_out'] += p_orig
ps['packets_in'] += p_reply
ps['conn_count'] += 1
dest_key = (peer, dst_ip, dst_port, proto)
dest_stats[dest_key]['bytes_orig'] += b_orig
dest_stats[dest_key]['bytes_reply'] += b_reply
dest_stats[dest_key]['count'] += 1
except Exception:
pass
except Exception:
pass
# Output peer summaries
for peer, ps in sorted(peer_stats.items()):
print(f"peer|{peer}|{ps['bytes_in']}|{ps['bytes_out']}|"
f"{ps['packets_in']}|{ps['packets_out']}|{ps['conn_count']}")
# Output top 5 destinations per peer sorted by byte count
dest_items = sorted(
dest_stats.items(),
key=lambda x: (x[0][0], -(x[1]['bytes_orig'] + x[1]['bytes_reply']))
)
for peer, group in groupby(dest_items, key=lambda x: x[0][0]):
top = list(group)[:20]
for (p, dst_ip, dst_port, proto), stats in top:
print(f"dest|{p}|{dst_ip}|{dst_port}|{proto}|"
f"{stats['bytes_orig']}|{stats['bytes_reply']}|{stats['count']}")

View file

@ -0,0 +1,21 @@
[Unit]
Description=wgctl conntrack accept logging daemon
After=network.target wg-quick@wg0.service
Requires=wg-quick@wg0.service
[Service]
Type=simple
ExecStart=/etc/wireguard/wgctl/daemon/wgctl-conntrack/wgctl-conntrack \
--wg-dir /etc/wireguard
Restart=on-failure
RestartSec=5s
StandardOutput=journal
StandardError=journal
SyslogIdentifier=wgctl-conntrack
# Needs CAP_NET_ADMIN for netlink conntrack
AmbientCapabilities=CAP_NET_ADMIN
CapabilityBoundingSet=CAP_NET_ADMIN
[Install]
WantedBy=multi-user.target

View file

@ -13,18 +13,15 @@ function ui::activity::peer_row() {
# ui::activity::service_row <dest_display> <drop_count> <drop_word> <drops_col> <w_drops> # ui::activity::service_row <dest_display> <drop_count> <drop_word> <drops_col> <w_drops>
function ui::activity::service_row() { function ui::activity::service_row() {
local dest_display="${1:-}" drop_count="${2:-0}" drop_word="${3:-drops}" \ local dest_display="${1:-}" drop_count="${2:-0}" drop_word="${3:-drops}" \
drops_col="${4:-30}" w_drops="${5:-1}" drops_col="${4:-30}" w_count="${5:-1}"
# Align drop count with peer drop column
# Service row visible prefix: " → " (6 visible) + ${#dest_display}
# But "→" is 3 bytes, 1 visible — arrow_prefix bytes = 8, visible = 6
local arrow_prefix=" → " local arrow_prefix=" → "
local prefix_bytes=${#arrow_prefix} # 8 bytes due to → being 3 bytes local prefix_bytes=${#arrow_prefix}
local prefix_len=$(( prefix_bytes + ${#dest_display} )) local prefix_len=$(( prefix_bytes + ${#dest_display} ))
local pad_n=$(( drops_col - prefix_len )) local pad_n=$(( drops_col - prefix_len ))
[[ $pad_n -lt 1 ]] && pad_n=1 [[ $pad_n -lt 1 ]] && pad_n=1
printf " \033[2m→\033[0m %s%*s %${w_drops}s %s\n" \ printf " \033[0;31m→ %s%*s %${w_count}s %s\033[0m\n" \
"$dest_display" "$pad_n" "" "$drop_count" "$drop_word" "$dest_display" "$pad_n" "" "$drop_count" "$drop_word"
} }
@ -44,4 +41,45 @@ function ui::activity::peer_row_table() {
function ui::activity::service_row_table() { function ui::activity::service_row_table() {
local dest_display="${1:-}" drop_count="${2:-0}" drop_word="${3:-drops}" local dest_display="${1:-}" drop_count="${2:-0}" drop_word="${3:-drops}"
printf " → %-30s %s %s\n" "$dest_display" "$drop_count" "$drop_word" printf " → %-30s %s %s\n" "$dest_display" "$drop_count" "$drop_word"
}
function ui::activity::accept_row() {
local name_pad="${1:-}" bytes_in="${2:-}" bytes_out="${3:-}" \
conns="${4:-0}" w_count="${5:-4}"
local conn_word="conns"
[[ "$conns" -eq 1 ]] && conn_word="conn"
local spaces
spaces=$(printf '%*s' "${#name_pad}" '')
printf " \033[0;32m%s ↓%-10s ↑%-10s %${w_count}s %s\033[0m\n" \
"$spaces" "$bytes_in" "$bytes_out" "$conns" "$conn_word"
}
function ui::activity::accept_dest_row() {
local dest="${1:-}" bytes_orig="${2:-0}" bytes_reply="${3:-0}" \
count="${4:-0}" drops_col="${5:-40}" w_count="${6:-4}"
local conn_word="conns"
[[ "$count" -eq 1 ]] && conn_word="conn"
local arrow_prefix=" → "
local prefix_bytes=${#arrow_prefix}
local prefix_len=$(( prefix_bytes + ${#dest} ))
local pad_n=$(( drops_col - prefix_len ))
[[ $pad_n -lt 1 ]] && pad_n=1
# Only show bytes if non-zero
local bytes_display=""
if [[ "$bytes_orig" -gt 0 || "$bytes_reply" -gt 0 ]]; then
local bytes_display=" "
[[ "$bytes_orig" -gt 0 ]] && bytes_display+="$(fmt::bytes "$bytes_orig") "
[[ "$bytes_reply" -gt 0 ]] && bytes_display+="$(fmt::bytes "$bytes_reply")"
bytes_display="${bytes_display% }" # trim trailing space
fi
printf " \033[0;32m→\033[0m \033[0;32m%s%*s %${w_count}s %-5s%s\033[0m\n" \
"$dest" "$pad_n" "" "$count" "$conn_word" "$bytes_display"
} }