diff --git a/commands/activity.command.sh b/commands/activity.command.sh index 54e9002..a0c3380 100644 --- a/commands/activity.command.sh +++ b/commands/activity.command.sh @@ -13,7 +13,9 @@ function cmd::activity::on_load() { flag::register --ip flag::register --hours flag::register --type - flag::register --dropped + flag::register --accept + flag::register --drop + flag::register --external command::mixin json_output } @@ -35,11 +37,12 @@ Options: --ip Filter by destination IP --hours Time window in hours (default: 24, 0 = all time) --type Filter by device type (combined with --peer) - --dropped Show only peers with at least one drop + --accept Show only accepted traffic (from conntrack) + --drop Show only firewall drops + --external Show only external traffic (full tunnel peers) Examples: wgctl activity - wgctl activity --dropped wgctl activity --peer phone-nuno wgctl activity --service truenas wgctl activity --hours 0 @@ -53,17 +56,20 @@ EOF function cmd::activity::run() { local filter_peer="" filter_service="" filter_ip="" filter_type="" - local hours=24 dropped_only=false + local hours=24 + local accept_only=false drop_only=false external_only=false while [[ $# -gt 0 ]]; do case "$1" in - --peer) filter_peer="$2"; shift 2 ;; - --service) filter_service="$2"; shift 2 ;; - --ip) filter_ip="$2"; shift 2 ;; - --type) filter_type="$2"; shift 2 ;; - --hours) hours="$2"; shift 2 ;; - --dropped) dropped_only=true; shift ;; - --help) cmd::activity::help; return ;; + --peer) filter_peer="$2"; shift 2 ;; + --service) filter_service="$2"; shift 2 ;; + --ip) filter_ip="$2"; shift 2 ;; + --type) filter_type="$2"; shift 2 ;; + --hours) hours="$2"; shift 2 ;; + --accept) accept_only=true; shift ;; + --drop) drop_only=true; shift ;; + --external) external_only=true; shift ;; + --help) cmd::activity::help; return ;; *) log::error "Unknown flag: $1" cmd::activity::help @@ -77,42 +83,67 @@ function cmd::activity::run() { return 0 fi - # Resolve peer name if type provided if [[ -n "$filter_peer" && -n "$filter_type" ]]; then filter_peer=$(peers::resolve_and_require "$filter_peer" "$filter_type") || return 1 fi - # Resolve --service to IP local service_ip="" if [[ -n "$filter_service" ]]; then service_ip=$(net::resolve "$filter_service" 2>/dev/null | head -1 | cut -d: -f1) || true - if [[ -z "$service_ip" ]]; then - log::error "Service not found: ${filter_service}" - return 1 - fi + [[ -z "$service_ip" ]] && log::error "Service not found: ${filter_service}" && return 1 fi [[ -n "$filter_ip" ]] && service_ip="$filter_ip" - # Fetch aggregated data - local data - data=$(json::activity_aggregate \ - "$(ctx::fw_events_log)" \ - "$(ctx::events_log)" \ - "$(config::interface)" \ - "$(ctx::net)" \ - "$(ctx::clients)" \ - "$(ctx::meta)" \ - "$hours" \ - "$filter_peer" \ - "$service_ip" 2>/dev/null) - - if [[ -z "$data" ]]; then - log::wg_warning "No activity data found" - return 0 + # ── Fetch data ── + local data="" + if ! $accept_only; then + data=$(json::activity_aggregate \ + "$(ctx::fw_events_log)" "$(ctx::events_log)" \ + "$(config::interface)" "$(ctx::net)" \ + "$(ctx::clients)" "$(ctx::meta)" \ + "$hours" "$filter_peer" "$service_ip" 2>/dev/null) fi - # Measure column widths - local w_peer=16 w_drops=1 + local accept_data="" + if ! $drop_only; then + local since_arg="" ext_flag="0" + [[ "$hours" -gt 0 ]] && since_arg="${hours}h" + $external_only && ext_flag="1" + [[ -f "$(ctx::accept_events_log)" ]] && \ + accept_data=$(json::accept_aggregate \ + "$(ctx::accept_events_log)" "$(ctx::net)" "$(ctx::clients)" \ + "$since_arg" "$filter_peer" "$ext_flag" 2>/dev/null) + fi + + [[ -z "$data" && -z "$accept_data" ]] && \ + log::wg_warning "No activity data found" && return 0 + + # ── Build accept maps ── + declare -gA _ACCEPT_PEER=() + declare -gA _ACCEPT_DEST_KEYS=() + declare -gA _ACCEPT_DEST=() + + while IFS='|' read -r type rest; do + [[ -z "$type" ]] && continue + case "$type" in + peer) + local a_name a_bi a_bo a_pi a_po a_conns + IFS='|' read -r a_name a_bi a_bo a_pi a_po a_conns <<< "$rest" + _ACCEPT_PEER["$a_name"]="${a_bi}|${a_bo}|${a_pi}|${a_po}|${a_conns}" + ;; + dest) + local d_peer d_ip d_port d_proto d_bytes_orig d_bytes_reply d_count + IFS='|' read -r d_peer d_ip d_port d_proto d_bytes_orig d_bytes_reply d_count <<< "$rest" + local d_key="${d_peer}:${d_ip}:${d_port}:${d_proto}" + _ACCEPT_DEST["$d_key"]="${d_bytes_orig}|${d_bytes_reply}|${d_count}" + _ACCEPT_DEST_KEYS["$d_peer"]+="${d_key} " + ;; + esac + done <<< "$accept_data" + + # ── Measure column widths ── + local w_peer=16 w_count=1 + while IFS='|' read -r type rest; do case "$type" in peer) @@ -120,83 +151,166 @@ function cmd::activity::run() { name=$(echo "$rest" | cut -d'|' -f1) drops=$(echo "$rest" | cut -d'|' -f4) (( ${#name} > w_peer )) && w_peer=${#name} - (( ${#drops} > w_drops )) && w_drops=${#drops} + (( ${#drops} > w_count )) && w_count=${#drops} ;; service) - local count - count=$(echo "$rest" | cut -d'|' -f3) - (( ${#count} > w_drops )) && w_drops=${#count} + local svc_count + svc_count=$(echo "$rest" | cut -d'|' -f3) + (( ${#svc_count} > w_count )) && w_count=${#svc_count} ;; esac done <<< "$data" - + + for a_name in "${!_ACCEPT_PEER[@]}"; do + (( ${#a_name} > w_peer )) && w_peer=${#a_name} + local a_conns_val="${_ACCEPT_PEER[$a_name]##*|}" + (( ${#a_conns_val} > w_count )) && w_count=${#a_conns_val} + done + for key in "${!_ACCEPT_DEST[@]}"; do + local d_val="${_ACCEPT_DEST[$key]}" + local d_count_val="${d_val##*|}" + (( ${#d_count_val} > w_count )) && w_count=${#d_count_val} + done + (( w_peer += 2 )) - - # Compute column where drop count starts on peer row: - # " " (2) + name (w_peer) + " ↓" (3) + rx (10) + " ↑" (3) + tx (10) + " " (2) - # ↓ and ↑ are multi-byte (3 bytes, 1 visible) — 2 extra bytes each - # Visible: 2 + w_peer + 2+1 + 10 + 2+1 + 10 + 2 = w_peer + 30 local drops_col=$(( w_peer + 30 )) - + local hours_display="${hours}h" [[ "$hours" == "0" ]] && hours_display="all time" - + log::section "Activity Monitor (last ${hours_display})" echo "" - + if display::is_table "activity"; then cmd::activity::_render_table "$data" return 0 fi - local first_peer=true skip_peer=false - + # ── Accept dest inline renderer ── + _render_peer_accept_dests() { + local peer_name="$1" + local keys="${_ACCEPT_DEST_KEYS[$peer_name]:-}" + [[ -z "$keys" ]] && return 0 + for d_key in $keys; do + local dest_stats="${_ACCEPT_DEST[$d_key]:-}" + [[ -z "$dest_stats" ]] && continue + local d_bytes_orig d_bytes_reply d_count + IFS='|' read -r d_bytes_orig d_bytes_reply d_count <<< "$dest_stats" + local rest_key="${d_key#${peer_name}:}" + local d_ip="${rest_key%%:*}" + local pp="${rest_key#*:}" + local d_port="${pp%%:*}" + local d_proto="${pp##*:}" + local dest_display + dest_display=$(resolve::dest "$d_ip" "$d_port" "$d_proto" 2>/dev/null \ + || echo "${d_ip}:${d_port}/${d_proto}") + ui::activity::accept_dest_row \ + "$dest_display" "$d_bytes_orig" "$d_bytes_reply" \ + "$d_count" "$drops_col" "$w_count" + done + } + + local first_peer=true skip_peer=false current_name="" + local -a rendered_peers=() + + # ── Main render loop (drop data) ── while IFS='|' read -r record_type rest; do case "$record_type" in peer) local name rx tx drops IFS='|' read -r name rx tx drops <<< "$rest" - + + # Flush previous peer's accept dests + [[ -n "$current_name" ]] && ! $drop_only && \ + _render_peer_accept_dests "$current_name" + skip_peer=false - if $dropped_only && [[ "$drops" -eq 0 ]]; then - skip_peer=true - continue - fi - + current_name="$name" + local has_accept="${_ACCEPT_PEER[$name]:-}" + $first_peer || echo "" first_peer=false - + rendered_peers+=("$name") + local rx_fmt tx_fmt rx_fmt=$(fmt::bytes "$rx") tx_fmt=$(fmt::bytes "$tx") - local name_pad rx_pad tx_pad name_pad=$(printf "%-${w_peer}s" "$name") rx_pad=$(printf "%-10s" "$rx_fmt") tx_pad=$(printf "%-10s" "$tx_fmt") - + local drop_word="drops" [[ "$drops" -eq 1 ]] && drop_word="drop" - - ui::activity::peer_row \ - "$name_pad" "$rx_pad" "$tx_pad" "$drops" "$drop_word" "$w_drops" + + # Always show peer name — either full row or name-only for accept_only + if $accept_only; then + printf " \033[1m%s\033[0m\n" "$name_pad" + else + ui::activity::peer_row \ + "$name_pad" "$rx_pad" "$tx_pad" "$drops" "$drop_word" "$w_count" + fi + + # Accept summary row + if [[ -n "$has_accept" ]] && ! $drop_only; then + local a_bi a_bo a_pi a_po a_conns + IFS='|' read -r a_bi a_bo a_pi a_po a_conns <<< "$has_accept" + ui::activity::accept_row \ + "$name_pad" \ + "$(printf '%-10s' "$(fmt::bytes "$a_bi")")" \ + "$(printf '%-10s' "$(fmt::bytes "$a_bo")")" \ + "$a_conns" "$w_count" + fi ;; - + service) $skip_peer && continue - + $accept_only && continue local peer dest_display drop_count IFS='|' read -r peer dest_display drop_count <<< "$rest" - local svc_drop_word="drops" [[ "$drop_count" -eq 1 ]] && svc_drop_word="drop" - ui::activity::service_row \ - "$dest_display" "$drop_count" "$svc_drop_word" "$drops_col" "$w_drops" + "$dest_display" "$drop_count" "$svc_drop_word" "$drops_col" "$w_count" ;; esac done <<< "$data" - + + # Flush last peer's accept dests + [[ -n "$current_name" ]] && ! $drop_only && \ + _render_peer_accept_dests "$current_name" + + # ── Accept-only peers (not in drop data) ── + if ! $drop_only; then + for a_name in $(echo "${!_ACCEPT_PEER[@]}" | tr ' ' '\n' | sort); do + # Skip already rendered + local already=false + for rp in "${rendered_peers[@]:-}"; do + [[ "$rp" == "$a_name" ]] && already=true && break + done + $already && continue + + $first_peer || echo "" + first_peer=false + + local a_stats="${_ACCEPT_PEER[$a_name]}" + local a_bi a_bo a_pi a_po a_conns + IFS='|' read -r a_bi a_bo a_pi a_po a_conns <<< "$a_stats" + local name_pad + name_pad=$(printf "%-${w_peer}s" "$a_name") + + # Always show peer name + printf " \033[1m%s\033[0m\n" "$name_pad" + + ui::activity::accept_row \ + "$name_pad" \ + "$(printf '%-10s' "$(fmt::bytes "$a_bi")")" \ + "$(printf '%-10s' "$(fmt::bytes "$a_bo")")" \ + "$a_conns" "$w_count" + _render_peer_accept_dests "$a_name" + done + fi + echo "" } @@ -276,3 +390,48 @@ function cmd::activity::_output_json() { array=$(printf '%s\n' "${peers[@]:-}" | paste -sd ',' -) printf '{"peers":[%s]}' "${array:-}" | json::envelope "activity" "$count" } + +function cmd::activity::_fetch_accept_data() { + local hours="${1:-24}" filter_peer="${2:-}" external_only="${3:-false}" + + [[ ! -f "$(ctx::accept_events_log)" ]] && return 0 + + local since_arg="" + [[ "$hours" -gt 0 ]] && since_arg="${hours}h" + + local ext_flag="0" + $external_only && ext_flag="1" + + json::accept_aggregate \ + "$(ctx::accept_events_log)" \ + "$(ctx::net)" \ + "$(ctx::clients)" \ + "$since_arg" \ + "$filter_peer" \ + 2>/dev/null +} + +function cmd::activity::_build_accept_maps() { + local accept_data="${1:-}" + # Outputs to stdout as bash declare statements — use eval + # Sets: _ACCEPT_PEER[name]="bytes_in|bytes_out|packets_in|packets_out|conn_count" + # _ACCEPT_DEST[name:ip:port:proto]="bytes|count" + declare -gA _ACCEPT_PEER=() + declare -gA _ACCEPT_DEST=() + + while IFS='|' read -r type rest; do + [[ -z "$type" ]] && continue + case "$type" in + peer) + local name bytes_in bytes_out packets_in packets_out conn_count + IFS='|' read -r name bytes_in bytes_out packets_in packets_out conn_count <<< "$rest" + _ACCEPT_PEER["$name"]="${bytes_in}|${bytes_out}|${packets_in}|${packets_out}|${conn_count}" + ;; + dest) + local peer dst_ip dst_port proto bytes count + IFS='|' read -r peer dst_ip dst_port proto bytes count <<< "$rest" + _ACCEPT_DEST["${peer}:${dst_ip}:${dst_port}:${proto}"]="${bytes}|${count}" + ;; + esac + done <<< "$accept_data" +} \ No newline at end of file diff --git a/core/context.sh b/core/context.sh index d3527dd..6a7afaf 100644 --- a/core/context.sh +++ b/core/context.sh @@ -44,49 +44,50 @@ _CTX_CONFIG_FILE="${_CTX_CONFIG}/wgctl.json" # Accessors # ============================================ -function ctx::root() { echo "$_CTX_ROOT"; } -function ctx::core() { echo "$_CTX_CORE"; } -function ctx::modules() { echo "$_CTX_MODULES"; } -function ctx::commands() { echo "$_CTX_COMMANDS"; } -function ctx::wg() { echo "$_CTX_WG"; } -function ctx::clients() { echo "$_CTX_CLIENTS"; } +function ctx::root() { echo "$_CTX_ROOT"; } +function ctx::core() { echo "$_CTX_CORE"; } +function ctx::modules() { echo "$_CTX_MODULES"; } +function ctx::commands() { echo "$_CTX_COMMANDS"; } +function ctx::wg() { echo "$_CTX_WG"; } +function ctx::clients() { echo "$_CTX_CLIENTS"; } # Top-level dirs -function ctx::wgctl() { echo "$_CTX_WGCTL"; } -function ctx::config() { echo "$_CTX_CONFIG"; } -function ctx::data() { echo "$_CTX_DATA"; } -function ctx::daemon() { echo "$_CTX_DAEMON"; } +function ctx::wgctl() { echo "$_CTX_WGCTL"; } +function ctx::config() { echo "$_CTX_CONFIG"; } +function ctx::data() { echo "$_CTX_DATA"; } +function ctx::daemon() { echo "$_CTX_DAEMON"; } # Data subdirs -function ctx::rules() { echo "$_CTX_RULES"; } -function ctx::rules::base() { echo "$_CTX_RULES_BASE"; } -function ctx::groups() { echo "$_CTX_GROUPS"; } -function ctx::blocks() { echo "$_CTX_BLOCKS"; } -function ctx::meta() { echo "$_CTX_META"; } -function ctx::identities() { echo "$_CTX_IDENTITY"; } -function ctx::peer_history() { echo "$_CTX_PEER_HISTORY"; } +function ctx::rules() { echo "$_CTX_RULES"; } +function ctx::rules::base() { echo "$_CTX_RULES_BASE"; } +function ctx::groups() { echo "$_CTX_GROUPS"; } +function ctx::blocks() { echo "$_CTX_BLOCKS"; } +function ctx::meta() { echo "$_CTX_META"; } +function ctx::identities() { echo "$_CTX_IDENTITY"; } +function ctx::peer_history() { echo "$_CTX_PEER_HISTORY"; } # Data files -function ctx::net() { echo "$_CTX_NET"; } -function ctx::hosts() { echo "$_CTX_HOSTS"; } -function ctx::subnets() { echo "$_CTX_SUBNETS"; } -function ctx::policies() { echo "$_CTX_POLICIES"; } +function ctx::net() { echo "$_CTX_NET"; } +function ctx::hosts() { echo "$_CTX_HOSTS"; } +function ctx::subnets() { echo "$_CTX_SUBNETS"; } +function ctx::policies() { echo "$_CTX_POLICIES"; } # Config files -function ctx::config_file() { echo "$_CTX_CONFIG_FILE"; } -function ctx::display() { echo "${_CTX_CONFIG}/display.json"; } +function ctx::config_file() { echo "$_CTX_CONFIG_FILE"; } +function ctx::display() { echo "${_CTX_CONFIG}/display.json"; } # Daemon files -function ctx::events_log() { echo "${_CTX_DAEMON}/events.log"; } -function ctx::fw_events_log() { echo "${_CTX_DAEMON}/fw_events.log"; } -function ctx::endpoint_cache() { echo "${_CTX_DAEMON}/endpoint_cache.json"; } +function ctx::events_log() { echo "${_CTX_DAEMON}/events.log"; } +function ctx::fw_events_log() { echo "${_CTX_DAEMON}/fw_events.log"; } +function ctx::endpoint_cache() { echo "${_CTX_DAEMON}/endpoint_cache.json"; } +function ctx::accept_events_log() { echo "${_CTX_DAEMON}/accept_events.log"; } # Tool paths -function ctx::json_helper() { echo "${_CTX_CORE}/json_helper.py"; } -function ctx::monitor_script() { echo "${_CTX_ROOT}/daemon/wgctl-monitor.py"; } -function ctx::lib() { echo "${_CTX_CORE}/lib"; } +function ctx::json_helper() { echo "${_CTX_CORE}/json_helper.py"; } +function ctx::monitor_script() { echo "${_CTX_ROOT}/daemon/wgctl-monitor.py"; } +function ctx::lib() { echo "${_CTX_CORE}/lib"; } -function ctx::block_history() { echo "${_CTX_DATA}/block-history"; } +function ctx::block_history() { echo "${_CTX_DATA}/block-history"; } # ============================================ # Path Helpers diff --git a/core/json.sh b/core/json.sh index 01bba77..c730605 100644 --- a/core/json.sh +++ b/core/json.sh @@ -154,6 +154,10 @@ function json::block_history_list_all() { python3 "$JSON_HELPER" block_history function json::endpoint_cache_get() { python3 "$JSON_HELPER" endpoint_cache_get "$@" 4 else '100', + args[5] if len(args) > 5 else '1', + args[6] if len(args) > 6 else '', + args[7] if len(args) > 7 else '0', + args[8] if len(args) > 8 else 'desc'), + 'accept_aggregate': lambda args: __import__('lib.accept_events', + fromlist=['accept_aggregate']).accept_aggregate( + args[0], args[1], args[2], + args[3] if len(args) > 3 else '', + args[4] if len(args) > 4 else '', + args[5] if len(args) > 5 else '0'), } # ── Main ───────────────────────────────────────────────────────────────────── diff --git a/core/lib/__pycache__/accept_events.cpython-311.pyc b/core/lib/__pycache__/accept_events.cpython-311.pyc new file mode 100644 index 0000000..12b234c Binary files /dev/null and b/core/lib/__pycache__/accept_events.cpython-311.pyc differ diff --git a/core/lib/__pycache__/block_history.cpython-311.pyc b/core/lib/__pycache__/block_history.cpython-311.pyc index 64bbfe7..14dcd3d 100644 Binary files a/core/lib/__pycache__/block_history.cpython-311.pyc and b/core/lib/__pycache__/block_history.cpython-311.pyc differ diff --git a/core/lib/accept_events.py b/core/lib/accept_events.py new file mode 100644 index 0000000..2e31e0a --- /dev/null +++ b/core/lib/accept_events.py @@ -0,0 +1,228 @@ +""" +accept_events.py — conntrack accept event processing. + +Reads accept_events.log written by wgctl-conntrack daemon. +Each line is a JSON object with fields: + ts, peer, src_ip, dst_ip, dst_port, proto, + bytes_orig, bytes_reply, packets_orig, packets_reply, + duration_sec, service, event, external +""" + +import os +import json +from collections import defaultdict +from datetime import datetime + +from lib.util import ( + DATETIME_FMT, + load_net_data, load_hosts_data, + reverse_lookup, hosts_lookup, + fmt_ts, fmt_ts_hour, ts_to_unix, parse_since, +) + + +def accept_events(file, filter_peer, filter_type, net_file, + limit, collapse='1', since='', filter_external='0', + sort_order='desc'): + """ + Format accept events with optional aggregation. + + Output per line (collapse=1): + ts|peer|dst_ip|dst_port|proto|bytes_total|packets_total|count|duration_avg + + Output per line (collapse=0): + ts|peer|dst_ip|dst_port|proto|bytes_orig|bytes_reply|packets_orig|packets_reply|duration_sec + """ + do_collapse = str(collapse) != '0' + external_only = str(filter_external) == '1' + limit = int(limit) if limit else 100 + since_dt = parse_since(since) if since else None + descending = sort_order != 'asc' + + events = [] + try: + with open(file) as f: + for line in f: + try: + e = json.loads(line.strip()) + if not e.get('peer'): + continue + if filter_peer and e.get('peer') != filter_peer: + continue + if filter_type and not e.get('peer', '').startswith(filter_type + '-'): + continue + if external_only and not e.get('external', False): + continue + if not external_only and e.get('external', False): + continue + if since_dt: + ts_str = e.get('ts', '') + try: + from datetime import timezone + ev_dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) + if ev_dt < since_dt: + continue + except Exception: + pass + events.append(e) + except Exception: + pass + except Exception: + pass + + if do_collapse: + # Aggregate by peer + dst_ip + dst_port + proto + hour + buckets = defaultdict(lambda: {'count': 0, 'bytes': 0, 'packets': 0, 'duration': 0.0}) + bucket_ts = {} + + for e in events: + ts_str = e.get('ts', '') + peer = e.get('peer', '') + dst_ip = e.get('dst_ip', '') + dst_port = str(e.get('dst_port', '')) + proto = e.get('proto', '') + try: + dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) + hour_key = (peer, dst_ip, dst_port, proto, dt.strftime('%Y-%m-%d %H')) + except Exception: + continue + + b = buckets[hour_key] + b['count'] += 1 + b['bytes'] += e.get('bytes_orig', 0) + e.get('bytes_reply', 0) + b['packets'] += e.get('packets_orig', 0) + e.get('packets_reply', 0) + b['duration'] += e.get('duration_sec', 0.0) + + if hour_key not in bucket_ts: + bucket_ts[hour_key] = dt + + # Sort and limit + sorted_buckets = sorted(bucket_ts.items(), key=lambda x: x[1]) + output = sorted_buckets[-limit:] + if descending: + output = list(reversed(output)) + + for hour_key, dt in output: + peer, dst_ip, dst_port, proto, _ = hour_key + b = buckets[hour_key] + ts_fmt = fmt_ts_hour(dt.isoformat()) + dur_avg = b['duration'] / b['count'] if b['count'] > 0 else 0.0 + print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b['bytes']}|{b['packets']}|{b['count']}|{dur_avg:.1f}") + + else: + # Detailed — one row per event + result = [(ts_to_unix(e.get('ts', '')), e) for e in events] + result = result[-limit:] + if descending: + result.reverse() + + for _, e in result: + ts_fmt = fmt_ts(e.get('ts', '')) + peer = e.get('peer', '') + dst_ip = e.get('dst_ip', '') + dst_port = str(e.get('dst_port', '')) + proto = e.get('proto', '') + b_orig = e.get('bytes_orig', 0) + b_reply = e.get('bytes_reply', 0) + p_orig = e.get('packets_orig', 0) + p_reply = e.get('packets_reply', 0) + dur = e.get('duration_sec', 0.0) + print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b_orig}|{b_reply}|{p_orig}|{p_reply}|{dur:.1f}") + + +def accept_aggregate(file, net_file, clients_dir, since='', + filter_peer='', external_only='0'): + """ + Aggregate accept events per peer — total bytes, packets, top destinations. + Used by wgctl activity to show accepted traffic alongside drops. + + external_only='1': only show traffic to external IPs (non-private) + external_only='0': only show traffic to internal IPs (default) + + Output: + peer|peer_name|bytes_in|bytes_out|packets_in|packets_out|conn_count + dest|peer_name|dst_ip|dst_port|proto|bytes_total|conn_count + """ + from collections import defaultdict + from itertools import groupby + + since_dt = parse_since(since) if since else None + show_external = str(external_only) == '1' + + peer_stats = defaultdict(lambda: { + 'bytes_in': 0, 'bytes_out': 0, + 'packets_in': 0, 'packets_out': 0, + 'conn_count': 0 + }) + # dest_stats = defaultdict(lambda: {'bytes': 0, 'count': 0}) + dest_stats = defaultdict(lambda: {'bytes_orig': 0, 'bytes_reply': 0, 'count': 0}) + + try: + with open(file) as f: + for line in f: + try: + e = json.loads(line.strip()) + peer = e.get('peer', '') + if not peer: + continue + if filter_peer and peer != filter_peer: + continue + + # Filter by external/internal + is_external = e.get('external', False) + if show_external and not is_external: + continue + if not show_external and is_external: + continue + + if since_dt: + ts_str = e.get('ts', '') + try: + from datetime import timezone + ev_dt = datetime.fromisoformat( + ts_str.replace('Z', '+00:00')) + if ev_dt < since_dt: + continue + except Exception: + pass + + dst_ip = e.get('dst_ip', '') + dst_port = str(e.get('dst_port', '')) + proto = e.get('proto', '') + b_orig = e.get('bytes_orig', 0) + b_reply = e.get('bytes_reply', 0) + p_orig = e.get('packets_orig', 0) + p_reply = e.get('packets_reply', 0) + + ps = peer_stats[peer] + ps['bytes_out'] += b_orig + ps['bytes_in'] += b_reply + ps['packets_out'] += p_orig + ps['packets_in'] += p_reply + ps['conn_count'] += 1 + + dest_key = (peer, dst_ip, dst_port, proto) + dest_stats[dest_key]['bytes_orig'] += b_orig + dest_stats[dest_key]['bytes_reply'] += b_reply + dest_stats[dest_key]['count'] += 1 + + except Exception: + pass + except Exception: + pass + + # Output peer summaries + for peer, ps in sorted(peer_stats.items()): + print(f"peer|{peer}|{ps['bytes_in']}|{ps['bytes_out']}|" + f"{ps['packets_in']}|{ps['packets_out']}|{ps['conn_count']}") + + # Output top 5 destinations per peer sorted by byte count + dest_items = sorted( + dest_stats.items(), + key=lambda x: (x[0][0], -(x[1]['bytes_orig'] + x[1]['bytes_reply'])) + ) + for peer, group in groupby(dest_items, key=lambda x: x[0][0]): + top = list(group)[:20] + for (p, dst_ip, dst_port, proto), stats in top: + print(f"dest|{p}|{dst_ip}|{dst_port}|{proto}|" + f"{stats['bytes_orig']}|{stats['bytes_reply']}|{stats['count']}") \ No newline at end of file diff --git a/daemon/wgctl-conntrack/cmd/root.go b/daemon/wgctl-conntrack/cmd/root.go new file mode 100644 index 0000000..a76140e --- /dev/null +++ b/daemon/wgctl-conntrack/cmd/root.go @@ -0,0 +1,33 @@ +package cmd + +import ( + "flag" + "fmt" + "os" +) + +// Flags holds CLI flags +type Flags struct { + WGDir string + Subnet string + LogFile string + Version bool +} + +const Version = "0.1.0" + +func Parse() *Flags { + f := &Flags{} + flag.StringVar(&f.WGDir, "wg-dir", "/etc/wireguard", "WireGuard base directory") + flag.StringVar(&f.Subnet, "subnet", "", "WireGuard subnet override") + flag.StringVar(&f.LogFile, "log-file", "", "Accept events log file override") + flag.BoolVar(&f.Version, "version", false, "Print version and exit") + flag.Parse() + + if f.Version { + fmt.Println(Version) + os.Exit(0) + } + + return f +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/config/config.go b/daemon/wgctl-conntrack/config/config.go new file mode 100644 index 0000000..978861f --- /dev/null +++ b/daemon/wgctl-conntrack/config/config.go @@ -0,0 +1,42 @@ +package config + +import ( + "encoding/json" + "os" +) + +// Config holds wgctl-conntrack runtime configuration +type Config struct { + WGSubnet string + DataDir string + ClientsDir string + AcceptLogFile string + ServicesFile string +} + +type wgctlJSON struct { + WireGuard struct { + Subnet string `json:"subnet"` + } `json:"wireguard"` +} + +// Load reads config from wgctl.json and applies defaults +func Load(wgDir string) (*Config, error) { + cfg := &Config{ + WGSubnet: "10.1.0.0/16", + DataDir: wgDir + "/.wgctl/data", + ClientsDir: wgDir + "/clients", + AcceptLogFile: wgDir + "/.wgctl/daemon/accept_events.log", + ServicesFile: wgDir + "/.wgctl/data/services.json", + } + + jsonFile := wgDir + "/.wgctl/config/wgctl.json" + if data, err := os.ReadFile(jsonFile); err == nil { + var wj wgctlJSON + if json.Unmarshal(data, &wj) == nil && wj.WireGuard.Subnet != "" { + cfg.WGSubnet = wj.WireGuard.Subnet + } + } + + return cfg, nil +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/conntrack/event.go b/daemon/wgctl-conntrack/conntrack/event.go new file mode 100644 index 0000000..18905c7 --- /dev/null +++ b/daemon/wgctl-conntrack/conntrack/event.go @@ -0,0 +1,29 @@ +package conntrack + +import "time" + +// EventType represents the type of traffic event +type EventType string + +const ( + EventAccept EventType = "accept" + EventExternal EventType = "external" +) + +// TrafficEvent is the normalized event written to the log +type TrafficEvent struct { + Timestamp time.Time `json:"ts"` + Peer string `json:"peer"` + SrcIP string `json:"src_ip"` + DstIP string `json:"dst_ip"` + DstPort uint16 `json:"dst_port"` + Proto string `json:"proto"` + BytesOrig uint64 `json:"bytes_orig"` + BytesReply uint64 `json:"bytes_reply"` + PacketsOrig uint64 `json:"packets_orig"` + PacketsReply uint64 `json:"packets_reply"` + DurationSec float64 `json:"duration_sec"` + Service string `json:"service,omitempty"` + Event EventType `json:"event"` + External bool `json:"external"` +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/conntrack/filter.go b/daemon/wgctl-conntrack/conntrack/filter.go new file mode 100644 index 0000000..c87e7a7 --- /dev/null +++ b/daemon/wgctl-conntrack/conntrack/filter.go @@ -0,0 +1,44 @@ +package conntrack + +import "net" + +var privateRanges = []string{ + "10.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", +} + +var privateCIDRs []*net.IPNet + +func init() { + for _, cidr := range privateRanges { + _, ipnet, _ := net.ParseCIDR(cidr) + privateCIDRs = append(privateCIDRs, ipnet) + } +} + +func IsWGPeer(ip net.IP, wgSubnet *net.IPNet) bool { + return wgSubnet.Contains(ip) +} + +func IsExternal(ip net.IP) bool { + for _, cidr := range privateCIDRs { + if cidr.Contains(ip) { + return false + } + } + return true +} + +func ProtoName(proto uint8) string { + switch proto { + case 6: + return "tcp" + case 17: + return "udp" + case 1: + return "icmp" + default: + return "unknown" + } +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/conntrack/subscriber.go b/daemon/wgctl-conntrack/conntrack/subscriber.go new file mode 100644 index 0000000..d6e60ef --- /dev/null +++ b/daemon/wgctl-conntrack/conntrack/subscriber.go @@ -0,0 +1,117 @@ +package conntrack + +import ( + "log" + "net" + "time" + + ct "github.com/ti-mo/conntrack" + "github.com/ti-mo/netfilter" +) + +// Resolver maps IPs and ports to peer/service names +type Resolver interface { + PeerForIP(ip net.IP) string + ServiceForDst(ip net.IP, port uint16, proto string) string +} + +// Subscriber listens for conntrack DESTROY events +type Subscriber struct { + wgSubnet *net.IPNet + events chan<- TrafficEvent + resolver Resolver +} + +func NewSubscriber(wgSubnet *net.IPNet, events chan<- TrafficEvent, resolver Resolver) *Subscriber { + return &Subscriber{wgSubnet: wgSubnet, events: events, resolver: resolver} +} + +func (s *Subscriber) Run() error { + conn, err := ct.Dial(nil) + if err != nil { + return err + } + defer conn.Close() + + evCh := make(chan ct.Event, 256) + + errCh, err := conn.Listen(evCh, 1, []netfilter.NetlinkGroup{ + netfilter.GroupCTDestroy, + }) + if err != nil { + return err + } + + log.Println("conntrack subscriber started") + + for { + select { + case ev := <-evCh: + s.processEvent(ev) + case err := <-errCh: + return err + } + } +} + +func (s *Subscriber) processEvent(ev ct.Event) { + flow := ev.Flow + if flow == nil { + return + } + + tuple := flow.TupleOrig + + // Skip IPv6 + if !tuple.IP.SourceAddress.Is4() || !tuple.IP.DestinationAddress.Is4() { + return + } + + srcBytes := tuple.IP.SourceAddress.As4() + dstBytes := tuple.IP.DestinationAddress.As4() + srcIP := net.IP(srcBytes[:]) + dstIP := net.IP(dstBytes[:]) + + // Only process WireGuard peer traffic + if !IsWGPeer(srcIP, s.wgSubnet) { + return + } + + proto := ProtoName(tuple.Proto.Protocol) + dstPort := tuple.Proto.DestinationPort + external := IsExternal(dstIP) + + peer := s.resolver.PeerForIP(srcIP) + if peer == "" { + return + } + + service := s.resolver.ServiceForDst(dstIP, dstPort, proto) + + var durationSec float64 + if flow.Timestamp.Stop.After(flow.Timestamp.Start) { + durationSec = flow.Timestamp.Stop.Sub(flow.Timestamp.Start).Seconds() + } + + eventType := EventAccept + if external { + eventType = EventExternal + } + + s.events <- TrafficEvent{ + Timestamp: time.Now().UTC(), + Peer: peer, + SrcIP: srcIP.String(), + DstIP: dstIP.String(), + DstPort: dstPort, + Proto: proto, + BytesOrig: flow.CountersOrig.Bytes, + BytesReply: flow.CountersReply.Bytes, + PacketsOrig: flow.CountersOrig.Packets, + PacketsReply: flow.CountersReply.Packets, + DurationSec: durationSec, + Service: service, + Event: eventType, + External: external, + } +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/go.mod b/daemon/wgctl-conntrack/go.mod new file mode 100644 index 0000000..c07cf4d --- /dev/null +++ b/daemon/wgctl-conntrack/go.mod @@ -0,0 +1,16 @@ +module git.krilio.net/nuno/wgctl-conntrack + +go 1.23.0 + +require ( + github.com/google/go-cmp v0.7.0 // indirect + github.com/josharian/native v1.1.0 // indirect + github.com/mdlayher/netlink v1.7.2 // indirect + github.com/mdlayher/socket v0.5.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/ti-mo/conntrack v0.6.0 // indirect + github.com/ti-mo/netfilter v0.5.3 // indirect + golang.org/x/net v0.39.0 // indirect + golang.org/x/sync v0.14.0 // indirect + golang.org/x/sys v0.33.0 // indirect +) diff --git a/daemon/wgctl-conntrack/go.sum b/daemon/wgctl-conntrack/go.sum new file mode 100644 index 0000000..fb799f8 --- /dev/null +++ b/daemon/wgctl-conntrack/go.sum @@ -0,0 +1,20 @@ +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= +github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= +github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g= +github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw= +github.com/mdlayher/socket v0.5.1 h1:VZaqt6RkGkt2OE9l3GcC6nZkqD3xKeQLyfleW/uBcos= +github.com/mdlayher/socket v0.5.1/go.mod h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/ti-mo/conntrack v0.6.0 h1:laiW2+dzKyS2u0aVr6FeRQs+v7cj4t7q+twolL/ZkjQ= +github.com/ti-mo/conntrack v0.6.0/go.mod h1:4HZrFQQLOSuBzgQNid3H/wYyyp1kfGXUYxueXjIGibo= +github.com/ti-mo/netfilter v0.5.3 h1:ikzduvnaUMwre5bhbNwWOd6bjqLMVb33vv0XXbK0xGQ= +github.com/ti-mo/netfilter v0.5.3/go.mod h1:08SyBCg6hu1qyQk4s3DjjJKNrm3RTb32nm6AzyT972E= +golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= +golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= +golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= +golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= diff --git a/daemon/wgctl-conntrack/go1.21.13.linux-amd64.tar.gz b/daemon/wgctl-conntrack/go1.21.13.linux-amd64.tar.gz new file mode 100644 index 0000000..1126dfa Binary files /dev/null and b/daemon/wgctl-conntrack/go1.21.13.linux-amd64.tar.gz differ diff --git a/daemon/wgctl-conntrack/main.go b/daemon/wgctl-conntrack/main.go new file mode 100644 index 0000000..bde9172 --- /dev/null +++ b/daemon/wgctl-conntrack/main.go @@ -0,0 +1,71 @@ +package main + +import ( + "log" + "net" + "os" + "os/signal" + "syscall" + + "git.krilio.net/nuno/wgctl-conntrack/cmd" + "git.krilio.net/nuno/wgctl-conntrack/config" + ctconn "git.krilio.net/nuno/wgctl-conntrack/conntrack" + "git.krilio.net/nuno/wgctl-conntrack/resolver" + "git.krilio.net/nuno/wgctl-conntrack/writer" +) + +func main() { + flags := cmd.Parse() + + cfg, err := config.Load(flags.WGDir) + if err != nil { + log.Fatalf("failed to load config: %v", err) + } + if flags.Subnet != "" { + cfg.WGSubnet = flags.Subnet + } + if flags.LogFile != "" { + cfg.AcceptLogFile = flags.LogFile + } + + _, wgSubnet, err := net.ParseCIDR(cfg.WGSubnet) + if err != nil { + log.Fatalf("invalid WG subnet %q: %v", cfg.WGSubnet, err) + } + + log.Printf("wgctl-conntrack v%s starting (subnet: %s, log: %s)", + cmd.Version, cfg.WGSubnet, cfg.AcceptLogFile) + + peerResolver := resolver.NewPeerResolver(flags.WGDir) + svcResolver := resolver.NewServiceResolver(cfg.ServicesFile) + + res := &combinedResolver{peers: peerResolver, services: svcResolver} + events := make(chan ctconn.TrafficEvent, 512) + + go writer.NewLogWriter(cfg.AcceptLogFile).Run(events) + + sub := ctconn.NewSubscriber(wgSubnet, events, res) + go func() { + if err := sub.Run(); err != nil { + log.Fatalf("conntrack subscriber error: %v", err) + } + }() + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + <-sig + log.Println("wgctl-conntrack shutting down") +} + +type combinedResolver struct { + peers *resolver.PeerResolver + services *resolver.ServiceResolver +} + +func (r *combinedResolver) PeerForIP(ip net.IP) string { + return r.peers.PeerForIP(ip) +} + +func (r *combinedResolver) ServiceForDst(ip net.IP, port uint16, proto string) string { + return r.services.ServiceForDst(ip, port, proto) +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/resolver/peers.go b/daemon/wgctl-conntrack/resolver/peers.go new file mode 100644 index 0000000..c9254d8 --- /dev/null +++ b/daemon/wgctl-conntrack/resolver/peers.go @@ -0,0 +1,93 @@ +package resolver + +import ( + "encoding/json" + "net" + "os" + "strings" + "sync" + "time" +) + +// PeerResolver maps WireGuard peer IPs to peer names +type PeerResolver struct { + mu sync.RWMutex + ipToName map[string]string + wgDir string +} + +func NewPeerResolver(wgDir string) *PeerResolver { + r := &PeerResolver{wgDir: wgDir, ipToName: make(map[string]string)} + r.reload() + go r.watchReload() + return r +} + +func (r *PeerResolver) PeerForIP(ip net.IP) string { + r.mu.RLock() + defer r.mu.RUnlock() + return r.ipToName[ip.String()] +} + +func (r *PeerResolver) reload() { + newMap := make(map[string]string) + + // WireGuard IPs from conf files (10.1.x.x → peer name) + clientsDir := r.wgDir + "/clients" + entries, err := os.ReadDir(clientsDir) + if err == nil { + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".conf") { + continue + } + name := strings.TrimSuffix(entry.Name(), ".conf") + if ip := parseAddressFromConf(clientsDir + "/" + entry.Name()); ip != "" { + newMap[ip] = name + } + } + } + + // External IPs from endpoint index (external IP → peer name) + indexFile := r.wgDir + "/.wgctl/data/peer-history/endpoint_index.json" + if data, err := os.ReadFile(indexFile); err == nil { + var index map[string]string + if json.Unmarshal(data, &index) == nil { + for ip, peer := range index { + newMap[ip] = peer + } + } + } + + r.mu.Lock() + r.ipToName = newMap + r.mu.Unlock() +} + +func (r *PeerResolver) watchReload() { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + for range ticker.C { + r.reload() + } +} + +func parseAddressFromConf(path string) string { + data, err := os.ReadFile(path) + if err != nil { + return "" + } + for _, line := range strings.Split(string(data), "\n") { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "Address") { + parts := strings.SplitN(line, "=", 2) + if len(parts) == 2 { + ip := strings.TrimSpace(parts[1]) + if idx := strings.Index(ip, "/"); idx != -1 { + ip = ip[:idx] + } + return ip + } + } + } + return "" +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/resolver/services.go b/daemon/wgctl-conntrack/resolver/services.go new file mode 100644 index 0000000..a524d87 --- /dev/null +++ b/daemon/wgctl-conntrack/resolver/services.go @@ -0,0 +1,93 @@ +package resolver + +import ( + "encoding/json" + "fmt" + "net" + "os" + "sync" + "time" +) + +// ServiceResolver maps IP:port:proto to service names +type ServiceResolver struct { + mu sync.RWMutex + portToSvc map[string]string + servicesFile string +} + +func NewServiceResolver(servicesFile string) *ServiceResolver { + r := &ServiceResolver{servicesFile: servicesFile, portToSvc: make(map[string]string)} + r.reload() + go r.watchReload() + return r +} + +func (r *ServiceResolver) ServiceForDst(ip net.IP, port uint16, proto string) string { + r.mu.RLock() + defer r.mu.RUnlock() + + // Try IP:port:proto first + if svc, ok := r.portToSvc[fmt.Sprintf("%s:%d:%s", ip.String(), port, proto)]; ok { + return svc + } + // Fall back to IP only + if svc, ok := r.portToSvc[ip.String()]; ok { + return svc + } + return "" +} + +func (r *ServiceResolver) reload() { + data, err := os.ReadFile(r.servicesFile) + if err != nil { + return + } + + var services map[string]interface{} + if json.Unmarshal(data, &services) != nil { + return + } + + newMap := make(map[string]string) + for name, svcRaw := range services { + svc, ok := svcRaw.(map[string]interface{}) + if !ok { + continue + } + + hosts := map[string]bool{} + if hostsRaw, ok := svc["hosts"].(map[string]interface{}); ok { + for ip := range hostsRaw { + hosts[ip] = true + newMap[ip] = name + } + } + + if portsRaw, ok := svc["ports"].([]interface{}); ok { + for _, portRaw := range portsRaw { + port, ok := portRaw.(map[string]interface{}) + if !ok { + continue + } + portNum := fmt.Sprintf("%.0f", port["port"]) + proto, _ := port["proto"].(string) + for ip := range hosts { + newMap[fmt.Sprintf("%s:%s:%s", ip, portNum, proto)] = name + } + } + } + } + + r.mu.Lock() + r.portToSvc = newMap + r.mu.Unlock() +} + +func (r *ServiceResolver) watchReload() { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + for range ticker.C { + r.reload() + } +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/wgctl-conntrack b/daemon/wgctl-conntrack/wgctl-conntrack new file mode 100755 index 0000000..8ae3c7c Binary files /dev/null and b/daemon/wgctl-conntrack/wgctl-conntrack differ diff --git a/daemon/wgctl-conntrack/wgctl-conntrack.service b/daemon/wgctl-conntrack/wgctl-conntrack.service new file mode 100644 index 0000000..74dccaf --- /dev/null +++ b/daemon/wgctl-conntrack/wgctl-conntrack.service @@ -0,0 +1,21 @@ +[Unit] +Description=wgctl conntrack accept logging daemon +After=network.target wg-quick@wg0.service +Requires=wg-quick@wg0.service + +[Service] +Type=simple +ExecStart=/etc/wireguard/wgctl/daemon/wgctl-conntrack/wgctl-conntrack \ + --wg-dir /etc/wireguard +Restart=on-failure +RestartSec=5s +StandardOutput=journal +StandardError=journal +SyslogIdentifier=wgctl-conntrack + +# Needs CAP_NET_ADMIN for netlink conntrack +AmbientCapabilities=CAP_NET_ADMIN +CapabilityBoundingSet=CAP_NET_ADMIN + +[Install] +WantedBy=multi-user.target diff --git a/daemon/wgctl-conntrack/writer/log.go b/daemon/wgctl-conntrack/writer/log.go new file mode 100644 index 0000000..a0b1f1f --- /dev/null +++ b/daemon/wgctl-conntrack/writer/log.go @@ -0,0 +1,47 @@ +package writer + +import ( + "encoding/json" + "log" + "os" + "sync" + + "git.krilio.net/nuno/wgctl-conntrack/conntrack" +) + +// LogWriter writes TrafficEvents as JSON lines to a file +type LogWriter struct { + path string + mu sync.Mutex +} + +func NewLogWriter(path string) *LogWriter { + return &LogWriter{path: path} +} + +func (w *LogWriter) Write(ev conntrack.TrafficEvent) error { + w.mu.Lock() + defer w.mu.Unlock() + + f, err := os.OpenFile(w.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + + data, err := json.Marshal(ev) + if err != nil { + return err + } + + _, err = f.Write(append(data, '\n')) + return err +} + +func (w *LogWriter) Run(events <-chan conntrack.TrafficEvent) { + for ev := range events { + if err := w.Write(ev); err != nil { + log.Printf("error writing event: %v", err) + } + } +} \ No newline at end of file diff --git a/modules/ui/activity.module.sh b/modules/ui/activity.module.sh index f83d000..5169ca8 100644 --- a/modules/ui/activity.module.sh +++ b/modules/ui/activity.module.sh @@ -13,18 +13,15 @@ function ui::activity::peer_row() { # ui::activity::service_row function ui::activity::service_row() { local dest_display="${1:-}" drop_count="${2:-0}" drop_word="${3:-drops}" \ - drops_col="${4:-30}" w_drops="${5:-1}" + drops_col="${4:-30}" w_count="${5:-1}" - # Align drop count with peer drop column - # Service row visible prefix: " → " (6 visible) + ${#dest_display} - # But "→" is 3 bytes, 1 visible — arrow_prefix bytes = 8, visible = 6 local arrow_prefix=" → " - local prefix_bytes=${#arrow_prefix} # 8 bytes due to → being 3 bytes + local prefix_bytes=${#arrow_prefix} local prefix_len=$(( prefix_bytes + ${#dest_display} )) local pad_n=$(( drops_col - prefix_len )) [[ $pad_n -lt 1 ]] && pad_n=1 - printf " \033[2m→\033[0m %s%*s %${w_drops}s %s\n" \ + printf " \033[0;31m→ %s%*s %${w_count}s %s\033[0m\n" \ "$dest_display" "$pad_n" "" "$drop_count" "$drop_word" } @@ -44,4 +41,45 @@ function ui::activity::peer_row_table() { function ui::activity::service_row_table() { local dest_display="${1:-}" drop_count="${2:-0}" drop_word="${3:-drops}" printf " → %-30s %s %s\n" "$dest_display" "$drop_count" "$drop_word" +} + +function ui::activity::accept_row() { + local name_pad="${1:-}" bytes_in="${2:-}" bytes_out="${3:-}" \ + conns="${4:-0}" w_count="${5:-4}" + + local conn_word="conns" + [[ "$conns" -eq 1 ]] && conn_word="conn" + + local spaces + spaces=$(printf '%*s' "${#name_pad}" '') + + printf " \033[0;32m%s ↓%-10s ↑%-10s %${w_count}s %s\033[0m\n" \ + "$spaces" "$bytes_in" "$bytes_out" "$conns" "$conn_word" +} + + +function ui::activity::accept_dest_row() { + local dest="${1:-}" bytes_orig="${2:-0}" bytes_reply="${3:-0}" \ + count="${4:-0}" drops_col="${5:-40}" w_count="${6:-4}" + + local conn_word="conns" + [[ "$count" -eq 1 ]] && conn_word="conn" + + local arrow_prefix=" → " + local prefix_bytes=${#arrow_prefix} + local prefix_len=$(( prefix_bytes + ${#dest} )) + local pad_n=$(( drops_col - prefix_len )) + [[ $pad_n -lt 1 ]] && pad_n=1 + + # Only show bytes if non-zero + local bytes_display="" + if [[ "$bytes_orig" -gt 0 || "$bytes_reply" -gt 0 ]]; then + local bytes_display=" " + [[ "$bytes_orig" -gt 0 ]] && bytes_display+="↓$(fmt::bytes "$bytes_orig") " + [[ "$bytes_reply" -gt 0 ]] && bytes_display+="↑$(fmt::bytes "$bytes_reply")" + bytes_display="${bytes_display% }" # trim trailing space + fi + + printf " \033[0;32m→\033[0m \033[0;32m%s%*s %${w_count}s %-5s%s\033[0m\n" \ + "$dest" "$pad_n" "" "$count" "$conn_word" "$bytes_display" } \ No newline at end of file