Skip to content

Commit

Permalink
Fix filestream integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
belimawr committed Dec 12, 2024
1 parent c1693f2 commit 09002a1
Showing 1 changed file with 128 additions and 78 deletions.
206 changes: 128 additions & 78 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ func TestFilestreamCloseRenamed(t *testing.T) {
// the output to receive the event and then close the source file.
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName) + "*"},
"prospector.scanner.check_interval": "10ms",
"close.on_state_change.check_interval": "1ms",
"close.on_state_change.renamed": "true",
"id": id,
"paths": []string{env.abspath(testlogName) + "*"},
"prospector.scanner.check_interval": "10ms",
"prospector.scanner.fingerprint.enabled": false,
"close.on_state_change.check_interval": "1ms",
"close.on_state_change.renamed": "true",
"file_identity.native": map[string]any{},
})

testlines := []byte("first log line\n")
Expand Down Expand Up @@ -94,9 +96,11 @@ func TestFilestreamMetadataUpdatedOnRename(t *testing.T) {
testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName) + "*"},
"prospector.scanner.check_interval": "1ms",
"id": id,
"paths": []string{env.abspath(testlogName) + "*"},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

testline := []byte("log line\n")
Expand Down Expand Up @@ -132,11 +136,13 @@ func TestFilestreamCloseRemoved(t *testing.T) {
testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName) + "*"},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "1ms",
"close.on_state_change.removed": "true",
"id": id,
"paths": []string{env.abspath(testlogName) + "*"},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "1ms",
"close.on_state_change.removed": "true",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

testlines := []byte("first log line\n")
Expand Down Expand Up @@ -209,9 +215,11 @@ func TestFilestreamEmptyLine(t *testing.T) {
testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

ctx, cancelInput := context.WithCancel(context.Background())
Expand Down Expand Up @@ -248,9 +256,11 @@ func TestFilestreamEmptyLinesOnly(t *testing.T) {
testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

ctx, cancelInput := context.WithCancel(context.Background())
Expand All @@ -272,8 +282,10 @@ func TestFilestreamBOMUTF8(t *testing.T) {
testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

// BOM: 0xEF,0xBB,0xBF
Expand Down Expand Up @@ -315,9 +327,11 @@ func TestFilestreamUTF16BOMs(t *testing.T) {
testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"encoding": name,
"id": id,
"paths": []string{env.abspath(testlogName)},
"encoding": name,
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

line := []byte("first line\n")
Expand Down Expand Up @@ -348,11 +362,13 @@ func TestFilestreamCloseTimeout(t *testing.T) {
testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.reader.after_interval": "500ms",
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.reader.after_interval": "500ms",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

testlines := []byte("first line\n")
Expand Down Expand Up @@ -382,11 +398,13 @@ func TestFilestreamCloseAfterInterval(t *testing.T) {
testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.on_state_change.inactive": "2s",
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.on_state_change.inactive": "2s",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

testlines := []byte("first line\nsecond line\nthird line\n")
Expand Down Expand Up @@ -417,7 +435,9 @@ func TestFilestreamCloseAfterIntervalRemoved(t *testing.T) {
"close.on_state_change.inactive": "100ms",
// reader is not stopped when file is removed to see if the reader can still detect
// if the file has been inactive even if it have been removed in the meantime
"close.on_state_change.removed": "false",
"close.on_state_change.removed": "false",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

testlines := []byte("first line\nsecond line\nthird line\n")
Expand Down Expand Up @@ -450,7 +470,9 @@ func TestFilestreamCloseAfterIntervalRenamed(t *testing.T) {
"close.on_state_change.inactive": "100ms",
// reader is not stopped when file is removed to see if the reader can still detect
// if the file has been inactive even if it have been removed in the meantime
"close.on_state_change.removed": "false",
"close.on_state_change.removed": "false",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

testlines := []byte("first line\nsecond line\nthird line\n")
Expand Down Expand Up @@ -485,7 +507,9 @@ func TestFilestreamCloseAfterIntervalRotatedAndRemoved(t *testing.T) {
"close.on_state_change.inactive": "100ms",
// reader is not stopped when file is removed to see if the reader can still detect
// if the file has been inactive even if it have been removed in the meantime
"close.on_state_change.removed": "false",
"close.on_state_change.removed": "false",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

testlines := []byte("first line\nsecond line\nthird line\n")
Expand Down Expand Up @@ -558,10 +582,12 @@ func TestFilestreamTruncatedFileOpen(t *testing.T) {
testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.resend_on_touch": "true",
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.resend_on_touch": "true",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

ctx, cancelInput := context.WithCancel(context.Background())
Expand Down Expand Up @@ -592,11 +618,13 @@ func TestFilestreamTruncatedFileClosed(t *testing.T) {
testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.resend_on_touch": "true",
"close.reader.on_eof": "true",
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.resend_on_touch": "true",
"close.reader.on_eof": "true",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

ctx, cancelInput := context.WithCancel(context.Background())
Expand Down Expand Up @@ -635,9 +663,11 @@ func TestFilestreamTruncateWithSymlink(t *testing.T) {
env.abspath(testlogName),
env.abspath(symlinkName),
},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.resend_on_touch": "true",
"prospector.scanner.symlinks": "true",
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.resend_on_touch": "true",
"prospector.scanner.symlinks": "true",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

lines := []byte("first line\nsecond line\nthird line\n")
Expand Down Expand Up @@ -707,10 +737,12 @@ func TestFilestreamTruncateCheckOffset(t *testing.T) {
testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.resend_on_touch": "true",
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.resend_on_touch": "true",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

ctx, cancelInput := context.WithCancel(context.Background())
Expand All @@ -737,9 +769,11 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
testlogName := "test.log"
id := "fake-ID-" + uuid.Must(uuid.NewV4()).String()
inp := env.mustCreateInput(map[string]interface{}{
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "200ms",
"id": id,
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "200ms",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

testlines := []byte("first line\nsecond line\n")
Expand Down Expand Up @@ -792,7 +826,9 @@ func TestFilestreamSymlinksEnabled(t *testing.T) {
"paths": []string{
env.abspath(symlinkName),
},
"prospector.scanner.symlinks": "true",
"prospector.scanner.symlinks": "true",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

testlines := []byte("first line\n")
Expand Down Expand Up @@ -824,10 +860,12 @@ func TestFilestreamSymlinkRotated(t *testing.T) {
"paths": []string{
env.abspath(symlinkName),
},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.symlinks": "true",
"close.on_state_change.removed": "false",
"clean_removed": "false",
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.symlinks": "true",
"close.on_state_change.removed": "false",
"clean_removed": "false",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

commonLine := "first line in file "
Expand Down Expand Up @@ -874,10 +912,12 @@ func TestFilestreamSymlinkRemoved(t *testing.T) {
"paths": []string{
env.abspath(symlinkName),
},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.symlinks": "true",
"close.on_state_change.removed": "false",
"clean_removed": "false",
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.symlinks": "true",
"close.on_state_change.removed": "false",
"clean_removed": "false",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

line := []byte("first line\n")
Expand Down Expand Up @@ -918,9 +958,11 @@ func TestFilestreamTruncate(t *testing.T) {
"paths": []string{
env.abspath("*"),
},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.resend_on_touch": "true",
"prospector.scanner.symlinks": "true",
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.resend_on_touch": "true",
"prospector.scanner.symlinks": "true",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

lines := []byte("first line\nsecond line\nthird line\n")
Expand Down Expand Up @@ -978,6 +1020,8 @@ func TestFilestreamHarvestAllFilesWhenHarvesterLimitExceeded(t *testing.T) {
"paths": []string{
env.abspath(logFiles[0].path),
env.abspath(logFiles[1].path)},
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
Expand All @@ -994,8 +1038,10 @@ func TestGlobalIDCannotBeUsed(t *testing.T) {
env := newInputTestingEnvironment(t)
testlogName := "test.log"
_, err := env.createInput(map[string]interface{}{
"id": ".global",
"paths": []string{env.abspath(testlogName) + "*"},
"id": ".global",
"paths": []string{env.abspath(testlogName) + "*"},
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})
if err == nil {
t.Fatal("expecting an error because '.global' cannot be used as input ID")
Expand All @@ -1013,10 +1059,12 @@ func TestRotatingCloseInactiveLargerWriteRate(t *testing.T) {
"paths": []string{
env.abspath("*"),
},
"prospector.scanner.check_interval": "100ms",
"close.on_state_change.check_interval": "1s",
"close.on_state_change.inactive": "5s",
"ignore_older": "10s",
"prospector.scanner.check_interval": "100ms",
"close.on_state_change.check_interval": "1s",
"close.on_state_change.inactive": "5s",
"ignore_older": "10s",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

ctx, cancelInput := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1060,10 +1108,12 @@ func TestRotatingCloseInactiveLowWriteRate(t *testing.T) {
"paths": []string{
env.abspath("*"),
},
"prospector.scanner.check_interval": "1ms",
"close.on_state_change.check_interval": "1ms",
"close.on_state_change.inactive": "1s",
"ignore_older": "10s",
"prospector.scanner.check_interval": "1ms",
"close.on_state_change.check_interval": "1ms",
"close.on_state_change.inactive": "1s",
"ignore_older": "10s",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
})

ctx, cancelInput := context.WithCancel(context.Background())
Expand Down

0 comments on commit 09002a1

Please sign in to comment.