From 187b345bfd232497c205b32c01026f38bb1d501b Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Fri, 8 Nov 2024 12:58:41 +0100 Subject: [PATCH] add `WriterOperator::WriteBatch` and `Operator::ProcessBatch` methods The File input's `emitBatch` function now calls `ProcessBatch` instead of `Process`. The added `ProcessBatch` method will make each Stanza operator capable of accepting a batch of entries. --- pkg/stanza/operator/helper/emitter.go | 11 +++++++- pkg/stanza/operator/helper/input.go | 9 ++++++ pkg/stanza/operator/helper/writer.go | 19 +++++++++++++ pkg/stanza/operator/input/file/input.go | 37 +++++++++++++++++-------- pkg/stanza/operator/operator.go | 2 ++ 5 files changed, 65 insertions(+), 13 deletions(-) diff --git a/pkg/stanza/operator/helper/emitter.go b/pkg/stanza/operator/helper/emitter.go index aa91b85c92be..9efd84153f4c 100644 --- a/pkg/stanza/operator/helper/emitter.go +++ b/pkg/stanza/operator/helper/emitter.go @@ -94,7 +94,7 @@ func (e *LogEmitter) Stop() error { return nil } -// Process will emit an entry to the output channel +// Process emits an entry to the consumerFunc func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error { if oldBatch := e.appendEntry(ent); len(oldBatch) > 0 { e.consumerFunc(ctx, oldBatch) @@ -103,6 +103,15 @@ func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error { return nil } +// ProcessBatch emits the entries to the consumerFunc +func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []entry.Entry) error { + for _, entry := range entries { + e.Process(ctx, &entry) + } + + return nil +} + // appendEntry appends the entry to the current batch. If maxBatchSize is reached, a new batch will be made, and the old batch // (which should be flushed) will be returned func (e *LogEmitter) appendEntry(ent *entry.Entry) []*entry.Entry { diff --git a/pkg/stanza/operator/helper/input.go b/pkg/stanza/operator/helper/input.go index b42b1d33c329..f1293de9e0ef 100644 --- a/pkg/stanza/operator/helper/input.go +++ b/pkg/stanza/operator/helper/input.go @@ -90,3 +90,12 @@ func (i *InputOperator) Process(_ context.Context, _ *entry.Entry) error { "Ensure that operator is not configured to receive logs from other operators", ) } + +// ProcessBatch will always return an error if called. +func (i *InputOperator) ProcessBatch(_ context.Context, _ []entry.Entry) error { + i.Logger().Error("Operator received a batch of entries, but can not process") + return errors.NewError( + "Operator can not process logs.", + "Ensure that operator is not configured to receive logs from other operators", + ) +} diff --git a/pkg/stanza/operator/helper/writer.go b/pkg/stanza/operator/helper/writer.go index a07b91a714c5..de93cdcbf120 100644 --- a/pkg/stanza/operator/helper/writer.go +++ b/pkg/stanza/operator/helper/writer.go @@ -61,6 +61,25 @@ func (w *WriterOperator) Write(ctx context.Context, e *entry.Entry) error { return nil } +// Write writes a batch of entries to the outputs of the operator. +// A batch is a collection of entries that are sent in one go. +func (w *WriterOperator) WriteBatch(ctx context.Context, entries []entry.Entry) error { + for i, op := range w.OutputOperators { + if i == len(w.OutputOperators)-1 { + return op.ProcessBatch(ctx, entries) + } + copyOfEntries := make([]entry.Entry, 0, len(entries)) + for _, entry := range entries { + copyOfEntries = append(copyOfEntries, *entry.Copy()) + } + err := op.ProcessBatch(ctx, copyOfEntries) + if err != nil { + w.Logger().Error("Failed to process entries", zap.Error(err)) + } + } + return nil +} + // CanOutput always returns true for a writer operator. func (w *WriterOperator) CanOutput() bool { return true diff --git a/pkg/stanza/operator/input/file/input.go b/pkg/stanza/operator/input/file/input.go index a261f083c268..0a2913cf9b1e 100644 --- a/pkg/stanza/operator/input/file/input.go +++ b/pkg/stanza/operator/input/file/input.go @@ -39,27 +39,40 @@ func (i *Input) Stop() error { } func (i *Input) emitBatch(ctx context.Context, tokens []emit.Token) error { + entries, conversionError := i.convertTokens(tokens) + if conversionError != nil { + conversionError = fmt.Errorf("convert tokens: %w", conversionError) + } + + consumeError := i.WriteBatch(ctx, entries) + if consumeError != nil { + consumeError = fmt.Errorf("consume entries: %w", consumeError) + } + + return errors.Join(conversionError, consumeError) +} + +func (i *Input) convertTokens(tokens []emit.Token) ([]entry.Entry, error) { + entries := make([]entry.Entry, 0, len(tokens)) var errs []error for _, token := range tokens { - err := i.emit(ctx, token) + if len(token.Body) == 0 { + continue + } + entry, err := i.convertToken(token) if err != nil { errs = append(errs, err) + continue } + entries = append(entries, *entry) } - if len(errs) > 0 { - return errors.Join(errs...) - } - return nil + return entries, errors.Join(errs...) } -func (i *Input) emit(ctx context.Context, token emit.Token) error { - if len(token.Body) == 0 { - return nil - } - +func (i *Input) convertToken(token emit.Token) (*entry.Entry, error) { ent, err := i.NewEntry(i.toBody(token.Body)) if err != nil { - return fmt.Errorf("create entry: %w", err) + return nil, fmt.Errorf("create entry: %w", err) } for k, v := range token.Attributes { @@ -67,5 +80,5 @@ func (i *Input) emit(ctx context.Context, token emit.Token) error { i.Logger().Error("set attribute", zap.Error(err)) } } - return i.Write(ctx, ent) + return ent, nil } diff --git a/pkg/stanza/operator/operator.go b/pkg/stanza/operator/operator.go index 21d9a176cc38..7c0f7e57956a 100644 --- a/pkg/stanza/operator/operator.go +++ b/pkg/stanza/operator/operator.go @@ -38,6 +38,8 @@ type Operator interface { CanProcess() bool // Process will process an entry from an operator. Process(context.Context, *entry.Entry) error + // Process processes a batch of entries from an operator. + ProcessBatch(context.Context, []entry.Entry) error // Logger returns the operator's logger Logger() *zap.Logger }