diff --git a/commands/activity.command.sh b/commands/activity.command.sh index 2ff0c1d..dcf90a9 100644 --- a/commands/activity.command.sh +++ b/commands/activity.command.sh @@ -18,6 +18,9 @@ function cmd::activity::on_load() { flag::register --accept flag::register --drop flag::register --external + flag::register --ports + flag::register --exclude-service + flag::register --include-service flag::exclusive --accept --drop } @@ -59,7 +62,8 @@ EOF function cmd::activity::run() { local filter_peer="" filter_service="" filter_ip="" filter_type="" local hours=24 - local accept_only=false drop_only=false external_only=false + local accept_only=false drop_only=false external_only=false show_ports=false + local -a exclude_services=() include_services=() while [[ $# -gt 0 ]]; do case "$1" in @@ -71,6 +75,9 @@ function cmd::activity::run() { --accept) accept_only=true; shift ;; --drop) drop_only=true; shift ;; --external) external_only=true; shift ;; + --ports) show_ports=true; shift ;; + --exclude-service) exclude_services+=("$2"); shift 2 ;; + --include-service) include_services+=("$2"); shift 2 ;; --help) cmd::activity::help; return ;; *) log::error "Unknown flag: $1" @@ -96,6 +103,21 @@ function cmd::activity::run() { fi [[ -n "$filter_ip" ]] && service_ip="$filter_ip" + # Build final exclusion list — remove any --include-service entries + local -a final_excludes=() + for svc in "${exclude_services[@]:-}"; do + local included=false + for inc in "${include_services[@]:-}"; do + [[ "$svc" == "$inc" ]] && included=true && break + done + $included || final_excludes+=("$svc") + done + + # Build exclude string for Python (space-separated) + local exclude_str="" + [[ ${#final_excludes[@]} -gt 0 ]] && \ + exclude_str=$(IFS=' '; echo "${final_excludes[*]}") + # ── Fetch data ── local data="" if ! $accept_only; then @@ -103,7 +125,7 @@ function cmd::activity::run() { "$(ctx::fw_events_log)" "$(ctx::events_log)" \ "$(config::interface)" "$(ctx::net)" \ "$(ctx::clients)" "$(ctx::meta)" \ - "$hours" "$filter_peer" "$service_ip" 2>/dev/null) + "$hours" "$filter_peer" "$service_ip" "$exclude_str" 2>/dev/null) fi local accept_data="" @@ -114,7 +136,7 @@ function cmd::activity::run() { [[ -f "$(ctx::accept_events_log)" ]] && \ accept_data=$(json::accept_aggregate \ "$(ctx::accept_events_log)" "$(ctx::net)" "$(ctx::clients)" \ - "$since_arg" "$filter_peer" "$ext_flag" 2>/dev/null) + "$since_arg" "$filter_peer" "$ext_flag" "$exclude_str" 2>/dev/null) fi [[ -z "$data" && -z "$accept_data" ]] && \ @@ -204,7 +226,17 @@ function cmd::activity::run() { local d_port="${pp%%:*}" local d_proto="${pp##*:}" local spec="${d_ip}:${d_port}:${d_proto}" - local dest_display="${_DEST_RESOLVE_CACHE[$spec]:-${d_ip}:${d_port}/${d_proto}}" + local dest_display + local raw_suffix="" + local resolved="${_DEST_RESOLVE_CACHE[$spec]:-${d_ip}:${d_port}/${d_proto}}" + local dest_display="$resolved" + if [[ "$show_ports" == "true" && "$resolved" != "${d_ip}:"* && "$resolved" != "${d_ip} "* ]]; then + if [[ -n "$d_port" && "$d_port" != "0" ]]; then + dest_display=$(printf "%s \033[2m(%s:%s)\033[0m" "$resolved" "$d_ip" "$d_port") + else + dest_display=$(printf "%s \033[2m(%s)\033[0m" "$resolved" "$d_ip") + fi + fi ui::activity::accept_dest_row \ "$dest_display" "$d_bytes_orig" "$d_bytes_reply" \ "$d_count" "$drops_col" "$w_count" @@ -289,14 +321,23 @@ function cmd::activity::run() { ;; service) - $skip_peer && continue - $accept_only && continue - local peer dest_display drop_count - IFS='|' read -r peer dest_display drop_count <<< "$rest" + local peer dest_display dst_ip dst_port proto drop_count + IFS='|' read -r peer dest_display dst_ip dst_port proto drop_count <<< "$rest" + # Build dim suffix if --ports + local svc_display="$dest_display" + if [[ "$show_ports" == "true" && -n "$dst_ip" ]]; then + if [[ -n "$dst_port" ]]; then + svc_display=$(printf "%s \033[2m(%s:%s)\033[0m" \ + "$dest_display" "$dst_ip" "$dst_port") + else + svc_display=$(printf "%s \033[2m(%s)\033[0m" \ + "$dest_display" "$dst_ip") + fi + fi local svc_drop_word="drops" [[ "$drop_count" -eq 1 ]] && svc_drop_word="drop" - ui::activity::service_row \ - "$dest_display" "$drop_count" "$svc_drop_word" "$drops_col" "$w_count" + $accept_only || ui::activity::service_row \ + "$svc_display" "$drop_count" "$svc_drop_word" "$drops_col" "$w_count" ;; esac done <<< "$data" diff --git a/core/command.sh b/core/command.sh index 4d3c182..fd422aa 100644 --- a/core/command.sh +++ b/core/command.sh @@ -89,7 +89,7 @@ function command::run() { # Preprocess mixin flags (--json, --no-color etc) command::_preprocess_flags args - echo "DEBUG cmd=$cmd groups='$groups' defs='${default_args[*]}' user='${user_args[*]:-}'" >&2 + local fn fn=$(command::fn "$cmd" run) core::call_function "$fn" ${args[@]+"${args[@]}"} diff --git a/core/json_helper.py b/core/json_helper.py index 7381c49..3884483 100644 --- a/core/json_helper.py +++ b/core/json_helper.py @@ -2148,7 +2148,8 @@ commands = { args[0], args[1], args[2], args[3], args[4], args[5], args[6] if len(args) > 6 else '24', args[7] if len(args) > 7 else '', - args[8] if len(args) > 8 else ''), + args[8] if len(args) > 8 else '', + args[9] if len(args) > 9 else ''), # Rules 'rule_resolve': lambda args: rule_resolve(args[0], args[1]), 'rule_resolve_field':lambda args: rule_resolve_field(args[0], args[1], args[2]), @@ -2302,13 +2303,14 @@ commands = { args[5] if len(args) > 5 else '1', args[6] if len(args) > 6 else '', args[7] if len(args) > 7 else '0', - args[8] if len(args) > 8 else 'desc'), + args[8] if len(args) > 8 else 'desc'), 'accept_aggregate': lambda args: __import__('lib.accept_events', fromlist=['accept_aggregate']).accept_aggregate( args[0], args[1], args[2], args[3] if len(args) > 3 else '', args[4] if len(args) > 4 else '', - args[5] if len(args) > 5 else '0'), + args[5] if len(args) > 5 else '0', + args[6] if len(args) > 6 else ''), 'batch_resolve_dest': lambda args: batch_resolve_dest(args[0], args[1], *args[2:]), } diff --git a/core/lib/__pycache__/accept_events.cpython-311.pyc b/core/lib/__pycache__/accept_events.cpython-311.pyc index fcd4f3d..5a26ab6 100644 Binary files a/core/lib/__pycache__/accept_events.cpython-311.pyc and b/core/lib/__pycache__/accept_events.cpython-311.pyc differ diff --git a/core/lib/__pycache__/activity.cpython-311.pyc b/core/lib/__pycache__/activity.cpython-311.pyc index 4187611..daf78b4 100644 Binary files a/core/lib/__pycache__/activity.cpython-311.pyc and b/core/lib/__pycache__/activity.cpython-311.pyc differ diff --git a/core/lib/accept_events.py b/core/lib/accept_events.py index 2e31e0a..9da9dc0 100644 --- a/core/lib/accept_events.py +++ b/core/lib/accept_events.py @@ -131,7 +131,7 @@ def accept_events(file, filter_peer, filter_type, net_file, def accept_aggregate(file, net_file, clients_dir, since='', - filter_peer='', external_only='0'): + filter_peer='', external_only='0', exclude_services=''): """ Aggregate accept events per peer — total bytes, packets, top destinations. Used by wgctl activity to show accepted traffic alongside drops. @@ -145,6 +145,7 @@ def accept_aggregate(file, net_file, clients_dir, since='', """ from collections import defaultdict from itertools import groupby + from lib.util import load_net_data, hosts_lookup, reverse_lookup since_dt = parse_since(since) if since else None show_external = str(external_only) == '1' @@ -157,6 +158,14 @@ def accept_aggregate(file, net_file, clients_dir, since='', # dest_stats = defaultdict(lambda: {'bytes': 0, 'count': 0}) dest_stats = defaultdict(lambda: {'bytes_orig': 0, 'bytes_reply': 0, 'count': 0}) + # Build exclusion set — supports service names and ip:port:proto + exclude_set = set() + if exclude_services: + for svc in exclude_services.split(): + exclude_set.add(svc.strip()) + + net_data = load_net_data(net_file) if (net_file and exclude_set) else {} + try: with open(file) as f: for line in f: @@ -200,7 +209,10 @@ def accept_aggregate(file, net_file, clients_dir, since='', ps['packets_out'] += p_orig ps['packets_in'] += p_reply ps['conn_count'] += 1 - + + if _is_excluded(dst_ip, dst_port, proto, exclude_set, net_data): + continue + dest_key = (peer, dst_ip, dst_port, proto) dest_stats[dest_key]['bytes_orig'] += b_orig dest_stats[dest_key]['bytes_reply'] += b_reply @@ -225,4 +237,24 @@ def accept_aggregate(file, net_file, clients_dir, since='', top = list(group)[:20] for (p, dst_ip, dst_port, proto), stats in top: print(f"dest|{p}|{dst_ip}|{dst_port}|{proto}|" - f"{stats['bytes_orig']}|{stats['bytes_reply']}|{stats['count']}") \ No newline at end of file + f"{stats['bytes_orig']}|{stats['bytes_reply']}|{stats['count']}") + + +def _is_excluded(ip, port, proto, exclude_set, net_data): + if not exclude_set: + return False + # Check raw ip:port:proto + if f"{ip}:{port}:{proto}" in exclude_set: + return True + # Check service name + svc = reverse_lookup(net_data, ip, str(port), proto) if net_data else '' + if svc and svc in exclude_set: + return True + # Check service:proto format (e.g. "pihole:dns-udp" -> "pihole" + "dns-udp") + if svc: + for excl in exclude_set: + if ':' in excl: + excl_svc, excl_port = excl.rsplit(':', 1) + if excl_svc == svc and excl_port in (f"{proto}-{port}", f"dns-{proto}"): + return True + return False \ No newline at end of file diff --git a/core/lib/activity.py b/core/lib/activity.py index a0d2a9c..24daf36 100644 --- a/core/lib/activity.py +++ b/core/lib/activity.py @@ -19,26 +19,49 @@ from lib.util import ( def activity_aggregate(fw_file, wg_file, wg_interface, net_file, clients_dir, meta_dir, hours, filter_peer, - filter_service_ip): + filter_service_ip, exclude_services=''): """ Aggregate activity data for wgctl activity. Output: peer|name|rx_bytes|tx_bytes|drop_count - service|peer_name|dest_display|drop_count + service|peer_name|dest_display|dst_ip|dst_port|proto|drop_count """ hours = int(hours) if hours else 24 cutoff = None if hours > 0: cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) - + + # Build exclusion set + exclude_set = set() + if exclude_services: + for svc in exclude_services.split(): + exclude_set.add(svc.strip()) + # Preload lookups once ip_to_peer = build_ip_to_name(clients_dir) pubkey_to_peer = build_pubkey_to_name(clients_dir) net_data = load_net_data(net_file) - + def _reverse(dest_ip, dest_port, proto): return reverse_lookup(net_data, dest_ip, dest_port, proto) - + + def _is_excluded(ip, port, proto, svc_name): + if not exclude_set: + return False + if f"{ip}:{port}:{proto}" in exclude_set: + return True + if svc_name and svc_name in exclude_set: + return True + if svc_name: + for excl in exclude_set: + if ':' in excl: + excl_svc, excl_port = excl.rsplit(':', 1) + if excl_svc == svc_name and excl_port in ( + f"{proto}-{port}", f"dns-{proto}", f"dns-udp", f"dns-tcp" + ): + return True + return False + # WireGuard transfer totals peer_rx = defaultdict(int) peer_tx = defaultdict(int) @@ -57,11 +80,12 @@ def activity_aggregate(fw_file, wg_file, wg_interface, net_file, peer_tx[peer] += tx except Exception: pass - + # Parse fw_events for drops + # service_drops[peer][(dest_display, dst_ip, dst_port, proto)] = count peer_drops = defaultdict(int) service_drops = defaultdict(lambda: defaultdict(int)) - + if os.path.exists(fw_file): try: with open(fw_file) as f: @@ -81,16 +105,16 @@ def activity_aggregate(fw_file, wg_file, wg_interface, net_file, continue except Exception: pass - + src_ip = ev.get('src_ip', '') if not src_ip: continue - + dest_ip = ev.get('dest_ip', '') dest_port = str(ev.get('dest_port', '')) proto_num = ev.get('ip.protocol', 0) proto = PROTO_MAP.get(int(proto_num), str(proto_num)) - + peer = ip_to_peer.get(src_ip) if not peer: continue @@ -98,18 +122,23 @@ def activity_aggregate(fw_file, wg_file, wg_interface, net_file, continue if filter_service_ip and dest_ip != filter_service_ip: continue - + svc_name = _reverse(dest_ip, dest_port, proto) dest_display = make_dest_display(dest_ip, dest_port, proto, svc_name) - + + if _is_excluded(dest_ip, dest_port, proto, svc_name): + continue + peer_drops[peer] += 1 - service_drops[peer][dest_display] += 1 - + # Key includes raw ip:port:proto for --ports support + svc_key = (dest_display, dest_ip, dest_port, proto) + service_drops[peer][svc_key] += 1 + except Exception: continue except Exception: pass - + # Collect peers with any activity all_peers = set() all_peers.update(k for k in peer_rx if peer_rx[k] > 0) @@ -117,13 +146,14 @@ def activity_aggregate(fw_file, wg_file, wg_interface, net_file, all_peers.update(peer_drops.keys()) if filter_peer: all_peers = {p for p in all_peers if p == filter_peer} - + for peer in sorted(all_peers): rx = peer_rx.get(peer, 0) tx = peer_tx.get(peer, 0) drops = peer_drops.get(peer, 0) print(f"peer|{peer}|{rx}|{tx}|{drops}") - + svc_map = service_drops.get(peer, {}) - for dest_display, count in sorted(svc_map.items(), key=lambda x: -x[1]): - print(f"service|{peer}|{dest_display}|{count}") \ No newline at end of file + for (dest_display, dst_ip, dst_port, proto), count in \ + sorted(svc_map.items(), key=lambda x: -x[1]): + print(f"service|{peer}|{dest_display}|{dst_ip}|{dst_port}|{proto}|{count}") \ No newline at end of file diff --git a/modules/ui/activity.module.sh b/modules/ui/activity.module.sh index 5169ca8..32785b3 100644 --- a/modules/ui/activity.module.sh +++ b/modules/ui/activity.module.sh @@ -1,7 +1,6 @@ #!/usr/bin/env bash # ui/activity.module.sh — rendering for wgctl activity -# ui::activity::peer_row function ui::activity::peer_row() { local name_pad="${1:-}" rx_pad="${2:-}" tx_pad="${3:-}" \ drops="${4:-0}" drop_word="${5:-drops}" w_drops="${6:-1}" @@ -10,22 +9,78 @@ function ui::activity::peer_row() { "$name_pad" "$rx_pad" "$tx_pad" "$drops" "$drop_word" } -# ui::activity::service_row +# ── _strip_ansi → visible string +# Used for measuring visible length of strings that may contain ANSI codes +function ui::activity::_visible_len() { + local s="$1" + printf "%b" "$s" | sed 's/\x1b\[[0-9;]*m//g' | wc -m | tr -d ' ' +} + +# ui::activity::service_row +# dest_display may contain ANSI (when --ports passes dim suffix) function ui::activity::service_row() { local dest_display="${1:-}" drop_count="${2:-0}" drop_word="${3:-drops}" \ drops_col="${4:-30}" w_count="${5:-1}" local arrow_prefix=" → " local prefix_bytes=${#arrow_prefix} - local prefix_len=$(( prefix_bytes + ${#dest_display} )) + # Measure visible length of dest (strip ANSI for correct padding) + local dest_visible_len + dest_visible_len=$(ui::activity::_visible_len "$dest_display") + local prefix_len=$(( prefix_bytes + dest_visible_len )) local pad_n=$(( drops_col - prefix_len )) [[ $pad_n -lt 1 ]] && pad_n=1 - printf " \033[0;31m→ %s%*s %${w_count}s %s\033[0m\n" \ + printf " \033[0;31m→\033[0m \033[0;31m%b\033[0m%*s \033[0;31m%${w_count}s %s\033[0m\n" \ "$dest_display" "$pad_n" "" "$drop_count" "$drop_word" } -# Table versions (kept for future display config) +function ui::activity::accept_row() { + local name_pad="${1:-}" bytes_in="${2:-}" bytes_out="${3:-}" \ + conns="${4:-0}" w_count="${5:-4}" + + local conn_word="conns" + [[ "$conns" -eq 1 ]] && conn_word="conn" + + local spaces + spaces=$(printf '%*s' "${#name_pad}" '') + + printf " \033[0;32m%s ↓%-10s ↑%-10s %${w_count}s %s\033[0m\n" \ + "$spaces" "$bytes_in" "$bytes_out" "$conns" "$conn_word" +} + +function ui::activity::accept_dest_row() { + local dest="${1:-}" bytes_orig="${2:-0}" bytes_reply="${3:-0}" \ + count="${4:-0}" drops_col="${5:-40}" w_count="${6:-4}" + + local conn_word="conns" + [[ "$count" -eq 1 ]] && conn_word="conn" + + local arrow_prefix=" → " + local prefix_bytes=${#arrow_prefix} + # Measure visible length of dest (strip ANSI for correct padding) + local dest_visible_len + dest_visible_len=$(ui::activity::_visible_len "$dest") + local prefix_len=$(( prefix_bytes + dest_visible_len )) + local pad_n=$(( drops_col - prefix_len )) + [[ $pad_n -lt 1 ]] && pad_n=1 + + # Build bytes display + local bytes_display="" + if [[ "$bytes_orig" -gt 0 || "$bytes_reply" -gt 0 ]]; then + bytes_display=" " + [[ "$bytes_reply" -gt 0 ]] && bytes_display+="↓$(fmt::bytes "$bytes_reply") " + [[ "$bytes_orig" -gt 0 ]] && bytes_display+="↑$(fmt::bytes "$bytes_orig")" + bytes_display="${bytes_display% }" + fi + + # Use %b for dest to interpret ANSI, keep rest as %s/%d + printf " \033[0;32m→\033[0m \033[0;32m%b\033[0m%*s \033[0;32m%${w_count}s %-5s\033[0m%s\n" \ + "$dest" "$pad_n" "" "$count" "$conn_word" "$bytes_display" +} + +# ── Table versions ────────────────────────────────────── + function ui::activity::header_table() { printf "\n %-24s %-14s %-14s %s\n" "PEER" "↓ RX" "↑ TX" "DROPS" printf " %s\n" "$(printf '─%.0s' {1..65})" @@ -41,45 +96,4 @@ function ui::activity::peer_row_table() { function ui::activity::service_row_table() { local dest_display="${1:-}" drop_count="${2:-0}" drop_word="${3:-drops}" printf " → %-30s %s %s\n" "$dest_display" "$drop_count" "$drop_word" -} - -function ui::activity::accept_row() { - local name_pad="${1:-}" bytes_in="${2:-}" bytes_out="${3:-}" \ - conns="${4:-0}" w_count="${5:-4}" - - local conn_word="conns" - [[ "$conns" -eq 1 ]] && conn_word="conn" - - local spaces - spaces=$(printf '%*s' "${#name_pad}" '') - - printf " \033[0;32m%s ↓%-10s ↑%-10s %${w_count}s %s\033[0m\n" \ - "$spaces" "$bytes_in" "$bytes_out" "$conns" "$conn_word" -} - - -function ui::activity::accept_dest_row() { - local dest="${1:-}" bytes_orig="${2:-0}" bytes_reply="${3:-0}" \ - count="${4:-0}" drops_col="${5:-40}" w_count="${6:-4}" - - local conn_word="conns" - [[ "$count" -eq 1 ]] && conn_word="conn" - - local arrow_prefix=" → " - local prefix_bytes=${#arrow_prefix} - local prefix_len=$(( prefix_bytes + ${#dest} )) - local pad_n=$(( drops_col - prefix_len )) - [[ $pad_n -lt 1 ]] && pad_n=1 - - # Only show bytes if non-zero - local bytes_display="" - if [[ "$bytes_orig" -gt 0 || "$bytes_reply" -gt 0 ]]; then - local bytes_display=" " - [[ "$bytes_orig" -gt 0 ]] && bytes_display+="↓$(fmt::bytes "$bytes_orig") " - [[ "$bytes_reply" -gt 0 ]] && bytes_display+="↑$(fmt::bytes "$bytes_reply")" - bytes_display="${bytes_display% }" # trim trailing space - fi - - printf " \033[0;32m→\033[0m \033[0;32m%s%*s %${w_count}s %-5s%s\033[0m\n" \ - "$dest" "$pad_n" "" "$count" "$conn_word" "$bytes_display" } \ No newline at end of file