- conntrack/event.go: TrafficEvent type - conntrack/filter.go: WG subnet filter, IsExternal, ProtoName - conntrack/subscriber.go: netlink conntrack DESTROY subscriber - writer/log.go: JSON line writer with mutex - resolver/peers.go: WG IP → peer name from conf files + endpoint index - resolver/services.go: IP:port → service name from services.json - config/config.go: reads wgctl.json, sensible defaults - cmd/root.go: CLI flags - main.go: wires everything together - DESTROY events only: full byte/packet counts per connection - filters to WireGuard subnet, marks external traffic
117 lines
No EOL
2.4 KiB
Go
117 lines
No EOL
2.4 KiB
Go
package conntrack
|
|
|
|
import (
|
|
"log"
|
|
"net"
|
|
"time"
|
|
|
|
ct "github.com/ti-mo/conntrack"
|
|
"github.com/ti-mo/netfilter"
|
|
)
|
|
|
|
// Resolver maps IPs and ports to peer/service names
|
|
type Resolver interface {
|
|
PeerForIP(ip net.IP) string
|
|
ServiceForDst(ip net.IP, port uint16, proto string) string
|
|
}
|
|
|
|
// Subscriber listens for conntrack DESTROY events
|
|
type Subscriber struct {
|
|
wgSubnet *net.IPNet
|
|
events chan<- TrafficEvent
|
|
resolver Resolver
|
|
}
|
|
|
|
func NewSubscriber(wgSubnet *net.IPNet, events chan<- TrafficEvent, resolver Resolver) *Subscriber {
|
|
return &Subscriber{wgSubnet: wgSubnet, events: events, resolver: resolver}
|
|
}
|
|
|
|
func (s *Subscriber) Run() error {
|
|
conn, err := ct.Dial(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
|
|
evCh := make(chan ct.Event, 256)
|
|
|
|
errCh, err := conn.Listen(evCh, 1, []netfilter.NetlinkGroup{
|
|
netfilter.GroupCTDestroy,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Println("conntrack subscriber started")
|
|
|
|
for {
|
|
select {
|
|
case ev := <-evCh:
|
|
s.processEvent(ev)
|
|
case err := <-errCh:
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Subscriber) processEvent(ev ct.Event) {
|
|
flow := ev.Flow
|
|
if flow == nil {
|
|
return
|
|
}
|
|
|
|
tuple := flow.TupleOrig
|
|
|
|
// Skip IPv6
|
|
if !tuple.IP.SourceAddress.Is4() || !tuple.IP.DestinationAddress.Is4() {
|
|
return
|
|
}
|
|
|
|
srcBytes := tuple.IP.SourceAddress.As4()
|
|
dstBytes := tuple.IP.DestinationAddress.As4()
|
|
srcIP := net.IP(srcBytes[:])
|
|
dstIP := net.IP(dstBytes[:])
|
|
|
|
// Only process WireGuard peer traffic
|
|
if !IsWGPeer(srcIP, s.wgSubnet) {
|
|
return
|
|
}
|
|
|
|
proto := ProtoName(tuple.Proto.Protocol)
|
|
dstPort := tuple.Proto.DestinationPort
|
|
external := IsExternal(dstIP)
|
|
|
|
peer := s.resolver.PeerForIP(srcIP)
|
|
if peer == "" {
|
|
return
|
|
}
|
|
|
|
service := s.resolver.ServiceForDst(dstIP, dstPort, proto)
|
|
|
|
var durationSec float64
|
|
if flow.Timestamp.Stop.After(flow.Timestamp.Start) {
|
|
durationSec = flow.Timestamp.Stop.Sub(flow.Timestamp.Start).Seconds()
|
|
}
|
|
|
|
eventType := EventAccept
|
|
if external {
|
|
eventType = EventExternal
|
|
}
|
|
|
|
s.events <- TrafficEvent{
|
|
Timestamp: time.Now().UTC(),
|
|
Peer: peer,
|
|
SrcIP: srcIP.String(),
|
|
DstIP: dstIP.String(),
|
|
DstPort: dstPort,
|
|
Proto: proto,
|
|
BytesOrig: flow.CountersOrig.Bytes,
|
|
BytesReply: flow.CountersReply.Bytes,
|
|
PacketsOrig: flow.CountersOrig.Packets,
|
|
PacketsReply: flow.CountersReply.Packets,
|
|
DurationSec: durationSec,
|
|
Service: service,
|
|
Event: eventType,
|
|
External: external,
|
|
}
|
|
} |