diff --git a/commands/activity.command.sh b/commands/activity.command.sh index 54e9002..a0c3380 100644 --- a/commands/activity.command.sh +++ b/commands/activity.command.sh @@ -13,7 +13,9 @@ function cmd::activity::on_load() { flag::register --ip flag::register --hours flag::register --type - flag::register --dropped + flag::register --accept + flag::register --drop + flag::register --external command::mixin json_output } @@ -35,11 +37,12 @@ Options: --ip Filter by destination IP --hours Time window in hours (default: 24, 0 = all time) --type Filter by device type (combined with --peer) - --dropped Show only peers with at least one drop + --accept Show only accepted traffic (from conntrack) + --drop Show only firewall drops + --external Show only external traffic (full tunnel peers) Examples: wgctl activity - wgctl activity --dropped wgctl activity --peer phone-nuno wgctl activity --service truenas wgctl activity --hours 0 @@ -53,17 +56,20 @@ EOF function cmd::activity::run() { local filter_peer="" filter_service="" filter_ip="" filter_type="" - local hours=24 dropped_only=false + local hours=24 + local accept_only=false drop_only=false external_only=false while [[ $# -gt 0 ]]; do case "$1" in - --peer) filter_peer="$2"; shift 2 ;; - --service) filter_service="$2"; shift 2 ;; - --ip) filter_ip="$2"; shift 2 ;; - --type) filter_type="$2"; shift 2 ;; - --hours) hours="$2"; shift 2 ;; - --dropped) dropped_only=true; shift ;; - --help) cmd::activity::help; return ;; + --peer) filter_peer="$2"; shift 2 ;; + --service) filter_service="$2"; shift 2 ;; + --ip) filter_ip="$2"; shift 2 ;; + --type) filter_type="$2"; shift 2 ;; + --hours) hours="$2"; shift 2 ;; + --accept) accept_only=true; shift ;; + --drop) drop_only=true; shift ;; + --external) external_only=true; shift ;; + --help) cmd::activity::help; return ;; *) log::error "Unknown flag: $1" cmd::activity::help @@ -77,42 +83,67 @@ function cmd::activity::run() { return 0 fi - # Resolve peer name if type provided if [[ -n "$filter_peer" && -n "$filter_type" ]]; then filter_peer=$(peers::resolve_and_require "$filter_peer" "$filter_type") || return 1 fi - # Resolve --service to IP local service_ip="" if [[ -n "$filter_service" ]]; then service_ip=$(net::resolve "$filter_service" 2>/dev/null | head -1 | cut -d: -f1) || true - if [[ -z "$service_ip" ]]; then - log::error "Service not found: ${filter_service}" - return 1 - fi + [[ -z "$service_ip" ]] && log::error "Service not found: ${filter_service}" && return 1 fi [[ -n "$filter_ip" ]] && service_ip="$filter_ip" - # Fetch aggregated data - local data - data=$(json::activity_aggregate \ - "$(ctx::fw_events_log)" \ - "$(ctx::events_log)" \ - "$(config::interface)" \ - "$(ctx::net)" \ - "$(ctx::clients)" \ - "$(ctx::meta)" \ - "$hours" \ - "$filter_peer" \ - "$service_ip" 2>/dev/null) - - if [[ -z "$data" ]]; then - log::wg_warning "No activity data found" - return 0 + # ── Fetch data ── + local data="" + if ! $accept_only; then + data=$(json::activity_aggregate \ + "$(ctx::fw_events_log)" "$(ctx::events_log)" \ + "$(config::interface)" "$(ctx::net)" \ + "$(ctx::clients)" "$(ctx::meta)" \ + "$hours" "$filter_peer" "$service_ip" 2>/dev/null) fi - # Measure column widths - local w_peer=16 w_drops=1 + local accept_data="" + if ! $drop_only; then + local since_arg="" ext_flag="0" + [[ "$hours" -gt 0 ]] && since_arg="${hours}h" + $external_only && ext_flag="1" + [[ -f "$(ctx::accept_events_log)" ]] && \ + accept_data=$(json::accept_aggregate \ + "$(ctx::accept_events_log)" "$(ctx::net)" "$(ctx::clients)" \ + "$since_arg" "$filter_peer" "$ext_flag" 2>/dev/null) + fi + + [[ -z "$data" && -z "$accept_data" ]] && \ + log::wg_warning "No activity data found" && return 0 + + # ── Build accept maps ── + declare -gA _ACCEPT_PEER=() + declare -gA _ACCEPT_DEST_KEYS=() + declare -gA _ACCEPT_DEST=() + + while IFS='|' read -r type rest; do + [[ -z "$type" ]] && continue + case "$type" in + peer) + local a_name a_bi a_bo a_pi a_po a_conns + IFS='|' read -r a_name a_bi a_bo a_pi a_po a_conns <<< "$rest" + _ACCEPT_PEER["$a_name"]="${a_bi}|${a_bo}|${a_pi}|${a_po}|${a_conns}" + ;; + dest) + local d_peer d_ip d_port d_proto d_bytes_orig d_bytes_reply d_count + IFS='|' read -r d_peer d_ip d_port d_proto d_bytes_orig d_bytes_reply d_count <<< "$rest" + local d_key="${d_peer}:${d_ip}:${d_port}:${d_proto}" + _ACCEPT_DEST["$d_key"]="${d_bytes_orig}|${d_bytes_reply}|${d_count}" + _ACCEPT_DEST_KEYS["$d_peer"]+="${d_key} " + ;; + esac + done <<< "$accept_data" + + # ── Measure column widths ── + local w_peer=16 w_count=1 + while IFS='|' read -r type rest; do case "$type" in peer) @@ -120,83 +151,166 @@ function cmd::activity::run() { name=$(echo "$rest" | cut -d'|' -f1) drops=$(echo "$rest" | cut -d'|' -f4) (( ${#name} > w_peer )) && w_peer=${#name} - (( ${#drops} > w_drops )) && w_drops=${#drops} + (( ${#drops} > w_count )) && w_count=${#drops} ;; service) - local count - count=$(echo "$rest" | cut -d'|' -f3) - (( ${#count} > w_drops )) && w_drops=${#count} + local svc_count + svc_count=$(echo "$rest" | cut -d'|' -f3) + (( ${#svc_count} > w_count )) && w_count=${#svc_count} ;; esac done <<< "$data" - + + for a_name in "${!_ACCEPT_PEER[@]}"; do + (( ${#a_name} > w_peer )) && w_peer=${#a_name} + local a_conns_val="${_ACCEPT_PEER[$a_name]##*|}" + (( ${#a_conns_val} > w_count )) && w_count=${#a_conns_val} + done + for key in "${!_ACCEPT_DEST[@]}"; do + local d_val="${_ACCEPT_DEST[$key]}" + local d_count_val="${d_val##*|}" + (( ${#d_count_val} > w_count )) && w_count=${#d_count_val} + done + (( w_peer += 2 )) - - # Compute column where drop count starts on peer row: - # " " (2) + name (w_peer) + " ↓" (3) + rx (10) + " ↑" (3) + tx (10) + " " (2) - # ↓ and ↑ are multi-byte (3 bytes, 1 visible) — 2 extra bytes each - # Visible: 2 + w_peer + 2+1 + 10 + 2+1 + 10 + 2 = w_peer + 30 local drops_col=$(( w_peer + 30 )) - + local hours_display="${hours}h" [[ "$hours" == "0" ]] && hours_display="all time" - + log::section "Activity Monitor (last ${hours_display})" echo "" - + if display::is_table "activity"; then cmd::activity::_render_table "$data" return 0 fi - local first_peer=true skip_peer=false - + # ── Accept dest inline renderer ── + _render_peer_accept_dests() { + local peer_name="$1" + local keys="${_ACCEPT_DEST_KEYS[$peer_name]:-}" + [[ -z "$keys" ]] && return 0 + for d_key in $keys; do + local dest_stats="${_ACCEPT_DEST[$d_key]:-}" + [[ -z "$dest_stats" ]] && continue + local d_bytes_orig d_bytes_reply d_count + IFS='|' read -r d_bytes_orig d_bytes_reply d_count <<< "$dest_stats" + local rest_key="${d_key#${peer_name}:}" + local d_ip="${rest_key%%:*}" + local pp="${rest_key#*:}" + local d_port="${pp%%:*}" + local d_proto="${pp##*:}" + local dest_display + dest_display=$(resolve::dest "$d_ip" "$d_port" "$d_proto" 2>/dev/null \ + || echo "${d_ip}:${d_port}/${d_proto}") + ui::activity::accept_dest_row \ + "$dest_display" "$d_bytes_orig" "$d_bytes_reply" \ + "$d_count" "$drops_col" "$w_count" + done + } + + local first_peer=true skip_peer=false current_name="" + local -a rendered_peers=() + + # ── Main render loop (drop data) ── while IFS='|' read -r record_type rest; do case "$record_type" in peer) local name rx tx drops IFS='|' read -r name rx tx drops <<< "$rest" - + + # Flush previous peer's accept dests + [[ -n "$current_name" ]] && ! $drop_only && \ + _render_peer_accept_dests "$current_name" + skip_peer=false - if $dropped_only && [[ "$drops" -eq 0 ]]; then - skip_peer=true - continue - fi - + current_name="$name" + local has_accept="${_ACCEPT_PEER[$name]:-}" + $first_peer || echo "" first_peer=false - + rendered_peers+=("$name") + local rx_fmt tx_fmt rx_fmt=$(fmt::bytes "$rx") tx_fmt=$(fmt::bytes "$tx") - local name_pad rx_pad tx_pad name_pad=$(printf "%-${w_peer}s" "$name") rx_pad=$(printf "%-10s" "$rx_fmt") tx_pad=$(printf "%-10s" "$tx_fmt") - + local drop_word="drops" [[ "$drops" -eq 1 ]] && drop_word="drop" - - ui::activity::peer_row \ - "$name_pad" "$rx_pad" "$tx_pad" "$drops" "$drop_word" "$w_drops" + + # Always show peer name — either full row or name-only for accept_only + if $accept_only; then + printf " \033[1m%s\033[0m\n" "$name_pad" + else + ui::activity::peer_row \ + "$name_pad" "$rx_pad" "$tx_pad" "$drops" "$drop_word" "$w_count" + fi + + # Accept summary row + if [[ -n "$has_accept" ]] && ! $drop_only; then + local a_bi a_bo a_pi a_po a_conns + IFS='|' read -r a_bi a_bo a_pi a_po a_conns <<< "$has_accept" + ui::activity::accept_row \ + "$name_pad" \ + "$(printf '%-10s' "$(fmt::bytes "$a_bi")")" \ + "$(printf '%-10s' "$(fmt::bytes "$a_bo")")" \ + "$a_conns" "$w_count" + fi ;; - + service) $skip_peer && continue - + $accept_only && continue local peer dest_display drop_count IFS='|' read -r peer dest_display drop_count <<< "$rest" - local svc_drop_word="drops" [[ "$drop_count" -eq 1 ]] && svc_drop_word="drop" - ui::activity::service_row \ - "$dest_display" "$drop_count" "$svc_drop_word" "$drops_col" "$w_drops" + "$dest_display" "$drop_count" "$svc_drop_word" "$drops_col" "$w_count" ;; esac done <<< "$data" - + + # Flush last peer's accept dests + [[ -n "$current_name" ]] && ! $drop_only && \ + _render_peer_accept_dests "$current_name" + + # ── Accept-only peers (not in drop data) ── + if ! $drop_only; then + for a_name in $(echo "${!_ACCEPT_PEER[@]}" | tr ' ' '\n' | sort); do + # Skip already rendered + local already=false + for rp in "${rendered_peers[@]:-}"; do + [[ "$rp" == "$a_name" ]] && already=true && break + done + $already && continue + + $first_peer || echo "" + first_peer=false + + local a_stats="${_ACCEPT_PEER[$a_name]}" + local a_bi a_bo a_pi a_po a_conns + IFS='|' read -r a_bi a_bo a_pi a_po a_conns <<< "$a_stats" + local name_pad + name_pad=$(printf "%-${w_peer}s" "$a_name") + + # Always show peer name + printf " \033[1m%s\033[0m\n" "$name_pad" + + ui::activity::accept_row \ + "$name_pad" \ + "$(printf '%-10s' "$(fmt::bytes "$a_bi")")" \ + "$(printf '%-10s' "$(fmt::bytes "$a_bo")")" \ + "$a_conns" "$w_count" + _render_peer_accept_dests "$a_name" + done + fi + echo "" } @@ -276,3 +390,48 @@ function cmd::activity::_output_json() { array=$(printf '%s\n' "${peers[@]:-}" | paste -sd ',' -) printf '{"peers":[%s]}' "${array:-}" | json::envelope "activity" "$count" } + +function cmd::activity::_fetch_accept_data() { + local hours="${1:-24}" filter_peer="${2:-}" external_only="${3:-false}" + + [[ ! -f "$(ctx::accept_events_log)" ]] && return 0 + + local since_arg="" + [[ "$hours" -gt 0 ]] && since_arg="${hours}h" + + local ext_flag="0" + $external_only && ext_flag="1" + + json::accept_aggregate \ + "$(ctx::accept_events_log)" \ + "$(ctx::net)" \ + "$(ctx::clients)" \ + "$since_arg" \ + "$filter_peer" \ + 2>/dev/null +} + +function cmd::activity::_build_accept_maps() { + local accept_data="${1:-}" + # Outputs to stdout as bash declare statements — use eval + # Sets: _ACCEPT_PEER[name]="bytes_in|bytes_out|packets_in|packets_out|conn_count" + # _ACCEPT_DEST[name:ip:port:proto]="bytes|count" + declare -gA _ACCEPT_PEER=() + declare -gA _ACCEPT_DEST=() + + while IFS='|' read -r type rest; do + [[ -z "$type" ]] && continue + case "$type" in + peer) + local name bytes_in bytes_out packets_in packets_out conn_count + IFS='|' read -r name bytes_in bytes_out packets_in packets_out conn_count <<< "$rest" + _ACCEPT_PEER["$name"]="${bytes_in}|${bytes_out}|${packets_in}|${packets_out}|${conn_count}" + ;; + dest) + local peer dst_ip dst_port proto bytes count + IFS='|' read -r peer dst_ip dst_port proto bytes count <<< "$rest" + _ACCEPT_DEST["${peer}:${dst_ip}:${dst_port}:${proto}"]="${bytes}|${count}" + ;; + esac + done <<< "$accept_data" +} \ No newline at end of file diff --git a/core/context.sh b/core/context.sh index d3527dd..6a7afaf 100644 --- a/core/context.sh +++ b/core/context.sh @@ -44,49 +44,50 @@ _CTX_CONFIG_FILE="${_CTX_CONFIG}/wgctl.json" # Accessors # ============================================ -function ctx::root() { echo "$_CTX_ROOT"; } -function ctx::core() { echo "$_CTX_CORE"; } -function ctx::modules() { echo "$_CTX_MODULES"; } -function ctx::commands() { echo "$_CTX_COMMANDS"; } -function ctx::wg() { echo "$_CTX_WG"; } -function ctx::clients() { echo "$_CTX_CLIENTS"; } +function ctx::root() { echo "$_CTX_ROOT"; } +function ctx::core() { echo "$_CTX_CORE"; } +function ctx::modules() { echo "$_CTX_MODULES"; } +function ctx::commands() { echo "$_CTX_COMMANDS"; } +function ctx::wg() { echo "$_CTX_WG"; } +function ctx::clients() { echo "$_CTX_CLIENTS"; } # Top-level dirs -function ctx::wgctl() { echo "$_CTX_WGCTL"; } -function ctx::config() { echo "$_CTX_CONFIG"; } -function ctx::data() { echo "$_CTX_DATA"; } -function ctx::daemon() { echo "$_CTX_DAEMON"; } +function ctx::wgctl() { echo "$_CTX_WGCTL"; } +function ctx::config() { echo "$_CTX_CONFIG"; } +function ctx::data() { echo "$_CTX_DATA"; } +function ctx::daemon() { echo "$_CTX_DAEMON"; } # Data subdirs -function ctx::rules() { echo "$_CTX_RULES"; } -function ctx::rules::base() { echo "$_CTX_RULES_BASE"; } -function ctx::groups() { echo "$_CTX_GROUPS"; } -function ctx::blocks() { echo "$_CTX_BLOCKS"; } -function ctx::meta() { echo "$_CTX_META"; } -function ctx::identities() { echo "$_CTX_IDENTITY"; } -function ctx::peer_history() { echo "$_CTX_PEER_HISTORY"; } +function ctx::rules() { echo "$_CTX_RULES"; } +function ctx::rules::base() { echo "$_CTX_RULES_BASE"; } +function ctx::groups() { echo "$_CTX_GROUPS"; } +function ctx::blocks() { echo "$_CTX_BLOCKS"; } +function ctx::meta() { echo "$_CTX_META"; } +function ctx::identities() { echo "$_CTX_IDENTITY"; } +function ctx::peer_history() { echo "$_CTX_PEER_HISTORY"; } # Data files -function ctx::net() { echo "$_CTX_NET"; } -function ctx::hosts() { echo "$_CTX_HOSTS"; } -function ctx::subnets() { echo "$_CTX_SUBNETS"; } -function ctx::policies() { echo "$_CTX_POLICIES"; } +function ctx::net() { echo "$_CTX_NET"; } +function ctx::hosts() { echo "$_CTX_HOSTS"; } +function ctx::subnets() { echo "$_CTX_SUBNETS"; } +function ctx::policies() { echo "$_CTX_POLICIES"; } # Config files -function ctx::config_file() { echo "$_CTX_CONFIG_FILE"; } -function ctx::display() { echo "${_CTX_CONFIG}/display.json"; } +function ctx::config_file() { echo "$_CTX_CONFIG_FILE"; } +function ctx::display() { echo "${_CTX_CONFIG}/display.json"; } # Daemon files -function ctx::events_log() { echo "${_CTX_DAEMON}/events.log"; } -function ctx::fw_events_log() { echo "${_CTX_DAEMON}/fw_events.log"; } -function ctx::endpoint_cache() { echo "${_CTX_DAEMON}/endpoint_cache.json"; } +function ctx::events_log() { echo "${_CTX_DAEMON}/events.log"; } +function ctx::fw_events_log() { echo "${_CTX_DAEMON}/fw_events.log"; } +function ctx::endpoint_cache() { echo "${_CTX_DAEMON}/endpoint_cache.json"; } +function ctx::accept_events_log() { echo "${_CTX_DAEMON}/accept_events.log"; } # Tool paths -function ctx::json_helper() { echo "${_CTX_CORE}/json_helper.py"; } -function ctx::monitor_script() { echo "${_CTX_ROOT}/daemon/wgctl-monitor.py"; } -function ctx::lib() { echo "${_CTX_CORE}/lib"; } +function ctx::json_helper() { echo "${_CTX_CORE}/json_helper.py"; } +function ctx::monitor_script() { echo "${_CTX_ROOT}/daemon/wgctl-monitor.py"; } +function ctx::lib() { echo "${_CTX_CORE}/lib"; } -function ctx::block_history() { echo "${_CTX_DATA}/block-history"; } +function ctx::block_history() { echo "${_CTX_DATA}/block-history"; } # ============================================ # Path Helpers diff --git a/core/json.sh b/core/json.sh index 01bba77..c730605 100644 --- a/core/json.sh +++ b/core/json.sh @@ -154,6 +154,10 @@ function json::block_history_list_all() { python3 "$JSON_HELPER" block_history function json::endpoint_cache_get() { python3 "$JSON_HELPER" endpoint_cache_get "$@" 4 else '100', + args[5] if len(args) > 5 else '1', + args[6] if len(args) > 6 else '', + args[7] if len(args) > 7 else '0', + args[8] if len(args) > 8 else 'desc'), + 'accept_aggregate': lambda args: __import__('lib.accept_events', + fromlist=['accept_aggregate']).accept_aggregate( + args[0], args[1], args[2], + args[3] if len(args) > 3 else '', + args[4] if len(args) > 4 else '', + args[5] if len(args) > 5 else '0'), } # ── Main ───────────────────────────────────────────────────────────────────── diff --git a/core/lib/__pycache__/accept_events.cpython-311.pyc b/core/lib/__pycache__/accept_events.cpython-311.pyc new file mode 100644 index 0000000..12b234c Binary files /dev/null and b/core/lib/__pycache__/accept_events.cpython-311.pyc differ diff --git a/core/lib/__pycache__/block_history.cpython-311.pyc b/core/lib/__pycache__/block_history.cpython-311.pyc index 64bbfe7..14dcd3d 100644 Binary files a/core/lib/__pycache__/block_history.cpython-311.pyc and b/core/lib/__pycache__/block_history.cpython-311.pyc differ diff --git a/core/lib/accept_events.py b/core/lib/accept_events.py new file mode 100644 index 0000000..2e31e0a --- /dev/null +++ b/core/lib/accept_events.py @@ -0,0 +1,228 @@ +""" +accept_events.py — conntrack accept event processing. + +Reads accept_events.log written by wgctl-conntrack daemon. +Each line is a JSON object with fields: + ts, peer, src_ip, dst_ip, dst_port, proto, + bytes_orig, bytes_reply, packets_orig, packets_reply, + duration_sec, service, event, external +""" + +import os +import json +from collections import defaultdict +from datetime import datetime + +from lib.util import ( + DATETIME_FMT, + load_net_data, load_hosts_data, + reverse_lookup, hosts_lookup, + fmt_ts, fmt_ts_hour, ts_to_unix, parse_since, +) + + +def accept_events(file, filter_peer, filter_type, net_file, + limit, collapse='1', since='', filter_external='0', + sort_order='desc'): + """ + Format accept events with optional aggregation. + + Output per line (collapse=1): + ts|peer|dst_ip|dst_port|proto|bytes_total|packets_total|count|duration_avg + + Output per line (collapse=0): + ts|peer|dst_ip|dst_port|proto|bytes_orig|bytes_reply|packets_orig|packets_reply|duration_sec + """ + do_collapse = str(collapse) != '0' + external_only = str(filter_external) == '1' + limit = int(limit) if limit else 100 + since_dt = parse_since(since) if since else None + descending = sort_order != 'asc' + + events = [] + try: + with open(file) as f: + for line in f: + try: + e = json.loads(line.strip()) + if not e.get('peer'): + continue + if filter_peer and e.get('peer') != filter_peer: + continue + if filter_type and not e.get('peer', '').startswith(filter_type + '-'): + continue + if external_only and not e.get('external', False): + continue + if not external_only and e.get('external', False): + continue + if since_dt: + ts_str = e.get('ts', '') + try: + from datetime import timezone + ev_dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) + if ev_dt < since_dt: + continue + except Exception: + pass + events.append(e) + except Exception: + pass + except Exception: + pass + + if do_collapse: + # Aggregate by peer + dst_ip + dst_port + proto + hour + buckets = defaultdict(lambda: {'count': 0, 'bytes': 0, 'packets': 0, 'duration': 0.0}) + bucket_ts = {} + + for e in events: + ts_str = e.get('ts', '') + peer = e.get('peer', '') + dst_ip = e.get('dst_ip', '') + dst_port = str(e.get('dst_port', '')) + proto = e.get('proto', '') + try: + dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) + hour_key = (peer, dst_ip, dst_port, proto, dt.strftime('%Y-%m-%d %H')) + except Exception: + continue + + b = buckets[hour_key] + b['count'] += 1 + b['bytes'] += e.get('bytes_orig', 0) + e.get('bytes_reply', 0) + b['packets'] += e.get('packets_orig', 0) + e.get('packets_reply', 0) + b['duration'] += e.get('duration_sec', 0.0) + + if hour_key not in bucket_ts: + bucket_ts[hour_key] = dt + + # Sort and limit + sorted_buckets = sorted(bucket_ts.items(), key=lambda x: x[1]) + output = sorted_buckets[-limit:] + if descending: + output = list(reversed(output)) + + for hour_key, dt in output: + peer, dst_ip, dst_port, proto, _ = hour_key + b = buckets[hour_key] + ts_fmt = fmt_ts_hour(dt.isoformat()) + dur_avg = b['duration'] / b['count'] if b['count'] > 0 else 0.0 + print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b['bytes']}|{b['packets']}|{b['count']}|{dur_avg:.1f}") + + else: + # Detailed — one row per event + result = [(ts_to_unix(e.get('ts', '')), e) for e in events] + result = result[-limit:] + if descending: + result.reverse() + + for _, e in result: + ts_fmt = fmt_ts(e.get('ts', '')) + peer = e.get('peer', '') + dst_ip = e.get('dst_ip', '') + dst_port = str(e.get('dst_port', '')) + proto = e.get('proto', '') + b_orig = e.get('bytes_orig', 0) + b_reply = e.get('bytes_reply', 0) + p_orig = e.get('packets_orig', 0) + p_reply = e.get('packets_reply', 0) + dur = e.get('duration_sec', 0.0) + print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b_orig}|{b_reply}|{p_orig}|{p_reply}|{dur:.1f}") + + +def accept_aggregate(file, net_file, clients_dir, since='', + filter_peer='', external_only='0'): + """ + Aggregate accept events per peer — total bytes, packets, top destinations. + Used by wgctl activity to show accepted traffic alongside drops. + + external_only='1': only show traffic to external IPs (non-private) + external_only='0': only show traffic to internal IPs (default) + + Output: + peer|peer_name|bytes_in|bytes_out|packets_in|packets_out|conn_count + dest|peer_name|dst_ip|dst_port|proto|bytes_total|conn_count + """ + from collections import defaultdict + from itertools import groupby + + since_dt = parse_since(since) if since else None + show_external = str(external_only) == '1' + + peer_stats = defaultdict(lambda: { + 'bytes_in': 0, 'bytes_out': 0, + 'packets_in': 0, 'packets_out': 0, + 'conn_count': 0 + }) + # dest_stats = defaultdict(lambda: {'bytes': 0, 'count': 0}) + dest_stats = defaultdict(lambda: {'bytes_orig': 0, 'bytes_reply': 0, 'count': 0}) + + try: + with open(file) as f: + for line in f: + try: + e = json.loads(line.strip()) + peer = e.get('peer', '') + if not peer: + continue + if filter_peer and peer != filter_peer: + continue + + # Filter by external/internal + is_external = e.get('external', False) + if show_external and not is_external: + continue + if not show_external and is_external: + continue + + if since_dt: + ts_str = e.get('ts', '') + try: + from datetime import timezone + ev_dt = datetime.fromisoformat( + ts_str.replace('Z', '+00:00')) + if ev_dt < since_dt: + continue + except Exception: + pass + + dst_ip = e.get('dst_ip', '') + dst_port = str(e.get('dst_port', '')) + proto = e.get('proto', '') + b_orig = e.get('bytes_orig', 0) + b_reply = e.get('bytes_reply', 0) + p_orig = e.get('packets_orig', 0) + p_reply = e.get('packets_reply', 0) + + ps = peer_stats[peer] + ps['bytes_out'] += b_orig + ps['bytes_in'] += b_reply + ps['packets_out'] += p_orig + ps['packets_in'] += p_reply + ps['conn_count'] += 1 + + dest_key = (peer, dst_ip, dst_port, proto) + dest_stats[dest_key]['bytes_orig'] += b_orig + dest_stats[dest_key]['bytes_reply'] += b_reply + dest_stats[dest_key]['count'] += 1 + + except Exception: + pass + except Exception: + pass + + # Output peer summaries + for peer, ps in sorted(peer_stats.items()): + print(f"peer|{peer}|{ps['bytes_in']}|{ps['bytes_out']}|" + f"{ps['packets_in']}|{ps['packets_out']}|{ps['conn_count']}") + + # Output top 5 destinations per peer sorted by byte count + dest_items = sorted( + dest_stats.items(), + key=lambda x: (x[0][0], -(x[1]['bytes_orig'] + x[1]['bytes_reply'])) + ) + for peer, group in groupby(dest_items, key=lambda x: x[0][0]): + top = list(group)[:20] + for (p, dst_ip, dst_port, proto), stats in top: + print(f"dest|{p}|{dst_ip}|{dst_port}|{proto}|" + f"{stats['bytes_orig']}|{stats['bytes_reply']}|{stats['count']}") \ No newline at end of file diff --git a/daemon/wgctl-conntrack/wgctl-conntrack.service b/daemon/wgctl-conntrack/wgctl-conntrack.service new file mode 100644 index 0000000..74dccaf --- /dev/null +++ b/daemon/wgctl-conntrack/wgctl-conntrack.service @@ -0,0 +1,21 @@ +[Unit] +Description=wgctl conntrack accept logging daemon +After=network.target wg-quick@wg0.service +Requires=wg-quick@wg0.service + +[Service] +Type=simple +ExecStart=/etc/wireguard/wgctl/daemon/wgctl-conntrack/wgctl-conntrack \ + --wg-dir /etc/wireguard +Restart=on-failure +RestartSec=5s +StandardOutput=journal +StandardError=journal +SyslogIdentifier=wgctl-conntrack + +# Needs CAP_NET_ADMIN for netlink conntrack +AmbientCapabilities=CAP_NET_ADMIN +CapabilityBoundingSet=CAP_NET_ADMIN + +[Install] +WantedBy=multi-user.target diff --git a/modules/ui/activity.module.sh b/modules/ui/activity.module.sh index f83d000..5169ca8 100644 --- a/modules/ui/activity.module.sh +++ b/modules/ui/activity.module.sh @@ -13,18 +13,15 @@ function ui::activity::peer_row() { # ui::activity::service_row function ui::activity::service_row() { local dest_display="${1:-}" drop_count="${2:-0}" drop_word="${3:-drops}" \ - drops_col="${4:-30}" w_drops="${5:-1}" + drops_col="${4:-30}" w_count="${5:-1}" - # Align drop count with peer drop column - # Service row visible prefix: " → " (6 visible) + ${#dest_display} - # But "→" is 3 bytes, 1 visible — arrow_prefix bytes = 8, visible = 6 local arrow_prefix=" → " - local prefix_bytes=${#arrow_prefix} # 8 bytes due to → being 3 bytes + local prefix_bytes=${#arrow_prefix} local prefix_len=$(( prefix_bytes + ${#dest_display} )) local pad_n=$(( drops_col - prefix_len )) [[ $pad_n -lt 1 ]] && pad_n=1 - printf " \033[2m→\033[0m %s%*s %${w_drops}s %s\n" \ + printf " \033[0;31m→ %s%*s %${w_count}s %s\033[0m\n" \ "$dest_display" "$pad_n" "" "$drop_count" "$drop_word" } @@ -44,4 +41,45 @@ function ui::activity::peer_row_table() { function ui::activity::service_row_table() { local dest_display="${1:-}" drop_count="${2:-0}" drop_word="${3:-drops}" printf " → %-30s %s %s\n" "$dest_display" "$drop_count" "$drop_word" +} + +function ui::activity::accept_row() { + local name_pad="${1:-}" bytes_in="${2:-}" bytes_out="${3:-}" \ + conns="${4:-0}" w_count="${5:-4}" + + local conn_word="conns" + [[ "$conns" -eq 1 ]] && conn_word="conn" + + local spaces + spaces=$(printf '%*s' "${#name_pad}" '') + + printf " \033[0;32m%s ↓%-10s ↑%-10s %${w_count}s %s\033[0m\n" \ + "$spaces" "$bytes_in" "$bytes_out" "$conns" "$conn_word" +} + + +function ui::activity::accept_dest_row() { + local dest="${1:-}" bytes_orig="${2:-0}" bytes_reply="${3:-0}" \ + count="${4:-0}" drops_col="${5:-40}" w_count="${6:-4}" + + local conn_word="conns" + [[ "$count" -eq 1 ]] && conn_word="conn" + + local arrow_prefix=" → " + local prefix_bytes=${#arrow_prefix} + local prefix_len=$(( prefix_bytes + ${#dest} )) + local pad_n=$(( drops_col - prefix_len )) + [[ $pad_n -lt 1 ]] && pad_n=1 + + # Only show bytes if non-zero + local bytes_display="" + if [[ "$bytes_orig" -gt 0 || "$bytes_reply" -gt 0 ]]; then + local bytes_display=" " + [[ "$bytes_orig" -gt 0 ]] && bytes_display+="↓$(fmt::bytes "$bytes_orig") " + [[ "$bytes_reply" -gt 0 ]] && bytes_display+="↑$(fmt::bytes "$bytes_reply")" + bytes_display="${bytes_display% }" # trim trailing space + fi + + printf " \033[0;32m→\033[0m \033[0;32m%s%*s %${w_count}s %-5s%s\033[0m\n" \ + "$dest" "$pad_n" "" "$count" "$conn_word" "$bytes_display" } \ No newline at end of file