diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index 47587a3be0cf..939523e41009 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -341,6 +341,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
- Update CEL mito extensions to v1.16.0. {pull}41727[41727]
+- Add `unifiedlogs` input for MacOS. {pull}41791[41791]
- Add evaluation state dump debugging option to CEL input. {pull}41335[41335]
- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862]
- Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869]
diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc
index 3b1d274d59bd..5e77999a00b3 100644
--- a/filebeat/docs/filebeat-options.asciidoc
+++ b/filebeat/docs/filebeat-options.asciidoc
@@ -95,6 +95,7 @@ You can configure {beatname_uc} to use the following inputs:
* <<{beatname_lc}-input-syslog>>
* <<{beatname_lc}-input-tcp>>
* <<{beatname_lc}-input-udp>>
+* <<{beatname_lc}-input-unifiedlogs>>
* <<{beatname_lc}-input-unix>>
* <<{beatname_lc}-input-winlog>>
@@ -158,6 +159,8 @@ include::inputs/input-tcp.asciidoc[]
include::inputs/input-udp.asciidoc[]
+include::../../x-pack/filebeat/docs/inputs/input-unifiedlogs.asciidoc[]
+
include::inputs/input-unix.asciidoc[]
include::inputs/input-winlog.asciidoc[]
diff --git a/x-pack/filebeat/docs/inputs/input-unifiedlogs.asciidoc b/x-pack/filebeat/docs/inputs/input-unifiedlogs.asciidoc
new file mode 100644
index 000000000000..37589f307fb3
--- /dev/null
+++ b/x-pack/filebeat/docs/inputs/input-unifiedlogs.asciidoc
@@ -0,0 +1,180 @@
+[role="xpack"]
+
+:type: unifiedlogs
+
+[id="{beatname_lc}-input-{type}"]
+=== Unified Logs input
+
+++++
+Unified Logs
+++++
+
+NOTE: Only available for MacOS.
+
+The unified logging system provides a comprehensive and performant API to capture
+telemetry across all levels of the system. This system centralizes the storage of
+log data in memory and on disk, rather than writing that data to a text-based log file.
+
+The input interacts with the `log` command-line tool to provide access to the events.
+
+The input starts streaming events from the current point in time unless a start date or
+the `backfill` options are set. When restarted it will continue where it left off.
+
+Alternatively, it can also do one off operations, such as:
+
+- Stream events contained in a `.logarchive` file.
+- Stream events contained in a `.tracev3` file.
+- Stream events in a specific time span, by providing a specific end date.
+
+After this one off operations complete, the input will stop.
+
+Other configuration options can be specified to filter what events to process.
+
+NOTE: The input can cause some duplicated events when backfilling and/or
+restarting. This is caused by how the underlying fetching method works and
+should be taken into account when using the input.
+
+Example configuration:
+
+Process all old and new logs:
+
+["source","yaml",subs="attributes"]
+----
+{beatname_lc}.inputs:
+- type: unifiedlogs
+ id: unifiedlogs-id
+ enabled: true
+ backfill: true
+----
+
+Process logs with predicate filters:
+
+["source","yaml",subs="attributes"]
+----
+{beatname_lc}.inputs:
+- type: unifiedlogs
+ id: unifiedlogs-id
+ enabled: true
+ predicate:
+ # Captures keychain.db unlock events
+ - 'process == "loginwindow" && sender == "Security"'
+ # Captures user login events
+ - 'process == "logind"'
+ # Captures command line activity run with elevated privileges
+ - 'process == "sudo"'
+----
+
+==== Configuration options
+
+The `unifiedlogs` input supports the following configuration options plus the
+<<{beatname_lc}-input-{type}-common-options>> described later.
+
+[float]
+==== `archive_file`
+
+Display events stored in the given archive.
+The archive must be a valid log archive bundle with the suffix `.logarchive`.
+
+[float]
+==== `trace_file`
+
+Display events stored in the given `.tracev3` file.
+In order to be decoded, the file must be contained within a valid `.logarchive`
+
+[float]
+==== `start`
+
+Shows content starting from the provided date.
+The following date/time formats are accepted:
+`YYYY-MM-DD`, `YYYY-MM-DD HH:MM:SS`, `YYYY-MM-DD HH:MM:SSZZZZZ`.
+
+[float]
+==== `end`
+
+Shows content up to the provided date.
+The following date/time formats are accepted:
+`YYYY-MM-DD`, `YYYY-MM-DD HH:MM:SS`, `YYYY-MM-DD HH:MM:SSZZZZZ`.
+
+[float]
+==== `predicate`
+
+Filters messages using the provided predicate based on NSPredicate.
+A compound predicate or multiple predicates can be provided as a list.
+
+For detailed information on the use of predicate based filtering,
+please refer to the https://developer.apple.com/library/mac/documentation/Cocoa/Conceptual/Predicates/Articles/pSyntax.html[Predicate Programming Guide].
+
+[float]
+==== `process`
+
+A list of the processes on which to operate. It accepts a PID or process name.
+
+[float]
+==== `source`
+
+Include symbol names and source line numbers for messages, if available.
+Default: `false`.
+
+[float]
+==== `info`
+
+Disable or enable info level messages.
+Default: `false`.
+
+[float]
+==== `debug`
+
+Disable or enable debug level messages.
+Default: `false`.
+
+[float]
+==== `backtrace`
+
+Disable or enable display of backtraces.
+Default: `false`.
+
+[float]
+==== `signpost`
+
+Disable or enable display of signposts.
+Default: `false`.
+
+[float]
+==== `unreliable`
+
+Annotate events with whether the log was emitted unreliably.
+Default: `false`.
+
+[float]
+==== `mach_continuous_time`
+
+Use mach continuous time timestamps rather than walltime.
+Default: `false`.
+
+[float]
+==== `backfill`
+
+If set to true the input will process all available logs since the beginning
+of time the first time it starts.
+Default: `false`.
+
+
+[id="{beatname_lc}-input-{type}-common-options"]
+include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]
+
+[float]
+=== Metrics
+
+This input exposes metrics under the <>.
+These metrics are exposed under the `/inputs/` path. They can be used to
+observe the activity of the input.
+
+You must assign a unique `id` to the input to expose metrics.
+
+[options="header"]
+|=======
+| Metric | Description
+| `errors_total` | Total number of errors.
+|=======
+
+:type!:
diff --git a/x-pack/filebeat/input/default-inputs/inputs_darwin.go b/x-pack/filebeat/input/default-inputs/inputs_darwin.go
new file mode 100644
index 000000000000..b43d75258b35
--- /dev/null
+++ b/x-pack/filebeat/input/default-inputs/inputs_darwin.go
@@ -0,0 +1,54 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+//go:build darwin
+
+package inputs
+
+import (
+ "github.com/elastic/beats/v7/filebeat/beater"
+ v2 "github.com/elastic/beats/v7/filebeat/input/v2"
+ "github.com/elastic/beats/v7/libbeat/beat"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/benchmark"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/cel"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/gcs"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/salesforce"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/streaming"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/unifiedlogs"
+ "github.com/elastic/elastic-agent-libs/logp"
+)
+
+func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin {
+ return []v2.Plugin{
+ azureblobstorage.Plugin(log, store),
+ azureeventhub.Plugin(log),
+ cel.Plugin(log, store),
+ cloudfoundry.Plugin(),
+ entityanalytics.Plugin(log),
+ gcs.Plugin(log, store),
+ http_endpoint.Plugin(),
+ httpjson.Plugin(log, store),
+ o365audit.Plugin(log, store),
+ awss3.Plugin(store),
+ awscloudwatch.Plugin(),
+ lumberjack.Plugin(),
+ salesforce.Plugin(log, store),
+ streaming.Plugin(log, store),
+ streaming.PluginWebsocketAlias(log, store),
+ netflow.Plugin(log),
+ benchmark.Plugin(),
+ unifiedlogs.Plugin(log, store),
+ }
+}
diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go
index 731f8979766e..a14145a5265f 100644
--- a/x-pack/filebeat/input/default-inputs/inputs_other.go
+++ b/x-pack/filebeat/input/default-inputs/inputs_other.go
@@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
-//go:build !aix && !windows
+//go:build !aix && !darwin && !windows
package inputs
diff --git a/x-pack/filebeat/input/unifiedlogs/config.go b/x-pack/filebeat/input/unifiedlogs/config.go
new file mode 100644
index 000000000000..0999e7d768b8
--- /dev/null
+++ b/x-pack/filebeat/input/unifiedlogs/config.go
@@ -0,0 +1,75 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+//go:build darwin
+
+package unifiedlogs
+
+import (
+ "fmt"
+ "strings"
+ "time"
+)
+
+type config struct {
+ showConfig
+ commonConfig
+ Backfill bool `config:"backfill"`
+}
+
+type showConfig struct {
+ ArchiveFile string `config:"archive_file"`
+ TraceFile string `config:"trace_file"`
+ Start string `config:"start"`
+ End string `config:"end"`
+}
+
+type commonConfig struct {
+ Predicate []string `config:"predicate"`
+ Process []string `config:"process"`
+ Source bool `config:"source"`
+ Info bool `config:"info"`
+ Debug bool `config:"debug"`
+ Backtrace bool `config:"backtrace"`
+ Signpost bool `config:"signpost"`
+ Unreliable bool `config:"unreliable"`
+ MachContinuousTime bool `config:"mach_continuous_time"`
+}
+
+func (c config) Validate() error {
+ if err := checkDateFormat(c.Start); err != nil {
+ return fmt.Errorf("start date is not valid: %w", err)
+ }
+ if err := checkDateFormat(c.End); err != nil {
+ return fmt.Errorf("end date is not valid: %w", err)
+ }
+ if c.ArchiveFile != "" && !strings.HasSuffix(c.ArchiveFile, ".logarchive") {
+ return fmt.Errorf("archive_file %v has the wrong extension", c.ArchiveFile)
+ }
+ if c.TraceFile != "" && !strings.HasSuffix(c.TraceFile, ".tracev3") {
+ return fmt.Errorf("trace_file %v has the wrong extension", c.TraceFile)
+ }
+ return nil
+}
+
+func defaultConfig() config {
+ return config{}
+}
+
+func checkDateFormat(date string) error {
+ if date == "" {
+ return nil
+ }
+ acceptedLayouts := []string{
+ "2006-01-02",
+ "2006-01-02 15:04:05",
+ "2006-01-02 15:04:05-0700",
+ }
+ for _, layout := range acceptedLayouts {
+ if _, err := time.Parse(layout, date); err == nil {
+ return nil
+ }
+ }
+ return fmt.Errorf("not a valid date, accepted layouts are: %v", acceptedLayouts)
+}
diff --git a/x-pack/filebeat/input/unifiedlogs/input.go b/x-pack/filebeat/input/unifiedlogs/input.go
new file mode 100644
index 000000000000..ca33c03c3bb6
--- /dev/null
+++ b/x-pack/filebeat/input/unifiedlogs/input.go
@@ -0,0 +1,385 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+//go:build darwin
+
+package unifiedlogs
+
+import (
+ "bufio"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "os/exec"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "golang.org/x/sync/errgroup"
+
+ v2 "github.com/elastic/beats/v7/filebeat/input/v2"
+ inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
+ "github.com/elastic/beats/v7/libbeat/beat"
+ "github.com/elastic/beats/v7/libbeat/feature"
+ "github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
+ conf "github.com/elastic/elastic-agent-libs/config"
+ "github.com/elastic/elastic-agent-libs/logp"
+ "github.com/elastic/elastic-agent-libs/mapstr"
+ "github.com/elastic/go-concert/ctxtool"
+)
+
+const (
+ inputName = "unifiedlogs"
+ srcArchiveName = "log-cmd-archive"
+ srcPollName = "log-cmd-poll"
+ logDateLayout = "2006-01-02 15:04:05.999999-0700"
+ cursorDateLayout = "2006-01-02 15:04:05-0700"
+)
+
+var (
+ // override for testing
+ timeNow = time.Now
+)
+
+func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin {
+ return v2.Plugin{
+ Name: inputName,
+ Stability: feature.Stable,
+ Deprecated: false,
+ Manager: &inputcursor.InputManager{
+ Logger: log,
+ StateStore: store,
+ Type: inputName,
+ Configure: cursorConfigure,
+ },
+ }
+}
+
+type logRecord struct {
+ Timestamp string `json:"timestamp"`
+}
+
+type source struct {
+ name string
+}
+
+func newSource(config config) source {
+ if config.ArchiveFile != "" || config.TraceFile != "" {
+ return source{name: srcArchiveName}
+ }
+ return source{name: srcPollName}
+}
+
+func (src source) Name() string { return src.name }
+
+type input struct {
+ config
+ metrics *inputMetrics
+}
+
+func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, error) {
+ conf := defaultConfig()
+ if err := cfg.Unpack(&conf); err != nil {
+ return nil, nil, err
+ }
+ sources, inp := newCursorInput(conf)
+ return sources, inp, nil
+}
+
+func newCursorInput(config config) ([]inputcursor.Source, inputcursor.Input) {
+ input := &input{config: config}
+ return []inputcursor.Source{newSource(config)}, input
+}
+
+func (input) Name() string { return inputName }
+
+func (input input) Test(src inputcursor.Source, _ v2.TestContext) error {
+ if _, err := exec.LookPath("log"); err != nil {
+ return err
+ }
+ return nil
+}
+
+// Run starts the input and blocks until it ends the execution.
+func (input *input) Run(ctxt v2.Context, src inputcursor.Source, resumeCursor inputcursor.Cursor, pub inputcursor.Publisher) error {
+ reg, unreg := inputmon.NewInputRegistry(input.Name(), ctxt.ID, nil)
+ defer unreg()
+ input.metrics = newInputMetrics(reg)
+
+ stdCtx := ctxtool.FromCanceller(ctxt.Cancelation)
+ log := ctxt.Logger.With("source", src.Name())
+
+ startFrom, err := loadCursor(resumeCursor, log)
+ if err != nil {
+ return err
+ }
+ if startFrom != "" {
+ input.Start = startFrom
+ }
+
+ return input.runWithMetrics(stdCtx, pub, log)
+}
+
+func (input *input) runWithMetrics(ctx context.Context, pub inputcursor.Publisher, log *logp.Logger) error {
+ // we create a wrapped publisher for the streaming go routine.
+ // It will notify the backfilling goroutine with the end date of the
+ // backfilling period and avoid updating the stored date to resume
+ // until backfilling is done.
+ wrappedPub := newWrappedPublisher(!input.mustBackfill(), pub)
+
+ var g errgroup.Group
+ // we start the streaming command in the background
+ // it will use the wrapped publisher to set the end date for the
+ // backfilling process.
+ if input.mustStream() {
+ g.Go(func() error {
+ logCmd := newLogStreamCmd(ctx, input.commonConfig)
+ return input.runLogCmd(ctx, logCmd, wrappedPub, log)
+ })
+ }
+
+ if input.mustBackfill() {
+ g.Go(func() error {
+ if input.mustStream() {
+ t := wrappedPub.getFirstProcessedTime()
+ // The time resolution of the log tool is microsecond, while it only
+ // accepts second resolution as an end parameter.
+ // To avoid potentially losing data we move the end forward one second,
+ // since it is preferable to have some duplicated events.
+ t = t.Add(time.Second)
+ input.End = t.Format(cursorDateLayout)
+
+ // to avoid race conditions updating the cursor, and to be able to
+ // resume from the oldest point in time, we only update cursor
+ // from the streaming goroutine once backfilling is done.
+ defer wrappedPub.startUpdatingCursor()
+ }
+ logCmd := newLogShowCmd(ctx, input.config)
+ err := input.runLogCmd(ctx, logCmd, pub, log)
+ if !input.mustStream() {
+ log.Debugf("finished processing events, stopping")
+ }
+ return err
+ })
+ }
+
+ return g.Wait()
+}
+
+// mustStream returns true in case a stream command is needed.
+// This is the default case and the only exceptions are when an archive file or an end date are set.
+func (input *input) mustStream() bool {
+ return !(input.ArchiveFile != "" || input.TraceFile != "" || input.End != "")
+}
+
+// mustBackfill returns true in case a show command is needed.
+// This happens when start or end dates are set (for example when resuming filebeat), when an archive file is used,
+// or when user forces it via the backfill config.
+func (input *input) mustBackfill() bool {
+ return input.Backfill || input.ArchiveFile != "" || input.TraceFile != "" || input.Start != "" || input.End != ""
+}
+
+func (input *input) runLogCmd(ctx context.Context, logCmd *exec.Cmd, pub inputcursor.Publisher, log *logp.Logger) error {
+ outpipe, err := logCmd.StdoutPipe()
+ if err != nil {
+ return fmt.Errorf("get stdout pipe: %w", err)
+ }
+ errpipe, err := logCmd.StderrPipe()
+ if err != nil {
+ return fmt.Errorf("get stderr pipe: %w", err)
+ }
+
+ log.Debugf("exec command start: %v", logCmd)
+ defer log.Debugf("exec command end: %v", logCmd)
+
+ if err := logCmd.Start(); err != nil {
+ return fmt.Errorf("start log command: %w", err)
+ }
+
+ if err := input.processLogs(outpipe, pub, log); err != nil {
+ log.Errorf("process logs: %v", err)
+ }
+
+ stderrBytes, _ := io.ReadAll(errpipe)
+ if err := logCmd.Wait(); err != nil && ctx.Err() == nil {
+ return fmt.Errorf("%q exited with an error: %w, %q", logCmd, err, string(stderrBytes))
+ }
+
+ return nil
+}
+
+func (input *input) processLogs(stdout io.Reader, pub inputcursor.Publisher, log *logp.Logger) error {
+ scanner := bufio.NewScanner(stdout)
+
+ var (
+ event beat.Event
+ line string
+ logRecordLine logRecord
+ timestamp time.Time
+ err error
+ )
+
+ for scanner.Scan() {
+ line = scanner.Text()
+ if err = json.Unmarshal([]byte(line), &logRecordLine); err != nil {
+ log.Errorf("invalid json log: %v", err)
+ input.metrics.errs.Add(1)
+ continue
+ }
+
+ if logRecordLine == (logRecord{}) {
+ continue
+ }
+
+ timestamp, err = time.Parse(logDateLayout, logRecordLine.Timestamp)
+ if err != nil {
+ input.metrics.errs.Add(1)
+ log.Errorf("invalid timestamp: %v", err)
+ continue
+ }
+
+ event = makeEvent(timestamp, line)
+ if err = pub.Publish(event, timestamp); err != nil {
+ log.Errorf("publish event: %v", err)
+ input.metrics.errs.Add(1)
+ continue
+ }
+ }
+ if err = scanner.Err(); err != nil {
+ input.metrics.errs.Add(1)
+ return fmt.Errorf("scanning stdout: %w", err)
+ }
+
+ return nil
+}
+
+// wrappedPublisher wraps a publisher and stores the first published event date.
+// this is required in order to backfill the events when we start a streaming command.
+type wrappedPublisher struct {
+ firstTimeOnce sync.Once
+ firstTimeC chan struct{}
+ firstProcessedTime time.Time
+
+ updateCursor *atomic.Bool
+
+ inner inputcursor.Publisher
+}
+
+func newWrappedPublisher(updateCursor bool, inner inputcursor.Publisher) *wrappedPublisher {
+ var atomicUC atomic.Bool
+ atomicUC.Store(updateCursor)
+ return &wrappedPublisher{
+ firstTimeC: make(chan struct{}),
+ updateCursor: &atomicUC,
+ inner: inner,
+ }
+}
+
+func (pub *wrappedPublisher) Publish(event beat.Event, cursor interface{}) error {
+ pub.firstTimeOnce.Do(func() {
+ pub.firstProcessedTime = cursor.(time.Time)
+ close(pub.firstTimeC)
+ })
+ if !pub.updateCursor.Load() {
+ cursor = nil
+ }
+ return pub.inner.Publish(event, cursor)
+}
+
+// getFirstProcessedTime will block until there is a value set for firstProcessedTime.
+func (pub *wrappedPublisher) getFirstProcessedTime() time.Time {
+ <-pub.firstTimeC
+ return pub.firstProcessedTime
+}
+
+func (pub *wrappedPublisher) startUpdatingCursor() {
+ pub.updateCursor.Store(true)
+}
+
+func loadCursor(c inputcursor.Cursor, log *logp.Logger) (string, error) {
+ if c.IsNew() {
+ return "", nil
+ }
+ var (
+ startFrom string
+ cursor time.Time
+ )
+ if err := c.Unpack(&cursor); err != nil {
+ return "", fmt.Errorf("unpack cursor: %w", err)
+ }
+ log.Infof("cursor loaded, resuming from: %v", startFrom)
+ return cursor.Format(cursorDateLayout), nil
+}
+
+func newLogShowCmd(ctx context.Context, cfg config) *exec.Cmd {
+ return exec.CommandContext(ctx, "log", newLogCmdArgs("show", cfg)...) // #nosec G204
+}
+
+func newLogStreamCmd(ctx context.Context, cfg commonConfig) *exec.Cmd {
+ return exec.CommandContext(ctx, "log", newLogCmdArgs("stream", config{commonConfig: cfg})...) // #nosec G204
+}
+
+func newLogCmdArgs(subcmd string, config config) []string {
+ args := []string{subcmd, "--style", "ndjson"}
+ if config.ArchiveFile != "" {
+ args = append(args, "--archive", config.ArchiveFile)
+ }
+ if config.TraceFile != "" {
+ args = append(args, "--file", config.TraceFile)
+ }
+ if len(config.Predicate) > 0 {
+ for _, p := range config.Predicate {
+ args = append(args, "--predicate", p)
+ }
+ }
+ if len(config.Process) > 0 {
+ for _, p := range config.Process {
+ args = append(args, "--process", p)
+ }
+ }
+ if config.Source {
+ args = append(args, "--source")
+ }
+ if config.Info {
+ args = append(args, "--info")
+ }
+ if config.Debug {
+ args = append(args, "--debug")
+ }
+ if config.Backtrace {
+ args = append(args, "--backtrace")
+ }
+ if config.Signpost {
+ args = append(args, "--signpost")
+ }
+ if config.Unreliable {
+ args = append(args, "--unreliable")
+ }
+ if config.MachContinuousTime {
+ args = append(args, "--mach-continuous-time")
+ }
+ if config.Start != "" {
+ args = append(args, "--start", config.Start)
+ }
+ if config.End != "" {
+ args = append(args, "--end", config.End)
+ }
+ return args
+}
+
+func makeEvent(timestamp time.Time, message string) beat.Event {
+ now := timeNow()
+ fields := mapstr.M{
+ "event": mapstr.M{
+ "created": now,
+ },
+ "message": message,
+ }
+
+ return beat.Event{
+ Timestamp: timestamp,
+ Fields: fields,
+ }
+}
diff --git a/x-pack/filebeat/input/unifiedlogs/input_test.go b/x-pack/filebeat/input/unifiedlogs/input_test.go
new file mode 100644
index 000000000000..907c927254d3
--- /dev/null
+++ b/x-pack/filebeat/input/unifiedlogs/input_test.go
@@ -0,0 +1,391 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+//go:build darwin
+
+package unifiedlogs
+
+import (
+ "bufio"
+ "bytes"
+ "context"
+ "fmt"
+ "os"
+ "os/exec"
+ "path"
+ "regexp"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
+ "github.com/elastic/beats/v7/libbeat/beat"
+ "github.com/elastic/elastic-agent-libs/logp"
+)
+
+var _ inputcursor.Publisher = (*publisher)(nil)
+
+type publisher struct {
+ m sync.Mutex
+
+ events []beat.Event
+ cursors []*time.Time
+}
+
+func (p *publisher) Publish(e beat.Event, cursor interface{}) error {
+ p.m.Lock()
+ defer p.m.Unlock()
+
+ p.events = append(p.events, e)
+ var c *time.Time
+ if cursor != nil {
+ cv := cursor.(time.Time)
+ c = &cv
+ }
+ p.cursors = append(p.cursors, c)
+ return nil
+}
+
+func TestInput(t *testing.T) {
+ archivePath, err := openArchive()
+ require.NoError(t, err)
+ t.Cleanup(func() { os.RemoveAll(archivePath) })
+
+ testCases := []struct {
+ name string
+ cfg config
+ timeUntilClose time.Duration
+ assertFunc func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time)
+ expectedLogStreamCmd string
+ expectedLogShowCmd string
+ expectedRunErrorMsg string
+ }{
+ {
+ name: "Default stream",
+ cfg: config{},
+ timeUntilClose: time.Second,
+ expectedLogStreamCmd: "/usr/bin/log stream --style ndjson",
+ assertFunc: func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) {
+ assert.NotEmpty(collect, events)
+ assert.NotEmpty(collect, cursors)
+ assert.Equal(collect, len(events), len(cursors))
+ lastEvent := events[len(events)-1]
+ lastCursor := cursors[len(cursors)-1]
+ assert.EqualValues(collect, &lastEvent.Timestamp, lastCursor)
+ },
+ },
+ {
+ name: "Archive not found",
+ cfg: config{
+ showConfig: showConfig{
+ ArchiveFile: "notfound.logarchive",
+ },
+ },
+ timeUntilClose: time.Second,
+ expectedLogShowCmd: "/usr/bin/log show --style ndjson --archive notfound.logarchive",
+ expectedRunErrorMsg: "\"/usr/bin/log show --style ndjson --archive notfound.logarchive\" exited with an error: exit status 64",
+ },
+ {
+ name: "Archived file",
+ cfg: config{
+ showConfig: showConfig{
+ ArchiveFile: archivePath,
+ },
+ },
+ timeUntilClose: time.Second,
+ expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s", archivePath),
+ assertFunc: eventsAndCursorAssertN(462),
+ },
+ {
+ name: "Trace file",
+ cfg: config{
+ showConfig: showConfig{
+ TraceFile: path.Join(archivePath, "logdata.LiveData.tracev3"),
+ },
+ },
+ timeUntilClose: time.Second,
+ expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --file %s", path.Join(archivePath, "logdata.LiveData.tracev3")),
+ assertFunc: eventsAndCursorAssertN(7),
+ },
+ {
+ name: "With start date",
+ cfg: config{
+ showConfig: showConfig{
+ ArchiveFile: archivePath,
+ Start: "2024-12-04 13:46:00+0200",
+ },
+ },
+ timeUntilClose: time.Second,
+ expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --start 2024-12-04 13:46:00+0200", archivePath),
+ assertFunc: eventsAndCursorAssertN(314),
+ },
+ {
+ name: "With start and end dates",
+ cfg: config{
+ showConfig: showConfig{
+ ArchiveFile: archivePath,
+ Start: "2024-12-04 13:45:00+0200",
+ End: "2024-12-04 13:46:00+0200",
+ },
+ },
+ timeUntilClose: time.Second,
+ expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --start 2024-12-04 13:45:00+0200 --end 2024-12-04 13:46:00+0200", archivePath),
+ assertFunc: eventsAndCursorAssertN(149),
+ },
+ {
+ name: "With end date",
+ cfg: config{
+ showConfig: showConfig{
+ ArchiveFile: archivePath,
+ End: "2024-12-04 13:46:00+0200",
+ },
+ },
+ timeUntilClose: time.Second,
+ expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --end 2024-12-04 13:46:00+0200", archivePath),
+ assertFunc: eventsAndCursorAssertN(462),
+ },
+ {
+ name: "With predicate",
+ cfg: config{
+ showConfig: showConfig{
+ ArchiveFile: archivePath,
+ },
+ commonConfig: commonConfig{
+ Predicate: []string{
+ `processImagePath == "/kernel"`,
+ },
+ },
+ },
+ timeUntilClose: time.Second,
+ expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --predicate processImagePath == \"/kernel\"", archivePath),
+ assertFunc: eventsAndCursorAssertN(460),
+ },
+ {
+ name: "With process",
+ cfg: config{
+ showConfig: showConfig{
+ ArchiveFile: archivePath,
+ },
+ commonConfig: commonConfig{
+ Process: []string{
+ "0",
+ },
+ },
+ },
+ timeUntilClose: time.Second,
+ expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --process 0", archivePath),
+ assertFunc: eventsAndCursorAssertN(462),
+ },
+ {
+ name: "With optional flags",
+ cfg: config{
+ showConfig: showConfig{
+ ArchiveFile: archivePath,
+ },
+ commonConfig: commonConfig{
+ Info: true,
+ Debug: true,
+ Backtrace: true,
+ Signpost: true,
+ MachContinuousTime: true,
+ },
+ },
+ timeUntilClose: time.Second,
+ expectedLogShowCmd: fmt.Sprintf("/usr/bin/log show --style ndjson --archive %s --info --debug --backtrace --signpost --mach-continuous-time", archivePath),
+ assertFunc: eventsAndCursorAssertN(462),
+ },
+ }
+
+ for _, tc := range testCases {
+ tc := tc
+ t.Run(tc.name, func(t *testing.T) {
+ _, cursorInput := newCursorInput(tc.cfg)
+ input := cursorInput.(*input)
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ pub := &publisher{}
+ log, buf := logp.NewInMemory("unifiedlogs_test", logp.JSONEncoderConfig())
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func(t *testing.T) {
+ defer wg.Done()
+ err := input.runWithMetrics(ctx, pub, log)
+ if tc.expectedRunErrorMsg == "" {
+ assert.NoError(t, err)
+ } else {
+ assert.ErrorContains(t, err, tc.expectedRunErrorMsg)
+ }
+ }(t)
+
+ select {
+ case <-ctx.Done():
+ case <-time.After(tc.timeUntilClose):
+ }
+
+ cancel()
+ wg.Wait()
+
+ assert.EventuallyWithT(t,
+ func(collect *assert.CollectT) {
+ assert.Equal(collect, tc.expectedLogStreamCmd, filterStartLogStreamLogline(buf.Bytes()))
+ assert.Equal(collect, tc.expectedLogShowCmd, filterStartLogShowLogline(buf.Bytes()))
+ if tc.assertFunc != nil {
+ tc.assertFunc(collect, pub.events, pub.cursors)
+ }
+ },
+ 30*time.Second, time.Second,
+ )
+ })
+ }
+}
+
+func TestBackfillAndStream(t *testing.T) {
+ archivePath, err := openArchive()
+ require.NoError(t, err)
+ t.Cleanup(func() { os.RemoveAll(archivePath) })
+
+ cfg := config{
+ Backfill: true,
+ showConfig: showConfig{
+ Start: time.Now().Add(-5 * time.Second).Format("2006-01-02 15:04:05"),
+ },
+ commonConfig: commonConfig{
+ Info: true,
+ Debug: true,
+ Backtrace: true,
+ Signpost: true,
+ MachContinuousTime: true,
+ },
+ }
+
+ expectedLogShowCmd := fmt.Sprintf("/usr/bin/log show --style ndjson --info --debug --backtrace --signpost --mach-continuous-time --start %v", time.Now().Format("2006-01-02"))
+ expectedLogStreamCmd := "/usr/bin/log stream --style ndjson --info --debug --backtrace --signpost --mach-continuous-time"
+
+ _, cursorInput := newCursorInput(cfg)
+ input := cursorInput.(*input)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ t.Cleanup(cancel)
+
+ pub := &publisher{}
+ log, buf := logp.NewInMemory("unifiedlogs_test", logp.JSONEncoderConfig())
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func(t *testing.T) {
+ defer wg.Done()
+ err := input.runWithMetrics(ctx, pub, log)
+ assert.NoError(t, err)
+ }(t)
+
+ var firstStreamedEventTime *time.Time
+ assert.EventuallyWithT(t,
+ func(collect *assert.CollectT) {
+ showCmdLog := filterStartLogShowLogline(buf.Bytes())
+ assert.Equal(collect, expectedLogStreamCmd, filterStartLogStreamLogline(buf.Bytes()))
+ assert.True(collect, strings.HasPrefix(showCmdLog, expectedLogShowCmd))
+ assert.NotEmpty(collect, pub.events)
+ assert.NotEmpty(collect, pub.cursors)
+
+ var endTime time.Time
+ regex := regexp.MustCompile(`--end\s+(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}[+-]\d{4})`)
+ matches := regex.FindStringSubmatch(showCmdLog)
+ assert.Equal(collect, 2, len(matches))
+ endTime, _ = time.Parse("2006-01-02 15:04:05-0700", matches[1])
+ endTime = endTime.Truncate(time.Second)
+
+ if firstStreamedEventTime == nil {
+ for i := range pub.events {
+ if pub.cursors[i] == nil {
+ first := pub.events[i].Timestamp.Add(time.Second).Truncate(time.Second)
+ firstStreamedEventTime = &first
+ break
+ }
+ }
+ }
+ assert.NotNil(collect, firstStreamedEventTime)
+ assert.EqualValues(collect, endTime, *firstStreamedEventTime)
+ assert.True(collect, strings.HasPrefix(showCmdLog, filterEndLogShowLogline(buf.Bytes())))
+ },
+ 30*time.Second, time.Second,
+ )
+
+ cancel()
+ wg.Wait()
+}
+
+const (
+ cmdStartPrefix = "exec command start: "
+ cmdEndPrefix = "exec command end: "
+)
+
+func filterStartLogStreamLogline(buf []byte) string {
+ const cmd = "/usr/bin/log stream"
+ return filterLogCmdLine(buf, cmd, cmdStartPrefix)
+}
+
+func filterStartLogShowLogline(buf []byte) string {
+ const cmd = "/usr/bin/log show"
+ return filterLogCmdLine(buf, cmd, cmdStartPrefix)
+}
+
+func filterEndLogShowLogline(buf []byte) string {
+ const cmd = "/usr/bin/log show"
+ return filterLogCmdLine(buf, cmd, cmdEndPrefix)
+}
+
+func filterLogCmdLine(buf []byte, cmd, cmdPrefix string) string {
+ scanner := bufio.NewScanner(bytes.NewBuffer(buf))
+ for scanner.Scan() {
+ text := scanner.Text()
+ parts := strings.Split(text, "\t")
+ if len(parts) != 4 {
+ continue
+ }
+
+ trimmed := strings.TrimPrefix(parts[3], cmdStartPrefix)
+ if strings.HasPrefix(trimmed, cmd) {
+ return trimmed
+ }
+ }
+ return ""
+}
+
+func eventsAndCursorAssertN(n int) func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) {
+ return func(collect *assert.CollectT, events []beat.Event, cursors []*time.Time) {
+ assert.Equal(collect, n, len(events))
+ assert.Equal(collect, n, len(cursors))
+ lastEvent := events[len(events)-1]
+ lastCursor := cursors[len(cursors)-1]
+ assert.EqualValues(collect, &lastEvent.Timestamp, lastCursor)
+ }
+}
+
+func openArchive() (string, error) {
+ return extractTarGz(path.Join("testdata", "test.logarchive.tar.gz"))
+}
+
+func extractTarGz(tarGzPath string) (string, error) {
+ // Create a temporary directory
+ tempDir, err := os.MkdirTemp("", "extracted-*")
+ if err != nil {
+ return "", fmt.Errorf("failed to create temporary directory: %v", err)
+ }
+
+ // Use the 'tar' command to extract the .tar.gz file
+ cmd := exec.Command("tar", "-xzf", tarGzPath, "-C", tempDir)
+
+ // Run the command
+ if err := cmd.Run(); err != nil {
+ return "", fmt.Errorf("failed to extract .tar.gz: %v", err)
+ }
+
+ return path.Join(tempDir, "test.logarchive"), nil
+}
diff --git a/x-pack/filebeat/input/unifiedlogs/metrics.go b/x-pack/filebeat/input/unifiedlogs/metrics.go
new file mode 100644
index 000000000000..2e24c0b4121f
--- /dev/null
+++ b/x-pack/filebeat/input/unifiedlogs/metrics.go
@@ -0,0 +1,27 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+//go:build darwin
+
+package unifiedlogs
+
+import (
+ "github.com/elastic/elastic-agent-libs/monitoring"
+)
+
+type inputMetrics struct {
+ errs *monitoring.Uint // total number of errors
+}
+
+func newInputMetrics(reg *monitoring.Registry) *inputMetrics {
+ if reg == nil {
+ return nil
+ }
+
+ out := &inputMetrics{
+ errs: monitoring.NewUint(reg, "errors_total"),
+ }
+
+ return out
+}
diff --git a/x-pack/filebeat/input/unifiedlogs/testdata/test.logarchive.tar.gz b/x-pack/filebeat/input/unifiedlogs/testdata/test.logarchive.tar.gz
new file mode 100644
index 000000000000..fc507f9b1e9e
Binary files /dev/null and b/x-pack/filebeat/input/unifiedlogs/testdata/test.logarchive.tar.gz differ