diff --git a/commands/activity.command.sh b/commands/activity.command.sh new file mode 100644 index 0000000..c9537e1 --- /dev/null +++ b/commands/activity.command.sh @@ -0,0 +1,215 @@ +#!/usr/bin/env bash +# activity.command.sh — WireGuard activity snapshot + +# ============================================ +# Lifecycle +# ============================================ + +function cmd::activity::on_load() { + load_module net + + flag::register --peer + flag::register --service + flag::register --ip + flag::register --hours + flag::register --type + flag::register --dropped +} + +# ============================================ +# Help +# ============================================ + +function cmd::activity::help() { + cat < Filter by peer name + --service Filter by service (e.g. truenas, proxmox:web-ui) + --ip Filter by destination IP + --hours Time window in hours (default: 24, 0 = all time) + --type Filter by device type (combined with --peer) + --dropped Show only peers with at least one drop + +Examples: + wgctl activity + wgctl activity --dropped + wgctl activity --peer phone-nuno + wgctl activity --service truenas + wgctl activity --hours 0 + wgctl activity --ip 10.0.0.101 +EOF +} + +# ============================================ +# Run +# ============================================ + +function cmd::activity::run() { + local filter_peer="" filter_service="" filter_ip="" filter_type="" + local hours=24 dropped_only=false + + while [[ $# -gt 0 ]]; do + case "$1" in + --peer) filter_peer="$2"; shift 2 ;; + --service) filter_service="$2"; shift 2 ;; + --ip) filter_ip="$2"; shift 2 ;; + --type) filter_type="$2"; shift 2 ;; + --hours) hours="$2"; shift 2 ;; + --dropped) dropped_only=true; shift ;; + --help) cmd::activity::help; return ;; + *) + log::error "Unknown flag: $1" + cmd::activity::help + return 1 + ;; + esac + done + + # Resolve peer name if type provided + if [[ -n "$filter_peer" && -n "$filter_type" ]]; then + filter_peer=$(peers::resolve_and_require "$filter_peer" "$filter_type") || return 1 + fi + + # Resolve --service to IP + local service_ip="" + if [[ -n "$filter_service" ]]; then + service_ip=$(net::resolve "$filter_service" 2>/dev/null | head -1 | cut -d: -f1) || true + if [[ -z "$service_ip" ]]; then + log::error "Service not found: ${filter_service}" + return 1 + fi + fi + [[ -n "$filter_ip" ]] && service_ip="$filter_ip" + + # Fetch aggregated data + local data + data=$(json::activity_aggregate \ + "$(ctx::fw_events_log)" \ + "$(ctx::events_log)" \ + "$(config::interface)" \ + "$(ctx::net)" \ + "$(ctx::clients)" \ + "$(ctx::meta)" \ + "$hours" \ + "$filter_peer" \ + "$service_ip" 2>/dev/null) + + if [[ -z "$data" ]]; then + log::wg_warning "No activity data found" + return 0 + fi + + # Measure w_peer and w_drops from data + local w_peer=16 w_drops=1 + while IFS='|' read -r type rest; do + case "$type" in + peer) + local name drops + name=$(echo "$rest" | cut -d'|' -f1) + drops=$(echo "$rest" | cut -d'|' -f4) + (( ${#name} > w_peer )) && w_peer=${#name} + (( ${#drops} > w_drops )) && w_drops=${#drops} + ;; + service) + local count + count=$(echo "$rest" | cut -d'|' -f3) + (( ${#count} > w_drops )) && w_drops=${#count} + ;; + esac + done <<< "$data" + + (( w_peer += 2 )) + + # Compute exact column where drop count starts on peer row: + # " " (2) + name (w_peer) + " ↓" (3) + rx (10) + " ↑" (3) + tx (10) + " " (2) + # Note: ↓ and ↑ are multi-byte (3 bytes) but display as 1 char — account for 2 extra bytes each + # Visible: 2 + w_peer + 2+1 + 10 + 2+1 + 10 + 2 = w_peer + 30 + local drops_col=$(( w_peer + 30 )) + + local hours_display="${hours}h" + [[ "$hours" == "0" ]] && hours_display="all time" + + log::section "Activity Monitor (last ${hours_display})" + echo "" + + local first_peer=true skip_peer=false + + while IFS='|' read -r record_type rest; do + case "$record_type" in + peer) + local name rx tx drops + IFS='|' read -r name rx tx drops <<< "$rest" + + skip_peer=false + if $dropped_only && [[ "$drops" -eq 0 ]]; then + skip_peer=true + continue + fi + + $first_peer || echo "" + first_peer=false + + local rx_fmt tx_fmt + rx_fmt=$(cmd::activity::_fmt_bytes "$rx") + tx_fmt=$(cmd::activity::_fmt_bytes "$tx") + + local name_pad rx_pad tx_pad + name_pad=$(printf "%-${w_peer}s" "$name") + rx_pad=$(printf "%-10s" "$rx_fmt") + tx_pad=$(printf "%-10s" "$tx_fmt") + + local drop_word="drops" + [[ "$drops" -eq 1 ]] && drop_word="drop" + printf " \033[1m%s\033[0m \033[2m↓\033[0m%s \033[2m↑\033[0m%s %${w_drops}s %s\n" \ + "$name_pad" "$rx_pad" "$tx_pad" "$drops" "$drop_word" + ;; + + service) + $skip_peer && continue + + local peer dest_display drop_count + IFS='|' read -r peer dest_display drop_count <<< "$rest" + + # Compute padding to align drop count with peer drop column + # Service row visible prefix: " → " (6) + ${#dest_display} + local arrow_prefix=" → " + local prefix_bytes=${#arrow_prefix} # = 8 due to → being 3 bytes + local prefix_len=$(( prefix_bytes + ${#dest_display} )) + # local prefix_len=$(( 6 + ${#dest_display} )) + local pad_n=$(( drops_col - prefix_len )) + [[ $pad_n -lt 1 ]] && pad_n=1 + + local svc_drop_word="drops" + [[ "$drop_count" -eq 1 ]] && svc_drop_word="drop" + printf " \033[2m→\033[0m %s%*s %${w_drops}s %s\n" \ + "$dest_display" "$pad_n" "" "$drop_count" "$svc_drop_word" + ;; + esac + done <<< "$data" + + echo "" +} + +# ============================================ +# Helpers +# ============================================ + +function cmd::activity::_fmt_bytes() { + local bytes="${1:-0}" + if (( bytes == 0 )); then + printf "—" + elif (( bytes >= 1073741824 )); then + printf "%dGB" $(( bytes / 1073741824 )) + elif (( bytes >= 1048576 )); then + printf "%dMB" $(( bytes / 1048576 )) + elif (( bytes >= 1024 )); then + printf "%dKB" $(( bytes / 1024 )) + else + printf "%dB" "$bytes" + fi +} \ No newline at end of file diff --git a/core/context.sh b/core/context.sh index 911ff5c..e6f4d5c 100644 --- a/core/context.sh +++ b/core/context.sh @@ -48,6 +48,7 @@ function ctx::identities() { echo "${_CTX_IDENTITY}"; } function ctx::subnets() { echo "${_CTX_DATA}/subnets.json"; } function ctx::events_log() { echo "$(ctx::daemon)/events.log"; } function ctx::fw_events_log() { echo "$(ctx::daemon)/fw_events.log"; } +function ctx::json_helper() { echo "${_CTX_CORE}/json_helper.py"; } # ============================================ # Path Helpers diff --git a/core/json.sh b/core/json.sh index 3dc474c..b887a6a 100644 --- a/core/json.sh +++ b/core/json.sh @@ -97,7 +97,7 @@ function json::identity_remove_rule() { python3 "$JSON_HELPER" identity_remove function json::identity_clear_rules() { python3 "$JSON_HELPER" identity_clear_rules "$@" 0: + cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) + + # Build ip -> peer name map + ip_to_peer = {} + for conf in sorted(glob.glob(f"{clients_dir}/*.conf")): + name = os.path.basename(conf).replace('.conf', '') + try: + with open(conf) as f: + for line in f: + if line.startswith('Address'): + ip = line.split('=')[1].strip().split('/')[0] + ip_to_peer[ip] = name + break + except Exception: + continue + + # Build pubkey -> peer name map + pubkey_to_peer = {} + for kf in glob.glob(f"{clients_dir}/*_public.key"): + name = os.path.basename(kf).replace('_public.key', '') + try: + with open(kf) as f: + key = f.read().strip() + if key: + pubkey_to_peer[key] = name + except Exception: + continue + + # Get WireGuard transfer totals + peer_rx = defaultdict(int) + peer_tx = defaultdict(int) + try: + result = subprocess.run( + ['wg', 'show', wg_interface, 'transfer'], + capture_output=True, text=True + ) + for line in result.stdout.strip().splitlines(): + parts = line.split() + if len(parts) >= 3: + pubkey, rx, tx = parts[0], int(parts[1]), int(parts[2]) + peer = pubkey_to_peer.get(pubkey) + if peer: + peer_rx[peer] += rx + peer_tx[peer] += tx + except Exception: + pass + + # Load net services for reverse lookup + net_data = {} + if os.path.exists(net_file): + try: + with open(net_file) as f: + net_data = json.load(f) + except Exception: + pass + + def reverse_lookup(dest_ip, dest_port, proto): + for svc_name, svc in net_data.items(): + if not isinstance(svc, dict): + continue + if svc.get('ip', '') != dest_ip: + continue + ports = svc.get('ports', {}) + if dest_port: + for port_name, port_def in ports.items(): + if not isinstance(port_def, dict): + continue + if (str(port_def.get('port', '')) == dest_port and + port_def.get('proto', 'tcp') == proto): + return f"{svc_name}:{port_name}" + return svc_name + else: + return svc_name + return '' + + def make_dest_display(dest_ip, dest_port, proto, svc_name): + if svc_name: + return svc_name + if dest_port: + display = f"{dest_ip}:{dest_port}" + else: + display = dest_ip + if proto and proto not in ('tcp', '6'): + display += f" ({proto})" + return display + + proto_map = {1: 'icmp', 6: 'tcp', 17: 'udp'} + + # Parse fw_events for drops + peer_drops = defaultdict(int) + service_drops = defaultdict(lambda: defaultdict(int)) + + if os.path.exists(fw_file): + try: + with open(fw_file) as f: + for line in f: + line = line.strip() + if not line: + continue + try: + ev = json.loads(line) + if cutoff: + ts_str = ev.get('timestamp', '') + try: + ts = datetime.fromisoformat(ts_str) + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + if ts < cutoff: + continue + except Exception: + pass + + src_ip = ev.get('src_ip', '') + if not src_ip: + continue + + dest_ip = ev.get('dest_ip', '') + dest_port = str(ev.get('dest_port', '')) + proto_num = ev.get('ip.protocol', 0) + proto = proto_map.get(int(proto_num), str(proto_num)) + + peer = ip_to_peer.get(src_ip) + if not peer: + continue + + if filter_peer and peer != filter_peer: + continue + if filter_service_ip and dest_ip != filter_service_ip: + continue + + svc_name = reverse_lookup(dest_ip, dest_port, proto) + dest_display = make_dest_display(dest_ip, dest_port, proto, svc_name) + + peer_drops[peer] += 1 + service_drops[peer][dest_display] += 1 + + except Exception: + continue + except Exception: + pass + + # Collect peers with any activity + all_peers = set() + all_peers.update(k for k in peer_rx if peer_rx[k] > 0) + all_peers.update(k for k in peer_tx if peer_tx[k] > 0) + all_peers.update(peer_drops.keys()) + if filter_peer: + all_peers = {p for p in all_peers if p == filter_peer} + + for peer in sorted(all_peers): + rx = peer_rx.get(peer, 0) + tx = peer_tx.get(peer, 0) + drops = peer_drops.get(peer, 0) + + print(f"peer|{peer}|{rx}|{tx}|{drops}") + + svc_map = service_drops.get(peer, {}) + for dest_display, count in sorted(svc_map.items(), key=lambda x: -x[1]): + print(f"service|{peer}|{dest_display}|{count}") + commands = { 'get': lambda args: get(args[0], args[1]), 'set': lambda args: set_key(args[0], args[1], args[2]), @@ -2514,6 +2690,12 @@ commands = { 'identity_remove_rule': lambda args: identity_remove_rule(args[0], args[1]), 'identity_clear_rules': lambda args: identity_clear_rules(args[0]), 'identity_has_rule': lambda args: identity_has_rule(args[0], args[1]), + 'activity_aggregate': lambda args: activity_aggregate( + args[0], args[1], args[2], args[3], args[4], args[5], + args[6] if len(args) > 6 else '24', + args[7] if len(args) > 7 else '', + args[8] if len(args) > 8 else '' + ), } if __name__ == '__main__':