diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 581f6621..f463e14a 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -46,7 +46,6 @@ type batchProcessor struct { timeout time.Duration sendBatchSize int sendBatchMaxSize int - maxInFlightBytes int // batchFunc is a factory for new batch objects corresponding // with the appropriate signal. @@ -68,6 +67,8 @@ type batchProcessor struct { // batcher will be either *singletonBatcher or *multiBatcher batcher batcher + + sem *semaphore.Weighted } type batcher interface { @@ -93,7 +94,6 @@ type shard struct { // newItem is used to receive data items from producers. newItem chan dataItem - sem *semaphore.Weighted // batch is an in-flight data item containing one of the // underlying data types. batch batch @@ -166,7 +166,7 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func shutdownC: make(chan struct{}, 1), metadataKeys: mks, metadataLimit: int(cfg.MetadataCardinalityLimit), - maxInFlightBytes: int(cfg.MaxInFlightBytes), + sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytes)), } if len(bp.metadataKeys) == 0 { bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)} @@ -195,7 +195,6 @@ func (bp *batchProcessor) newShard(md map[string][]string) *shard { newItem: make(chan dataItem, runtime.NumCPU()), exportCtx: exportCtx, batch: bp.batchFunc(), - sem: semaphore.NewWeighted(int64(bp.maxInFlightBytes)), } b.processor.goroutines.Add(1) @@ -361,6 +360,20 @@ func (b *shard) sendItems(trigger trigger) { b.totalSent = numItemsAfter } +func (bp *batchProcessor) countAcquire(ctx context.Context, bytes int64) error { + if bp.telemetry.batchInFlightBytes != nil { + bp.telemetry.batchInFlightBytes.Add(ctx, bytes, bp.telemetry.processorAttrOption) + } + return bp.sem.Acquire(ctx, bytes) +} + +func (bp *batchProcessor) countRelease(bytes int64) { + if bp.telemetry.batchInFlightBytes != nil { + bp.telemetry.batchInFlightBytes.Add(context.Background(), -bytes, bp.telemetry.processorAttrOption) + } + bp.sem.Release(bytes) +} + func (b *shard) consumeAndWait(ctx context.Context, data any) error { respCh := make(chan error, 1) item := dataItem{ @@ -378,7 +391,7 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error { } bytes := int64(b.batch.sizeBytes(data)) - err := b.sem.Acquire(ctx, bytes) + err := b.processor.countAcquire(ctx, bytes) if err != nil { return err } @@ -387,7 +400,7 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error { // releases all previously acquired bytes defer func() { if item.count == 0 { - b.sem.Release(bytes) + b.processor.countRelease(bytes) return } @@ -404,7 +417,7 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error { } break } - b.sem.Release(bytes) + b.processor.countRelease(bytes) }() }() diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 01ebdfcd..1c0f9c0c 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -292,7 +292,7 @@ func TestBatchProcessorCancelContext(t *testing.T) { }, 5*time.Second, 10*time.Millisecond) // semaphore should be fully acquired at this point. - assert.False(t, bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(1))) + assert.False(t, bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(1))) wg.Add(1) go func() { @@ -307,14 +307,14 @@ func TestBatchProcessorCancelContext(t *testing.T) { wg.Wait() // check sending another request does not change the semaphore count, even after ConsumeTraces returns. - assert.False(t, bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(1))) + assert.False(t, bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(1))) // signal to the blockingConsumer to return response to waiters. bc.unblock() // Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytes bytes. require.Eventually(t, func() bool { - return bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(cfg.MaxInFlightBytes)) + return bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(cfg.MaxInFlightBytes)) }, 5*time.Second, 10*time.Millisecond) require.NoError(t, bp.Shutdown(context.Background())) } diff --git a/collector/processor/concurrentbatchprocessor/metrics.go b/collector/processor/concurrentbatchprocessor/metrics.go index ec5b7e42..126d495a 100644 --- a/collector/processor/concurrentbatchprocessor/metrics.go +++ b/collector/processor/concurrentbatchprocessor/metrics.go @@ -103,12 +103,18 @@ type batchProcessorTelemetry struct { exportCtx context.Context processorAttr []attribute.KeyValue + processorAttrOption metric.MeasurementOption batchSizeTriggerSend metric.Int64Counter timeoutTriggerSend metric.Int64Counter batchSendSize metric.Int64Histogram batchSendSizeBytes metric.Int64Histogram batchSendLatency metric.Float64Histogram batchMetadataCardinality metric.Int64ObservableUpDownCounter + + // Note: since the semaphore does not provide access to its current + // value, we instrument the number of in-flight bytes using parallel + // instrumentation counting acquired and released bytes. + batchInFlightBytes metric.Int64UpDownCounter } func newBatchProcessorTelemetry(set processor.CreateSettings, currentMetadataCardinality func() int, useOtel bool) (*batchProcessorTelemetry, error) { @@ -137,6 +143,8 @@ func (bpt *batchProcessorTelemetry) createOtelMetrics(mp metric.MeterProvider, c return nil } + bpt.processorAttrOption = metric.WithAttributes(bpt.processorAttr...) + var errors, err error meter := mp.Meter(scopeName) @@ -180,12 +188,19 @@ func (bpt *batchProcessorTelemetry) createOtelMetrics(mp metric.MeterProvider, c metric.WithDescription("Number of distinct metadata value combinations being processed"), metric.WithUnit("1"), metric.WithInt64Callback(func(_ context.Context, obs metric.Int64Observer) error { - obs.Observe(int64(currentMetadataCardinality())) + obs.Observe(int64(currentMetadataCardinality()), bpt.processorAttrOption) return nil }), ) errors = multierr.Append(errors, err) + bpt.batchInFlightBytes, err = meter.Int64UpDownCounter( + processorhelper.BuildCustomMetricName(metricTypeStr, "in_flight_bytes"), + metric.WithDescription("Number of bytes in flight"), + metric.WithUnit("By"), + ) + errors = multierr.Append(errors, err) + return errors } @@ -214,17 +229,16 @@ func (bpt *batchProcessorTelemetry) recordWithOC(latency time.Duration, trigger } func (bpt *batchProcessorTelemetry) recordWithOtel(latency time.Duration, trigger trigger, sent, bytes int64) { - attrs := metric.WithAttributes(bpt.processorAttr...) switch trigger { case triggerBatchSize: - bpt.batchSizeTriggerSend.Add(bpt.exportCtx, 1, attrs) + bpt.batchSizeTriggerSend.Add(bpt.exportCtx, 1, bpt.processorAttrOption) case triggerTimeout: - bpt.timeoutTriggerSend.Add(bpt.exportCtx, 1, attrs) + bpt.timeoutTriggerSend.Add(bpt.exportCtx, 1, bpt.processorAttrOption) } - bpt.batchSendSize.Record(bpt.exportCtx, sent, attrs) + bpt.batchSendSize.Record(bpt.exportCtx, sent, bpt.processorAttrOption) if bpt.detailed { - bpt.batchSendLatency.Record(bpt.exportCtx, latency.Seconds(), attrs) - bpt.batchSendSizeBytes.Record(bpt.exportCtx, bytes, attrs) + bpt.batchSendLatency.Record(bpt.exportCtx, latency.Seconds(), bpt.processorAttrOption) + bpt.batchSendSizeBytes.Record(bpt.exportCtx, bytes, bpt.processorAttrOption) } }