From b98b7633cc6f68d105112237c8f8853fbc7cd73e Mon Sep 17 00:00:00 2001 From: kvalliyurnatt <135071014+kvalliyurnatt@users.noreply.github.com> Date: Thu, 21 Mar 2024 17:27:14 -0400 Subject: [PATCH] Add an option to report deltas during a flow period instead of cumulative stats (#38223) * add an option to report delta flows * adding unit test and doc update * fix linting issues * fix imports * addressing review comments --- packetbeat/config/config.go | 2 + packetbeat/docs/packetbeat-options.asciidoc | 6 +++ packetbeat/flows/flows.go | 2 +- packetbeat/flows/worker.go | 38 +++++++++++-------- packetbeat/flows/worker_test.go | 41 ++++++++++++++++++--- 5 files changed, 68 insertions(+), 21 deletions(-) diff --git a/packetbeat/config/config.go b/packetbeat/config/config.go index 7d579af635bf..427df6cd1176 100644 --- a/packetbeat/config/config.go +++ b/packetbeat/config/config.go @@ -144,6 +144,8 @@ type Flows struct { KeepNull bool `config:"keep_null"` // Index is used to overwrite the index where flows are published Index string `config:"index"` + // DeltaFlowReports when enabled will report flow network stats(bytes, packets) as delta values + EnableDeltaFlowReports bool `config:"enable_delta_flow_reports"` } type ProtocolCommon struct { diff --git a/packetbeat/docs/packetbeat-options.asciidoc b/packetbeat/docs/packetbeat-options.asciidoc index c48b4a1b01d0..aaa598b612c3 100644 --- a/packetbeat/docs/packetbeat-options.asciidoc +++ b/packetbeat/docs/packetbeat-options.asciidoc @@ -461,6 +461,12 @@ in time. Periodical reporting can be disabled by setting the value to -1. If disabled, flows are still reported once being timed out. The default value is 10s. +[float] +==== `enable_delta_flow_reports` + +Configure network.bytes and network.packets to be a delta +value instead of a cumlative sum for each flow period. The default value is false. + [float] [[packetbeat-configuration-flows-fields]] ==== `fields` diff --git a/packetbeat/flows/flows.go b/packetbeat/flows/flows.go index b7b522175299..9df019af2d03 100644 --- a/packetbeat/flows/flows.go +++ b/packetbeat/flows/flows.go @@ -71,7 +71,7 @@ func NewFlows(pub Reporter, watcher *procs.ProcessesWatcher, config *config.Flow counter := &counterReg{} - worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period) + worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period, config.EnableDeltaFlowReports) if err != nil { logp.Err("failed to configure flows processing intervals: %v", err) return nil, err diff --git a/packetbeat/flows/worker.go b/packetbeat/flows/worker.go index e3a2008a0599..46f7c0ca4187 100644 --- a/packetbeat/flows/worker.go +++ b/packetbeat/flows/worker.go @@ -127,7 +127,7 @@ func (w *worker) periodically(tick time.Duration, fn func() error) { // reporting will be done at flow lifetime end. // Flows are published via the pub Reporter after being enriched with process information // by watcher. -func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration) (*worker, error) { +func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration, enableDeltaFlowReports bool) (*worker, error) { if timeout < time.Second { return nil, ErrInvalidTimeout } @@ -161,10 +161,11 @@ func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMe defaultBatchSize := 1024 processor := &flowsProcessor{ - table: table, - watcher: watcher, - counters: counters, - timeout: timeout, + table: table, + watcher: watcher, + counters: counters, + timeout: timeout, + enableDeltaFlowReporting: enableDeltaFlowReports, } processor.spool.init(pub, defaultBatchSize) @@ -221,11 +222,12 @@ func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period i } type flowsProcessor struct { - spool spool - watcher *procs.ProcessesWatcher - table *flowMetaTable - counters *counterReg - timeout time.Duration + spool spool + watcher *procs.ProcessesWatcher + table *flowMetaTable + counters *counterReg + timeout time.Duration + enableDeltaFlowReporting bool } func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastReport bool) { @@ -281,13 +283,13 @@ func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastRe } func (fw *flowsProcessor) report(w *worker, ts time.Time, flow *biFlow, isOver bool, intNames, uintNames, floatNames []string) { - event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames) + event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames, fw.enableDeltaFlowReporting) debugf("add event: %v", event) fw.spool.publish(event) } -func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string) beat.Event { +func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, enableDeltaFlowReporting bool) beat.Event { timestamp := ts event := mapstr.M{ @@ -418,7 +420,7 @@ func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOve var totalBytes, totalPackets uint64 if f.stats[0] != nil { // Source stats. - stats := encodeStats(f.stats[0], intNames, uintNames, floatNames) + stats := encodeStats(f.stats[0], intNames, uintNames, floatNames, enableDeltaFlowReporting) for k, v := range stats { switch k { case "icmpV4TypeCode": @@ -449,7 +451,7 @@ func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOve } if f.stats[1] != nil { // Destination stats. - stats := encodeStats(f.stats[1], intNames, uintNames, floatNames) + stats := encodeStats(f.stats[1], intNames, uintNames, floatNames, enableDeltaFlowReporting) for k, v := range stats { switch k { case "icmpV4TypeCode", "icmpV6TypeCode": @@ -533,7 +535,7 @@ func formatHardwareAddr(addr net.HardwareAddr) string { return string(buf) } -func encodeStats(stats *flowStats, ints, uints, floats []string) map[string]interface{} { +func encodeStats(stats *flowStats, ints, uints, floats []string, enableDeltaFlowReporting bool) map[string]interface{} { report := make(map[string]interface{}) i := 0 @@ -551,6 +553,12 @@ func encodeStats(stats *flowStats, ints, uints, floats []string) map[string]inte for m := mask; m != 0; m >>= 1 { if (m & 1) == 1 { report[uints[i]] = stats.uints[i] + if enableDeltaFlowReporting && (uints[i] == "bytes" || uints[i] == "packets") { + // If Delta Flow Reporting is enabled, reset bytes and packets at each period. + // Only the bytes and packets received during the flow period will be reported. + // This should be thread safe as it is called under the flowmetadatatable lock. + stats.uints[i] = 0 + } } i++ } diff --git a/packetbeat/flows/worker_test.go b/packetbeat/flows/worker_test.go index ef0104adc922..d6e371cad875 100644 --- a/packetbeat/flows/worker_test.go +++ b/packetbeat/flows/worker_test.go @@ -21,16 +21,17 @@ import ( "encoding/json" "flag" "os" + "reflect" "testing" "time" - "github.com/elastic/go-lookslike/isdef" - - "github.com/elastic/go-lookslike" + "gotest.tools/assert" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/go-lookslike" + "github.com/elastic/go-lookslike/isdef" ) // Use `go test -data` to update sample event files. @@ -65,7 +66,7 @@ func TestCreateEvent(t *testing.T) { } bif.stats[0] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{10, 1}} bif.stats[1] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{460, 2}} - event := createEvent(&procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil) + event := createEvent(&procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil, false) // Validate the contents of the event. validate := lookslike.MustCompile(map[string]interface{}{ @@ -116,7 +117,7 @@ func TestCreateEvent(t *testing.T) { // Write the event to disk if -data is used. if *dataFlag { - event.Fields.Put("@timestamp", common.Time(end)) //nolint:errcheck // Never fails. + event.Fields.Put("@timestamp", common.Time(end)) output, err := json.MarshalIndent(&event.Fields, "", " ") if err != nil { t.Fatal(err) @@ -126,4 +127,34 @@ func TestCreateEvent(t *testing.T) { t.Fatal(err) } } + + // when enableDeltaFlowReporting is true, the flow stats should be reset + expectbiFlow := &biFlow{ + id: id.rawFlowID, + killed: 1, + createTS: start, + ts: end, + dir: flowDirForward, + } + expectbiFlow.stats[0] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{0, 0}} + expectbiFlow.stats[1] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{0, 0}} + + // Assert the biflow is not 0 before the test + assert.Assert(t, !reflect.DeepEqual(expectbiFlow.stats[0].uints, bif.stats[0].uints)) + assert.Assert(t, !reflect.DeepEqual(expectbiFlow.stats[1].uints, bif.stats[1].uints)) + + event = createEvent(&procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil, true) + result = validate(event.Fields) + if errs := result.Errors(); len(errs) > 0 { + for _, err := range errs { + t.Error(err) + } + t.FailNow() + } + + // Assert the biflow is 0 after the test + assert.DeepEqual(t, expectbiFlow.stats[0].uintFlags, bif.stats[0].uintFlags) + assert.DeepEqual(t, expectbiFlow.stats[0].uints, bif.stats[0].uints) + assert.DeepEqual(t, expectbiFlow.stats[1].uintFlags, bif.stats[1].uintFlags) + assert.DeepEqual(t, expectbiFlow.stats[1].uints, bif.stats[1].uints) }