Skip to content

Commit

Permalink
Enhance netflow logger to include status recorder and update related …
Browse files Browse the repository at this point in the history
…tests
  • Loading branch information
hakansa committed Mar 5, 2025
1 parent 9151f62 commit 476c2e1
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 27 deletions.
2 changes: 1 addition & 1 deletion client/firewall/uspfilter/conntrack/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

var logger = log.NewFromLogrus(logrus.StandardLogger())
var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}).GetLogger()
var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}, nil).GetLogger()

// Memory pressure tests
func BenchmarkMemoryPressure(b *testing.B) {
Expand Down
2 changes: 1 addition & 1 deletion client/firewall/uspfilter/uspfilter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

var logger = log.NewFromLogrus(logrus.StandardLogger())
var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}).GetLogger()
var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}, nil).GetLogger()

type IFaceMock struct {
SetFilterFunc func(device.PacketFilter) error
Expand Down
2 changes: 1 addition & 1 deletion client/internal/acl/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
mgmProto "github.com/netbirdio/netbird/management/proto"
)

var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}).GetLogger()
var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}, nil).GetLogger()

func TestDefaultManager(t *testing.T) {
networkMap := &mgmProto.NetworkMap{
Expand Down
2 changes: 1 addition & 1 deletion client/internal/dns/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/netbirdio/netbird/formatter"
)

var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}).GetLogger()
var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}, nil).GetLogger()

type mocWGIface struct {
filter device.PacketFilter
Expand Down
2 changes: 1 addition & 1 deletion client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (e *Engine) Start() error {

// start flow manager right after interface creation
publicKey := e.config.WgPrivateKey.PublicKey()
e.flowManager = netflow.NewManager(e.ctx, e.wgInterface, publicKey[:])
e.flowManager = netflow.NewManager(e.ctx, e.wgInterface, publicKey[:], e.statusRecorder)

if e.config.RosenpassEnabled {
log.Infof("rosenpass is enabled")
Expand Down
14 changes: 10 additions & 4 deletions client/internal/netflow/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/netbirdio/netbird/client/internal/netflow/store"
"github.com/netbirdio/netbird/client/internal/netflow/types"
"github.com/netbirdio/netbird/client/internal/peer"
)

type rcvChan chan *types.EventFields
Expand All @@ -21,15 +22,17 @@ type Logger struct {
enabled atomic.Bool
rcvChan atomic.Pointer[rcvChan]
cancelReceiver context.CancelFunc
statusRecorder *peer.Status
Store types.Store
}

func New(ctx context.Context) *Logger {
func New(ctx context.Context, statusRecorder *peer.Status) *Logger {
ctx, cancel := context.WithCancel(ctx)
return &Logger{
ctx: ctx,
cancel: cancel,
Store: store.NewMemoryStore(),
ctx: ctx,
cancel: cancel,
statusRecorder: statusRecorder,
Store: store.NewMemoryStore(),
}
}

Expand Down Expand Up @@ -79,6 +82,9 @@ func (l *Logger) startReceiver() {
EventFields: *eventFields,
Timestamp: time.Now(),
}
srcResId, dstResId := l.statusRecorder.CheckRoutes(event.SourceIP, event.DestIP)
event.SourceResourceID = []byte(srcResId)
event.DestResourceID = []byte(dstResId)
l.Store.StoreEvent(&event)
}
}
Expand Down
2 changes: 1 addition & 1 deletion client/internal/netflow/logger/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func TestStore(t *testing.T) {
logger := logger.New(context.Background())
logger := logger.New(context.Background(), nil)
logger.Enable()

event := types.EventFields{
Expand Down
5 changes: 3 additions & 2 deletions client/internal/netflow/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/netbirdio/netbird/client/internal/netflow/conntrack"
"github.com/netbirdio/netbird/client/internal/netflow/logger"
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/flow/client"
"github.com/netbirdio/netbird/flow/proto"
)
Expand All @@ -29,8 +30,8 @@ type Manager struct {
}

// NewManager creates a new netflow manager
func NewManager(ctx context.Context, iface nftypes.IFaceMapper, publicKey []byte) *Manager {
flowLogger := logger.New(ctx)
func NewManager(ctx context.Context, iface nftypes.IFaceMapper, publicKey []byte, statusRecorder *peer.Status) *Manager {
flowLogger := logger.New(ctx, statusRecorder)

var ct nftypes.ConnTracker
if runtime.GOOS == "linux" && iface != nil && !iface.IsUserspaceBind() {
Expand Down
32 changes: 17 additions & 15 deletions client/internal/netflow/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,23 @@ type Event struct {
}

type EventFields struct {
FlowID uuid.UUID
Type Type
RuleID []byte
Direction Direction
Protocol Protocol
SourceIP netip.Addr
DestIP netip.Addr
SourcePort uint16
DestPort uint16
ICMPType uint8
ICMPCode uint8
RxPackets uint64
TxPackets uint64
RxBytes uint64
TxBytes uint64
FlowID uuid.UUID
Type Type
RuleID []byte
Direction Direction
Protocol Protocol
SourceIP netip.Addr
DestIP netip.Addr
SourceResourceID []byte
DestResourceID []byte
SourcePort uint16
DestPort uint16
ICMPType uint8
ICMPCode uint8
RxPackets uint64
TxPackets uint64
RxBytes uint64
TxBytes uint64
}

type FlowConfig struct {
Expand Down
50 changes: 50 additions & 0 deletions client/internal/peer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,56 @@ func (d *Status) RemovePeerStateRoute(peer string, route string) error {
return nil
}

// CheckRoutes checks for both the source and destination IP addresses in the local peer routes first,
// and then in the remote peers' routes. It returns the IP address of the matching peer (as a string)
// for source and destination. If a match is found in local peer routes, the local peer IP is returned;
// otherwise, the remote peer IP is returned. If no match is found, an empty string is returned for that IP.
func (d *Status) CheckRoutes(src, dst netip.Addr) (srcMatchedPeerIP string, dstMatchedPeerIP string) {
// check local peer routes.
for route := range d.localPeer.Routes {
prefix, err := netip.ParsePrefix(route)
if err != nil {
log.Debugf("failed to parse route %s: %v", route, err)
continue
}
if srcMatchedPeerIP == "" && prefix.Contains(src) {
srcMatchedPeerIP = d.localPeer.IP
}
if dstMatchedPeerIP == "" && prefix.Contains(dst) {
dstMatchedPeerIP = d.localPeer.IP
}
// early return if both source and destination are matched.
if srcMatchedPeerIP != "" && dstMatchedPeerIP != "" {
return srcMatchedPeerIP, dstMatchedPeerIP
}
}

// if one or both addresses were not found in local routes, check remote peers.
d.mux.Lock()
defer d.mux.Unlock()
for _, peer := range d.peers {
peerRoutes := peer.GetRoutes()
for route := range peerRoutes {
prefix, err := netip.ParsePrefix(route)
if err != nil {
log.Debugf("failed to parse route %s: %v", route, err)
continue
}
if srcMatchedPeerIP == "" && prefix.Contains(src) {
srcMatchedPeerIP = peer.IP
}
if dstMatchedPeerIP == "" && prefix.Contains(dst) {
dstMatchedPeerIP = peer.IP
}
// early return if both addresses are matched.
if srcMatchedPeerIP != "" && dstMatchedPeerIP != "" {
return srcMatchedPeerIP, dstMatchedPeerIP
}
}
}
return srcMatchedPeerIP, dstMatchedPeerIP
}

func (d *Status) UpdatePeerICEState(receivedState State) error {
d.mux.Lock()
defer d.mux.Unlock()
Expand Down

0 comments on commit 476c2e1

Please sign in to comment.