Compare commits
3 commits
91593b2576
...
d26e67b940
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d26e67b940 | ||
|
|
b892298259 | ||
|
|
d314ba376e |
22 changed files with 1173 additions and 104 deletions
|
|
@ -13,7 +13,9 @@ function cmd::activity::on_load() {
|
|||
flag::register --ip
|
||||
flag::register --hours
|
||||
flag::register --type
|
||||
flag::register --dropped
|
||||
flag::register --accept
|
||||
flag::register --drop
|
||||
flag::register --external
|
||||
|
||||
command::mixin json_output
|
||||
}
|
||||
|
|
@ -35,11 +37,12 @@ Options:
|
|||
--ip <ip> Filter by destination IP
|
||||
--hours <n> Time window in hours (default: 24, 0 = all time)
|
||||
--type <type> Filter by device type (combined with --peer)
|
||||
--dropped Show only peers with at least one drop
|
||||
--accept Show only accepted traffic (from conntrack)
|
||||
--drop Show only firewall drops
|
||||
--external Show only external traffic (full tunnel peers)
|
||||
|
||||
Examples:
|
||||
wgctl activity
|
||||
wgctl activity --dropped
|
||||
wgctl activity --peer phone-nuno
|
||||
wgctl activity --service truenas
|
||||
wgctl activity --hours 0
|
||||
|
|
@ -53,7 +56,8 @@ EOF
|
|||
|
||||
function cmd::activity::run() {
|
||||
local filter_peer="" filter_service="" filter_ip="" filter_type=""
|
||||
local hours=24 dropped_only=false
|
||||
local hours=24
|
||||
local accept_only=false drop_only=false external_only=false
|
||||
|
||||
while [[ $# -gt 0 ]]; do
|
||||
case "$1" in
|
||||
|
|
@ -62,7 +66,9 @@ function cmd::activity::run() {
|
|||
--ip) filter_ip="$2"; shift 2 ;;
|
||||
--type) filter_type="$2"; shift 2 ;;
|
||||
--hours) hours="$2"; shift 2 ;;
|
||||
--dropped) dropped_only=true; shift ;;
|
||||
--accept) accept_only=true; shift ;;
|
||||
--drop) drop_only=true; shift ;;
|
||||
--external) external_only=true; shift ;;
|
||||
--help) cmd::activity::help; return ;;
|
||||
*)
|
||||
log::error "Unknown flag: $1"
|
||||
|
|
@ -77,42 +83,67 @@ function cmd::activity::run() {
|
|||
return 0
|
||||
fi
|
||||
|
||||
# Resolve peer name if type provided
|
||||
if [[ -n "$filter_peer" && -n "$filter_type" ]]; then
|
||||
filter_peer=$(peers::resolve_and_require "$filter_peer" "$filter_type") || return 1
|
||||
fi
|
||||
|
||||
# Resolve --service to IP
|
||||
local service_ip=""
|
||||
if [[ -n "$filter_service" ]]; then
|
||||
service_ip=$(net::resolve "$filter_service" 2>/dev/null | head -1 | cut -d: -f1) || true
|
||||
if [[ -z "$service_ip" ]]; then
|
||||
log::error "Service not found: ${filter_service}"
|
||||
return 1
|
||||
fi
|
||||
[[ -z "$service_ip" ]] && log::error "Service not found: ${filter_service}" && return 1
|
||||
fi
|
||||
[[ -n "$filter_ip" ]] && service_ip="$filter_ip"
|
||||
|
||||
# Fetch aggregated data
|
||||
local data
|
||||
# ── Fetch data ──
|
||||
local data=""
|
||||
if ! $accept_only; then
|
||||
data=$(json::activity_aggregate \
|
||||
"$(ctx::fw_events_log)" \
|
||||
"$(ctx::events_log)" \
|
||||
"$(config::interface)" \
|
||||
"$(ctx::net)" \
|
||||
"$(ctx::clients)" \
|
||||
"$(ctx::meta)" \
|
||||
"$hours" \
|
||||
"$filter_peer" \
|
||||
"$service_ip" 2>/dev/null)
|
||||
|
||||
if [[ -z "$data" ]]; then
|
||||
log::wg_warning "No activity data found"
|
||||
return 0
|
||||
"$(ctx::fw_events_log)" "$(ctx::events_log)" \
|
||||
"$(config::interface)" "$(ctx::net)" \
|
||||
"$(ctx::clients)" "$(ctx::meta)" \
|
||||
"$hours" "$filter_peer" "$service_ip" 2>/dev/null)
|
||||
fi
|
||||
|
||||
# Measure column widths
|
||||
local w_peer=16 w_drops=1
|
||||
local accept_data=""
|
||||
if ! $drop_only; then
|
||||
local since_arg="" ext_flag="0"
|
||||
[[ "$hours" -gt 0 ]] && since_arg="${hours}h"
|
||||
$external_only && ext_flag="1"
|
||||
[[ -f "$(ctx::accept_events_log)" ]] && \
|
||||
accept_data=$(json::accept_aggregate \
|
||||
"$(ctx::accept_events_log)" "$(ctx::net)" "$(ctx::clients)" \
|
||||
"$since_arg" "$filter_peer" "$ext_flag" 2>/dev/null)
|
||||
fi
|
||||
|
||||
[[ -z "$data" && -z "$accept_data" ]] && \
|
||||
log::wg_warning "No activity data found" && return 0
|
||||
|
||||
# ── Build accept maps ──
|
||||
declare -gA _ACCEPT_PEER=()
|
||||
declare -gA _ACCEPT_DEST_KEYS=()
|
||||
declare -gA _ACCEPT_DEST=()
|
||||
|
||||
while IFS='|' read -r type rest; do
|
||||
[[ -z "$type" ]] && continue
|
||||
case "$type" in
|
||||
peer)
|
||||
local a_name a_bi a_bo a_pi a_po a_conns
|
||||
IFS='|' read -r a_name a_bi a_bo a_pi a_po a_conns <<< "$rest"
|
||||
_ACCEPT_PEER["$a_name"]="${a_bi}|${a_bo}|${a_pi}|${a_po}|${a_conns}"
|
||||
;;
|
||||
dest)
|
||||
local d_peer d_ip d_port d_proto d_bytes_orig d_bytes_reply d_count
|
||||
IFS='|' read -r d_peer d_ip d_port d_proto d_bytes_orig d_bytes_reply d_count <<< "$rest"
|
||||
local d_key="${d_peer}:${d_ip}:${d_port}:${d_proto}"
|
||||
_ACCEPT_DEST["$d_key"]="${d_bytes_orig}|${d_bytes_reply}|${d_count}"
|
||||
_ACCEPT_DEST_KEYS["$d_peer"]+="${d_key} "
|
||||
;;
|
||||
esac
|
||||
done <<< "$accept_data"
|
||||
|
||||
# ── Measure column widths ──
|
||||
local w_peer=16 w_count=1
|
||||
|
||||
while IFS='|' read -r type rest; do
|
||||
case "$type" in
|
||||
peer)
|
||||
|
|
@ -120,22 +151,28 @@ function cmd::activity::run() {
|
|||
name=$(echo "$rest" | cut -d'|' -f1)
|
||||
drops=$(echo "$rest" | cut -d'|' -f4)
|
||||
(( ${#name} > w_peer )) && w_peer=${#name}
|
||||
(( ${#drops} > w_drops )) && w_drops=${#drops}
|
||||
(( ${#drops} > w_count )) && w_count=${#drops}
|
||||
;;
|
||||
service)
|
||||
local count
|
||||
count=$(echo "$rest" | cut -d'|' -f3)
|
||||
(( ${#count} > w_drops )) && w_drops=${#count}
|
||||
local svc_count
|
||||
svc_count=$(echo "$rest" | cut -d'|' -f3)
|
||||
(( ${#svc_count} > w_count )) && w_count=${#svc_count}
|
||||
;;
|
||||
esac
|
||||
done <<< "$data"
|
||||
|
||||
(( w_peer += 2 ))
|
||||
for a_name in "${!_ACCEPT_PEER[@]}"; do
|
||||
(( ${#a_name} > w_peer )) && w_peer=${#a_name}
|
||||
local a_conns_val="${_ACCEPT_PEER[$a_name]##*|}"
|
||||
(( ${#a_conns_val} > w_count )) && w_count=${#a_conns_val}
|
||||
done
|
||||
for key in "${!_ACCEPT_DEST[@]}"; do
|
||||
local d_val="${_ACCEPT_DEST[$key]}"
|
||||
local d_count_val="${d_val##*|}"
|
||||
(( ${#d_count_val} > w_count )) && w_count=${#d_count_val}
|
||||
done
|
||||
|
||||
# Compute column where drop count starts on peer row:
|
||||
# " " (2) + name (w_peer) + " ↓" (3) + rx (10) + " ↑" (3) + tx (10) + " " (2)
|
||||
# ↓ and ↑ are multi-byte (3 bytes, 1 visible) — 2 extra bytes each
|
||||
# Visible: 2 + w_peer + 2+1 + 10 + 2+1 + 10 + 2 = w_peer + 30
|
||||
(( w_peer += 2 ))
|
||||
local drops_col=$(( w_peer + 30 ))
|
||||
|
||||
local hours_display="${hours}h"
|
||||
|
|
@ -149,27 +186,55 @@ function cmd::activity::run() {
|
|||
return 0
|
||||
fi
|
||||
|
||||
local first_peer=true skip_peer=false
|
||||
# ── Accept dest inline renderer ──
|
||||
_render_peer_accept_dests() {
|
||||
local peer_name="$1"
|
||||
local keys="${_ACCEPT_DEST_KEYS[$peer_name]:-}"
|
||||
[[ -z "$keys" ]] && return 0
|
||||
for d_key in $keys; do
|
||||
local dest_stats="${_ACCEPT_DEST[$d_key]:-}"
|
||||
[[ -z "$dest_stats" ]] && continue
|
||||
local d_bytes_orig d_bytes_reply d_count
|
||||
IFS='|' read -r d_bytes_orig d_bytes_reply d_count <<< "$dest_stats"
|
||||
local rest_key="${d_key#${peer_name}:}"
|
||||
local d_ip="${rest_key%%:*}"
|
||||
local pp="${rest_key#*:}"
|
||||
local d_port="${pp%%:*}"
|
||||
local d_proto="${pp##*:}"
|
||||
local dest_display
|
||||
dest_display=$(resolve::dest "$d_ip" "$d_port" "$d_proto" 2>/dev/null \
|
||||
|| echo "${d_ip}:${d_port}/${d_proto}")
|
||||
ui::activity::accept_dest_row \
|
||||
"$dest_display" "$d_bytes_orig" "$d_bytes_reply" \
|
||||
"$d_count" "$drops_col" "$w_count"
|
||||
done
|
||||
}
|
||||
|
||||
local first_peer=true skip_peer=false current_name=""
|
||||
local -a rendered_peers=()
|
||||
|
||||
# ── Main render loop (drop data) ──
|
||||
while IFS='|' read -r record_type rest; do
|
||||
case "$record_type" in
|
||||
peer)
|
||||
local name rx tx drops
|
||||
IFS='|' read -r name rx tx drops <<< "$rest"
|
||||
|
||||
# Flush previous peer's accept dests
|
||||
[[ -n "$current_name" ]] && ! $drop_only && \
|
||||
_render_peer_accept_dests "$current_name"
|
||||
|
||||
skip_peer=false
|
||||
if $dropped_only && [[ "$drops" -eq 0 ]]; then
|
||||
skip_peer=true
|
||||
continue
|
||||
fi
|
||||
current_name="$name"
|
||||
local has_accept="${_ACCEPT_PEER[$name]:-}"
|
||||
|
||||
$first_peer || echo ""
|
||||
first_peer=false
|
||||
rendered_peers+=("$name")
|
||||
|
||||
local rx_fmt tx_fmt
|
||||
rx_fmt=$(fmt::bytes "$rx")
|
||||
tx_fmt=$(fmt::bytes "$tx")
|
||||
|
||||
local name_pad rx_pad tx_pad
|
||||
name_pad=$(printf "%-${w_peer}s" "$name")
|
||||
rx_pad=$(printf "%-10s" "$rx_fmt")
|
||||
|
|
@ -178,25 +243,74 @@ function cmd::activity::run() {
|
|||
local drop_word="drops"
|
||||
[[ "$drops" -eq 1 ]] && drop_word="drop"
|
||||
|
||||
# Always show peer name — either full row or name-only for accept_only
|
||||
if $accept_only; then
|
||||
printf " \033[1m%s\033[0m\n" "$name_pad"
|
||||
else
|
||||
ui::activity::peer_row \
|
||||
"$name_pad" "$rx_pad" "$tx_pad" "$drops" "$drop_word" "$w_drops"
|
||||
"$name_pad" "$rx_pad" "$tx_pad" "$drops" "$drop_word" "$w_count"
|
||||
fi
|
||||
|
||||
# Accept summary row
|
||||
if [[ -n "$has_accept" ]] && ! $drop_only; then
|
||||
local a_bi a_bo a_pi a_po a_conns
|
||||
IFS='|' read -r a_bi a_bo a_pi a_po a_conns <<< "$has_accept"
|
||||
ui::activity::accept_row \
|
||||
"$name_pad" \
|
||||
"$(printf '%-10s' "$(fmt::bytes "$a_bi")")" \
|
||||
"$(printf '%-10s' "$(fmt::bytes "$a_bo")")" \
|
||||
"$a_conns" "$w_count"
|
||||
fi
|
||||
;;
|
||||
|
||||
service)
|
||||
$skip_peer && continue
|
||||
|
||||
$accept_only && continue
|
||||
local peer dest_display drop_count
|
||||
IFS='|' read -r peer dest_display drop_count <<< "$rest"
|
||||
|
||||
local svc_drop_word="drops"
|
||||
[[ "$drop_count" -eq 1 ]] && svc_drop_word="drop"
|
||||
|
||||
ui::activity::service_row \
|
||||
"$dest_display" "$drop_count" "$svc_drop_word" "$drops_col" "$w_drops"
|
||||
"$dest_display" "$drop_count" "$svc_drop_word" "$drops_col" "$w_count"
|
||||
;;
|
||||
esac
|
||||
done <<< "$data"
|
||||
|
||||
# Flush last peer's accept dests
|
||||
[[ -n "$current_name" ]] && ! $drop_only && \
|
||||
_render_peer_accept_dests "$current_name"
|
||||
|
||||
# ── Accept-only peers (not in drop data) ──
|
||||
if ! $drop_only; then
|
||||
for a_name in $(echo "${!_ACCEPT_PEER[@]}" | tr ' ' '\n' | sort); do
|
||||
# Skip already rendered
|
||||
local already=false
|
||||
for rp in "${rendered_peers[@]:-}"; do
|
||||
[[ "$rp" == "$a_name" ]] && already=true && break
|
||||
done
|
||||
$already && continue
|
||||
|
||||
$first_peer || echo ""
|
||||
first_peer=false
|
||||
|
||||
local a_stats="${_ACCEPT_PEER[$a_name]}"
|
||||
local a_bi a_bo a_pi a_po a_conns
|
||||
IFS='|' read -r a_bi a_bo a_pi a_po a_conns <<< "$a_stats"
|
||||
local name_pad
|
||||
name_pad=$(printf "%-${w_peer}s" "$a_name")
|
||||
|
||||
# Always show peer name
|
||||
printf " \033[1m%s\033[0m\n" "$name_pad"
|
||||
|
||||
ui::activity::accept_row \
|
||||
"$name_pad" \
|
||||
"$(printf '%-10s' "$(fmt::bytes "$a_bi")")" \
|
||||
"$(printf '%-10s' "$(fmt::bytes "$a_bo")")" \
|
||||
"$a_conns" "$w_count"
|
||||
_render_peer_accept_dests "$a_name"
|
||||
done
|
||||
fi
|
||||
|
||||
echo ""
|
||||
}
|
||||
|
||||
|
|
@ -276,3 +390,48 @@ function cmd::activity::_output_json() {
|
|||
array=$(printf '%s\n' "${peers[@]:-}" | paste -sd ',' -)
|
||||
printf '{"peers":[%s]}' "${array:-}" | json::envelope "activity" "$count"
|
||||
}
|
||||
|
||||
function cmd::activity::_fetch_accept_data() {
|
||||
local hours="${1:-24}" filter_peer="${2:-}" external_only="${3:-false}"
|
||||
|
||||
[[ ! -f "$(ctx::accept_events_log)" ]] && return 0
|
||||
|
||||
local since_arg=""
|
||||
[[ "$hours" -gt 0 ]] && since_arg="${hours}h"
|
||||
|
||||
local ext_flag="0"
|
||||
$external_only && ext_flag="1"
|
||||
|
||||
json::accept_aggregate \
|
||||
"$(ctx::accept_events_log)" \
|
||||
"$(ctx::net)" \
|
||||
"$(ctx::clients)" \
|
||||
"$since_arg" \
|
||||
"$filter_peer" \
|
||||
2>/dev/null
|
||||
}
|
||||
|
||||
function cmd::activity::_build_accept_maps() {
|
||||
local accept_data="${1:-}"
|
||||
# Outputs to stdout as bash declare statements — use eval
|
||||
# Sets: _ACCEPT_PEER[name]="bytes_in|bytes_out|packets_in|packets_out|conn_count"
|
||||
# _ACCEPT_DEST[name:ip:port:proto]="bytes|count"
|
||||
declare -gA _ACCEPT_PEER=()
|
||||
declare -gA _ACCEPT_DEST=()
|
||||
|
||||
while IFS='|' read -r type rest; do
|
||||
[[ -z "$type" ]] && continue
|
||||
case "$type" in
|
||||
peer)
|
||||
local name bytes_in bytes_out packets_in packets_out conn_count
|
||||
IFS='|' read -r name bytes_in bytes_out packets_in packets_out conn_count <<< "$rest"
|
||||
_ACCEPT_PEER["$name"]="${bytes_in}|${bytes_out}|${packets_in}|${packets_out}|${conn_count}"
|
||||
;;
|
||||
dest)
|
||||
local peer dst_ip dst_port proto bytes count
|
||||
IFS='|' read -r peer dst_ip dst_port proto bytes count <<< "$rest"
|
||||
_ACCEPT_DEST["${peer}:${dst_ip}:${dst_port}:${proto}"]="${bytes}|${count}"
|
||||
;;
|
||||
esac
|
||||
done <<< "$accept_data"
|
||||
}
|
||||
|
|
@ -80,6 +80,7 @@ function ctx::display() { echo "${_CTX_CONFIG}/display.json"; }
|
|||
function ctx::events_log() { echo "${_CTX_DAEMON}/events.log"; }
|
||||
function ctx::fw_events_log() { echo "${_CTX_DAEMON}/fw_events.log"; }
|
||||
function ctx::endpoint_cache() { echo "${_CTX_DAEMON}/endpoint_cache.json"; }
|
||||
function ctx::accept_events_log() { echo "${_CTX_DAEMON}/accept_events.log"; }
|
||||
|
||||
# Tool paths
|
||||
function ctx::json_helper() { echo "${_CTX_CORE}/json_helper.py"; }
|
||||
|
|
|
|||
|
|
@ -154,6 +154,10 @@ function json::block_history_list_all() { python3 "$JSON_HELPER" block_history
|
|||
|
||||
function json::endpoint_cache_get() { python3 "$JSON_HELPER" endpoint_cache_get "$@" </dev/null; }
|
||||
|
||||
# Accept Events
|
||||
function json::accept_events() { python3 "$JSON_HELPER" accept_events "$@" </dev/null; }
|
||||
function json::accept_aggregate() { python3 "$JSON_HELPER" accept_aggregate "$@" </dev/null; }
|
||||
|
||||
function json::peer_transfer() {
|
||||
ACTIVITY_TOTAL_LOW="$(config::activity_total_low)" \
|
||||
ACTIVITY_TOTAL_MED="$(config::activity_total_med)" \
|
||||
|
|
|
|||
|
|
@ -2225,6 +2225,19 @@ commands = {
|
|||
'block_history_list_all': lambda args: __import__('lib.block_history',
|
||||
fromlist=['block_history_list_all']).block_history_list_all(args[0]),
|
||||
'endpoint_cache_get': lambda args: endpoint_cache_get(args[0], args[1]),
|
||||
'accept_events': lambda args: __import__('lib.accept_events', fromlist=['accept_events']).accept_events(
|
||||
args[0], args[1], args[2], args[3],
|
||||
args[4] if len(args) > 4 else '100',
|
||||
args[5] if len(args) > 5 else '1',
|
||||
args[6] if len(args) > 6 else '',
|
||||
args[7] if len(args) > 7 else '0',
|
||||
args[8] if len(args) > 8 else 'desc'),
|
||||
'accept_aggregate': lambda args: __import__('lib.accept_events',
|
||||
fromlist=['accept_aggregate']).accept_aggregate(
|
||||
args[0], args[1], args[2],
|
||||
args[3] if len(args) > 3 else '',
|
||||
args[4] if len(args) > 4 else '',
|
||||
args[5] if len(args) > 5 else '0'),
|
||||
}
|
||||
|
||||
# ── Main ─────────────────────────────────────────────────────────────────────
|
||||
|
|
|
|||
BIN
core/lib/__pycache__/accept_events.cpython-311.pyc
Normal file
BIN
core/lib/__pycache__/accept_events.cpython-311.pyc
Normal file
Binary file not shown.
Binary file not shown.
228
core/lib/accept_events.py
Normal file
228
core/lib/accept_events.py
Normal file
|
|
@ -0,0 +1,228 @@
|
|||
"""
|
||||
accept_events.py — conntrack accept event processing.
|
||||
|
||||
Reads accept_events.log written by wgctl-conntrack daemon.
|
||||
Each line is a JSON object with fields:
|
||||
ts, peer, src_ip, dst_ip, dst_port, proto,
|
||||
bytes_orig, bytes_reply, packets_orig, packets_reply,
|
||||
duration_sec, service, event, external
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
|
||||
from lib.util import (
|
||||
DATETIME_FMT,
|
||||
load_net_data, load_hosts_data,
|
||||
reverse_lookup, hosts_lookup,
|
||||
fmt_ts, fmt_ts_hour, ts_to_unix, parse_since,
|
||||
)
|
||||
|
||||
|
||||
def accept_events(file, filter_peer, filter_type, net_file,
|
||||
limit, collapse='1', since='', filter_external='0',
|
||||
sort_order='desc'):
|
||||
"""
|
||||
Format accept events with optional aggregation.
|
||||
|
||||
Output per line (collapse=1):
|
||||
ts|peer|dst_ip|dst_port|proto|bytes_total|packets_total|count|duration_avg
|
||||
|
||||
Output per line (collapse=0):
|
||||
ts|peer|dst_ip|dst_port|proto|bytes_orig|bytes_reply|packets_orig|packets_reply|duration_sec
|
||||
"""
|
||||
do_collapse = str(collapse) != '0'
|
||||
external_only = str(filter_external) == '1'
|
||||
limit = int(limit) if limit else 100
|
||||
since_dt = parse_since(since) if since else None
|
||||
descending = sort_order != 'asc'
|
||||
|
||||
events = []
|
||||
try:
|
||||
with open(file) as f:
|
||||
for line in f:
|
||||
try:
|
||||
e = json.loads(line.strip())
|
||||
if not e.get('peer'):
|
||||
continue
|
||||
if filter_peer and e.get('peer') != filter_peer:
|
||||
continue
|
||||
if filter_type and not e.get('peer', '').startswith(filter_type + '-'):
|
||||
continue
|
||||
if external_only and not e.get('external', False):
|
||||
continue
|
||||
if not external_only and e.get('external', False):
|
||||
continue
|
||||
if since_dt:
|
||||
ts_str = e.get('ts', '')
|
||||
try:
|
||||
from datetime import timezone
|
||||
ev_dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
|
||||
if ev_dt < since_dt:
|
||||
continue
|
||||
except Exception:
|
||||
pass
|
||||
events.append(e)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if do_collapse:
|
||||
# Aggregate by peer + dst_ip + dst_port + proto + hour
|
||||
buckets = defaultdict(lambda: {'count': 0, 'bytes': 0, 'packets': 0, 'duration': 0.0})
|
||||
bucket_ts = {}
|
||||
|
||||
for e in events:
|
||||
ts_str = e.get('ts', '')
|
||||
peer = e.get('peer', '')
|
||||
dst_ip = e.get('dst_ip', '')
|
||||
dst_port = str(e.get('dst_port', ''))
|
||||
proto = e.get('proto', '')
|
||||
try:
|
||||
dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
|
||||
hour_key = (peer, dst_ip, dst_port, proto, dt.strftime('%Y-%m-%d %H'))
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
b = buckets[hour_key]
|
||||
b['count'] += 1
|
||||
b['bytes'] += e.get('bytes_orig', 0) + e.get('bytes_reply', 0)
|
||||
b['packets'] += e.get('packets_orig', 0) + e.get('packets_reply', 0)
|
||||
b['duration'] += e.get('duration_sec', 0.0)
|
||||
|
||||
if hour_key not in bucket_ts:
|
||||
bucket_ts[hour_key] = dt
|
||||
|
||||
# Sort and limit
|
||||
sorted_buckets = sorted(bucket_ts.items(), key=lambda x: x[1])
|
||||
output = sorted_buckets[-limit:]
|
||||
if descending:
|
||||
output = list(reversed(output))
|
||||
|
||||
for hour_key, dt in output:
|
||||
peer, dst_ip, dst_port, proto, _ = hour_key
|
||||
b = buckets[hour_key]
|
||||
ts_fmt = fmt_ts_hour(dt.isoformat())
|
||||
dur_avg = b['duration'] / b['count'] if b['count'] > 0 else 0.0
|
||||
print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b['bytes']}|{b['packets']}|{b['count']}|{dur_avg:.1f}")
|
||||
|
||||
else:
|
||||
# Detailed — one row per event
|
||||
result = [(ts_to_unix(e.get('ts', '')), e) for e in events]
|
||||
result = result[-limit:]
|
||||
if descending:
|
||||
result.reverse()
|
||||
|
||||
for _, e in result:
|
||||
ts_fmt = fmt_ts(e.get('ts', ''))
|
||||
peer = e.get('peer', '')
|
||||
dst_ip = e.get('dst_ip', '')
|
||||
dst_port = str(e.get('dst_port', ''))
|
||||
proto = e.get('proto', '')
|
||||
b_orig = e.get('bytes_orig', 0)
|
||||
b_reply = e.get('bytes_reply', 0)
|
||||
p_orig = e.get('packets_orig', 0)
|
||||
p_reply = e.get('packets_reply', 0)
|
||||
dur = e.get('duration_sec', 0.0)
|
||||
print(f"{ts_fmt}|{peer}|{dst_ip}|{dst_port}|{proto}|{b_orig}|{b_reply}|{p_orig}|{p_reply}|{dur:.1f}")
|
||||
|
||||
|
||||
def accept_aggregate(file, net_file, clients_dir, since='',
|
||||
filter_peer='', external_only='0'):
|
||||
"""
|
||||
Aggregate accept events per peer — total bytes, packets, top destinations.
|
||||
Used by wgctl activity to show accepted traffic alongside drops.
|
||||
|
||||
external_only='1': only show traffic to external IPs (non-private)
|
||||
external_only='0': only show traffic to internal IPs (default)
|
||||
|
||||
Output:
|
||||
peer|peer_name|bytes_in|bytes_out|packets_in|packets_out|conn_count
|
||||
dest|peer_name|dst_ip|dst_port|proto|bytes_total|conn_count
|
||||
"""
|
||||
from collections import defaultdict
|
||||
from itertools import groupby
|
||||
|
||||
since_dt = parse_since(since) if since else None
|
||||
show_external = str(external_only) == '1'
|
||||
|
||||
peer_stats = defaultdict(lambda: {
|
||||
'bytes_in': 0, 'bytes_out': 0,
|
||||
'packets_in': 0, 'packets_out': 0,
|
||||
'conn_count': 0
|
||||
})
|
||||
# dest_stats = defaultdict(lambda: {'bytes': 0, 'count': 0})
|
||||
dest_stats = defaultdict(lambda: {'bytes_orig': 0, 'bytes_reply': 0, 'count': 0})
|
||||
|
||||
try:
|
||||
with open(file) as f:
|
||||
for line in f:
|
||||
try:
|
||||
e = json.loads(line.strip())
|
||||
peer = e.get('peer', '')
|
||||
if not peer:
|
||||
continue
|
||||
if filter_peer and peer != filter_peer:
|
||||
continue
|
||||
|
||||
# Filter by external/internal
|
||||
is_external = e.get('external', False)
|
||||
if show_external and not is_external:
|
||||
continue
|
||||
if not show_external and is_external:
|
||||
continue
|
||||
|
||||
if since_dt:
|
||||
ts_str = e.get('ts', '')
|
||||
try:
|
||||
from datetime import timezone
|
||||
ev_dt = datetime.fromisoformat(
|
||||
ts_str.replace('Z', '+00:00'))
|
||||
if ev_dt < since_dt:
|
||||
continue
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
dst_ip = e.get('dst_ip', '')
|
||||
dst_port = str(e.get('dst_port', ''))
|
||||
proto = e.get('proto', '')
|
||||
b_orig = e.get('bytes_orig', 0)
|
||||
b_reply = e.get('bytes_reply', 0)
|
||||
p_orig = e.get('packets_orig', 0)
|
||||
p_reply = e.get('packets_reply', 0)
|
||||
|
||||
ps = peer_stats[peer]
|
||||
ps['bytes_out'] += b_orig
|
||||
ps['bytes_in'] += b_reply
|
||||
ps['packets_out'] += p_orig
|
||||
ps['packets_in'] += p_reply
|
||||
ps['conn_count'] += 1
|
||||
|
||||
dest_key = (peer, dst_ip, dst_port, proto)
|
||||
dest_stats[dest_key]['bytes_orig'] += b_orig
|
||||
dest_stats[dest_key]['bytes_reply'] += b_reply
|
||||
dest_stats[dest_key]['count'] += 1
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Output peer summaries
|
||||
for peer, ps in sorted(peer_stats.items()):
|
||||
print(f"peer|{peer}|{ps['bytes_in']}|{ps['bytes_out']}|"
|
||||
f"{ps['packets_in']}|{ps['packets_out']}|{ps['conn_count']}")
|
||||
|
||||
# Output top 5 destinations per peer sorted by byte count
|
||||
dest_items = sorted(
|
||||
dest_stats.items(),
|
||||
key=lambda x: (x[0][0], -(x[1]['bytes_orig'] + x[1]['bytes_reply']))
|
||||
)
|
||||
for peer, group in groupby(dest_items, key=lambda x: x[0][0]):
|
||||
top = list(group)[:20]
|
||||
for (p, dst_ip, dst_port, proto), stats in top:
|
||||
print(f"dest|{p}|{dst_ip}|{dst_port}|{proto}|"
|
||||
f"{stats['bytes_orig']}|{stats['bytes_reply']}|{stats['count']}")
|
||||
33
daemon/wgctl-conntrack/cmd/root.go
Normal file
33
daemon/wgctl-conntrack/cmd/root.go
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
// Flags holds CLI flags
|
||||
type Flags struct {
|
||||
WGDir string
|
||||
Subnet string
|
||||
LogFile string
|
||||
Version bool
|
||||
}
|
||||
|
||||
const Version = "0.1.0"
|
||||
|
||||
func Parse() *Flags {
|
||||
f := &Flags{}
|
||||
flag.StringVar(&f.WGDir, "wg-dir", "/etc/wireguard", "WireGuard base directory")
|
||||
flag.StringVar(&f.Subnet, "subnet", "", "WireGuard subnet override")
|
||||
flag.StringVar(&f.LogFile, "log-file", "", "Accept events log file override")
|
||||
flag.BoolVar(&f.Version, "version", false, "Print version and exit")
|
||||
flag.Parse()
|
||||
|
||||
if f.Version {
|
||||
fmt.Println(Version)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
return f
|
||||
}
|
||||
42
daemon/wgctl-conntrack/config/config.go
Normal file
42
daemon/wgctl-conntrack/config/config.go
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
)
|
||||
|
||||
// Config holds wgctl-conntrack runtime configuration
|
||||
type Config struct {
|
||||
WGSubnet string
|
||||
DataDir string
|
||||
ClientsDir string
|
||||
AcceptLogFile string
|
||||
ServicesFile string
|
||||
}
|
||||
|
||||
type wgctlJSON struct {
|
||||
WireGuard struct {
|
||||
Subnet string `json:"subnet"`
|
||||
} `json:"wireguard"`
|
||||
}
|
||||
|
||||
// Load reads config from wgctl.json and applies defaults
|
||||
func Load(wgDir string) (*Config, error) {
|
||||
cfg := &Config{
|
||||
WGSubnet: "10.1.0.0/16",
|
||||
DataDir: wgDir + "/.wgctl/data",
|
||||
ClientsDir: wgDir + "/clients",
|
||||
AcceptLogFile: wgDir + "/.wgctl/daemon/accept_events.log",
|
||||
ServicesFile: wgDir + "/.wgctl/data/services.json",
|
||||
}
|
||||
|
||||
jsonFile := wgDir + "/.wgctl/config/wgctl.json"
|
||||
if data, err := os.ReadFile(jsonFile); err == nil {
|
||||
var wj wgctlJSON
|
||||
if json.Unmarshal(data, &wj) == nil && wj.WireGuard.Subnet != "" {
|
||||
cfg.WGSubnet = wj.WireGuard.Subnet
|
||||
}
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
29
daemon/wgctl-conntrack/conntrack/event.go
Normal file
29
daemon/wgctl-conntrack/conntrack/event.go
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
package conntrack
|
||||
|
||||
import "time"
|
||||
|
||||
// EventType represents the type of traffic event
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
EventAccept EventType = "accept"
|
||||
EventExternal EventType = "external"
|
||||
)
|
||||
|
||||
// TrafficEvent is the normalized event written to the log
|
||||
type TrafficEvent struct {
|
||||
Timestamp time.Time `json:"ts"`
|
||||
Peer string `json:"peer"`
|
||||
SrcIP string `json:"src_ip"`
|
||||
DstIP string `json:"dst_ip"`
|
||||
DstPort uint16 `json:"dst_port"`
|
||||
Proto string `json:"proto"`
|
||||
BytesOrig uint64 `json:"bytes_orig"`
|
||||
BytesReply uint64 `json:"bytes_reply"`
|
||||
PacketsOrig uint64 `json:"packets_orig"`
|
||||
PacketsReply uint64 `json:"packets_reply"`
|
||||
DurationSec float64 `json:"duration_sec"`
|
||||
Service string `json:"service,omitempty"`
|
||||
Event EventType `json:"event"`
|
||||
External bool `json:"external"`
|
||||
}
|
||||
44
daemon/wgctl-conntrack/conntrack/filter.go
Normal file
44
daemon/wgctl-conntrack/conntrack/filter.go
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
package conntrack
|
||||
|
||||
import "net"
|
||||
|
||||
var privateRanges = []string{
|
||||
"10.0.0.0/8",
|
||||
"172.16.0.0/12",
|
||||
"192.168.0.0/16",
|
||||
}
|
||||
|
||||
var privateCIDRs []*net.IPNet
|
||||
|
||||
func init() {
|
||||
for _, cidr := range privateRanges {
|
||||
_, ipnet, _ := net.ParseCIDR(cidr)
|
||||
privateCIDRs = append(privateCIDRs, ipnet)
|
||||
}
|
||||
}
|
||||
|
||||
func IsWGPeer(ip net.IP, wgSubnet *net.IPNet) bool {
|
||||
return wgSubnet.Contains(ip)
|
||||
}
|
||||
|
||||
func IsExternal(ip net.IP) bool {
|
||||
for _, cidr := range privateCIDRs {
|
||||
if cidr.Contains(ip) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func ProtoName(proto uint8) string {
|
||||
switch proto {
|
||||
case 6:
|
||||
return "tcp"
|
||||
case 17:
|
||||
return "udp"
|
||||
case 1:
|
||||
return "icmp"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
117
daemon/wgctl-conntrack/conntrack/subscriber.go
Normal file
117
daemon/wgctl-conntrack/conntrack/subscriber.go
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
package conntrack
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
ct "github.com/ti-mo/conntrack"
|
||||
"github.com/ti-mo/netfilter"
|
||||
)
|
||||
|
||||
// Resolver maps IPs and ports to peer/service names
|
||||
type Resolver interface {
|
||||
PeerForIP(ip net.IP) string
|
||||
ServiceForDst(ip net.IP, port uint16, proto string) string
|
||||
}
|
||||
|
||||
// Subscriber listens for conntrack DESTROY events
|
||||
type Subscriber struct {
|
||||
wgSubnet *net.IPNet
|
||||
events chan<- TrafficEvent
|
||||
resolver Resolver
|
||||
}
|
||||
|
||||
func NewSubscriber(wgSubnet *net.IPNet, events chan<- TrafficEvent, resolver Resolver) *Subscriber {
|
||||
return &Subscriber{wgSubnet: wgSubnet, events: events, resolver: resolver}
|
||||
}
|
||||
|
||||
func (s *Subscriber) Run() error {
|
||||
conn, err := ct.Dial(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
evCh := make(chan ct.Event, 256)
|
||||
|
||||
errCh, err := conn.Listen(evCh, 1, []netfilter.NetlinkGroup{
|
||||
netfilter.GroupCTDestroy,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Println("conntrack subscriber started")
|
||||
|
||||
for {
|
||||
select {
|
||||
case ev := <-evCh:
|
||||
s.processEvent(ev)
|
||||
case err := <-errCh:
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Subscriber) processEvent(ev ct.Event) {
|
||||
flow := ev.Flow
|
||||
if flow == nil {
|
||||
return
|
||||
}
|
||||
|
||||
tuple := flow.TupleOrig
|
||||
|
||||
// Skip IPv6
|
||||
if !tuple.IP.SourceAddress.Is4() || !tuple.IP.DestinationAddress.Is4() {
|
||||
return
|
||||
}
|
||||
|
||||
srcBytes := tuple.IP.SourceAddress.As4()
|
||||
dstBytes := tuple.IP.DestinationAddress.As4()
|
||||
srcIP := net.IP(srcBytes[:])
|
||||
dstIP := net.IP(dstBytes[:])
|
||||
|
||||
// Only process WireGuard peer traffic
|
||||
if !IsWGPeer(srcIP, s.wgSubnet) {
|
||||
return
|
||||
}
|
||||
|
||||
proto := ProtoName(tuple.Proto.Protocol)
|
||||
dstPort := tuple.Proto.DestinationPort
|
||||
external := IsExternal(dstIP)
|
||||
|
||||
peer := s.resolver.PeerForIP(srcIP)
|
||||
if peer == "" {
|
||||
return
|
||||
}
|
||||
|
||||
service := s.resolver.ServiceForDst(dstIP, dstPort, proto)
|
||||
|
||||
var durationSec float64
|
||||
if flow.Timestamp.Stop.After(flow.Timestamp.Start) {
|
||||
durationSec = flow.Timestamp.Stop.Sub(flow.Timestamp.Start).Seconds()
|
||||
}
|
||||
|
||||
eventType := EventAccept
|
||||
if external {
|
||||
eventType = EventExternal
|
||||
}
|
||||
|
||||
s.events <- TrafficEvent{
|
||||
Timestamp: time.Now().UTC(),
|
||||
Peer: peer,
|
||||
SrcIP: srcIP.String(),
|
||||
DstIP: dstIP.String(),
|
||||
DstPort: dstPort,
|
||||
Proto: proto,
|
||||
BytesOrig: flow.CountersOrig.Bytes,
|
||||
BytesReply: flow.CountersReply.Bytes,
|
||||
PacketsOrig: flow.CountersOrig.Packets,
|
||||
PacketsReply: flow.CountersReply.Packets,
|
||||
DurationSec: durationSec,
|
||||
Service: service,
|
||||
Event: eventType,
|
||||
External: external,
|
||||
}
|
||||
}
|
||||
16
daemon/wgctl-conntrack/go.mod
Normal file
16
daemon/wgctl-conntrack/go.mod
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
module git.krilio.net/nuno/wgctl-conntrack
|
||||
|
||||
go 1.23.0
|
||||
|
||||
require (
|
||||
github.com/google/go-cmp v0.7.0 // indirect
|
||||
github.com/josharian/native v1.1.0 // indirect
|
||||
github.com/mdlayher/netlink v1.7.2 // indirect
|
||||
github.com/mdlayher/socket v0.5.1 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/ti-mo/conntrack v0.6.0 // indirect
|
||||
github.com/ti-mo/netfilter v0.5.3 // indirect
|
||||
golang.org/x/net v0.39.0 // indirect
|
||||
golang.org/x/sync v0.14.0 // indirect
|
||||
golang.org/x/sys v0.33.0 // indirect
|
||||
)
|
||||
20
daemon/wgctl-conntrack/go.sum
Normal file
20
daemon/wgctl-conntrack/go.sum
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA=
|
||||
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
|
||||
github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g=
|
||||
github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw=
|
||||
github.com/mdlayher/socket v0.5.1 h1:VZaqt6RkGkt2OE9l3GcC6nZkqD3xKeQLyfleW/uBcos=
|
||||
github.com/mdlayher/socket v0.5.1/go.mod h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/ti-mo/conntrack v0.6.0 h1:laiW2+dzKyS2u0aVr6FeRQs+v7cj4t7q+twolL/ZkjQ=
|
||||
github.com/ti-mo/conntrack v0.6.0/go.mod h1:4HZrFQQLOSuBzgQNid3H/wYyyp1kfGXUYxueXjIGibo=
|
||||
github.com/ti-mo/netfilter v0.5.3 h1:ikzduvnaUMwre5bhbNwWOd6bjqLMVb33vv0XXbK0xGQ=
|
||||
github.com/ti-mo/netfilter v0.5.3/go.mod h1:08SyBCg6hu1qyQk4s3DjjJKNrm3RTb32nm6AzyT972E=
|
||||
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
|
||||
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
|
||||
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
|
||||
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
|
||||
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
BIN
daemon/wgctl-conntrack/go1.21.13.linux-amd64.tar.gz
Normal file
BIN
daemon/wgctl-conntrack/go1.21.13.linux-amd64.tar.gz
Normal file
Binary file not shown.
71
daemon/wgctl-conntrack/main.go
Normal file
71
daemon/wgctl-conntrack/main.go
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"git.krilio.net/nuno/wgctl-conntrack/cmd"
|
||||
"git.krilio.net/nuno/wgctl-conntrack/config"
|
||||
ctconn "git.krilio.net/nuno/wgctl-conntrack/conntrack"
|
||||
"git.krilio.net/nuno/wgctl-conntrack/resolver"
|
||||
"git.krilio.net/nuno/wgctl-conntrack/writer"
|
||||
)
|
||||
|
||||
func main() {
|
||||
flags := cmd.Parse()
|
||||
|
||||
cfg, err := config.Load(flags.WGDir)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to load config: %v", err)
|
||||
}
|
||||
if flags.Subnet != "" {
|
||||
cfg.WGSubnet = flags.Subnet
|
||||
}
|
||||
if flags.LogFile != "" {
|
||||
cfg.AcceptLogFile = flags.LogFile
|
||||
}
|
||||
|
||||
_, wgSubnet, err := net.ParseCIDR(cfg.WGSubnet)
|
||||
if err != nil {
|
||||
log.Fatalf("invalid WG subnet %q: %v", cfg.WGSubnet, err)
|
||||
}
|
||||
|
||||
log.Printf("wgctl-conntrack v%s starting (subnet: %s, log: %s)",
|
||||
cmd.Version, cfg.WGSubnet, cfg.AcceptLogFile)
|
||||
|
||||
peerResolver := resolver.NewPeerResolver(flags.WGDir)
|
||||
svcResolver := resolver.NewServiceResolver(cfg.ServicesFile)
|
||||
|
||||
res := &combinedResolver{peers: peerResolver, services: svcResolver}
|
||||
events := make(chan ctconn.TrafficEvent, 512)
|
||||
|
||||
go writer.NewLogWriter(cfg.AcceptLogFile).Run(events)
|
||||
|
||||
sub := ctconn.NewSubscriber(wgSubnet, events, res)
|
||||
go func() {
|
||||
if err := sub.Run(); err != nil {
|
||||
log.Fatalf("conntrack subscriber error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sig
|
||||
log.Println("wgctl-conntrack shutting down")
|
||||
}
|
||||
|
||||
type combinedResolver struct {
|
||||
peers *resolver.PeerResolver
|
||||
services *resolver.ServiceResolver
|
||||
}
|
||||
|
||||
func (r *combinedResolver) PeerForIP(ip net.IP) string {
|
||||
return r.peers.PeerForIP(ip)
|
||||
}
|
||||
|
||||
func (r *combinedResolver) ServiceForDst(ip net.IP, port uint16, proto string) string {
|
||||
return r.services.ServiceForDst(ip, port, proto)
|
||||
}
|
||||
93
daemon/wgctl-conntrack/resolver/peers.go
Normal file
93
daemon/wgctl-conntrack/resolver/peers.go
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
package resolver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// PeerResolver maps WireGuard peer IPs to peer names
|
||||
type PeerResolver struct {
|
||||
mu sync.RWMutex
|
||||
ipToName map[string]string
|
||||
wgDir string
|
||||
}
|
||||
|
||||
func NewPeerResolver(wgDir string) *PeerResolver {
|
||||
r := &PeerResolver{wgDir: wgDir, ipToName: make(map[string]string)}
|
||||
r.reload()
|
||||
go r.watchReload()
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *PeerResolver) PeerForIP(ip net.IP) string {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
return r.ipToName[ip.String()]
|
||||
}
|
||||
|
||||
func (r *PeerResolver) reload() {
|
||||
newMap := make(map[string]string)
|
||||
|
||||
// WireGuard IPs from conf files (10.1.x.x → peer name)
|
||||
clientsDir := r.wgDir + "/clients"
|
||||
entries, err := os.ReadDir(clientsDir)
|
||||
if err == nil {
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".conf") {
|
||||
continue
|
||||
}
|
||||
name := strings.TrimSuffix(entry.Name(), ".conf")
|
||||
if ip := parseAddressFromConf(clientsDir + "/" + entry.Name()); ip != "" {
|
||||
newMap[ip] = name
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// External IPs from endpoint index (external IP → peer name)
|
||||
indexFile := r.wgDir + "/.wgctl/data/peer-history/endpoint_index.json"
|
||||
if data, err := os.ReadFile(indexFile); err == nil {
|
||||
var index map[string]string
|
||||
if json.Unmarshal(data, &index) == nil {
|
||||
for ip, peer := range index {
|
||||
newMap[ip] = peer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
r.mu.Lock()
|
||||
r.ipToName = newMap
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *PeerResolver) watchReload() {
|
||||
ticker := time.NewTicker(60 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
r.reload()
|
||||
}
|
||||
}
|
||||
|
||||
func parseAddressFromConf(path string) string {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
for _, line := range strings.Split(string(data), "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
if strings.HasPrefix(line, "Address") {
|
||||
parts := strings.SplitN(line, "=", 2)
|
||||
if len(parts) == 2 {
|
||||
ip := strings.TrimSpace(parts[1])
|
||||
if idx := strings.Index(ip, "/"); idx != -1 {
|
||||
ip = ip[:idx]
|
||||
}
|
||||
return ip
|
||||
}
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
93
daemon/wgctl-conntrack/resolver/services.go
Normal file
93
daemon/wgctl-conntrack/resolver/services.go
Normal file
|
|
@ -0,0 +1,93 @@
|
|||
package resolver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ServiceResolver maps IP:port:proto to service names
|
||||
type ServiceResolver struct {
|
||||
mu sync.RWMutex
|
||||
portToSvc map[string]string
|
||||
servicesFile string
|
||||
}
|
||||
|
||||
func NewServiceResolver(servicesFile string) *ServiceResolver {
|
||||
r := &ServiceResolver{servicesFile: servicesFile, portToSvc: make(map[string]string)}
|
||||
r.reload()
|
||||
go r.watchReload()
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *ServiceResolver) ServiceForDst(ip net.IP, port uint16, proto string) string {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
// Try IP:port:proto first
|
||||
if svc, ok := r.portToSvc[fmt.Sprintf("%s:%d:%s", ip.String(), port, proto)]; ok {
|
||||
return svc
|
||||
}
|
||||
// Fall back to IP only
|
||||
if svc, ok := r.portToSvc[ip.String()]; ok {
|
||||
return svc
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (r *ServiceResolver) reload() {
|
||||
data, err := os.ReadFile(r.servicesFile)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var services map[string]interface{}
|
||||
if json.Unmarshal(data, &services) != nil {
|
||||
return
|
||||
}
|
||||
|
||||
newMap := make(map[string]string)
|
||||
for name, svcRaw := range services {
|
||||
svc, ok := svcRaw.(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
hosts := map[string]bool{}
|
||||
if hostsRaw, ok := svc["hosts"].(map[string]interface{}); ok {
|
||||
for ip := range hostsRaw {
|
||||
hosts[ip] = true
|
||||
newMap[ip] = name
|
||||
}
|
||||
}
|
||||
|
||||
if portsRaw, ok := svc["ports"].([]interface{}); ok {
|
||||
for _, portRaw := range portsRaw {
|
||||
port, ok := portRaw.(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
portNum := fmt.Sprintf("%.0f", port["port"])
|
||||
proto, _ := port["proto"].(string)
|
||||
for ip := range hosts {
|
||||
newMap[fmt.Sprintf("%s:%s:%s", ip, portNum, proto)] = name
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
r.mu.Lock()
|
||||
r.portToSvc = newMap
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
func (r *ServiceResolver) watchReload() {
|
||||
ticker := time.NewTicker(60 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
r.reload()
|
||||
}
|
||||
}
|
||||
BIN
daemon/wgctl-conntrack/wgctl-conntrack
Executable file
BIN
daemon/wgctl-conntrack/wgctl-conntrack
Executable file
Binary file not shown.
21
daemon/wgctl-conntrack/wgctl-conntrack.service
Normal file
21
daemon/wgctl-conntrack/wgctl-conntrack.service
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
[Unit]
|
||||
Description=wgctl conntrack accept logging daemon
|
||||
After=network.target wg-quick@wg0.service
|
||||
Requires=wg-quick@wg0.service
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
ExecStart=/etc/wireguard/wgctl/daemon/wgctl-conntrack/wgctl-conntrack \
|
||||
--wg-dir /etc/wireguard
|
||||
Restart=on-failure
|
||||
RestartSec=5s
|
||||
StandardOutput=journal
|
||||
StandardError=journal
|
||||
SyslogIdentifier=wgctl-conntrack
|
||||
|
||||
# Needs CAP_NET_ADMIN for netlink conntrack
|
||||
AmbientCapabilities=CAP_NET_ADMIN
|
||||
CapabilityBoundingSet=CAP_NET_ADMIN
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
47
daemon/wgctl-conntrack/writer/log.go
Normal file
47
daemon/wgctl-conntrack/writer/log.go
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
package writer
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"git.krilio.net/nuno/wgctl-conntrack/conntrack"
|
||||
)
|
||||
|
||||
// LogWriter writes TrafficEvents as JSON lines to a file
|
||||
type LogWriter struct {
|
||||
path string
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewLogWriter(path string) *LogWriter {
|
||||
return &LogWriter{path: path}
|
||||
}
|
||||
|
||||
func (w *LogWriter) Write(ev conntrack.TrafficEvent) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
f, err := os.OpenFile(w.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
data, err := json.Marshal(ev)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = f.Write(append(data, '\n'))
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *LogWriter) Run(events <-chan conntrack.TrafficEvent) {
|
||||
for ev := range events {
|
||||
if err := w.Write(ev); err != nil {
|
||||
log.Printf("error writing event: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -13,18 +13,15 @@ function ui::activity::peer_row() {
|
|||
# ui::activity::service_row <dest_display> <drop_count> <drop_word> <drops_col> <w_drops>
|
||||
function ui::activity::service_row() {
|
||||
local dest_display="${1:-}" drop_count="${2:-0}" drop_word="${3:-drops}" \
|
||||
drops_col="${4:-30}" w_drops="${5:-1}"
|
||||
drops_col="${4:-30}" w_count="${5:-1}"
|
||||
|
||||
# Align drop count with peer drop column
|
||||
# Service row visible prefix: " → " (6 visible) + ${#dest_display}
|
||||
# But "→" is 3 bytes, 1 visible — arrow_prefix bytes = 8, visible = 6
|
||||
local arrow_prefix=" → "
|
||||
local prefix_bytes=${#arrow_prefix} # 8 bytes due to → being 3 bytes
|
||||
local prefix_bytes=${#arrow_prefix}
|
||||
local prefix_len=$(( prefix_bytes + ${#dest_display} ))
|
||||
local pad_n=$(( drops_col - prefix_len ))
|
||||
[[ $pad_n -lt 1 ]] && pad_n=1
|
||||
|
||||
printf " \033[2m→\033[0m %s%*s %${w_drops}s %s\n" \
|
||||
printf " \033[0;31m→ %s%*s %${w_count}s %s\033[0m\n" \
|
||||
"$dest_display" "$pad_n" "" "$drop_count" "$drop_word"
|
||||
}
|
||||
|
||||
|
|
@ -45,3 +42,44 @@ function ui::activity::service_row_table() {
|
|||
local dest_display="${1:-}" drop_count="${2:-0}" drop_word="${3:-drops}"
|
||||
printf " → %-30s %s %s\n" "$dest_display" "$drop_count" "$drop_word"
|
||||
}
|
||||
|
||||
function ui::activity::accept_row() {
|
||||
local name_pad="${1:-}" bytes_in="${2:-}" bytes_out="${3:-}" \
|
||||
conns="${4:-0}" w_count="${5:-4}"
|
||||
|
||||
local conn_word="conns"
|
||||
[[ "$conns" -eq 1 ]] && conn_word="conn"
|
||||
|
||||
local spaces
|
||||
spaces=$(printf '%*s' "${#name_pad}" '')
|
||||
|
||||
printf " \033[0;32m%s ↓%-10s ↑%-10s %${w_count}s %s\033[0m\n" \
|
||||
"$spaces" "$bytes_in" "$bytes_out" "$conns" "$conn_word"
|
||||
}
|
||||
|
||||
|
||||
function ui::activity::accept_dest_row() {
|
||||
local dest="${1:-}" bytes_orig="${2:-0}" bytes_reply="${3:-0}" \
|
||||
count="${4:-0}" drops_col="${5:-40}" w_count="${6:-4}"
|
||||
|
||||
local conn_word="conns"
|
||||
[[ "$count" -eq 1 ]] && conn_word="conn"
|
||||
|
||||
local arrow_prefix=" → "
|
||||
local prefix_bytes=${#arrow_prefix}
|
||||
local prefix_len=$(( prefix_bytes + ${#dest} ))
|
||||
local pad_n=$(( drops_col - prefix_len ))
|
||||
[[ $pad_n -lt 1 ]] && pad_n=1
|
||||
|
||||
# Only show bytes if non-zero
|
||||
local bytes_display=""
|
||||
if [[ "$bytes_orig" -gt 0 || "$bytes_reply" -gt 0 ]]; then
|
||||
local bytes_display=" "
|
||||
[[ "$bytes_orig" -gt 0 ]] && bytes_display+="↓$(fmt::bytes "$bytes_orig") "
|
||||
[[ "$bytes_reply" -gt 0 ]] && bytes_display+="↑$(fmt::bytes "$bytes_reply")"
|
||||
bytes_display="${bytes_display% }" # trim trailing space
|
||||
fi
|
||||
|
||||
printf " \033[0;32m→\033[0m \033[0;32m%s%*s %${w_count}s %-5s%s\033[0m\n" \
|
||||
"$dest" "$pad_n" "" "$count" "$conn_word" "$bytes_display"
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue