From 476c2e14e8f3adf4f7c64281d8a985742864c8d4 Mon Sep 17 00:00:00 2001 From: Hakan Sariman Date: Wed, 5 Mar 2025 19:42:02 +0300 Subject: [PATCH] Enhance netflow logger to include status recorder and update related tests --- .../uspfilter/conntrack/common_test.go | 2 +- client/firewall/uspfilter/uspfilter_test.go | 2 +- client/internal/acl/manager_test.go | 2 +- client/internal/dns/server_test.go | 2 +- client/internal/engine.go | 2 +- client/internal/netflow/logger/logger.go | 14 ++++-- client/internal/netflow/logger/logger_test.go | 2 +- client/internal/netflow/manager.go | 5 +- client/internal/netflow/types/types.go | 32 ++++++------ client/internal/peer/status.go | 50 +++++++++++++++++++ 10 files changed, 86 insertions(+), 27 deletions(-) diff --git a/client/firewall/uspfilter/conntrack/common_test.go b/client/firewall/uspfilter/conntrack/common_test.go index 6d1ed589061..898c9cb281d 100644 --- a/client/firewall/uspfilter/conntrack/common_test.go +++ b/client/firewall/uspfilter/conntrack/common_test.go @@ -12,7 +12,7 @@ import ( ) var logger = log.NewFromLogrus(logrus.StandardLogger()) -var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}).GetLogger() +var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}, nil).GetLogger() // Memory pressure tests func BenchmarkMemoryPressure(b *testing.B) { diff --git a/client/firewall/uspfilter/uspfilter_test.go b/client/firewall/uspfilter/uspfilter_test.go index e0e2b86c76f..f90057587e4 100644 --- a/client/firewall/uspfilter/uspfilter_test.go +++ b/client/firewall/uspfilter/uspfilter_test.go @@ -24,7 +24,7 @@ import ( ) var logger = log.NewFromLogrus(logrus.StandardLogger()) -var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}).GetLogger() +var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}, nil).GetLogger() type IFaceMock struct { SetFilterFunc func(device.PacketFilter) error diff --git a/client/internal/acl/manager_test.go b/client/internal/acl/manager_test.go index 82a136e9cfb..29a3df33630 100644 --- a/client/internal/acl/manager_test.go +++ b/client/internal/acl/manager_test.go @@ -15,7 +15,7 @@ import ( mgmProto "github.com/netbirdio/netbird/management/proto" ) -var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}).GetLogger() +var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}, nil).GetLogger() func TestDefaultManager(t *testing.T) { networkMap := &mgmProto.NetworkMap{ diff --git a/client/internal/dns/server_test.go b/client/internal/dns/server_test.go index 7c75f6bed8b..86c455bc4fa 100644 --- a/client/internal/dns/server_test.go +++ b/client/internal/dns/server_test.go @@ -30,7 +30,7 @@ import ( "github.com/netbirdio/netbird/formatter" ) -var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}).GetLogger() +var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}, nil).GetLogger() type mocWGIface struct { filter device.PacketFilter diff --git a/client/internal/engine.go b/client/internal/engine.go index 9e5d8a42d49..76ce919a3fc 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -353,7 +353,7 @@ func (e *Engine) Start() error { // start flow manager right after interface creation publicKey := e.config.WgPrivateKey.PublicKey() - e.flowManager = netflow.NewManager(e.ctx, e.wgInterface, publicKey[:]) + e.flowManager = netflow.NewManager(e.ctx, e.wgInterface, publicKey[:], e.statusRecorder) if e.config.RosenpassEnabled { log.Infof("rosenpass is enabled") diff --git a/client/internal/netflow/logger/logger.go b/client/internal/netflow/logger/logger.go index 1e23c1dceb6..c3fcf7eb7a5 100644 --- a/client/internal/netflow/logger/logger.go +++ b/client/internal/netflow/logger/logger.go @@ -11,6 +11,7 @@ import ( "github.com/netbirdio/netbird/client/internal/netflow/store" "github.com/netbirdio/netbird/client/internal/netflow/types" + "github.com/netbirdio/netbird/client/internal/peer" ) type rcvChan chan *types.EventFields @@ -21,15 +22,17 @@ type Logger struct { enabled atomic.Bool rcvChan atomic.Pointer[rcvChan] cancelReceiver context.CancelFunc + statusRecorder *peer.Status Store types.Store } -func New(ctx context.Context) *Logger { +func New(ctx context.Context, statusRecorder *peer.Status) *Logger { ctx, cancel := context.WithCancel(ctx) return &Logger{ - ctx: ctx, - cancel: cancel, - Store: store.NewMemoryStore(), + ctx: ctx, + cancel: cancel, + statusRecorder: statusRecorder, + Store: store.NewMemoryStore(), } } @@ -79,6 +82,9 @@ func (l *Logger) startReceiver() { EventFields: *eventFields, Timestamp: time.Now(), } + srcResId, dstResId := l.statusRecorder.CheckRoutes(event.SourceIP, event.DestIP) + event.SourceResourceID = []byte(srcResId) + event.DestResourceID = []byte(dstResId) l.Store.StoreEvent(&event) } } diff --git a/client/internal/netflow/logger/logger_test.go b/client/internal/netflow/logger/logger_test.go index e986118ec5a..94bacc0944f 100644 --- a/client/internal/netflow/logger/logger_test.go +++ b/client/internal/netflow/logger/logger_test.go @@ -12,7 +12,7 @@ import ( ) func TestStore(t *testing.T) { - logger := logger.New(context.Background()) + logger := logger.New(context.Background(), nil) logger.Enable() event := types.EventFields{ diff --git a/client/internal/netflow/manager.go b/client/internal/netflow/manager.go index 8ab81f4ff34..62abddbab94 100644 --- a/client/internal/netflow/manager.go +++ b/client/internal/netflow/manager.go @@ -13,6 +13,7 @@ import ( "github.com/netbirdio/netbird/client/internal/netflow/conntrack" "github.com/netbirdio/netbird/client/internal/netflow/logger" nftypes "github.com/netbirdio/netbird/client/internal/netflow/types" + "github.com/netbirdio/netbird/client/internal/peer" "github.com/netbirdio/netbird/flow/client" "github.com/netbirdio/netbird/flow/proto" ) @@ -29,8 +30,8 @@ type Manager struct { } // NewManager creates a new netflow manager -func NewManager(ctx context.Context, iface nftypes.IFaceMapper, publicKey []byte) *Manager { - flowLogger := logger.New(ctx) +func NewManager(ctx context.Context, iface nftypes.IFaceMapper, publicKey []byte, statusRecorder *peer.Status) *Manager { + flowLogger := logger.New(ctx, statusRecorder) var ct nftypes.ConnTracker if runtime.GOOS == "linux" && iface != nil && !iface.IsUserspaceBind() { diff --git a/client/internal/netflow/types/types.go b/client/internal/netflow/types/types.go index a0f2eb95e2d..112b82a55b5 100644 --- a/client/internal/netflow/types/types.go +++ b/client/internal/netflow/types/types.go @@ -67,21 +67,23 @@ type Event struct { } type EventFields struct { - FlowID uuid.UUID - Type Type - RuleID []byte - Direction Direction - Protocol Protocol - SourceIP netip.Addr - DestIP netip.Addr - SourcePort uint16 - DestPort uint16 - ICMPType uint8 - ICMPCode uint8 - RxPackets uint64 - TxPackets uint64 - RxBytes uint64 - TxBytes uint64 + FlowID uuid.UUID + Type Type + RuleID []byte + Direction Direction + Protocol Protocol + SourceIP netip.Addr + DestIP netip.Addr + SourceResourceID []byte + DestResourceID []byte + SourcePort uint16 + DestPort uint16 + ICMPType uint8 + ICMPCode uint8 + RxPackets uint64 + TxPackets uint64 + RxBytes uint64 + TxBytes uint64 } type FlowConfig struct { diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index ee884a76eaf..bd576287be9 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -345,6 +345,56 @@ func (d *Status) RemovePeerStateRoute(peer string, route string) error { return nil } +// CheckRoutes checks for both the source and destination IP addresses in the local peer routes first, +// and then in the remote peers' routes. It returns the IP address of the matching peer (as a string) +// for source and destination. If a match is found in local peer routes, the local peer IP is returned; +// otherwise, the remote peer IP is returned. If no match is found, an empty string is returned for that IP. +func (d *Status) CheckRoutes(src, dst netip.Addr) (srcMatchedPeerIP string, dstMatchedPeerIP string) { + // check local peer routes. + for route := range d.localPeer.Routes { + prefix, err := netip.ParsePrefix(route) + if err != nil { + log.Debugf("failed to parse route %s: %v", route, err) + continue + } + if srcMatchedPeerIP == "" && prefix.Contains(src) { + srcMatchedPeerIP = d.localPeer.IP + } + if dstMatchedPeerIP == "" && prefix.Contains(dst) { + dstMatchedPeerIP = d.localPeer.IP + } + // early return if both source and destination are matched. + if srcMatchedPeerIP != "" && dstMatchedPeerIP != "" { + return srcMatchedPeerIP, dstMatchedPeerIP + } + } + + // if one or both addresses were not found in local routes, check remote peers. + d.mux.Lock() + defer d.mux.Unlock() + for _, peer := range d.peers { + peerRoutes := peer.GetRoutes() + for route := range peerRoutes { + prefix, err := netip.ParsePrefix(route) + if err != nil { + log.Debugf("failed to parse route %s: %v", route, err) + continue + } + if srcMatchedPeerIP == "" && prefix.Contains(src) { + srcMatchedPeerIP = peer.IP + } + if dstMatchedPeerIP == "" && prefix.Contains(dst) { + dstMatchedPeerIP = peer.IP + } + // early return if both addresses are matched. + if srcMatchedPeerIP != "" && dstMatchedPeerIP != "" { + return srcMatchedPeerIP, dstMatchedPeerIP + } + } + } + return srcMatchedPeerIP, dstMatchedPeerIP +} + func (d *Status) UpdatePeerICEState(receivedState State) error { d.mux.Lock() defer d.mux.Unlock()