Skip to content

Commit

Permalink
add WriterOperator::WriteBatch and Operator::ProcessBatch methods
Browse files Browse the repository at this point in the history
The File input's `emitBatch` function now calls `ProcessBatch` instead of `Process`.

The added `ProcessBatch` method will make each Stanza operator
capable of accepting a batch of entries.
  • Loading branch information
andrzej-stencel committed Nov 8, 2024
1 parent 8780b9a commit 187b345
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 13 deletions.
11 changes: 10 additions & 1 deletion pkg/stanza/operator/helper/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (e *LogEmitter) Stop() error {
return nil
}

// Process will emit an entry to the output channel
// Process emits an entry to the consumerFunc
func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error {
if oldBatch := e.appendEntry(ent); len(oldBatch) > 0 {
e.consumerFunc(ctx, oldBatch)
Expand All @@ -103,6 +103,15 @@ func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error {
return nil
}

// ProcessBatch emits the entries to the consumerFunc
func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
for _, entry := range entries {
e.Process(ctx, &entry)
}

return nil
}

// appendEntry appends the entry to the current batch. If maxBatchSize is reached, a new batch will be made, and the old batch
// (which should be flushed) will be returned
func (e *LogEmitter) appendEntry(ent *entry.Entry) []*entry.Entry {
Expand Down
9 changes: 9 additions & 0 deletions pkg/stanza/operator/helper/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,12 @@ func (i *InputOperator) Process(_ context.Context, _ *entry.Entry) error {
"Ensure that operator is not configured to receive logs from other operators",
)
}

// ProcessBatch will always return an error if called.
func (i *InputOperator) ProcessBatch(_ context.Context, _ []entry.Entry) error {
i.Logger().Error("Operator received a batch of entries, but can not process")
return errors.NewError(
"Operator can not process logs.",
"Ensure that operator is not configured to receive logs from other operators",
)
}
19 changes: 19 additions & 0 deletions pkg/stanza/operator/helper/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,25 @@ func (w *WriterOperator) Write(ctx context.Context, e *entry.Entry) error {
return nil
}

// Write writes a batch of entries to the outputs of the operator.
// A batch is a collection of entries that are sent in one go.
func (w *WriterOperator) WriteBatch(ctx context.Context, entries []entry.Entry) error {
for i, op := range w.OutputOperators {
if i == len(w.OutputOperators)-1 {
return op.ProcessBatch(ctx, entries)
}
copyOfEntries := make([]entry.Entry, 0, len(entries))
for _, entry := range entries {
copyOfEntries = append(copyOfEntries, *entry.Copy())
}
err := op.ProcessBatch(ctx, copyOfEntries)
if err != nil {
w.Logger().Error("Failed to process entries", zap.Error(err))
}
}
return nil
}

// CanOutput always returns true for a writer operator.
func (w *WriterOperator) CanOutput() bool {
return true
Expand Down
37 changes: 25 additions & 12 deletions pkg/stanza/operator/input/file/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,46 @@ func (i *Input) Stop() error {
}

func (i *Input) emitBatch(ctx context.Context, tokens []emit.Token) error {
entries, conversionError := i.convertTokens(tokens)
if conversionError != nil {
conversionError = fmt.Errorf("convert tokens: %w", conversionError)
}

consumeError := i.WriteBatch(ctx, entries)
if consumeError != nil {
consumeError = fmt.Errorf("consume entries: %w", consumeError)
}

return errors.Join(conversionError, consumeError)
}

func (i *Input) convertTokens(tokens []emit.Token) ([]entry.Entry, error) {
entries := make([]entry.Entry, 0, len(tokens))
var errs []error
for _, token := range tokens {
err := i.emit(ctx, token)
if len(token.Body) == 0 {
continue
}
entry, err := i.convertToken(token)
if err != nil {
errs = append(errs, err)
continue
}
entries = append(entries, *entry)
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
return entries, errors.Join(errs...)
}

func (i *Input) emit(ctx context.Context, token emit.Token) error {
if len(token.Body) == 0 {
return nil
}

func (i *Input) convertToken(token emit.Token) (*entry.Entry, error) {
ent, err := i.NewEntry(i.toBody(token.Body))
if err != nil {
return fmt.Errorf("create entry: %w", err)
return nil, fmt.Errorf("create entry: %w", err)
}

for k, v := range token.Attributes {
if err := ent.Set(entry.NewAttributeField(k), v); err != nil {
i.Logger().Error("set attribute", zap.Error(err))
}
}
return i.Write(ctx, ent)
return ent, nil
}
2 changes: 2 additions & 0 deletions pkg/stanza/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Operator interface {
CanProcess() bool
// Process will process an entry from an operator.
Process(context.Context, *entry.Entry) error
// Process processes a batch of entries from an operator.
ProcessBatch(context.Context, []entry.Entry) error
// Logger returns the operator's logger
Logger() *zap.Logger
}

0 comments on commit 187b345

Please sign in to comment.