Skip to content

Commit

Permalink
Add stream test
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardoungureanu committed Dec 5, 2024
1 parent 0f73a09 commit 3be1040
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 18 deletions.
18 changes: 11 additions & 7 deletions x-pack/filebeat/input/unifiedlogs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,22 +214,26 @@ func (input *input) processLogs(stdout io.Reader, pub inputcursor.Publisher, log
scanner := bufio.NewScanner(stdout)

var (
event beat.Event
line string
logRecord logRecord
timestamp time.Time
err error
event beat.Event
line string
logRecordLine logRecord
timestamp time.Time
err error
)

for scanner.Scan() {
line = scanner.Text()
if err = json.Unmarshal([]byte(line), &logRecord); err != nil {
if err = json.Unmarshal([]byte(line), &logRecordLine); err != nil {
log.Errorf("invalid json log: %v", err)
input.metrics.errs.Add(1)
continue
}

timestamp, err = time.Parse(logDateLayout, logRecord.Timestamp)
if logRecordLine == (logRecord{}) {
continue
}

timestamp, err = time.Parse(logDateLayout, logRecordLine.Timestamp)
if err != nil {
input.metrics.errs.Add(1)
log.Errorf("invalid timestamp: %v", err)
Expand Down
67 changes: 56 additions & 11 deletions x-pack/filebeat/input/unifiedlogs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os"
"os/exec"
"path"
"regexp"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -59,7 +60,7 @@ func TestInput(t *testing.T) {
name string
cfg config
timeUntilClose time.Duration
assertFunc func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time)
assertFunc func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time, cmd ...string)
expectedLogStreamCmd string
expectedLogShowCmd string
expectedRunErrorMsg string
Expand All @@ -69,7 +70,7 @@ func TestInput(t *testing.T) {
cfg: config{},
timeUntilClose: time.Second,
expectedLogStreamCmd: "/usr/bin/log stream --style ndjson",
assertFunc: func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) {
assertFunc: func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time, cmd ...string) {
assert.NotEmpty(collect, events)
assert.NotEmpty(collect, cursors)
assert.Equal(collect, len(events), len(cursors))
Expand Down Expand Up @@ -156,12 +157,12 @@ func TestInput(t *testing.T) {
},
commonConfig: commonConfig{
Predicate: []string{
`processImagePath == "\/kernel"`,
`processImagePath == "/kernel"`,
},
},
},
timeUntilClose: time.Second,
expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --predicate sender = 'Security'", archivePath),
expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --predicate processImagePath == \"/kernel\"", archivePath),
assertFunc: eventsAndCursorAssertN(460),
},
{
Expand All @@ -171,11 +172,13 @@ func TestInput(t *testing.T) {
ArchiveFile: archivePath,
},
commonConfig: commonConfig{
Process: []string{""},
Process: []string{
"0",
},
},
},
timeUntilClose: time.Second,
expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --process 617", archivePath),
expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --process 0", archivePath),
assertFunc: eventsAndCursorAssertN(462),
},
{
Expand All @@ -194,7 +197,49 @@ func TestInput(t *testing.T) {
},
timeUntilClose: time.Second,
expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --info --debug --backtrace --signpost --mach-continuous-time", archivePath),
assertFunc: eventsAndCursorAssertN(4081),
assertFunc: eventsAndCursorAssertN(462),
},
{
name: "Stream and Backfill",
cfg: config{
Backfill: true,
showConfig: showConfig{
Start: time.Now().Add(-5 * time.Second).Format("2006-01-02 15:04:05"),
},
commonConfig: commonConfig{
Info: true,
Debug: true,
Backtrace: true,
Signpost: true,
MachContinuousTime: true,
},
},
timeUntilClose: 2 * time.Second,
expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --info --debug --backtrace --signpost --mach-continuous-time --start %v", time.Now().Format("2006-01-02")),
expectedLogStreamCmd: "/usr/bin/log stream --style ndjson --info --debug --backtrace --signpost --mach-continuous-time",
assertFunc: func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time, cmd ...string) {
assert.Less(collect, 0, len(events))
assert.Less(collect, 0, len(cursors))

var endTime time.Time
regex := regexp.MustCompile(`--end\s+(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}[+-]\d{4})`)
if len(cmd) > 0 {
matches := regex.FindStringSubmatch(cmd[0])
assert.Equal(collect, 2, len(matches))
endTime, _ = time.Parse("2006-01-02 15:04:05-0700", matches[1])
}
endTime = endTime.Truncate(time.Second)

for i := range events {
if cursors[i] == nil {
firstStreamedEventTime := events[i].Timestamp
firstStreamedEventTime = firstStreamedEventTime.Add(time.Second).Truncate(time.Second)
assert.Equal(collect, endTime, firstStreamedEventTime)
break
}
}

},
},
}

Expand Down Expand Up @@ -232,9 +277,9 @@ func TestInput(t *testing.T) {
assert.EventuallyWithT(t,
func(collect *assert.CollectT) {
assert.Equal(collect, tc.expectedLogStreamCmd, filterLogStreamLogline(buf.Bytes()))
assert.Equal(collect, tc.expectedLogShowCmd, filterLogShowLogline(buf.Bytes()))
assert.Equal(collect, true, strings.HasPrefix(filterLogShowLogline(buf.Bytes()), tc.expectedLogShowCmd))
if tc.assertFunc != nil {
tc.assertFunc(collect, pub.events, pub.cursors)
tc.assertFunc(collect, pub.events, pub.cursors, filterLogShowLogline(buf.Bytes()))
}
},
30*time.Second, time.Second,
Expand Down Expand Up @@ -272,8 +317,8 @@ func filterLogCmdLine(buf []byte, cmdPrefix string) string {
return ""
}

func eventsAndCursorAssertN(n int) func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) {
return func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) {
func eventsAndCursorAssertN(n int) func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time, cmd ...string) {
return func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time, cmd ...string) {
assert.Equal(collect, n, len(events))
assert.Equal(collect, n, len(cursors))
lastEvent := events[len(events)-1]
Expand Down

0 comments on commit 3be1040

Please sign in to comment.