From ac6a091eac60b15b037a5b3f46e583dc2e5a7438 Mon Sep 17 00:00:00 2001 From: Sascha Steinbiss Date: Wed, 3 Jul 2024 11:05:45 +0200 Subject: [PATCH] implement full flow aggregation --- CHANGELOG.md | 6 ++ cmd/fever/cmds/run.go | 4 +- fever.yaml | 4 +- processing/unicorn_aggregator.go | 6 +- processing/unicorn_aggregator_test.go | 89 +++++++++++++++++++++++---- 5 files changed, 94 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 279db79..00f3cba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to FEVER will be documented in this file. +## [1.3.6] - 2024-07-03 + +### Added +- Add support for sending aggregations from all flows, not just TCP + bidirectional ones. + ## [1.3.5] - 2023-03-27 ### Fixed diff --git a/cmd/fever/cmds/run.go b/cmd/fever/cmds/run.go index b86838b..80632ca 100644 --- a/cmd/fever/cmds/run.go +++ b/cmd/fever/cmds/run.go @@ -409,7 +409,9 @@ func mainfunc(cmd *cobra.Command, args []string) { "state": "disabled", }).Info("compression of flow stats") } - ua := processing.MakeUnicornAggregator(submitter, unicornSleep, dummyMode) + allFlows := viper.GetBool("flowreport.all") + ua := processing.MakeUnicornAggregator(submitter, unicornSleep, dummyMode, + allFlows) testSrcIP := viper.GetString("flowreport.testdata-srcip") testDestIP := viper.GetString("flowreport.testdata-destip") testDestPort := viper.GetInt64("flowreport.testdata-destport") diff --git a/fever.yaml b/fever.yaml index 5e9fba2..2295267 100644 --- a/fever.yaml +++ b/fever.yaml @@ -85,6 +85,8 @@ flowreport: #testdata-srcip: 0.0.0.1 #testdata-destip: 0.0.0.2 #testdata-destport: 99999 + # Set to true to count _all_ flows, not just TCP bidirectional ones. + all: false # Configuration for metrics (i.e. InfluxDB) submission. metrics: @@ -165,4 +167,4 @@ mgmt: socket: /tmp/fever-mgmt.sock # Use network server for gRPC commmunication. #network: tcp - #host: localhost:9999 \ No newline at end of file + #host: localhost:9999 diff --git a/processing/unicorn_aggregator.go b/processing/unicorn_aggregator.go index 066abaf..fb29d90 100644 --- a/processing/unicorn_aggregator.go +++ b/processing/unicorn_aggregator.go @@ -45,6 +45,7 @@ type UnicornAggregator struct { TestFlowSrcIP string TestFlowDestIP string TestFlowDestPort int64 + AllFlows bool } // MakeUnicornAggregate creates a new empty UnicornAggregate object. @@ -58,7 +59,7 @@ func MakeUnicornAggregate() *UnicornAggregate { // MakeUnicornAggregator creates a new empty UnicornAggregator object. func MakeUnicornAggregator(statsSubmitter util.StatsSubmitter, - submitPeriod time.Duration, dummyMode bool) *UnicornAggregator { + submitPeriod time.Duration, dummyMode bool, allFlows bool) *UnicornAggregator { a := &UnicornAggregator{ Logger: log.WithFields(log.Fields{ "domain": "aggregate", @@ -70,6 +71,7 @@ func MakeUnicornAggregator(statsSubmitter util.StatsSubmitter, ClosedChan: make(chan bool), Aggregate: *MakeUnicornAggregate(), TestFlowDestPort: 99999, + AllFlows: allFlows, } return a } @@ -197,7 +199,7 @@ func (a *UnicornAggregator) Stop(stopChan chan bool) { // aggregated state func (a *UnicornAggregator) Consume(e *types.Entry) error { // Unicorn flow aggregation update - if e.EventType == "flow" && e.Proto == "TCP" && e.BytesToClient > 0 { + if e.EventType == "flow" && (a.AllFlows || (e.Proto == "TCP" && e.BytesToClient > 0)) { a.StringBuf.Write([]byte(e.SrcIP)) a.StringBuf.Write([]byte("_")) a.StringBuf.Write([]byte(e.DestIP)) diff --git a/processing/unicorn_aggregator_test.go b/processing/unicorn_aggregator_test.go index 1ce30db..f4781dd 100644 --- a/processing/unicorn_aggregator_test.go +++ b/processing/unicorn_aggregator_test.go @@ -15,7 +15,7 @@ import ( log "github.com/sirupsen/logrus" ) -func makeUnicornFlowEvent() types.Entry { +func makeUnicornFlowEvent(proto string) types.Entry { e := types.Entry{ SrcIP: fmt.Sprintf("10.%d.%d.%d", rand.Intn(250), rand.Intn(250), rand.Intn(250)), SrcPort: []int64{1, 2, 3, 4, 5}[rand.Intn(5)], @@ -23,7 +23,7 @@ func makeUnicornFlowEvent() types.Entry { DestPort: []int64{11, 12, 13, 14, 15}[rand.Intn(5)], Timestamp: time.Now().Format(types.SuricataTimestampFormat), EventType: "flow", - Proto: "TCP", + Proto: proto, BytesToClient: int64(rand.Intn(10000)), BytesToServer: int64(rand.Intn(10000)), PktsToClient: int64(rand.Intn(100)), @@ -101,7 +101,7 @@ func TestUnicornAggregatorNoSubmission(t *testing.T) { dsub := &testSubmitter{ Data: make([]string, 0), } - f := MakeUnicornAggregator(dsub, 100*time.Millisecond, false) + f := MakeUnicornAggregator(dsub, 100*time.Millisecond, false, false) f.Run() time.Sleep(1 * time.Second) @@ -128,12 +128,12 @@ func TestUnicornAggregator(t *testing.T) { dsub := &testSubmitter{ Data: make([]string, 0), } - f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false) + f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false, false) f.Run() createdFlows := make(map[string]int) for i := 0; i < 200000; i++ { - ev := makeUnicornFlowEvent() + ev := makeUnicornFlowEvent("TCP") if ev.BytesToClient > 0 { key := fmt.Sprintf("%s_%s_%d", ev.SrcIP, ev.DestIP, ev.DestPort) createdFlows[key]++ @@ -189,7 +189,7 @@ func TestUnicornAggregatorWithTestdata(t *testing.T) { dsub := &testSubmitter{ Data: make([]string, 0), } - f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false) + f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false, false) f.EnableTestFlow("1.2.3.4", "5.6.7.8", 33333) f.Run() @@ -239,7 +239,7 @@ func TestUnicornAggregatorWithDispatch(t *testing.T) { dsub := &testSubmitter{ Data: make([]string, 0), } - f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false) + f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false, false) feedWaitChan := make(chan bool) outChan := make(chan types.Entry) @@ -256,9 +256,13 @@ func TestUnicornAggregatorWithDispatch(t *testing.T) { f.Run() createdFlows := make(map[string]int) - for i := 0; i < 200000; i++ { - ev := makeUnicornFlowEvent() - if ev.BytesToClient > 0 { + for i := 0; i < 400000; i++ { + proto := "TCP" + if i%2 == 0 { + proto = "UDP" + } + ev := makeUnicornFlowEvent(proto) + if proto == "TCP" && ev.BytesToClient > 0 { key := fmt.Sprintf("%s_%s_%d", ev.SrcIP, ev.DestIP, ev.DestPort) createdFlows[key]++ } @@ -266,7 +270,7 @@ func TestUnicornAggregatorWithDispatch(t *testing.T) { } for { - if dsub.GetTotalAggs() < len(createdFlows) { + if dsub.GetTotalAggs() < (len(createdFlows) / 2) { log.Debug(dsub.GetTotalAggs()) time.Sleep(100 * time.Millisecond) } else { @@ -309,3 +313,66 @@ func TestUnicornAggregatorWithDispatch(t *testing.T) { } } } + +func TestUnicornMixedUDPTCP(t *testing.T) { + rand.Seed(time.Now().UTC().UnixNano()) + dsub := &testSubmitter{ + Data: make([]string, 0), + } + f := MakeUnicornAggregator(dsub, 500*time.Millisecond, false, true) + f.Run() + + createdFlows := make(map[string]int) + for i := 0; i < 200000; i++ { + proto := "TCP" + if i%2 == 0 { + proto = "UDP" + } + ev := makeUnicornFlowEvent(proto) + key := fmt.Sprintf("%s_%s_%d", ev.SrcIP, ev.DestIP, ev.DestPort) + createdFlows[key]++ + f.Consume(&ev) + } + + for { + if dsub.GetTotalAggs() < len(createdFlows) { + log.Debug(dsub.GetTotalAggs()) + time.Sleep(100 * time.Millisecond) + } else { + break + } + } + + consumeWaitChan := make(chan bool) + f.Stop(consumeWaitChan) + <-consumeWaitChan + + if len(dsub.Data) == 0 { + t.Fatalf("collected aggregations are empty") + } + + log.Info(dsub.GetTotalAggs(), len(createdFlows), len(dsub.Data)) + + var totallen int + for _, v := range dsub.Data { + totallen += len(v) + } + if totallen == 0 { + t.Fatalf("length of collected aggregations is zero") + } + + if dsub.GetTotalAggs() != len(createdFlows) { + t.Fatalf("unexpected number of flow aggregates: %d/%d", dsub.GetTotalAggs(), + len(createdFlows)) + } + + for k, v := range dsub.GetFlowTuples() { + if _, ok := createdFlows[k]; !ok { + t.Fatalf("missing flow aggregate: %s", k) + } + if v["count"] != int64(createdFlows[k]) { + t.Fatalf("unexpected number of flows for %s: %d/%d", + k, v["count"], createdFlows[k]) + } + } +}