From 09002a16ae43c498c3834e250e3024aa8e3d51c7 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 12 Dec 2024 15:00:34 -0500 Subject: [PATCH] Fix filestream integration tests --- .../filestream/input_integration_test.go | 206 +++++++++++------- 1 file changed, 128 insertions(+), 78 deletions(-) diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index 80327d8bcf2c..3d468bb23c04 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -52,11 +52,13 @@ func TestFilestreamCloseRenamed(t *testing.T) { // the output to receive the event and then close the source file. id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName) + "*"}, - "prospector.scanner.check_interval": "10ms", - "close.on_state_change.check_interval": "1ms", - "close.on_state_change.renamed": "true", + "id": id, + "paths": []string{env.abspath(testlogName) + "*"}, + "prospector.scanner.check_interval": "10ms", + "prospector.scanner.fingerprint.enabled": false, + "close.on_state_change.check_interval": "1ms", + "close.on_state_change.renamed": "true", + "file_identity.native": map[string]any{}, }) testlines := []byte("first log line\n") @@ -94,9 +96,11 @@ func TestFilestreamMetadataUpdatedOnRename(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName) + "*"}, - "prospector.scanner.check_interval": "1ms", + "id": id, + "paths": []string{env.abspath(testlogName) + "*"}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testline := []byte("log line\n") @@ -132,11 +136,13 @@ func TestFilestreamCloseRemoved(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName) + "*"}, - "prospector.scanner.check_interval": "24h", - "close.on_state_change.check_interval": "1ms", - "close.on_state_change.removed": "true", + "id": id, + "paths": []string{env.abspath(testlogName) + "*"}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "1ms", + "close.on_state_change.removed": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first log line\n") @@ -209,9 +215,11 @@ func TestFilestreamEmptyLine(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background()) @@ -248,9 +256,11 @@ func TestFilestreamEmptyLinesOnly(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background()) @@ -272,8 +282,10 @@ func TestFilestreamBOMUTF8(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) // BOM: 0xEF,0xBB,0xBF @@ -315,9 +327,11 @@ func TestFilestreamUTF16BOMs(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "encoding": name, + "id": id, + "paths": []string{env.abspath(testlogName)}, + "encoding": name, + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) line := []byte("first line\n") @@ -348,11 +362,13 @@ func TestFilestreamCloseTimeout(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "24h", - "close.on_state_change.check_interval": "100ms", - "close.reader.after_interval": "500ms", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "100ms", + "close.reader.after_interval": "500ms", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\n") @@ -382,11 +398,13 @@ func TestFilestreamCloseAfterInterval(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "24h", - "close.on_state_change.check_interval": "100ms", - "close.on_state_change.inactive": "2s", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "100ms", + "close.on_state_change.inactive": "2s", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\nsecond line\nthird line\n") @@ -417,7 +435,9 @@ func TestFilestreamCloseAfterIntervalRemoved(t *testing.T) { "close.on_state_change.inactive": "100ms", // reader is not stopped when file is removed to see if the reader can still detect // if the file has been inactive even if it have been removed in the meantime - "close.on_state_change.removed": "false", + "close.on_state_change.removed": "false", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\nsecond line\nthird line\n") @@ -450,7 +470,9 @@ func TestFilestreamCloseAfterIntervalRenamed(t *testing.T) { "close.on_state_change.inactive": "100ms", // reader is not stopped when file is removed to see if the reader can still detect // if the file has been inactive even if it have been removed in the meantime - "close.on_state_change.removed": "false", + "close.on_state_change.removed": "false", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\nsecond line\nthird line\n") @@ -485,7 +507,9 @@ func TestFilestreamCloseAfterIntervalRotatedAndRemoved(t *testing.T) { "close.on_state_change.inactive": "100ms", // reader is not stopped when file is removed to see if the reader can still detect // if the file has been inactive even if it have been removed in the meantime - "close.on_state_change.removed": "false", + "close.on_state_change.removed": "false", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\nsecond line\nthird line\n") @@ -558,10 +582,12 @@ func TestFilestreamTruncatedFileOpen(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.resend_on_touch": "true", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background()) @@ -592,11 +618,13 @@ func TestFilestreamTruncatedFileClosed(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.resend_on_touch": "true", - "close.reader.on_eof": "true", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + "close.reader.on_eof": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background()) @@ -635,9 +663,11 @@ func TestFilestreamTruncateWithSymlink(t *testing.T) { env.abspath(testlogName), env.abspath(symlinkName), }, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.resend_on_touch": "true", - "prospector.scanner.symlinks": "true", + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + "prospector.scanner.symlinks": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) lines := []byte("first line\nsecond line\nthird line\n") @@ -707,10 +737,12 @@ func TestFilestreamTruncateCheckOffset(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.resend_on_touch": "true", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background()) @@ -737,9 +769,11 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) { testlogName := "test.log" id := "fake-ID-" + uuid.Must(uuid.NewV4()).String() inp := env.mustCreateInput(map[string]interface{}{ - "id": id, - "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "200ms", + "id": id, + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "200ms", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\nsecond line\n") @@ -792,7 +826,9 @@ func TestFilestreamSymlinksEnabled(t *testing.T) { "paths": []string{ env.abspath(symlinkName), }, - "prospector.scanner.symlinks": "true", + "prospector.scanner.symlinks": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) testlines := []byte("first line\n") @@ -824,10 +860,12 @@ func TestFilestreamSymlinkRotated(t *testing.T) { "paths": []string{ env.abspath(symlinkName), }, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.symlinks": "true", - "close.on_state_change.removed": "false", - "clean_removed": "false", + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.symlinks": "true", + "close.on_state_change.removed": "false", + "clean_removed": "false", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) commonLine := "first line in file " @@ -874,10 +912,12 @@ func TestFilestreamSymlinkRemoved(t *testing.T) { "paths": []string{ env.abspath(symlinkName), }, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.symlinks": "true", - "close.on_state_change.removed": "false", - "clean_removed": "false", + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.symlinks": "true", + "close.on_state_change.removed": "false", + "clean_removed": "false", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) line := []byte("first line\n") @@ -918,9 +958,11 @@ func TestFilestreamTruncate(t *testing.T) { "paths": []string{ env.abspath("*"), }, - "prospector.scanner.check_interval": "1ms", - "prospector.scanner.resend_on_touch": "true", - "prospector.scanner.symlinks": "true", + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + "prospector.scanner.symlinks": "true", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) lines := []byte("first line\nsecond line\nthird line\n") @@ -978,6 +1020,8 @@ func TestFilestreamHarvestAllFilesWhenHarvesterLimitExceeded(t *testing.T) { "paths": []string{ env.abspath(logFiles[0].path), env.abspath(logFiles[1].path)}, + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) @@ -994,8 +1038,10 @@ func TestGlobalIDCannotBeUsed(t *testing.T) { env := newInputTestingEnvironment(t) testlogName := "test.log" _, err := env.createInput(map[string]interface{}{ - "id": ".global", - "paths": []string{env.abspath(testlogName) + "*"}, + "id": ".global", + "paths": []string{env.abspath(testlogName) + "*"}, + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) if err == nil { t.Fatal("expecting an error because '.global' cannot be used as input ID") @@ -1013,10 +1059,12 @@ func TestRotatingCloseInactiveLargerWriteRate(t *testing.T) { "paths": []string{ env.abspath("*"), }, - "prospector.scanner.check_interval": "100ms", - "close.on_state_change.check_interval": "1s", - "close.on_state_change.inactive": "5s", - "ignore_older": "10s", + "prospector.scanner.check_interval": "100ms", + "close.on_state_change.check_interval": "1s", + "close.on_state_change.inactive": "5s", + "ignore_older": "10s", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background()) @@ -1060,10 +1108,12 @@ func TestRotatingCloseInactiveLowWriteRate(t *testing.T) { "paths": []string{ env.abspath("*"), }, - "prospector.scanner.check_interval": "1ms", - "close.on_state_change.check_interval": "1ms", - "close.on_state_change.inactive": "1s", - "ignore_older": "10s", + "prospector.scanner.check_interval": "1ms", + "close.on_state_change.check_interval": "1ms", + "close.on_state_change.inactive": "1s", + "ignore_older": "10s", + "prospector.scanner.fingerprint.enabled": false, + "file_identity.native": map[string]any{}, }) ctx, cancelInput := context.WithCancel(context.Background())