diff --git a/x-pack/filebeat/input/unifiedlogs/input.go b/x-pack/filebeat/input/unifiedlogs/input.go index 6e0187fe509..b7724b4caad 100644 --- a/x-pack/filebeat/input/unifiedlogs/input.go +++ b/x-pack/filebeat/input/unifiedlogs/input.go @@ -214,22 +214,26 @@ func (input *input) processLogs(stdout io.Reader, pub inputcursor.Publisher, log scanner := bufio.NewScanner(stdout) var ( - event beat.Event - line string - logRecord logRecord - timestamp time.Time - err error + event beat.Event + line string + logRecordLine logRecord + timestamp time.Time + err error ) for scanner.Scan() { line = scanner.Text() - if err = json.Unmarshal([]byte(line), &logRecord); err != nil { + if err = json.Unmarshal([]byte(line), &logRecordLine); err != nil { log.Errorf("invalid json log: %v", err) input.metrics.errs.Add(1) continue } - timestamp, err = time.Parse(logDateLayout, logRecord.Timestamp) + if logRecordLine == (logRecord{}) { + continue + } + + timestamp, err = time.Parse(logDateLayout, logRecordLine.Timestamp) if err != nil { input.metrics.errs.Add(1) log.Errorf("invalid timestamp: %v", err) diff --git a/x-pack/filebeat/input/unifiedlogs/input_test.go b/x-pack/filebeat/input/unifiedlogs/input_test.go index 51283c4cebf..049ec3856d4 100644 --- a/x-pack/filebeat/input/unifiedlogs/input_test.go +++ b/x-pack/filebeat/input/unifiedlogs/input_test.go @@ -14,6 +14,7 @@ import ( "os" "os/exec" "path" + "regexp" "strings" "sync" "testing" @@ -59,7 +60,7 @@ func TestInput(t *testing.T) { name string cfg config timeUntilClose time.Duration - assertFunc func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) + assertFunc func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time, cmd ...string) expectedLogStreamCmd string expectedLogShowCmd string expectedRunErrorMsg string @@ -69,7 +70,7 @@ func TestInput(t *testing.T) { cfg: config{}, timeUntilClose: time.Second, expectedLogStreamCmd: "/usr/bin/log stream --style ndjson", - assertFunc: func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) { + assertFunc: func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time, cmd ...string) { assert.NotEmpty(collect, events) assert.NotEmpty(collect, cursors) assert.Equal(collect, len(events), len(cursors)) @@ -156,12 +157,12 @@ func TestInput(t *testing.T) { }, commonConfig: commonConfig{ Predicate: []string{ - `processImagePath == "\/kernel"`, + `processImagePath == "/kernel"`, }, }, }, timeUntilClose: time.Second, - expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --predicate sender = 'Security'", archivePath), + expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --predicate processImagePath == \"/kernel\"", archivePath), assertFunc: eventsAndCursorAssertN(460), }, { @@ -171,11 +172,13 @@ func TestInput(t *testing.T) { ArchiveFile: archivePath, }, commonConfig: commonConfig{ - Process: []string{""}, + Process: []string{ + "0", + }, }, }, timeUntilClose: time.Second, - expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --process 617", archivePath), + expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --process 0", archivePath), assertFunc: eventsAndCursorAssertN(462), }, { @@ -194,7 +197,49 @@ func TestInput(t *testing.T) { }, timeUntilClose: time.Second, expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --info --debug --backtrace --signpost --mach-continuous-time", archivePath), - assertFunc: eventsAndCursorAssertN(4081), + assertFunc: eventsAndCursorAssertN(462), + }, + { + name: "Stream and Backfill", + cfg: config{ + Backfill: true, + showConfig: showConfig{ + Start: time.Now().Add(-5 * time.Second).Format("2006-01-02 15:04:05"), + }, + commonConfig: commonConfig{ + Info: true, + Debug: true, + Backtrace: true, + Signpost: true, + MachContinuousTime: true, + }, + }, + timeUntilClose: 2 * time.Second, + expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --info --debug --backtrace --signpost --mach-continuous-time --start %v", time.Now().Format("2006-01-02")), + expectedLogStreamCmd: "/usr/bin/log stream --style ndjson --info --debug --backtrace --signpost --mach-continuous-time", + assertFunc: func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time, cmd ...string) { + assert.Less(collect, 0, len(events)) + assert.Less(collect, 0, len(cursors)) + + var endTime time.Time + regex := regexp.MustCompile(`--end\s+(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}[+-]\d{4})`) + if len(cmd) > 0 { + matches := regex.FindStringSubmatch(cmd[0]) + assert.Equal(collect, 2, len(matches)) + endTime, _ = time.Parse("2006-01-02 15:04:05-0700", matches[1]) + } + endTime = endTime.Truncate(time.Second) + + for i := range events { + if cursors[i] == nil { + firstStreamedEventTime := events[i].Timestamp + firstStreamedEventTime = firstStreamedEventTime.Add(time.Second).Truncate(time.Second) + assert.Equal(collect, endTime, firstStreamedEventTime) + break + } + } + + }, }, } @@ -232,9 +277,9 @@ func TestInput(t *testing.T) { assert.EventuallyWithT(t, func(collect *assert.CollectT) { assert.Equal(collect, tc.expectedLogStreamCmd, filterLogStreamLogline(buf.Bytes())) - assert.Equal(collect, tc.expectedLogShowCmd, filterLogShowLogline(buf.Bytes())) + assert.Equal(collect, true, strings.HasPrefix(filterLogShowLogline(buf.Bytes()), tc.expectedLogShowCmd)) if tc.assertFunc != nil { - tc.assertFunc(collect, pub.events, pub.cursors) + tc.assertFunc(collect, pub.events, pub.cursors, filterLogShowLogline(buf.Bytes())) } }, 30*time.Second, time.Second, @@ -272,8 +317,8 @@ func filterLogCmdLine(buf []byte, cmdPrefix string) string { return "" } -func eventsAndCursorAssertN(n int) func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) { - return func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) { +func eventsAndCursorAssertN(n int) func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time, cmd ...string) { + return func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time, cmd ...string) { assert.Equal(collect, n, len(events)) assert.Equal(collect, n, len(cursors)) lastEvent := events[len(events)-1]