diff --git a/pkg/stanza/operator/helper/emitter.go b/pkg/stanza/operator/helper/emitter.go index 9efd84153f4c..152f95f0ce0c 100644 --- a/pkg/stanza/operator/helper/emitter.go +++ b/pkg/stanza/operator/helper/emitter.go @@ -104,9 +104,9 @@ func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error { } // ProcessBatch emits the entries to the consumerFunc -func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []entry.Entry) error { - for _, entry := range entries { - e.Process(ctx, &entry) +func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []*entry.Entry) error { + if oldBatch := e.appendEntries(entries); len(oldBatch) > 0 { + e.consumerFunc(ctx, oldBatch) } return nil @@ -128,6 +128,22 @@ func (e *LogEmitter) appendEntry(ent *entry.Entry) []*entry.Entry { return nil } +// appendEntries appends the entries to the current batch. If maxBatchSize is reached, a new batch will be made, and the old batch +// (which should be flushed) will be returned +func (e *LogEmitter) appendEntries(entries []*entry.Entry) []*entry.Entry { + e.batchMux.Lock() + defer e.batchMux.Unlock() + + e.batch = append(e.batch, entries...) + if uint(len(e.batch)) >= e.maxBatchSize { + var oldBatch []*entry.Entry + oldBatch, e.batch = e.batch, make([]*entry.Entry, 0, e.maxBatchSize) + return oldBatch + } + + return nil +} + // flusher flushes the current batch every flush interval. Intended to be run as a goroutine func (e *LogEmitter) flusher() { defer e.wg.Done()