From ec4437551bc1ca68082fbb2718a5cdcfd28c57a2 Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Fri, 8 Nov 2024 15:25:48 +0100 Subject: [PATCH] do not split batches in Log Emitter This changes the Log Emitter to run the `consumerFunc` on the whole batch, instead of splitting the batch into individual entries and calling `consumerFunc` on each of them. This doesn't change much while the Log Emitter has its own `batch` buffer, but if we remove the `batch` buffer (see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35456), this should prevent the performance drop described in https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35454. --- pkg/stanza/operator/helper/emitter.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/pkg/stanza/operator/helper/emitter.go b/pkg/stanza/operator/helper/emitter.go index 9efd84153f4c..152f95f0ce0c 100644 --- a/pkg/stanza/operator/helper/emitter.go +++ b/pkg/stanza/operator/helper/emitter.go @@ -104,9 +104,9 @@ func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error { } // ProcessBatch emits the entries to the consumerFunc -func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []entry.Entry) error { - for _, entry := range entries { - e.Process(ctx, &entry) +func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []*entry.Entry) error { + if oldBatch := e.appendEntries(entries); len(oldBatch) > 0 { + e.consumerFunc(ctx, oldBatch) } return nil @@ -128,6 +128,22 @@ func (e *LogEmitter) appendEntry(ent *entry.Entry) []*entry.Entry { return nil } +// appendEntries appends the entries to the current batch. If maxBatchSize is reached, a new batch will be made, and the old batch +// (which should be flushed) will be returned +func (e *LogEmitter) appendEntries(entries []*entry.Entry) []*entry.Entry { + e.batchMux.Lock() + defer e.batchMux.Unlock() + + e.batch = append(e.batch, entries...) + if uint(len(e.batch)) >= e.maxBatchSize { + var oldBatch []*entry.Entry + oldBatch, e.batch = e.batch, make([]*entry.Entry, 0, e.maxBatchSize) + return oldBatch + } + + return nil +} + // flusher flushes the current batch every flush interval. Intended to be run as a goroutine func (e *LogEmitter) flusher() { defer e.wg.Done()