Skip to content

Commit

Permalink
Add in-flight-bytes metric to concurrent batch processor (#112)
Browse files Browse the repository at this point in the history
Fixes #110 (with #111).

This adds an up-down counter of bytes acquired and released, since the
semaphore does not expose the same quantity.

Also, this moves the semaphore into the batch processor, instead of in
the shard. The limit therefore applies to all shards at once, which I
think is what we intend, or at least it's the simpler option to reason
about vs allowing each shard to have an independent limit.
  • Loading branch information
jmacd authored Nov 28, 2023
1 parent 72b566b commit e589559
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 17 deletions.
27 changes: 20 additions & 7 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type batchProcessor struct {
timeout time.Duration
sendBatchSize int
sendBatchMaxSize int
maxInFlightBytes int

// batchFunc is a factory for new batch objects corresponding
// with the appropriate signal.
Expand All @@ -68,6 +67,8 @@ type batchProcessor struct {

// batcher will be either *singletonBatcher or *multiBatcher
batcher batcher

sem *semaphore.Weighted
}

type batcher interface {
Expand All @@ -93,7 +94,6 @@ type shard struct {
// newItem is used to receive data items from producers.
newItem chan dataItem

sem *semaphore.Weighted
// batch is an in-flight data item containing one of the
// underlying data types.
batch batch
Expand Down Expand Up @@ -166,7 +166,7 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
shutdownC: make(chan struct{}, 1),
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
maxInFlightBytes: int(cfg.MaxInFlightBytes),
sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytes)),
}
if len(bp.metadataKeys) == 0 {
bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)}
Expand Down Expand Up @@ -195,7 +195,6 @@ func (bp *batchProcessor) newShard(md map[string][]string) *shard {
newItem: make(chan dataItem, runtime.NumCPU()),
exportCtx: exportCtx,
batch: bp.batchFunc(),
sem: semaphore.NewWeighted(int64(bp.maxInFlightBytes)),
}

b.processor.goroutines.Add(1)
Expand Down Expand Up @@ -361,6 +360,20 @@ func (b *shard) sendItems(trigger trigger) {
b.totalSent = numItemsAfter
}

func (bp *batchProcessor) countAcquire(ctx context.Context, bytes int64) error {
if bp.telemetry.batchInFlightBytes != nil {
bp.telemetry.batchInFlightBytes.Add(ctx, bytes, bp.telemetry.processorAttrOption)
}
return bp.sem.Acquire(ctx, bytes)
}

func (bp *batchProcessor) countRelease(bytes int64) {
if bp.telemetry.batchInFlightBytes != nil {
bp.telemetry.batchInFlightBytes.Add(context.Background(), -bytes, bp.telemetry.processorAttrOption)
}
bp.sem.Release(bytes)
}

func (b *shard) consumeAndWait(ctx context.Context, data any) error {
respCh := make(chan error, 1)
item := dataItem{
Expand All @@ -378,7 +391,7 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error {
}

bytes := int64(b.batch.sizeBytes(data))
err := b.sem.Acquire(ctx, bytes)
err := b.processor.countAcquire(ctx, bytes)
if err != nil {
return err
}
Expand All @@ -387,7 +400,7 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error {
// releases all previously acquired bytes
defer func() {
if item.count == 0 {
b.sem.Release(bytes)
b.processor.countRelease(bytes)
return
}

Expand All @@ -404,7 +417,7 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error {
}
break
}
b.sem.Release(bytes)
b.processor.countRelease(bytes)
}()
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func TestBatchProcessorCancelContext(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond)

// semaphore should be fully acquired at this point.
assert.False(t, bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(1)))
assert.False(t, bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(1)))

wg.Add(1)
go func() {
Expand All @@ -307,14 +307,14 @@ func TestBatchProcessorCancelContext(t *testing.T) {
wg.Wait()

// check sending another request does not change the semaphore count, even after ConsumeTraces returns.
assert.False(t, bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(1)))
assert.False(t, bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(1)))

// signal to the blockingConsumer to return response to waiters.
bc.unblock()

// Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytes bytes.
require.Eventually(t, func() bool {
return bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(cfg.MaxInFlightBytes))
return bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(cfg.MaxInFlightBytes))
}, 5*time.Second, 10*time.Millisecond)
require.NoError(t, bp.Shutdown(context.Background()))
}
Expand Down
28 changes: 21 additions & 7 deletions collector/processor/concurrentbatchprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,18 @@ type batchProcessorTelemetry struct {
exportCtx context.Context

processorAttr []attribute.KeyValue
processorAttrOption metric.MeasurementOption
batchSizeTriggerSend metric.Int64Counter
timeoutTriggerSend metric.Int64Counter
batchSendSize metric.Int64Histogram
batchSendSizeBytes metric.Int64Histogram
batchSendLatency metric.Float64Histogram
batchMetadataCardinality metric.Int64ObservableUpDownCounter

// Note: since the semaphore does not provide access to its current
// value, we instrument the number of in-flight bytes using parallel
// instrumentation counting acquired and released bytes.
batchInFlightBytes metric.Int64UpDownCounter
}

func newBatchProcessorTelemetry(set processor.CreateSettings, currentMetadataCardinality func() int, useOtel bool) (*batchProcessorTelemetry, error) {
Expand Down Expand Up @@ -137,6 +143,8 @@ func (bpt *batchProcessorTelemetry) createOtelMetrics(mp metric.MeterProvider, c
return nil
}

bpt.processorAttrOption = metric.WithAttributes(bpt.processorAttr...)

var errors, err error
meter := mp.Meter(scopeName)

Expand Down Expand Up @@ -180,12 +188,19 @@ func (bpt *batchProcessorTelemetry) createOtelMetrics(mp metric.MeterProvider, c
metric.WithDescription("Number of distinct metadata value combinations being processed"),
metric.WithUnit("1"),
metric.WithInt64Callback(func(_ context.Context, obs metric.Int64Observer) error {
obs.Observe(int64(currentMetadataCardinality()))
obs.Observe(int64(currentMetadataCardinality()), bpt.processorAttrOption)
return nil
}),
)
errors = multierr.Append(errors, err)

bpt.batchInFlightBytes, err = meter.Int64UpDownCounter(
processorhelper.BuildCustomMetricName(metricTypeStr, "in_flight_bytes"),
metric.WithDescription("Number of bytes in flight"),
metric.WithUnit("By"),
)
errors = multierr.Append(errors, err)

return errors
}

Expand Down Expand Up @@ -214,17 +229,16 @@ func (bpt *batchProcessorTelemetry) recordWithOC(latency time.Duration, trigger
}

func (bpt *batchProcessorTelemetry) recordWithOtel(latency time.Duration, trigger trigger, sent, bytes int64) {
attrs := metric.WithAttributes(bpt.processorAttr...)
switch trigger {
case triggerBatchSize:
bpt.batchSizeTriggerSend.Add(bpt.exportCtx, 1, attrs)
bpt.batchSizeTriggerSend.Add(bpt.exportCtx, 1, bpt.processorAttrOption)
case triggerTimeout:
bpt.timeoutTriggerSend.Add(bpt.exportCtx, 1, attrs)
bpt.timeoutTriggerSend.Add(bpt.exportCtx, 1, bpt.processorAttrOption)
}

bpt.batchSendSize.Record(bpt.exportCtx, sent, attrs)
bpt.batchSendSize.Record(bpt.exportCtx, sent, bpt.processorAttrOption)
if bpt.detailed {
bpt.batchSendLatency.Record(bpt.exportCtx, latency.Seconds(), attrs)
bpt.batchSendSizeBytes.Record(bpt.exportCtx, bytes, attrs)
bpt.batchSendLatency.Record(bpt.exportCtx, latency.Seconds(), bpt.processorAttrOption)
bpt.batchSendSizeBytes.Record(bpt.exportCtx, bytes, bpt.processorAttrOption)
}
}

0 comments on commit e589559

Please sign in to comment.