diff --git a/daemon/wgctl-conntrack/cmd/root.go b/daemon/wgctl-conntrack/cmd/root.go new file mode 100644 index 0000000..a76140e --- /dev/null +++ b/daemon/wgctl-conntrack/cmd/root.go @@ -0,0 +1,33 @@ +package cmd + +import ( + "flag" + "fmt" + "os" +) + +// Flags holds CLI flags +type Flags struct { + WGDir string + Subnet string + LogFile string + Version bool +} + +const Version = "0.1.0" + +func Parse() *Flags { + f := &Flags{} + flag.StringVar(&f.WGDir, "wg-dir", "/etc/wireguard", "WireGuard base directory") + flag.StringVar(&f.Subnet, "subnet", "", "WireGuard subnet override") + flag.StringVar(&f.LogFile, "log-file", "", "Accept events log file override") + flag.BoolVar(&f.Version, "version", false, "Print version and exit") + flag.Parse() + + if f.Version { + fmt.Println(Version) + os.Exit(0) + } + + return f +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/config/config.go b/daemon/wgctl-conntrack/config/config.go new file mode 100644 index 0000000..978861f --- /dev/null +++ b/daemon/wgctl-conntrack/config/config.go @@ -0,0 +1,42 @@ +package config + +import ( + "encoding/json" + "os" +) + +// Config holds wgctl-conntrack runtime configuration +type Config struct { + WGSubnet string + DataDir string + ClientsDir string + AcceptLogFile string + ServicesFile string +} + +type wgctlJSON struct { + WireGuard struct { + Subnet string `json:"subnet"` + } `json:"wireguard"` +} + +// Load reads config from wgctl.json and applies defaults +func Load(wgDir string) (*Config, error) { + cfg := &Config{ + WGSubnet: "10.1.0.0/16", + DataDir: wgDir + "/.wgctl/data", + ClientsDir: wgDir + "/clients", + AcceptLogFile: wgDir + "/.wgctl/daemon/accept_events.log", + ServicesFile: wgDir + "/.wgctl/data/services.json", + } + + jsonFile := wgDir + "/.wgctl/config/wgctl.json" + if data, err := os.ReadFile(jsonFile); err == nil { + var wj wgctlJSON + if json.Unmarshal(data, &wj) == nil && wj.WireGuard.Subnet != "" { + cfg.WGSubnet = wj.WireGuard.Subnet + } + } + + return cfg, nil +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/conntrack/event.go b/daemon/wgctl-conntrack/conntrack/event.go new file mode 100644 index 0000000..18905c7 --- /dev/null +++ b/daemon/wgctl-conntrack/conntrack/event.go @@ -0,0 +1,29 @@ +package conntrack + +import "time" + +// EventType represents the type of traffic event +type EventType string + +const ( + EventAccept EventType = "accept" + EventExternal EventType = "external" +) + +// TrafficEvent is the normalized event written to the log +type TrafficEvent struct { + Timestamp time.Time `json:"ts"` + Peer string `json:"peer"` + SrcIP string `json:"src_ip"` + DstIP string `json:"dst_ip"` + DstPort uint16 `json:"dst_port"` + Proto string `json:"proto"` + BytesOrig uint64 `json:"bytes_orig"` + BytesReply uint64 `json:"bytes_reply"` + PacketsOrig uint64 `json:"packets_orig"` + PacketsReply uint64 `json:"packets_reply"` + DurationSec float64 `json:"duration_sec"` + Service string `json:"service,omitempty"` + Event EventType `json:"event"` + External bool `json:"external"` +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/conntrack/filter.go b/daemon/wgctl-conntrack/conntrack/filter.go new file mode 100644 index 0000000..c87e7a7 --- /dev/null +++ b/daemon/wgctl-conntrack/conntrack/filter.go @@ -0,0 +1,44 @@ +package conntrack + +import "net" + +var privateRanges = []string{ + "10.0.0.0/8", + "172.16.0.0/12", + "192.168.0.0/16", +} + +var privateCIDRs []*net.IPNet + +func init() { + for _, cidr := range privateRanges { + _, ipnet, _ := net.ParseCIDR(cidr) + privateCIDRs = append(privateCIDRs, ipnet) + } +} + +func IsWGPeer(ip net.IP, wgSubnet *net.IPNet) bool { + return wgSubnet.Contains(ip) +} + +func IsExternal(ip net.IP) bool { + for _, cidr := range privateCIDRs { + if cidr.Contains(ip) { + return false + } + } + return true +} + +func ProtoName(proto uint8) string { + switch proto { + case 6: + return "tcp" + case 17: + return "udp" + case 1: + return "icmp" + default: + return "unknown" + } +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/conntrack/subscriber.go b/daemon/wgctl-conntrack/conntrack/subscriber.go new file mode 100644 index 0000000..d6e60ef --- /dev/null +++ b/daemon/wgctl-conntrack/conntrack/subscriber.go @@ -0,0 +1,117 @@ +package conntrack + +import ( + "log" + "net" + "time" + + ct "github.com/ti-mo/conntrack" + "github.com/ti-mo/netfilter" +) + +// Resolver maps IPs and ports to peer/service names +type Resolver interface { + PeerForIP(ip net.IP) string + ServiceForDst(ip net.IP, port uint16, proto string) string +} + +// Subscriber listens for conntrack DESTROY events +type Subscriber struct { + wgSubnet *net.IPNet + events chan<- TrafficEvent + resolver Resolver +} + +func NewSubscriber(wgSubnet *net.IPNet, events chan<- TrafficEvent, resolver Resolver) *Subscriber { + return &Subscriber{wgSubnet: wgSubnet, events: events, resolver: resolver} +} + +func (s *Subscriber) Run() error { + conn, err := ct.Dial(nil) + if err != nil { + return err + } + defer conn.Close() + + evCh := make(chan ct.Event, 256) + + errCh, err := conn.Listen(evCh, 1, []netfilter.NetlinkGroup{ + netfilter.GroupCTDestroy, + }) + if err != nil { + return err + } + + log.Println("conntrack subscriber started") + + for { + select { + case ev := <-evCh: + s.processEvent(ev) + case err := <-errCh: + return err + } + } +} + +func (s *Subscriber) processEvent(ev ct.Event) { + flow := ev.Flow + if flow == nil { + return + } + + tuple := flow.TupleOrig + + // Skip IPv6 + if !tuple.IP.SourceAddress.Is4() || !tuple.IP.DestinationAddress.Is4() { + return + } + + srcBytes := tuple.IP.SourceAddress.As4() + dstBytes := tuple.IP.DestinationAddress.As4() + srcIP := net.IP(srcBytes[:]) + dstIP := net.IP(dstBytes[:]) + + // Only process WireGuard peer traffic + if !IsWGPeer(srcIP, s.wgSubnet) { + return + } + + proto := ProtoName(tuple.Proto.Protocol) + dstPort := tuple.Proto.DestinationPort + external := IsExternal(dstIP) + + peer := s.resolver.PeerForIP(srcIP) + if peer == "" { + return + } + + service := s.resolver.ServiceForDst(dstIP, dstPort, proto) + + var durationSec float64 + if flow.Timestamp.Stop.After(flow.Timestamp.Start) { + durationSec = flow.Timestamp.Stop.Sub(flow.Timestamp.Start).Seconds() + } + + eventType := EventAccept + if external { + eventType = EventExternal + } + + s.events <- TrafficEvent{ + Timestamp: time.Now().UTC(), + Peer: peer, + SrcIP: srcIP.String(), + DstIP: dstIP.String(), + DstPort: dstPort, + Proto: proto, + BytesOrig: flow.CountersOrig.Bytes, + BytesReply: flow.CountersReply.Bytes, + PacketsOrig: flow.CountersOrig.Packets, + PacketsReply: flow.CountersReply.Packets, + DurationSec: durationSec, + Service: service, + Event: eventType, + External: external, + } +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/go.mod b/daemon/wgctl-conntrack/go.mod new file mode 100644 index 0000000..c07cf4d --- /dev/null +++ b/daemon/wgctl-conntrack/go.mod @@ -0,0 +1,16 @@ +module git.krilio.net/nuno/wgctl-conntrack + +go 1.23.0 + +require ( + github.com/google/go-cmp v0.7.0 // indirect + github.com/josharian/native v1.1.0 // indirect + github.com/mdlayher/netlink v1.7.2 // indirect + github.com/mdlayher/socket v0.5.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/ti-mo/conntrack v0.6.0 // indirect + github.com/ti-mo/netfilter v0.5.3 // indirect + golang.org/x/net v0.39.0 // indirect + golang.org/x/sync v0.14.0 // indirect + golang.org/x/sys v0.33.0 // indirect +) diff --git a/daemon/wgctl-conntrack/go.sum b/daemon/wgctl-conntrack/go.sum new file mode 100644 index 0000000..fb799f8 --- /dev/null +++ b/daemon/wgctl-conntrack/go.sum @@ -0,0 +1,20 @@ +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= +github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= +github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g= +github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw= +github.com/mdlayher/socket v0.5.1 h1:VZaqt6RkGkt2OE9l3GcC6nZkqD3xKeQLyfleW/uBcos= +github.com/mdlayher/socket v0.5.1/go.mod h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/ti-mo/conntrack v0.6.0 h1:laiW2+dzKyS2u0aVr6FeRQs+v7cj4t7q+twolL/ZkjQ= +github.com/ti-mo/conntrack v0.6.0/go.mod h1:4HZrFQQLOSuBzgQNid3H/wYyyp1kfGXUYxueXjIGibo= +github.com/ti-mo/netfilter v0.5.3 h1:ikzduvnaUMwre5bhbNwWOd6bjqLMVb33vv0XXbK0xGQ= +github.com/ti-mo/netfilter v0.5.3/go.mod h1:08SyBCg6hu1qyQk4s3DjjJKNrm3RTb32nm6AzyT972E= +golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= +golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= +golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= +golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= diff --git a/daemon/wgctl-conntrack/go1.21.13.linux-amd64.tar.gz b/daemon/wgctl-conntrack/go1.21.13.linux-amd64.tar.gz new file mode 100644 index 0000000..1126dfa Binary files /dev/null and b/daemon/wgctl-conntrack/go1.21.13.linux-amd64.tar.gz differ diff --git a/daemon/wgctl-conntrack/main.go b/daemon/wgctl-conntrack/main.go new file mode 100644 index 0000000..bde9172 --- /dev/null +++ b/daemon/wgctl-conntrack/main.go @@ -0,0 +1,71 @@ +package main + +import ( + "log" + "net" + "os" + "os/signal" + "syscall" + + "git.krilio.net/nuno/wgctl-conntrack/cmd" + "git.krilio.net/nuno/wgctl-conntrack/config" + ctconn "git.krilio.net/nuno/wgctl-conntrack/conntrack" + "git.krilio.net/nuno/wgctl-conntrack/resolver" + "git.krilio.net/nuno/wgctl-conntrack/writer" +) + +func main() { + flags := cmd.Parse() + + cfg, err := config.Load(flags.WGDir) + if err != nil { + log.Fatalf("failed to load config: %v", err) + } + if flags.Subnet != "" { + cfg.WGSubnet = flags.Subnet + } + if flags.LogFile != "" { + cfg.AcceptLogFile = flags.LogFile + } + + _, wgSubnet, err := net.ParseCIDR(cfg.WGSubnet) + if err != nil { + log.Fatalf("invalid WG subnet %q: %v", cfg.WGSubnet, err) + } + + log.Printf("wgctl-conntrack v%s starting (subnet: %s, log: %s)", + cmd.Version, cfg.WGSubnet, cfg.AcceptLogFile) + + peerResolver := resolver.NewPeerResolver(flags.WGDir) + svcResolver := resolver.NewServiceResolver(cfg.ServicesFile) + + res := &combinedResolver{peers: peerResolver, services: svcResolver} + events := make(chan ctconn.TrafficEvent, 512) + + go writer.NewLogWriter(cfg.AcceptLogFile).Run(events) + + sub := ctconn.NewSubscriber(wgSubnet, events, res) + go func() { + if err := sub.Run(); err != nil { + log.Fatalf("conntrack subscriber error: %v", err) + } + }() + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + <-sig + log.Println("wgctl-conntrack shutting down") +} + +type combinedResolver struct { + peers *resolver.PeerResolver + services *resolver.ServiceResolver +} + +func (r *combinedResolver) PeerForIP(ip net.IP) string { + return r.peers.PeerForIP(ip) +} + +func (r *combinedResolver) ServiceForDst(ip net.IP, port uint16, proto string) string { + return r.services.ServiceForDst(ip, port, proto) +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/resolver/peers.go b/daemon/wgctl-conntrack/resolver/peers.go new file mode 100644 index 0000000..c9254d8 --- /dev/null +++ b/daemon/wgctl-conntrack/resolver/peers.go @@ -0,0 +1,93 @@ +package resolver + +import ( + "encoding/json" + "net" + "os" + "strings" + "sync" + "time" +) + +// PeerResolver maps WireGuard peer IPs to peer names +type PeerResolver struct { + mu sync.RWMutex + ipToName map[string]string + wgDir string +} + +func NewPeerResolver(wgDir string) *PeerResolver { + r := &PeerResolver{wgDir: wgDir, ipToName: make(map[string]string)} + r.reload() + go r.watchReload() + return r +} + +func (r *PeerResolver) PeerForIP(ip net.IP) string { + r.mu.RLock() + defer r.mu.RUnlock() + return r.ipToName[ip.String()] +} + +func (r *PeerResolver) reload() { + newMap := make(map[string]string) + + // WireGuard IPs from conf files (10.1.x.x → peer name) + clientsDir := r.wgDir + "/clients" + entries, err := os.ReadDir(clientsDir) + if err == nil { + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".conf") { + continue + } + name := strings.TrimSuffix(entry.Name(), ".conf") + if ip := parseAddressFromConf(clientsDir + "/" + entry.Name()); ip != "" { + newMap[ip] = name + } + } + } + + // External IPs from endpoint index (external IP → peer name) + indexFile := r.wgDir + "/.wgctl/data/peer-history/endpoint_index.json" + if data, err := os.ReadFile(indexFile); err == nil { + var index map[string]string + if json.Unmarshal(data, &index) == nil { + for ip, peer := range index { + newMap[ip] = peer + } + } + } + + r.mu.Lock() + r.ipToName = newMap + r.mu.Unlock() +} + +func (r *PeerResolver) watchReload() { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + for range ticker.C { + r.reload() + } +} + +func parseAddressFromConf(path string) string { + data, err := os.ReadFile(path) + if err != nil { + return "" + } + for _, line := range strings.Split(string(data), "\n") { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "Address") { + parts := strings.SplitN(line, "=", 2) + if len(parts) == 2 { + ip := strings.TrimSpace(parts[1]) + if idx := strings.Index(ip, "/"); idx != -1 { + ip = ip[:idx] + } + return ip + } + } + } + return "" +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/resolver/services.go b/daemon/wgctl-conntrack/resolver/services.go new file mode 100644 index 0000000..a524d87 --- /dev/null +++ b/daemon/wgctl-conntrack/resolver/services.go @@ -0,0 +1,93 @@ +package resolver + +import ( + "encoding/json" + "fmt" + "net" + "os" + "sync" + "time" +) + +// ServiceResolver maps IP:port:proto to service names +type ServiceResolver struct { + mu sync.RWMutex + portToSvc map[string]string + servicesFile string +} + +func NewServiceResolver(servicesFile string) *ServiceResolver { + r := &ServiceResolver{servicesFile: servicesFile, portToSvc: make(map[string]string)} + r.reload() + go r.watchReload() + return r +} + +func (r *ServiceResolver) ServiceForDst(ip net.IP, port uint16, proto string) string { + r.mu.RLock() + defer r.mu.RUnlock() + + // Try IP:port:proto first + if svc, ok := r.portToSvc[fmt.Sprintf("%s:%d:%s", ip.String(), port, proto)]; ok { + return svc + } + // Fall back to IP only + if svc, ok := r.portToSvc[ip.String()]; ok { + return svc + } + return "" +} + +func (r *ServiceResolver) reload() { + data, err := os.ReadFile(r.servicesFile) + if err != nil { + return + } + + var services map[string]interface{} + if json.Unmarshal(data, &services) != nil { + return + } + + newMap := make(map[string]string) + for name, svcRaw := range services { + svc, ok := svcRaw.(map[string]interface{}) + if !ok { + continue + } + + hosts := map[string]bool{} + if hostsRaw, ok := svc["hosts"].(map[string]interface{}); ok { + for ip := range hostsRaw { + hosts[ip] = true + newMap[ip] = name + } + } + + if portsRaw, ok := svc["ports"].([]interface{}); ok { + for _, portRaw := range portsRaw { + port, ok := portRaw.(map[string]interface{}) + if !ok { + continue + } + portNum := fmt.Sprintf("%.0f", port["port"]) + proto, _ := port["proto"].(string) + for ip := range hosts { + newMap[fmt.Sprintf("%s:%s:%s", ip, portNum, proto)] = name + } + } + } + } + + r.mu.Lock() + r.portToSvc = newMap + r.mu.Unlock() +} + +func (r *ServiceResolver) watchReload() { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + for range ticker.C { + r.reload() + } +} \ No newline at end of file diff --git a/daemon/wgctl-conntrack/wgctl-conntrack b/daemon/wgctl-conntrack/wgctl-conntrack new file mode 100755 index 0000000..8ae3c7c Binary files /dev/null and b/daemon/wgctl-conntrack/wgctl-conntrack differ diff --git a/daemon/wgctl-conntrack/writer/log.go b/daemon/wgctl-conntrack/writer/log.go new file mode 100644 index 0000000..a0b1f1f --- /dev/null +++ b/daemon/wgctl-conntrack/writer/log.go @@ -0,0 +1,47 @@ +package writer + +import ( + "encoding/json" + "log" + "os" + "sync" + + "git.krilio.net/nuno/wgctl-conntrack/conntrack" +) + +// LogWriter writes TrafficEvents as JSON lines to a file +type LogWriter struct { + path string + mu sync.Mutex +} + +func NewLogWriter(path string) *LogWriter { + return &LogWriter{path: path} +} + +func (w *LogWriter) Write(ev conntrack.TrafficEvent) error { + w.mu.Lock() + defer w.mu.Unlock() + + f, err := os.OpenFile(w.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + + data, err := json.Marshal(ev) + if err != nil { + return err + } + + _, err = f.Write(append(data, '\n')) + return err +} + +func (w *LogWriter) Run(events <-chan conntrack.TrafficEvent) { + for ev := range events { + if err := w.Write(ev); err != nil { + log.Printf("error writing event: %v", err) + } + } +} \ No newline at end of file