Skip to content

Commit

Permalink
First unifiedlogs implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-gr committed Nov 26, 2024
1 parent 24d7cf0 commit a798085
Show file tree
Hide file tree
Showing 5 changed files with 412 additions and 1 deletion.
54 changes: 54 additions & 0 deletions x-pack/filebeat/input/default-inputs/inputs_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build darwin

package inputs

import (
"github.com/elastic/beats/v7/filebeat/beater"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch"
"github.com/elastic/beats/v7/x-pack/filebeat/input/awss3"
"github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage"
"github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub"
"github.com/elastic/beats/v7/x-pack/filebeat/input/benchmark"
"github.com/elastic/beats/v7/x-pack/filebeat/input/cel"
"github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics"
"github.com/elastic/beats/v7/x-pack/filebeat/input/gcs"
"github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint"
"github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson"
"github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
"github.com/elastic/beats/v7/x-pack/filebeat/input/salesforce"
"github.com/elastic/beats/v7/x-pack/filebeat/input/streaming"
"github.com/elastic/beats/v7/x-pack/filebeat/input/unifiedlogs"
"github.com/elastic/elastic-agent-libs/logp"
)

func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin {
return []v2.Plugin{
azureblobstorage.Plugin(log, store),
azureeventhub.Plugin(log),
cel.Plugin(log, store),
cloudfoundry.Plugin(),
entityanalytics.Plugin(log),
gcs.Plugin(log, store),
http_endpoint.Plugin(),
httpjson.Plugin(log, store),
o365audit.Plugin(log, store),
awss3.Plugin(store),
awscloudwatch.Plugin(),
lumberjack.Plugin(),
salesforce.Plugin(log, store),
streaming.Plugin(log, store),
streaming.PluginWebsocketAlias(log, store),
netflow.Plugin(log),
benchmark.Plugin(),
unifiedlogs.Plugin(log, store),
}
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/default-inputs/inputs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !aix && !windows
//go:build !aix && !darwin && !windows

package inputs

Expand Down
59 changes: 59 additions & 0 deletions x-pack/filebeat/input/unifiedlogs/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build darwin

package unifiedlogs

import (
"fmt"
"strings"
"time"
)

type config struct {
ArchiveFile string `config:"archive_file"`
TraceFile string `config:"trace_file"`
Predicate string `config:"predicate"`
Process []string `config:"process"`
Source bool `config:"source"`
Info bool `config:"info"`
Debug bool `config:"debug"`
Signposts bool `config:"signposts"`
Start string `config:"start"`
Timezone string `config:"timezone"`
}

func (c config) Validate() error {
if err := checkDateFormat(c.Start); err != nil {
return fmt.Errorf("start date is not valid: %w", err)
}
if c.ArchiveFile != "" && !strings.HasSuffix(c.ArchiveFile, ".logarchive") {
return fmt.Errorf("archive_file %v has the wrong extension", c.ArchiveFile)
}
if c.TraceFile != "" && !strings.HasSuffix(c.TraceFile, ".tracev3") {
return fmt.Errorf("trace_file %v has the wrong extension", c.TraceFile)
}
return nil
}

func defaultConfig() config {
return config{
Timezone: "UTC",
}
}

func checkDateFormat(date string) error {
acceptedLayouts := []string{
"2006-01-02",
"2006-01-02 15:04:05",
"2006-01-02 15:04:05-0700",
}
for _, layout := range acceptedLayouts {
if _, err := time.Parse(layout, date); err == nil {
return nil
}
}
return fmt.Errorf("not a valid date, accepted layouts are: %v", acceptedLayouts)
}
261 changes: 261 additions & 0 deletions x-pack/filebeat/input/unifiedlogs/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build darwin

package unifiedlogs

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"time"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/go-concert/ctxtool"
)

const (
inputName = "unifiedlogs"
srcArchiveName = "log-cmd-archive"
srcPollName = "log-cmd-poll"
)

var (
// override for testing
timeNow = time.Now
)

func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin {
return v2.Plugin{
Name: inputName,
Stability: feature.Beta,
Deprecated: false,
Manager: &inputcursor.InputManager{
Logger: log,
StateStore: store,
Type: inputName,
Configure: cursorConfigure,
},
}
}

type cursor struct {
Timestamp string `json:"timestamp"`
}

type source struct {
name string
}

func newSource(config config) source {
if config.ArchiveFile != "" || config.TraceFile != "" {
return source{name: srcArchiveName}
}
return source{name: srcPollName}
}

func (src source) Name() string { return src.name }

type input struct {
config
}

func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, error) {
conf := defaultConfig()
if err := cfg.Unpack(&conf); err != nil {
return nil, nil, err
}
sources, inp := newCursorInput(conf)
return sources, inp, nil
}

func newCursorInput(config config) ([]inputcursor.Source, inputcursor.Input) {
input := &input{config: config}
return []inputcursor.Source{newSource(config)}, input
}

func (input) Name() string { return inputName }

func (input input) Test(src inputcursor.Source, _ v2.TestContext) error {
if _, err := exec.LookPath("log"); err != nil {
return err
}
if src.Name() == srcArchiveName {
if _, err := os.Stat(input.ArchiveFile); input.ArchiveFile != "" && os.IsNotExist(err) {
return err
}
if _, err := os.Stat(input.TraceFile); input.TraceFile != "" && os.IsNotExist(err) {
return err
}
}
return nil
}

// Run starts the input and blocks until it ends the execution.
func (input *input) Run(ctxt v2.Context, src inputcursor.Source, resumeCursor inputcursor.Cursor, pub inputcursor.Publisher) error {
reg, unreg := inputmon.NewInputRegistry(input.Name(), ctxt.ID, nil)
defer unreg()

stdCtx := ctxtool.FromCanceller(ctxt.Cancelation)
metrics := newInputMetrics(reg)
log := ctxt.Logger.With("source", src.Name())
logCmd, err := newLogCmd(stdCtx, input.config, resumeCursor)
if err != nil {
return err
}

return input.runWithMetrics(stdCtx, logCmd, pub, metrics, log)
}

func (input *input) runWithMetrics(ctx context.Context, logCmd *exec.Cmd, pub inputcursor.Publisher, metrics *inputMetrics, log *logp.Logger) error {
for {
metrics.intervals.Add(1)

select {
case <-ctx.Done():
log.Infof("input stopped because context was cancelled with: %w", ctx.Err())
return nil
default:
}

pipe, err := logCmd.StdoutPipe()
if err != nil {
return fmt.Errorf("get stdout pipe: %w", err)
}

if err := logCmd.Start(); err != nil {
return fmt.Errorf("start log command: %w", err)
}

if err = input.processLogs(pipe, pub, metrics, log); err != nil {
log.Errorf("process logs: %w", err)
}

if err := logCmd.Wait(); err != nil {
return fmt.Errorf("log command exited with an error: %w", err)
}

if input.isArchive() {
log.Info("finished processing the archived logs, stopping")
return nil
}
}
}

func (input *input) isArchive() bool { return input.ArchiveFile != "" || input.TraceFile != "" }

func (input *input) processLogs(stdout io.Reader, pub inputcursor.Publisher, metrics *inputMetrics, log *logp.Logger) error {
scanner := bufio.NewScanner(stdout)

var c int64
defer func() { metrics.intervalEvents.Update(c) }()

for scanner.Scan() {
line := scanner.Text()
var logRecord cursor
if err := json.Unmarshal([]byte(line), &logRecord); err != nil {
log.Errorf("invalid json log: %w", err)
metrics.errs.Add(1)
continue
}

event, err := makeEvent(logRecord.Timestamp, line)
if err != nil {
log.Errorf("makeEvent: %w", err)
metrics.errs.Add(1)
continue
}

if err := pub.Publish(event, logRecord); err != nil {
log.Errorf("publish event: %w", err)
metrics.errs.Add(1)
continue
}

c++
}
if err := scanner.Err(); err != nil {
metrics.errs.Add(1)
return fmt.Errorf("scanning stdout: %w", err)
}
return nil
}

func newLogCmd(ctx context.Context, config config, resumeCursor inputcursor.Cursor) (*exec.Cmd, error) {
args := []string{"show", "--style", "ndjson"}
if config.ArchiveFile != "" {
args = append(args, "--archive", config.ArchiveFile)
}
if config.TraceFile != "" {
args = append(args, "--file", config.TraceFile)
}
if config.Predicate != "" {
args = append(args, "--predicate", config.Predicate)
}
if len(config.Process) > 0 {
for _, p := range config.Process {
args = append(args, "--process", p)
}
}
if config.Source {
args = append(args, "--source")
}
if config.Info {
args = append(args, "--info")
}
if config.Debug {
args = append(args, "--debug")
}
if config.Signposts {
args = append(args, "--signposts")
}
if config.Timezone != "" {
args = append(args, "--timezone", config.Timezone)
}
start := config.Start
if !resumeCursor.IsNew() {
cursor := cursor{}
if err := resumeCursor.Unpack(&cursor); err != nil {
return nil, fmt.Errorf("unpacking cursor: %w", err)
}
start = cursor.Timestamp
}
if start != "" {
args = append(args, "--start", start)
}
return exec.CommandContext(ctx, "log", args...), nil
}

func makeEvent(timestamp, message string) (beat.Event, error) {
const layout = "2006-01-02 15:04:05-0700"

ts, err := time.Parse(layout, timestamp)
if err != nil {
return beat.Event{}, fmt.Errorf("invalid timestamp: %w", err)
}
now := timeNow()
fields := mapstr.M{
"event": mapstr.M{
"created": now,
},
"message": message,
}

return beat.Event{
Timestamp: ts,
Fields: fields,
}, nil
}
Loading

0 comments on commit a798085

Please sign in to comment.