wgctl/daemon/wgctl-conntrack/conntrack/subscriber.go
Nuno Duque Nunes d314ba376e feat: wgctl-conntrack Go daemon
- conntrack/event.go: TrafficEvent type
- conntrack/filter.go: WG subnet filter, IsExternal, ProtoName
- conntrack/subscriber.go: netlink conntrack DESTROY subscriber
- writer/log.go: JSON line writer with mutex
- resolver/peers.go: WG IP → peer name from conf files + endpoint index
- resolver/services.go: IP:port → service name from services.json
- config/config.go: reads wgctl.json, sensible defaults
- cmd/root.go: CLI flags
- main.go: wires everything together
- DESTROY events only: full byte/packet counts per connection
- filters to WireGuard subnet, marks external traffic
2026-05-28 02:51:27 +00:00

117 lines
No EOL
2.4 KiB
Go

package conntrack
import (
"log"
"net"
"time"
ct "github.com/ti-mo/conntrack"
"github.com/ti-mo/netfilter"
)
// Resolver maps IPs and ports to peer/service names
type Resolver interface {
PeerForIP(ip net.IP) string
ServiceForDst(ip net.IP, port uint16, proto string) string
}
// Subscriber listens for conntrack DESTROY events
type Subscriber struct {
wgSubnet *net.IPNet
events chan<- TrafficEvent
resolver Resolver
}
func NewSubscriber(wgSubnet *net.IPNet, events chan<- TrafficEvent, resolver Resolver) *Subscriber {
return &Subscriber{wgSubnet: wgSubnet, events: events, resolver: resolver}
}
func (s *Subscriber) Run() error {
conn, err := ct.Dial(nil)
if err != nil {
return err
}
defer conn.Close()
evCh := make(chan ct.Event, 256)
errCh, err := conn.Listen(evCh, 1, []netfilter.NetlinkGroup{
netfilter.GroupCTDestroy,
})
if err != nil {
return err
}
log.Println("conntrack subscriber started")
for {
select {
case ev := <-evCh:
s.processEvent(ev)
case err := <-errCh:
return err
}
}
}
func (s *Subscriber) processEvent(ev ct.Event) {
flow := ev.Flow
if flow == nil {
return
}
tuple := flow.TupleOrig
// Skip IPv6
if !tuple.IP.SourceAddress.Is4() || !tuple.IP.DestinationAddress.Is4() {
return
}
srcBytes := tuple.IP.SourceAddress.As4()
dstBytes := tuple.IP.DestinationAddress.As4()
srcIP := net.IP(srcBytes[:])
dstIP := net.IP(dstBytes[:])
// Only process WireGuard peer traffic
if !IsWGPeer(srcIP, s.wgSubnet) {
return
}
proto := ProtoName(tuple.Proto.Protocol)
dstPort := tuple.Proto.DestinationPort
external := IsExternal(dstIP)
peer := s.resolver.PeerForIP(srcIP)
if peer == "" {
return
}
service := s.resolver.ServiceForDst(dstIP, dstPort, proto)
var durationSec float64
if flow.Timestamp.Stop.After(flow.Timestamp.Start) {
durationSec = flow.Timestamp.Stop.Sub(flow.Timestamp.Start).Seconds()
}
eventType := EventAccept
if external {
eventType = EventExternal
}
s.events <- TrafficEvent{
Timestamp: time.Now().UTC(),
Peer: peer,
SrcIP: srcIP.String(),
DstIP: dstIP.String(),
DstPort: dstPort,
Proto: proto,
BytesOrig: flow.CountersOrig.Bytes,
BytesReply: flow.CountersReply.Bytes,
PacketsOrig: flow.CountersOrig.Packets,
PacketsReply: flow.CountersReply.Packets,
DurationSec: durationSec,
Service: service,
Event: eventType,
External: external,
}
}