diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 47587a3be0cf..939523e41009 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -341,6 +341,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] - Refactor & cleanup with updates to default values and documentation. {pull}41834[41834] - Update CEL mito extensions to v1.16.0. {pull}41727[41727] +- Add `unifiedlogs` input for MacOS. {pull}41791[41791] - Add evaluation state dump debugging option to CEL input. {pull}41335[41335] - Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862] - Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869] diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc index 3b1d274d59bd..5e77999a00b3 100644 --- a/filebeat/docs/filebeat-options.asciidoc +++ b/filebeat/docs/filebeat-options.asciidoc @@ -95,6 +95,7 @@ You can configure {beatname_uc} to use the following inputs: * <<{beatname_lc}-input-syslog>> * <<{beatname_lc}-input-tcp>> * <<{beatname_lc}-input-udp>> +* <<{beatname_lc}-input-unifiedlogs>> * <<{beatname_lc}-input-unix>> * <<{beatname_lc}-input-winlog>> @@ -158,6 +159,8 @@ include::inputs/input-tcp.asciidoc[] include::inputs/input-udp.asciidoc[] +include::../../x-pack/filebeat/docs/inputs/input-unifiedlogs.asciidoc[] + include::inputs/input-unix.asciidoc[] include::inputs/input-winlog.asciidoc[] diff --git a/x-pack/filebeat/docs/inputs/input-unifiedlogs.asciidoc b/x-pack/filebeat/docs/inputs/input-unifiedlogs.asciidoc new file mode 100644 index 000000000000..37589f307fb3 --- /dev/null +++ b/x-pack/filebeat/docs/inputs/input-unifiedlogs.asciidoc @@ -0,0 +1,180 @@ +[role="xpack"] + +:type: unifiedlogs + +[id="{beatname_lc}-input-{type}"] +=== Unified Logs input + +++++ +Unified Logs +++++ + +NOTE: Only available for MacOS. + +The unified logging system provides a comprehensive and performant API to capture +telemetry across all levels of the system. This system centralizes the storage of +log data in memory and on disk, rather than writing that data to a text-based log file. + +The input interacts with the `log` command-line tool to provide access to the events. + +The input starts streaming events from the current point in time unless a start date or +the `backfill` options are set. When restarted it will continue where it left off. + +Alternatively, it can also do one off operations, such as: + +- Stream events contained in a `.logarchive` file. +- Stream events contained in a `.tracev3` file. +- Stream events in a specific time span, by providing a specific end date. + +After this one off operations complete, the input will stop. + +Other configuration options can be specified to filter what events to process. + +NOTE: The input can cause some duplicated events when backfilling and/or +restarting. This is caused by how the underlying fetching method works and +should be taken into account when using the input. + +Example configuration: + +Process all old and new logs: + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: unifiedlogs + id: unifiedlogs-id + enabled: true + backfill: true +---- + +Process logs with predicate filters: + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: unifiedlogs + id: unifiedlogs-id + enabled: true + predicate: + # Captures keychain.db unlock events + - 'process == "loginwindow" && sender == "Security"' + # Captures user login events + - 'process == "logind"' + # Captures command line activity run with elevated privileges + - 'process == "sudo"' +---- + +==== Configuration options + +The `unifiedlogs` input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +[float] +==== `archive_file` + +Display events stored in the given archive. +The archive must be a valid log archive bundle with the suffix `.logarchive`. + +[float] +==== `trace_file` + +Display events stored in the given `.tracev3` file. +In order to be decoded, the file must be contained within a valid `.logarchive` + +[float] +==== `start` + +Shows content starting from the provided date. +The following date/time formats are accepted: +`YYYY-MM-DD`, `YYYY-MM-DD HH:MM:SS`, `YYYY-MM-DD HH:MM:SSZZZZZ`. + +[float] +==== `end` + +Shows content up to the provided date. +The following date/time formats are accepted: +`YYYY-MM-DD`, `YYYY-MM-DD HH:MM:SS`, `YYYY-MM-DD HH:MM:SSZZZZZ`. + +[float] +==== `predicate` + +Filters messages using the provided predicate based on NSPredicate. +A compound predicate or multiple predicates can be provided as a list. + +For detailed information on the use of predicate based filtering, +please refer to the https://developer.apple.com/library/mac/documentation/Cocoa/Conceptual/Predicates/Articles/pSyntax.html[Predicate Programming Guide]. + +[float] +==== `process` + +A list of the processes on which to operate. It accepts a PID or process name. + +[float] +==== `source` + +Include symbol names and source line numbers for messages, if available. +Default: `false`. + +[float] +==== `info` + +Disable or enable info level messages. +Default: `false`. + +[float] +==== `debug` + +Disable or enable debug level messages. +Default: `false`. + +[float] +==== `backtrace` + +Disable or enable display of backtraces. +Default: `false`. + +[float] +==== `signpost` + +Disable or enable display of signposts. +Default: `false`. + +[float] +==== `unreliable` + +Annotate events with whether the log was emitted unreliably. +Default: `false`. + +[float] +==== `mach_continuous_time` + +Use mach continuous time timestamps rather than walltime. +Default: `false`. + +[float] +==== `backfill` + +If set to true the input will process all available logs since the beginning +of time the first time it starts. +Default: `false`. + + +[id="{beatname_lc}-input-{type}-common-options"] +include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] + +[float] +=== Metrics + +This input exposes metrics under the <>. +These metrics are exposed under the `/inputs/` path. They can be used to +observe the activity of the input. + +You must assign a unique `id` to the input to expose metrics. + +[options="header"] +|======= +| Metric | Description +| `errors_total` | Total number of errors. +|======= + +:type!: diff --git a/x-pack/filebeat/input/default-inputs/inputs_darwin.go b/x-pack/filebeat/input/default-inputs/inputs_darwin.go new file mode 100644 index 000000000000..b43d75258b35 --- /dev/null +++ b/x-pack/filebeat/input/default-inputs/inputs_darwin.go @@ -0,0 +1,54 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build darwin + +package inputs + +import ( + "github.com/elastic/beats/v7/filebeat/beater" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" + "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" + "github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage" + "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" + "github.com/elastic/beats/v7/x-pack/filebeat/input/benchmark" + "github.com/elastic/beats/v7/x-pack/filebeat/input/cel" + "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" + "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics" + "github.com/elastic/beats/v7/x-pack/filebeat/input/gcs" + "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" + "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" + "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" + "github.com/elastic/beats/v7/x-pack/filebeat/input/salesforce" + "github.com/elastic/beats/v7/x-pack/filebeat/input/streaming" + "github.com/elastic/beats/v7/x-pack/filebeat/input/unifiedlogs" + "github.com/elastic/elastic-agent-libs/logp" +) + +func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { + return []v2.Plugin{ + azureblobstorage.Plugin(log, store), + azureeventhub.Plugin(log), + cel.Plugin(log, store), + cloudfoundry.Plugin(), + entityanalytics.Plugin(log), + gcs.Plugin(log, store), + http_endpoint.Plugin(), + httpjson.Plugin(log, store), + o365audit.Plugin(log, store), + awss3.Plugin(store), + awscloudwatch.Plugin(), + lumberjack.Plugin(), + salesforce.Plugin(log, store), + streaming.Plugin(log, store), + streaming.PluginWebsocketAlias(log, store), + netflow.Plugin(log), + benchmark.Plugin(), + unifiedlogs.Plugin(log, store), + } +} diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index 731f8979766e..a14145a5265f 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build !aix && !windows +//go:build !aix && !darwin && !windows package inputs diff --git a/x-pack/filebeat/input/unifiedlogs/config.go b/x-pack/filebeat/input/unifiedlogs/config.go new file mode 100644 index 000000000000..0999e7d768b8 --- /dev/null +++ b/x-pack/filebeat/input/unifiedlogs/config.go @@ -0,0 +1,75 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build darwin + +package unifiedlogs + +import ( + "fmt" + "strings" + "time" +) + +type config struct { + showConfig + commonConfig + Backfill bool `config:"backfill"` +} + +type showConfig struct { + ArchiveFile string `config:"archive_file"` + TraceFile string `config:"trace_file"` + Start string `config:"start"` + End string `config:"end"` +} + +type commonConfig struct { + Predicate []string `config:"predicate"` + Process []string `config:"process"` + Source bool `config:"source"` + Info bool `config:"info"` + Debug bool `config:"debug"` + Backtrace bool `config:"backtrace"` + Signpost bool `config:"signpost"` + Unreliable bool `config:"unreliable"` + MachContinuousTime bool `config:"mach_continuous_time"` +} + +func (c config) Validate() error { + if err := checkDateFormat(c.Start); err != nil { + return fmt.Errorf("start date is not valid: %w", err) + } + if err := checkDateFormat(c.End); err != nil { + return fmt.Errorf("end date is not valid: %w", err) + } + if c.ArchiveFile != "" && !strings.HasSuffix(c.ArchiveFile, ".logarchive") { + return fmt.Errorf("archive_file %v has the wrong extension", c.ArchiveFile) + } + if c.TraceFile != "" && !strings.HasSuffix(c.TraceFile, ".tracev3") { + return fmt.Errorf("trace_file %v has the wrong extension", c.TraceFile) + } + return nil +} + +func defaultConfig() config { + return config{} +} + +func checkDateFormat(date string) error { + if date == "" { + return nil + } + acceptedLayouts := []string{ + "2006-01-02", + "2006-01-02 15:04:05", + "2006-01-02 15:04:05-0700", + } + for _, layout := range acceptedLayouts { + if _, err := time.Parse(layout, date); err == nil { + return nil + } + } + return fmt.Errorf("not a valid date, accepted layouts are: %v", acceptedLayouts) +} diff --git a/x-pack/filebeat/input/unifiedlogs/input.go b/x-pack/filebeat/input/unifiedlogs/input.go new file mode 100644 index 000000000000..ca33c03c3bb6 --- /dev/null +++ b/x-pack/filebeat/input/unifiedlogs/input.go @@ -0,0 +1,385 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build darwin + +package unifiedlogs + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "os/exec" + "sync" + "sync/atomic" + "time" + + "golang.org/x/sync/errgroup" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/go-concert/ctxtool" +) + +const ( + inputName = "unifiedlogs" + srcArchiveName = "log-cmd-archive" + srcPollName = "log-cmd-poll" + logDateLayout = "2006-01-02 15:04:05.999999-0700" + cursorDateLayout = "2006-01-02 15:04:05-0700" +) + +var ( + // override for testing + timeNow = time.Now +) + +func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Stable, + Deprecated: false, + Manager: &inputcursor.InputManager{ + Logger: log, + StateStore: store, + Type: inputName, + Configure: cursorConfigure, + }, + } +} + +type logRecord struct { + Timestamp string `json:"timestamp"` +} + +type source struct { + name string +} + +func newSource(config config) source { + if config.ArchiveFile != "" || config.TraceFile != "" { + return source{name: srcArchiveName} + } + return source{name: srcPollName} +} + +func (src source) Name() string { return src.name } + +type input struct { + config + metrics *inputMetrics +} + +func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, error) { + conf := defaultConfig() + if err := cfg.Unpack(&conf); err != nil { + return nil, nil, err + } + sources, inp := newCursorInput(conf) + return sources, inp, nil +} + +func newCursorInput(config config) ([]inputcursor.Source, inputcursor.Input) { + input := &input{config: config} + return []inputcursor.Source{newSource(config)}, input +} + +func (input) Name() string { return inputName } + +func (input input) Test(src inputcursor.Source, _ v2.TestContext) error { + if _, err := exec.LookPath("log"); err != nil { + return err + } + return nil +} + +// Run starts the input and blocks until it ends the execution. +func (input *input) Run(ctxt v2.Context, src inputcursor.Source, resumeCursor inputcursor.Cursor, pub inputcursor.Publisher) error { + reg, unreg := inputmon.NewInputRegistry(input.Name(), ctxt.ID, nil) + defer unreg() + input.metrics = newInputMetrics(reg) + + stdCtx := ctxtool.FromCanceller(ctxt.Cancelation) + log := ctxt.Logger.With("source", src.Name()) + + startFrom, err := loadCursor(resumeCursor, log) + if err != nil { + return err + } + if startFrom != "" { + input.Start = startFrom + } + + return input.runWithMetrics(stdCtx, pub, log) +} + +func (input *input) runWithMetrics(ctx context.Context, pub inputcursor.Publisher, log *logp.Logger) error { + // we create a wrapped publisher for the streaming go routine. + // It will notify the backfilling goroutine with the end date of the + // backfilling period and avoid updating the stored date to resume + // until backfilling is done. + wrappedPub := newWrappedPublisher(!input.mustBackfill(), pub) + + var g errgroup.Group + // we start the streaming command in the background + // it will use the wrapped publisher to set the end date for the + // backfilling process. + if input.mustStream() { + g.Go(func() error { + logCmd := newLogStreamCmd(ctx, input.commonConfig) + return input.runLogCmd(ctx, logCmd, wrappedPub, log) + }) + } + + if input.mustBackfill() { + g.Go(func() error { + if input.mustStream() { + t := wrappedPub.getFirstProcessedTime() + // The time resolution of the log tool is microsecond, while it only + // accepts second resolution as an end parameter. + // To avoid potentially losing data we move the end forward one second, + // since it is preferable to have some duplicated events. + t = t.Add(time.Second) + input.End = t.Format(cursorDateLayout) + + // to avoid race conditions updating the cursor, and to be able to + // resume from the oldest point in time, we only update cursor + // from the streaming goroutine once backfilling is done. + defer wrappedPub.startUpdatingCursor() + } + logCmd := newLogShowCmd(ctx, input.config) + err := input.runLogCmd(ctx, logCmd, pub, log) + if !input.mustStream() { + log.Debugf("finished processing events, stopping") + } + return err + }) + } + + return g.Wait() +} + +// mustStream returns true in case a stream command is needed. +// This is the default case and the only exceptions are when an archive file or an end date are set. +func (input *input) mustStream() bool { + return !(input.ArchiveFile != "" || input.TraceFile != "" || input.End != "") +} + +// mustBackfill returns true in case a show command is needed. +// This happens when start or end dates are set (for example when resuming filebeat), when an archive file is used, +// or when user forces it via the backfill config. +func (input *input) mustBackfill() bool { + return input.Backfill || input.ArchiveFile != "" || input.TraceFile != "" || input.Start != "" || input.End != "" +} + +func (input *input) runLogCmd(ctx context.Context, logCmd *exec.Cmd, pub inputcursor.Publisher, log *logp.Logger) error { + outpipe, err := logCmd.StdoutPipe() + if err != nil { + return fmt.Errorf("get stdout pipe: %w", err) + } + errpipe, err := logCmd.StderrPipe() + if err != nil { + return fmt.Errorf("get stderr pipe: %w", err) + } + + log.Debugf("exec command start: %v", logCmd) + defer log.Debugf("exec command end: %v", logCmd) + + if err := logCmd.Start(); err != nil { + return fmt.Errorf("start log command: %w", err) + } + + if err := input.processLogs(outpipe, pub, log); err != nil { + log.Errorf("process logs: %v", err) + } + + stderrBytes, _ := io.ReadAll(errpipe) + if err := logCmd.Wait(); err != nil && ctx.Err() == nil { + return fmt.Errorf("%q exited with an error: %w, %q", logCmd, err, string(stderrBytes)) + } + + return nil +} + +func (input *input) processLogs(stdout io.Reader, pub inputcursor.Publisher, log *logp.Logger) error { + scanner := bufio.NewScanner(stdout) + + var ( + event beat.Event + line string + logRecordLine logRecord + timestamp time.Time + err error + ) + + for scanner.Scan() { + line = scanner.Text() + if err = json.Unmarshal([]byte(line), &logRecordLine); err != nil { + log.Errorf("invalid json log: %v", err) + input.metrics.errs.Add(1) + continue + } + + if logRecordLine == (logRecord{}) { + continue + } + + timestamp, err = time.Parse(logDateLayout, logRecordLine.Timestamp) + if err != nil { + input.metrics.errs.Add(1) + log.Errorf("invalid timestamp: %v", err) + continue + } + + event = makeEvent(timestamp, line) + if err = pub.Publish(event, timestamp); err != nil { + log.Errorf("publish event: %v", err) + input.metrics.errs.Add(1) + continue + } + } + if err = scanner.Err(); err != nil { + input.metrics.errs.Add(1) + return fmt.Errorf("scanning stdout: %w", err) + } + + return nil +} + +// wrappedPublisher wraps a publisher and stores the first published event date. +// this is required in order to backfill the events when we start a streaming command. +type wrappedPublisher struct { + firstTimeOnce sync.Once + firstTimeC chan struct{} + firstProcessedTime time.Time + + updateCursor *atomic.Bool + + inner inputcursor.Publisher +} + +func newWrappedPublisher(updateCursor bool, inner inputcursor.Publisher) *wrappedPublisher { + var atomicUC atomic.Bool + atomicUC.Store(updateCursor) + return &wrappedPublisher{ + firstTimeC: make(chan struct{}), + updateCursor: &atomicUC, + inner: inner, + } +} + +func (pub *wrappedPublisher) Publish(event beat.Event, cursor interface{}) error { + pub.firstTimeOnce.Do(func() { + pub.firstProcessedTime = cursor.(time.Time) + close(pub.firstTimeC) + }) + if !pub.updateCursor.Load() { + cursor = nil + } + return pub.inner.Publish(event, cursor) +} + +// getFirstProcessedTime will block until there is a value set for firstProcessedTime. +func (pub *wrappedPublisher) getFirstProcessedTime() time.Time { + <-pub.firstTimeC + return pub.firstProcessedTime +} + +func (pub *wrappedPublisher) startUpdatingCursor() { + pub.updateCursor.Store(true) +} + +func loadCursor(c inputcursor.Cursor, log *logp.Logger) (string, error) { + if c.IsNew() { + return "", nil + } + var ( + startFrom string + cursor time.Time + ) + if err := c.Unpack(&cursor); err != nil { + return "", fmt.Errorf("unpack cursor: %w", err) + } + log.Infof("cursor loaded, resuming from: %v", startFrom) + return cursor.Format(cursorDateLayout), nil +} + +func newLogShowCmd(ctx context.Context, cfg config) *exec.Cmd { + return exec.CommandContext(ctx, "log", newLogCmdArgs("show", cfg)...) // #nosec G204 +} + +func newLogStreamCmd(ctx context.Context, cfg commonConfig) *exec.Cmd { + return exec.CommandContext(ctx, "log", newLogCmdArgs("stream", config{commonConfig: cfg})...) // #nosec G204 +} + +func newLogCmdArgs(subcmd string, config config) []string { + args := []string{subcmd, "--style", "ndjson"} + if config.ArchiveFile != "" { + args = append(args, "--archive", config.ArchiveFile) + } + if config.TraceFile != "" { + args = append(args, "--file", config.TraceFile) + } + if len(config.Predicate) > 0 { + for _, p := range config.Predicate { + args = append(args, "--predicate", p) + } + } + if len(config.Process) > 0 { + for _, p := range config.Process { + args = append(args, "--process", p) + } + } + if config.Source { + args = append(args, "--source") + } + if config.Info { + args = append(args, "--info") + } + if config.Debug { + args = append(args, "--debug") + } + if config.Backtrace { + args = append(args, "--backtrace") + } + if config.Signpost { + args = append(args, "--signpost") + } + if config.Unreliable { + args = append(args, "--unreliable") + } + if config.MachContinuousTime { + args = append(args, "--mach-continuous-time") + } + if config.Start != "" { + args = append(args, "--start", config.Start) + } + if config.End != "" { + args = append(args, "--end", config.End) + } + return args +} + +func makeEvent(timestamp time.Time, message string) beat.Event { + now := timeNow() + fields := mapstr.M{ + "event": mapstr.M{ + "created": now, + }, + "message": message, + } + + return beat.Event{ + Timestamp: timestamp, + Fields: fields, + } +} diff --git a/x-pack/filebeat/input/unifiedlogs/input_test.go b/x-pack/filebeat/input/unifiedlogs/input_test.go new file mode 100644 index 000000000000..907c927254d3 --- /dev/null +++ b/x-pack/filebeat/input/unifiedlogs/input_test.go @@ -0,0 +1,391 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build darwin + +package unifiedlogs + +import ( + "bufio" + "bytes" + "context" + "fmt" + "os" + "os/exec" + "path" + "regexp" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" +) + +var _ inputcursor.Publisher = (*publisher)(nil) + +type publisher struct { + m sync.Mutex + + events []beat.Event + cursors []*time.Time +} + +func (p *publisher) Publish(e beat.Event, cursor interface{}) error { + p.m.Lock() + defer p.m.Unlock() + + p.events = append(p.events, e) + var c *time.Time + if cursor != nil { + cv := cursor.(time.Time) + c = &cv + } + p.cursors = append(p.cursors, c) + return nil +} + +func TestInput(t *testing.T) { + archivePath, err := openArchive() + require.NoError(t, err) + t.Cleanup(func() { os.RemoveAll(archivePath) }) + + testCases := []struct { + name string + cfg config + timeUntilClose time.Duration + assertFunc func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) + expectedLogStreamCmd string + expectedLogShowCmd string + expectedRunErrorMsg string + }{ + { + name: "Default stream", + cfg: config{}, + timeUntilClose: time.Second, + expectedLogStreamCmd: "/usr/bin/log stream --style ndjson", + assertFunc: func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) { + assert.NotEmpty(collect, events) + assert.NotEmpty(collect, cursors) + assert.Equal(collect, len(events), len(cursors)) + lastEvent := events[len(events)-1] + lastCursor := cursors[len(cursors)-1] + assert.EqualValues(collect, &lastEvent.Timestamp, lastCursor) + }, + }, + { + name: "Archive not found", + cfg: config{ + showConfig: showConfig{ + ArchiveFile: "notfound.logarchive", + }, + }, + timeUntilClose: time.Second, + expectedLogShowCmd: "/usr/bin/log show --style ndjson --archive notfound.logarchive", + expectedRunErrorMsg: "\"/usr/bin/log show --style ndjson --archive notfound.logarchive\" exited with an error: exit status 64", + }, + { + name: "Archived file", + cfg: config{ + showConfig: showConfig{ + ArchiveFile: archivePath, + }, + }, + timeUntilClose: time.Second, + expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s", archivePath), + assertFunc: eventsAndCursorAssertN(462), + }, + { + name: "Trace file", + cfg: config{ + showConfig: showConfig{ + TraceFile: path.Join(archivePath, "logdata.LiveData.tracev3"), + }, + }, + timeUntilClose: time.Second, + expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --file %s", path.Join(archivePath, "logdata.LiveData.tracev3")), + assertFunc: eventsAndCursorAssertN(7), + }, + { + name: "With start date", + cfg: config{ + showConfig: showConfig{ + ArchiveFile: archivePath, + Start: "2024-12-04 13:46:00+0200", + }, + }, + timeUntilClose: time.Second, + expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --start 2024-12-04 13:46:00+0200", archivePath), + assertFunc: eventsAndCursorAssertN(314), + }, + { + name: "With start and end dates", + cfg: config{ + showConfig: showConfig{ + ArchiveFile: archivePath, + Start: "2024-12-04 13:45:00+0200", + End: "2024-12-04 13:46:00+0200", + }, + }, + timeUntilClose: time.Second, + expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --start 2024-12-04 13:45:00+0200 --end 2024-12-04 13:46:00+0200", archivePath), + assertFunc: eventsAndCursorAssertN(149), + }, + { + name: "With end date", + cfg: config{ + showConfig: showConfig{ + ArchiveFile: archivePath, + End: "2024-12-04 13:46:00+0200", + }, + }, + timeUntilClose: time.Second, + expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --end 2024-12-04 13:46:00+0200", archivePath), + assertFunc: eventsAndCursorAssertN(462), + }, + { + name: "With predicate", + cfg: config{ + showConfig: showConfig{ + ArchiveFile: archivePath, + }, + commonConfig: commonConfig{ + Predicate: []string{ + `processImagePath == "/kernel"`, + }, + }, + }, + timeUntilClose: time.Second, + expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --predicate processImagePath == \"/kernel\"", archivePath), + assertFunc: eventsAndCursorAssertN(460), + }, + { + name: "With process", + cfg: config{ + showConfig: showConfig{ + ArchiveFile: archivePath, + }, + commonConfig: commonConfig{ + Process: []string{ + "0", + }, + }, + }, + timeUntilClose: time.Second, + expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --process 0", archivePath), + assertFunc: eventsAndCursorAssertN(462), + }, + { + name: "With optional flags", + cfg: config{ + showConfig: showConfig{ + ArchiveFile: archivePath, + }, + commonConfig: commonConfig{ + Info: true, + Debug: true, + Backtrace: true, + Signpost: true, + MachContinuousTime: true, + }, + }, + timeUntilClose: time.Second, + expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --info --debug --backtrace --signpost --mach-continuous-time", archivePath), + assertFunc: eventsAndCursorAssertN(462), + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + _, cursorInput := newCursorInput(tc.cfg) + input := cursorInput.(*input) + + ctx, cancel := context.WithCancel(context.Background()) + + pub := &publisher{} + log, buf := logp.NewInMemory("unifiedlogs_test", logp.JSONEncoderConfig()) + + var wg sync.WaitGroup + wg.Add(1) + go func(t *testing.T) { + defer wg.Done() + err := input.runWithMetrics(ctx, pub, log) + if tc.expectedRunErrorMsg == "" { + assert.NoError(t, err) + } else { + assert.ErrorContains(t, err, tc.expectedRunErrorMsg) + } + }(t) + + select { + case <-ctx.Done(): + case <-time.After(tc.timeUntilClose): + } + + cancel() + wg.Wait() + + assert.EventuallyWithT(t, + func(collect *assert.CollectT) { + assert.Equal(collect, tc.expectedLogStreamCmd, filterStartLogStreamLogline(buf.Bytes())) + assert.Equal(collect, tc.expectedLogShowCmd, filterStartLogShowLogline(buf.Bytes())) + if tc.assertFunc != nil { + tc.assertFunc(collect, pub.events, pub.cursors) + } + }, + 30*time.Second, time.Second, + ) + }) + } +} + +func TestBackfillAndStream(t *testing.T) { + archivePath, err := openArchive() + require.NoError(t, err) + t.Cleanup(func() { os.RemoveAll(archivePath) }) + + cfg := config{ + Backfill: true, + showConfig: showConfig{ + Start: time.Now().Add(-5 * time.Second).Format("2006-01-02 15:04:05"), + }, + commonConfig: commonConfig{ + Info: true, + Debug: true, + Backtrace: true, + Signpost: true, + MachContinuousTime: true, + }, + } + + expectedLogShowCmd := fmt.Sprintf("/usr/bin/log show --style ndjson --info --debug --backtrace --signpost --mach-continuous-time --start %v", time.Now().Format("2006-01-02")) + expectedLogStreamCmd := "/usr/bin/log stream --style ndjson --info --debug --backtrace --signpost --mach-continuous-time" + + _, cursorInput := newCursorInput(cfg) + input := cursorInput.(*input) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + pub := &publisher{} + log, buf := logp.NewInMemory("unifiedlogs_test", logp.JSONEncoderConfig()) + + var wg sync.WaitGroup + wg.Add(1) + go func(t *testing.T) { + defer wg.Done() + err := input.runWithMetrics(ctx, pub, log) + assert.NoError(t, err) + }(t) + + var firstStreamedEventTime *time.Time + assert.EventuallyWithT(t, + func(collect *assert.CollectT) { + showCmdLog := filterStartLogShowLogline(buf.Bytes()) + assert.Equal(collect, expectedLogStreamCmd, filterStartLogStreamLogline(buf.Bytes())) + assert.True(collect, strings.HasPrefix(showCmdLog, expectedLogShowCmd)) + assert.NotEmpty(collect, pub.events) + assert.NotEmpty(collect, pub.cursors) + + var endTime time.Time + regex := regexp.MustCompile(`--end\s+(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}[+-]\d{4})`) + matches := regex.FindStringSubmatch(showCmdLog) + assert.Equal(collect, 2, len(matches)) + endTime, _ = time.Parse("2006-01-02 15:04:05-0700", matches[1]) + endTime = endTime.Truncate(time.Second) + + if firstStreamedEventTime == nil { + for i := range pub.events { + if pub.cursors[i] == nil { + first := pub.events[i].Timestamp.Add(time.Second).Truncate(time.Second) + firstStreamedEventTime = &first + break + } + } + } + assert.NotNil(collect, firstStreamedEventTime) + assert.EqualValues(collect, endTime, *firstStreamedEventTime) + assert.True(collect, strings.HasPrefix(showCmdLog, filterEndLogShowLogline(buf.Bytes()))) + }, + 30*time.Second, time.Second, + ) + + cancel() + wg.Wait() +} + +const ( + cmdStartPrefix = "exec command start: " + cmdEndPrefix = "exec command end: " +) + +func filterStartLogStreamLogline(buf []byte) string { + const cmd = "/usr/bin/log stream" + return filterLogCmdLine(buf, cmd, cmdStartPrefix) +} + +func filterStartLogShowLogline(buf []byte) string { + const cmd = "/usr/bin/log show" + return filterLogCmdLine(buf, cmd, cmdStartPrefix) +} + +func filterEndLogShowLogline(buf []byte) string { + const cmd = "/usr/bin/log show" + return filterLogCmdLine(buf, cmd, cmdEndPrefix) +} + +func filterLogCmdLine(buf []byte, cmd, cmdPrefix string) string { + scanner := bufio.NewScanner(bytes.NewBuffer(buf)) + for scanner.Scan() { + text := scanner.Text() + parts := strings.Split(text, "\t") + if len(parts) != 4 { + continue + } + + trimmed := strings.TrimPrefix(parts[3], cmdStartPrefix) + if strings.HasPrefix(trimmed, cmd) { + return trimmed + } + } + return "" +} + +func eventsAndCursorAssertN(n int) func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) { + return func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) { + assert.Equal(collect, n, len(events)) + assert.Equal(collect, n, len(cursors)) + lastEvent := events[len(events)-1] + lastCursor := cursors[len(cursors)-1] + assert.EqualValues(collect, &lastEvent.Timestamp, lastCursor) + } +} + +func openArchive() (string, error) { + return extractTarGz(path.Join("testdata", "test.logarchive.tar.gz")) +} + +func extractTarGz(tarGzPath string) (string, error) { + // Create a temporary directory + tempDir, err := os.MkdirTemp("", "extracted-*") + if err != nil { + return "", fmt.Errorf("failed to create temporary directory: %v", err) + } + + // Use the 'tar' command to extract the .tar.gz file + cmd := exec.Command("tar", "-xzf", tarGzPath, "-C", tempDir) + + // Run the command + if err := cmd.Run(); err != nil { + return "", fmt.Errorf("failed to extract .tar.gz: %v", err) + } + + return path.Join(tempDir, "test.logarchive"), nil +} diff --git a/x-pack/filebeat/input/unifiedlogs/metrics.go b/x-pack/filebeat/input/unifiedlogs/metrics.go new file mode 100644 index 000000000000..2e24c0b4121f --- /dev/null +++ b/x-pack/filebeat/input/unifiedlogs/metrics.go @@ -0,0 +1,27 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build darwin + +package unifiedlogs + +import ( + "github.com/elastic/elastic-agent-libs/monitoring" +) + +type inputMetrics struct { + errs *monitoring.Uint // total number of errors +} + +func newInputMetrics(reg *monitoring.Registry) *inputMetrics { + if reg == nil { + return nil + } + + out := &inputMetrics{ + errs: monitoring.NewUint(reg, "errors_total"), + } + + return out +} diff --git a/x-pack/filebeat/input/unifiedlogs/testdata/test.logarchive.tar.gz b/x-pack/filebeat/input/unifiedlogs/testdata/test.logarchive.tar.gz new file mode 100644 index 000000000000..fc507f9b1e9e Binary files /dev/null and b/x-pack/filebeat/input/unifiedlogs/testdata/test.logarchive.tar.gz differ