diff --git a/x-pack/filebeat/input/default-inputs/inputs_darwin.go b/x-pack/filebeat/input/default-inputs/inputs_darwin.go new file mode 100644 index 000000000000..b43d75258b35 --- /dev/null +++ b/x-pack/filebeat/input/default-inputs/inputs_darwin.go @@ -0,0 +1,54 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build darwin + +package inputs + +import ( + "github.com/elastic/beats/v7/filebeat/beater" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" + "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" + "github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage" + "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" + "github.com/elastic/beats/v7/x-pack/filebeat/input/benchmark" + "github.com/elastic/beats/v7/x-pack/filebeat/input/cel" + "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" + "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics" + "github.com/elastic/beats/v7/x-pack/filebeat/input/gcs" + "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" + "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" + "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" + "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" + "github.com/elastic/beats/v7/x-pack/filebeat/input/salesforce" + "github.com/elastic/beats/v7/x-pack/filebeat/input/streaming" + "github.com/elastic/beats/v7/x-pack/filebeat/input/unifiedlogs" + "github.com/elastic/elastic-agent-libs/logp" +) + +func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { + return []v2.Plugin{ + azureblobstorage.Plugin(log, store), + azureeventhub.Plugin(log), + cel.Plugin(log, store), + cloudfoundry.Plugin(), + entityanalytics.Plugin(log), + gcs.Plugin(log, store), + http_endpoint.Plugin(), + httpjson.Plugin(log, store), + o365audit.Plugin(log, store), + awss3.Plugin(store), + awscloudwatch.Plugin(), + lumberjack.Plugin(), + salesforce.Plugin(log, store), + streaming.Plugin(log, store), + streaming.PluginWebsocketAlias(log, store), + netflow.Plugin(log), + benchmark.Plugin(), + unifiedlogs.Plugin(log, store), + } +} diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index 731f8979766e..a14145a5265f 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build !aix && !windows +//go:build !aix && !darwin && !windows package inputs diff --git a/x-pack/filebeat/input/unifiedlogs/config.go b/x-pack/filebeat/input/unifiedlogs/config.go new file mode 100644 index 000000000000..6f5dfb3114fc --- /dev/null +++ b/x-pack/filebeat/input/unifiedlogs/config.go @@ -0,0 +1,59 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build darwin + +package unifiedlogs + +import ( + "fmt" + "strings" + "time" +) + +type config struct { + ArchiveFile string `config:"archive_file"` + TraceFile string `config:"trace_file"` + Predicate string `config:"predicate"` + Process []string `config:"process"` + Source bool `config:"source"` + Info bool `config:"info"` + Debug bool `config:"debug"` + Signposts bool `config:"signposts"` + Start string `config:"start"` + Timezone string `config:"timezone"` +} + +func (c config) Validate() error { + if err := checkDateFormat(c.Start); err != nil { + return fmt.Errorf("start date is not valid: %w", err) + } + if c.ArchiveFile != "" && !strings.HasSuffix(c.ArchiveFile, ".logarchive") { + return fmt.Errorf("archive_file %v has the wrong extension", c.ArchiveFile) + } + if c.TraceFile != "" && !strings.HasSuffix(c.TraceFile, ".tracev3") { + return fmt.Errorf("trace_file %v has the wrong extension", c.TraceFile) + } + return nil +} + +func defaultConfig() config { + return config{ + Timezone: "UTC", + } +} + +func checkDateFormat(date string) error { + acceptedLayouts := []string{ + "2006-01-02", + "2006-01-02 15:04:05", + "2006-01-02 15:04:05-0700", + } + for _, layout := range acceptedLayouts { + if _, err := time.Parse(layout, date); err != nil { + return nil + } + } + return fmt.Errorf("not a valid date, accepted layouts are: %v", acceptedLayouts) +} diff --git a/x-pack/filebeat/input/unifiedlogs/input.go b/x-pack/filebeat/input/unifiedlogs/input.go new file mode 100644 index 000000000000..f47236f7d65b --- /dev/null +++ b/x-pack/filebeat/input/unifiedlogs/input.go @@ -0,0 +1,261 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build darwin + +package unifiedlogs + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "os" + "os/exec" + "time" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/go-concert/ctxtool" +) + +const ( + inputName = "unifiedlogs" + srcArchiveName = "log-cmd-archive" + srcPollName = "log-cmd-poll" +) + +var ( + // override for testing + timeNow = time.Now +) + +func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Beta, + Deprecated: false, + Manager: &inputcursor.InputManager{ + Logger: log, + StateStore: store, + Type: inputName, + Configure: cursorConfigure, + }, + } +} + +type cursor struct { + Timestamp string `json:"timestamp"` +} + +type source struct { + name string +} + +func newSource(config config) source { + if config.ArchiveFile != "" || config.TraceFile != "" { + return source{name: srcArchiveName} + } + return source{name: srcPollName} +} + +func (src source) Name() string { return src.name } + +type input struct { + config +} + +func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, error) { + conf := defaultConfig() + if err := cfg.Unpack(&conf); err != nil { + return nil, nil, err + } + sources, inp := newCursorInput(conf) + return sources, inp, nil +} + +func newCursorInput(config config) ([]inputcursor.Source, inputcursor.Input) { + input := &input{config: config} + return []inputcursor.Source{newSource(config)}, input +} + +func (input) Name() string { return inputName } + +func (input input) Test(src inputcursor.Source, _ v2.TestContext) error { + if _, err := exec.LookPath("log"); err != nil { + return err + } + if src.Name() == srcArchiveName { + if _, err := os.Stat(input.ArchiveFile); input.ArchiveFile != "" && os.IsNotExist(err) { + return err + } + if _, err := os.Stat(input.TraceFile); input.TraceFile != "" && os.IsNotExist(err) { + return err + } + } + return nil +} + +// Run starts the input and blocks until it ends the execution. +func (input *input) Run(ctxt v2.Context, src inputcursor.Source, resumeCursor inputcursor.Cursor, pub inputcursor.Publisher) error { + reg, unreg := inputmon.NewInputRegistry(input.Name(), ctxt.ID, nil) + defer unreg() + + stdCtx := ctxtool.FromCanceller(ctxt.Cancelation) + metrics := newInputMetrics(reg) + log := ctxt.Logger.With("source", src.Name()) + logCmd, err := newLogCmd(stdCtx, input.config, resumeCursor) + if err != nil { + return err + } + + return input.runWithMetrics(stdCtx, logCmd, pub, metrics, log) +} + +func (input *input) runWithMetrics(ctx context.Context, logCmd *exec.Cmd, pub inputcursor.Publisher, metrics *inputMetrics, log *logp.Logger) error { + for { + metrics.intervals.Add(1) + + select { + case <-ctx.Done(): + log.Infof("input stopped because context was cancelled with: %w", ctx.Err()) + return nil + default: + } + + pipe, err := logCmd.StdoutPipe() + if err != nil { + return fmt.Errorf("get stdout pipe: %w", err) + } + + if err := logCmd.Start(); err != nil { + return fmt.Errorf("start log command: %w", err) + } + + if err = input.processLogs(pipe, pub, metrics, log); err != nil { + log.Errorf("process logs: %w", err) + } + + if err := logCmd.Wait(); err != nil { + return fmt.Errorf("log command exited with an error: %w", err) + } + + if input.isArchive() { + log.Info("finished processing the archived logs, stopping") + return nil + } + } +} + +func (input *input) isArchive() bool { return input.ArchiveFile != "" || input.TraceFile != "" } + +func (input *input) processLogs(stdout io.Reader, pub inputcursor.Publisher, metrics *inputMetrics, log *logp.Logger) error { + scanner := bufio.NewScanner(stdout) + + var c int64 + defer func() { metrics.intervalEvents.Update(c) }() + + for scanner.Scan() { + line := scanner.Text() + var logRecord cursor + if err := json.Unmarshal([]byte(line), &logRecord); err != nil { + log.Errorf("invalid json log: %w", err) + metrics.errs.Add(1) + continue + } + + event, err := makeEvent(logRecord.Timestamp, line) + if err != nil { + log.Errorf("makeEvent: %w", err) + metrics.errs.Add(1) + continue + } + + if err := pub.Publish(event, logRecord); err != nil { + log.Errorf("publish event: %w", err) + metrics.errs.Add(1) + continue + } + + c++ + } + if err := scanner.Err(); err != nil { + metrics.errs.Add(1) + return fmt.Errorf("scanning stdout: %w", err) + } + return nil +} + +func newLogCmd(ctx context.Context, config config, resumeCursor inputcursor.Cursor) (*exec.Cmd, error) { + args := []string{"show", "--style", "ndjson"} + if config.ArchiveFile != "" { + args = append(args, "--archive", config.ArchiveFile) + } + if config.TraceFile != "" { + args = append(args, "--file", config.TraceFile) + } + if config.Predicate != "" { + args = append(args, "--predicate", config.Predicate) + } + if len(config.Process) > 0 { + for _, p := range config.Process { + args = append(args, "--process", p) + } + } + if config.Source { + args = append(args, "--source") + } + if config.Info { + args = append(args, "--info") + } + if config.Debug { + args = append(args, "--debug") + } + if config.Signposts { + args = append(args, "--signposts") + } + if config.Timezone != "" { + args = append(args, "--timezone", config.Timezone) + } + start := config.Start + if !resumeCursor.IsNew() { + cursor := cursor{} + if err := resumeCursor.Unpack(&cursor); err != nil { + return nil, fmt.Errorf("unpacking cursor: %w", err) + } + start = cursor.Timestamp + } + if start != "" { + args = append(args, "--start", start) + } + return exec.CommandContext(ctx, "log", args...), nil +} + +func makeEvent(timestamp, message string) (beat.Event, error) { + const layout = "2006-01-02 15:04:05-0700" + + ts, err := time.Parse(layout, timestamp) + if err != nil { + return beat.Event{}, fmt.Errorf("invalid timestamp: %w", err) + } + now := timeNow() + fields := mapstr.M{ + "event": mapstr.M{ + "created": now, + }, + "message": message, + } + + return beat.Event{ + Timestamp: ts, + Fields: fields, + }, nil +} diff --git a/x-pack/filebeat/input/unifiedlogs/metrics.go b/x-pack/filebeat/input/unifiedlogs/metrics.go new file mode 100644 index 000000000000..fdce924fbb85 --- /dev/null +++ b/x-pack/filebeat/input/unifiedlogs/metrics.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build darwin + +package unifiedlogs + +import ( + "github.com/rcrowley/go-metrics" + + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" +) + +type inputMetrics struct { + intervalEvents metrics.Sample // histogram of the total events per interval + intervals *monitoring.Uint // total number of intervals executed + errs *monitoring.Uint // total number of errors +} + +func newInputMetrics(reg *monitoring.Registry) *inputMetrics { + if reg == nil { + return nil + } + + out := &inputMetrics{ + intervals: monitoring.NewUint(reg, "unifiedlogs_interval_total"), + errs: monitoring.NewUint(reg, "unifiedlogs_errors_total"), + intervalEvents: metrics.NewUniformSample(1024), + } + + _ = adapter.GetGoMetrics(reg, "unifiedlogs_interval_events", adapter.Accept). + GetOrRegister("histogram", metrics.NewHistogram(out.intervalEvents)) + + return out +}