Skip to content

Commit

Permalink
do not split batches in Log Emitter
Browse files Browse the repository at this point in the history
This changes the Log Emitter to run the `consumerFunc`
on the whole batch, instead of splitting the batch into individual entries
and calling `consumerFunc` on each of them.

This doesn't change much while the Log Emitter has its own `batch` buffer,
but if we remove the `batch` buffer (see #35456),
this should prevent the performance drop described in #35454.
  • Loading branch information
andrzej-stencel committed Nov 19, 2024
1 parent b2d8204 commit ec44375
Showing 1 changed file with 19 additions and 3 deletions.
22 changes: 19 additions & 3 deletions pkg/stanza/operator/helper/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ func (e *LogEmitter) Process(ctx context.Context, ent *entry.Entry) error {
}

// ProcessBatch emits the entries to the consumerFunc
func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []entry.Entry) error {
for _, entry := range entries {
e.Process(ctx, &entry)
func (e *LogEmitter) ProcessBatch(ctx context.Context, entries []*entry.Entry) error {
if oldBatch := e.appendEntries(entries); len(oldBatch) > 0 {
e.consumerFunc(ctx, oldBatch)
}

return nil
Expand All @@ -128,6 +128,22 @@ func (e *LogEmitter) appendEntry(ent *entry.Entry) []*entry.Entry {
return nil
}

// appendEntries appends the entries to the current batch. If maxBatchSize is reached, a new batch will be made, and the old batch
// (which should be flushed) will be returned
func (e *LogEmitter) appendEntries(entries []*entry.Entry) []*entry.Entry {
e.batchMux.Lock()
defer e.batchMux.Unlock()

e.batch = append(e.batch, entries...)
if uint(len(e.batch)) >= e.maxBatchSize {
var oldBatch []*entry.Entry
oldBatch, e.batch = e.batch, make([]*entry.Entry, 0, e.maxBatchSize)
return oldBatch
}

return nil
}

// flusher flushes the current batch every flush interval. Intended to be run as a goroutine
func (e *LogEmitter) flusher() {
defer e.wg.Done()
Expand Down

0 comments on commit ec44375

Please sign in to comment.