From 46be7f5553d5451cfaeae5fb137e3895d28e3e27 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 30 Nov 2023 13:02:45 -0800 Subject: [PATCH] Count the in-flight metric after acquire succeeds (#115) Fixes a leak in the in_flight_bytes accounting by not counting the bytes until after the semaphore is acquired. We will want to add some kind of limit around the receiver's in-flight bytes. --- .../processor/concurrentbatchprocessor/batch_processor.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index f463e14a..853ca2c7 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -361,10 +361,11 @@ func (b *shard) sendItems(trigger trigger) { } func (bp *batchProcessor) countAcquire(ctx context.Context, bytes int64) error { - if bp.telemetry.batchInFlightBytes != nil { + err := bp.sem.Acquire(ctx, bytes) + if err == nil && bp.telemetry.batchInFlightBytes != nil { bp.telemetry.batchInFlightBytes.Add(ctx, bytes, bp.telemetry.processorAttrOption) } - return bp.sem.Acquire(ctx, bytes) + return err } func (bp *batchProcessor) countRelease(bytes int64) {