From 328670be543c281a3d3bc6906f9a11b9ad446c22 Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Wed, 12 Jun 2024 12:59:28 +0530 Subject: [PATCH] feature: tag events that come from a filestream with `take_over: true` (#39828) * filestream: tag events with `take_over: true` * filestream: modify test cases * add comments * update documentation * Update filebeat/input/filestream/input_test.go Co-authored-by: Tiago Queiroz * add changelog --------- Co-authored-by: Tiago Queiroz --- CHANGELOG.next.asciidoc | 1 + .../docs/howto/migrate-to-filestream.asciidoc | 5 +++ filebeat/input/filestream/input.go | 8 ++++ filebeat/input/filestream/input_test.go | 43 +++++++++++++++++++ 4 files changed, 57 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e1214aa0e27a..8e5cd3497e1b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Removed deprecated Sophos UTM from Beats. Use the https://docs.elastic.co/integrations/sophos[Sophos] Elastic integration instead. {pull}38037[38037] - Introduce input/netmetrics and refactor netflow input metrics {pull}38055[38055] - Update Salesforce module to use new Salesforce input. {pull}37509[37509] +- Tag events that come from a filestream in "take over" mode. {pull}39828[39828] - Fix high IO and handling of a corrupted registry log file. {pull}35893[35893] *Heartbeat* diff --git a/filebeat/docs/howto/migrate-to-filestream.asciidoc b/filebeat/docs/howto/migrate-to-filestream.asciidoc index a57105adb3e3..30057fab7251 100644 --- a/filebeat/docs/howto/migrate-to-filestream.asciidoc +++ b/filebeat/docs/howto/migrate-to-filestream.asciidoc @@ -247,3 +247,8 @@ and return to old `log` inputs the files that were taken by `filestream` inputs, 6. Run Filebeat with the old configuration (no `filestream` inputs with `take_over: true`). NOTE: Reverting to backups might cause some events to repeat, depends on the amount of time the new configuration was running. + +=== Debugging on Kibana + +Events produced by `filestream` with `take_over: true` contains `take_over` tag. +You can filter on this tag in Kibana and see the events which came from a filestream in the "take over" mode. \ No newline at end of file diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 0136b062b485..7da25654a258 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -41,6 +41,7 @@ import ( "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" ) const pluginName = "filestream" @@ -61,6 +62,7 @@ type filestream struct { encodingFactory encoding.EncodingFactory closerConfig closerConfig parsers parser.Config + takeOver bool } // Plugin creates a new filestream input plugin for creating a stateful input. @@ -101,6 +103,7 @@ func configure(cfg *conf.C) (loginp.Prospector, loginp.Harvester, error) { encodingFactory: encodingFactory, closerConfig: config.Close, parsers: config.Reader.Parsers, + takeOver: config.TakeOver, } return prospector, filestream, nil @@ -378,6 +381,11 @@ func (inp *filestream) readFromSource( metrics.BytesProcessed.Add(uint64(message.Bytes)) + // add "take_over" tag if `take_over` is set to true + if inp.takeOver { + _ = mapstr.AddTags(message.Fields, []string{"take_over"}) + } + if err := p.Publish(message.ToEvent(), s); err != nil { metrics.ProcessingErrors.Inc() return err diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go index a1d9729c5aad..3dfe176ac017 100644 --- a/filebeat/input/filestream/input_test.go +++ b/filebeat/input/filestream/input_test.go @@ -35,6 +35,7 @@ import ( "github.com/elastic/beats/v7/libbeat/statestore/storetest" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" ) func BenchmarkFilestream(b *testing.B) { @@ -115,6 +116,48 @@ paths: }) } +func TestTakeOverTags(t *testing.T) { + testCases := []struct { + name string + takeOver bool + testFunc func(t *testing.T, event beat.Event) + }{ + { + name: "test-take_over-true", + takeOver: true, + testFunc: func(t *testing.T, event beat.Event) { + tags, err := event.GetValue("tags") + require.NoError(t, err) + require.Contains(t, tags, "take_over") + }, + }, + { + name: "test-take_over-false", + takeOver: false, + testFunc: func(t *testing.T, event beat.Event) { + _, err := event.GetValue("tags") + require.ErrorIs(t, err, mapstr.ErrKeyNotFound) + }, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + filename := generateFile(t, t.TempDir(), 5) + cfg := fmt.Sprintf(` +type: filestream +prospector.scanner.check_interval: 1s +take_over: %t +paths: + - %s`, testCase.takeOver, filename) + runner := createFilestreamTestRunner(context.Background(), t, testCase.name, cfg, 5, true) + events := runner(t) + for _, event := range events { + testCase.testFunc(t, event) + } + }) + } +} + // runFilestreamBenchmark runs the entire filestream input with the in-memory registry and the test pipeline. // `testID` must be unique for each test run // `cfg` must be a valid YAML string containing valid filestream configuration