Skip to content

Commit

Permalink
Count the in-flight metric after acquire succeeds (#115)
Browse files Browse the repository at this point in the history
Fixes a leak in the in_flight_bytes accounting by not counting the bytes
until after the semaphore is acquired.

We will want to add some kind of limit around the receiver's in-flight
bytes.
  • Loading branch information
jmacd authored Nov 30, 2023
1 parent 167265b commit 46be7f5
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,11 @@ func (b *shard) sendItems(trigger trigger) {
}

func (bp *batchProcessor) countAcquire(ctx context.Context, bytes int64) error {
if bp.telemetry.batchInFlightBytes != nil {
err := bp.sem.Acquire(ctx, bytes)
if err == nil && bp.telemetry.batchInFlightBytes != nil {
bp.telemetry.batchInFlightBytes.Add(ctx, bytes, bp.telemetry.processorAttrOption)
}
return bp.sem.Acquire(ctx, bytes)
return err
}

func (bp *batchProcessor) countRelease(bytes int64) {
Expand Down

0 comments on commit 46be7f5

Please sign in to comment.