diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c07ab45..489b9603 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,8 +9,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## Unreleased - Concurrent batch processor: tracing improvements. [#238](https://github.com/open-telemetry/otel-arrow/pull/238), [#241](https://github.com/open-telemetry/otel-arrow/pull/241) -- Concurrent batch processor: support disabling in-flight limits. [#243](https://github.com/open-telemetry/otel-arrow/pull/243) - Update to latest OTel-Collector & OTel-Go dependencies. Remove collector packages now included in collector-contrib/internal/otelarrow. [#245](https://github.com/open-telemetry/otel-arrow/pull/245) +- Concurrent batch processor: remove support for in-flight limits. [#247](https://github.com/open-telemetry/otel-arrow/pull/248) ## [0.25.0](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.24.0) - 2024-07-17 diff --git a/collector/processor/concurrentbatchprocessor/README.md b/collector/processor/concurrentbatchprocessor/README.md index 6d5419c1..66a1e85f 100644 --- a/collector/processor/concurrentbatchprocessor/README.md +++ b/collector/processor/concurrentbatchprocessor/README.md @@ -7,11 +7,7 @@ The differences in this component, relative to that component are: 1. Synchronous pipeline support: this component blocks each producer until the request returns with success or an error status code. -2. Maximim in-flight-bytes setting. This component measures the - in-memory size of each request it admits to the pipeline and - otherwise stalls requests until they timeout. This function is - disabled by `max_in_flight_size_mib: 0`. -3. Unlimited concurrency: this component will start as many goroutines +2. Unlimited concurrency: this component will start as many goroutines as needed to send batches through the pipeline. Here is an example configuration: @@ -22,7 +18,6 @@ Here is an example configuration: send_batch_max_size: 1500 send_batch_size: 1000 timeout: 1s - max_in_flight_size_mib: 128 ``` In this configuration, the component will admit up to 128MiB of diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index b655ea03..22706fbe 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -18,7 +18,6 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/multierr" "go.uber.org/zap" - "golang.org/x/sync/semaphore" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" @@ -70,10 +69,6 @@ type batchProcessor struct { // batcher will be either *singletonBatcher or *multiBatcher batcher batcher - // in-flight bytes limit mechanism - limitBytes int64 - sem *semaphore.Weighted - tracer trace.TracerProvider } @@ -171,8 +166,6 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat } sort.Strings(mks) - limitBytes := int64(cfg.MaxInFlightSizeMiB) << 20 - tp := set.TelemetrySettings.TracerProvider if tp == nil { tp = otel.GetTracerProvider() @@ -188,14 +181,9 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat shutdownC: make(chan struct{}, 1), metadataKeys: mks, metadataLimit: int(cfg.MetadataCardinalityLimit), - limitBytes: limitBytes, tracer: tp, } - if limitBytes != 0 { - bp.sem = semaphore.NewWeighted(limitBytes) - } - if len(bp.metadataKeys) == 0 { bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)} } else { @@ -459,9 +447,6 @@ func allSame(x []context.Context) bool { func (bp *batchProcessor) countAcquire(ctx context.Context, bytes int64) error { var err error - if bp.sem != nil { - err = bp.sem.Acquire(ctx, bytes) - } if err == nil && bp.telemetry.batchInFlightBytes != nil { bp.telemetry.batchInFlightBytes.Add(ctx, bytes, bp.telemetry.processorAttrOption) } @@ -472,9 +457,6 @@ func (bp *batchProcessor) countRelease(bytes int64) { if bp.telemetry.batchInFlightBytes != nil { bp.telemetry.batchInFlightBytes.Add(context.Background(), -bytes, bp.telemetry.processorAttrOption) } - if bp.sem != nil { - bp.sem.Release(bytes) - } } func (b *shard) consumeAndWait(ctx context.Context, data any) error { @@ -502,10 +484,6 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error { } bytes := int64(b.batch.sizeBytes(data)) - if bytes > b.processor.limitBytes { - return fmt.Errorf("request size exceeds max-in-flight bytes: %d", bytes) - } - err := b.processor.countAcquire(ctx, bytes) if err != nil { return err diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index fb01d366..efc5fcba 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -34,8 +34,8 @@ import ( "go.opentelemetry.io/otel" noopmetric "go.opentelemetry.io/otel/metric/noop" sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" ) type testError struct{} @@ -289,78 +289,11 @@ func calculateMaxInFlightSizeMiB(numRequests, spansPerRequest int) uint32 { return numMiB } -// This test is meant to confirm that semaphore is still -// released if the client context is canceled. -func TestBatchProcessorCancelContext(t *testing.T) { - requestCount := 10 - spansPerRequest := 250 - cfg := createDefaultConfig().(*Config) - cfg.SendBatchSize = 128 - cfg.Timeout = 10 * time.Second - cfg.MaxInFlightSizeMiB = calculateMaxInFlightSizeMiB(requestCount, spansPerRequest) - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - bc := &blockingConsumer{ - blocking: make(chan struct{}, 1), - sem: semaphore.NewWeighted(int64(cfg.MaxInFlightSizeMiB << 20)), - szr: &ptrace.ProtoMarshaler{}, - } - bp, err := newBatchTracesProcessor(creationSet, bc, cfg) - require.NoError(t, err) - require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) - - sentResourceSpans := ptrace.NewTraces().ResourceSpans() - var wg sync.WaitGroup - ctx, cancel := context.WithCancel(context.Background()) - for requestNum := 0; requestNum < requestCount; requestNum++ { - td := testdata.GenerateTraces(spansPerRequest) - spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() - for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { - spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) - } - td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) - // ConsumeTraces is a blocking function and should be run in a go routine - // until batch size reached to unblock. - wg.Add(1) - go func() { - consumeErr := bp.ConsumeTraces(ctx, td) - assert.Contains(t, consumeErr.Error(), "context canceled") - wg.Done() - }() - } - - // check all spans arrived in blockingConsumer. - require.Eventually(t, func() bool { - numSpans := (requestCount) * spansPerRequest - return bc.getItemsWaiting() == numSpans - }, 5*time.Second, 10*time.Millisecond) - - // MaxInFlightSizeMiB is the upperbound on in flight bytes, so calculate - // how many free bytes the semaphore has. - excess := int64(cfg.MaxInFlightSizeMiB<<20) - bc.numBytesAcquired - assert.False(t, bp.sem.TryAcquire(excess+1)) - - // cancel context and wait for ConsumeTraces to return. - cancel() - wg.Wait() - assert.False(t, bp.sem.TryAcquire(excess+1)) - - // signal to the blockingConsumer to return response to waiters. - bc.unblock() - - // Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightSizeMiB bytes. - require.Eventually(t, func() bool { - return bp.sem.TryAcquire(int64(cfg.MaxInFlightSizeMiB << 20)) - }, 5*time.Second, 10*time.Millisecond) - require.NoError(t, bp.Shutdown(context.Background())) -} - func TestBatchProcessorUnbrokenParentContextSingle(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 100 cfg.SendBatchMaxSize = 100 cfg.Timeout = 3 * time.Second - cfg.MaxInFlightSizeMiB = 2 requestCount := 10 spansPerRequest := 5249 exp := tracetest.NewInMemoryExporter() @@ -442,7 +375,6 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { cfg.SendBatchSize = 100 cfg.SendBatchMaxSize = 100 cfg.Timeout = 3 * time.Second - cfg.MaxInFlightSizeMiB = 2 requestCount := 50 // keep spansPerRequest small to ensure multiple contexts end up in the same batch. spansPerRequest := 5 @@ -536,7 +468,7 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { for _, link := range span.Links { haveSpanCtxs = append(haveSpanCtxs, link.SpanContext) } - + assert.ElementsMatch(t, expectSpanCtxs, haveSpanCtxs) } @@ -547,7 +479,7 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { tp.ForceFlush(bg) td = exp.GetSpans() - + assert.Equal(t, len(callCtxs), len(td)) for _, span := range td { switch span.Name { @@ -824,9 +756,8 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 1000, - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, + Timeout: 3 * time.Second, + SendBatchSize: 1000, } sink := new(consumertest.TracesSink) @@ -859,9 +790,8 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 200 * time.Millisecond, - SendBatchSize: 50, - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, + Timeout: 200 * time.Millisecond, + SendBatchSize: 50, } requestCount := 100 @@ -925,9 +855,8 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 2 * time.Second, - SendBatchSize: 50, - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, + Timeout: 2 * time.Second, + SendBatchSize: 50, } requestCount := 100 @@ -999,9 +928,8 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { func TestBatchMetricsProcessor_Timeout(t *testing.T) { cfg := Config{ - Timeout: 100 * time.Millisecond, - SendBatchSize: 101, - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, + Timeout: 100 * time.Millisecond, + SendBatchSize: 101, } requestCount := 5 metricsPerRequest := 10 @@ -1045,9 +973,8 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { func TestBatchMetricProcessor_Shutdown(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 1000, - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, + Timeout: 3 * time.Second, + SendBatchSize: 1000, } requestCount := 5 metricsPerRequest := 10 @@ -1138,9 +1065,8 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) { func BenchmarkBatchMetricProcessor(b *testing.B) { b.StopTimer() cfg := Config{ - Timeout: 100 * time.Millisecond, - SendBatchSize: 2000, - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, + Timeout: 100 * time.Millisecond, + SendBatchSize: 2000, } runMetricsProcessorBenchmark(b, cfg) } @@ -1148,10 +1074,9 @@ func BenchmarkBatchMetricProcessor(b *testing.B) { func BenchmarkMultiBatchMetricProcessor(b *testing.B) { b.StopTimer() cfg := Config{ - Timeout: 100 * time.Millisecond, - SendBatchSize: 2000, - MetadataKeys: []string{"test", "test2"}, - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, + Timeout: 100 * time.Millisecond, + SendBatchSize: 2000, + MetadataKeys: []string{"test", "test2"}, } runMetricsProcessorBenchmark(b, cfg) } @@ -1199,9 +1124,8 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 200 * time.Millisecond, - SendBatchSize: 50, - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, + Timeout: 200 * time.Millisecond, + SendBatchSize: 50, } requestCount := 100 @@ -1265,9 +1189,8 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 2 * time.Second, - SendBatchSize: 50, - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, + Timeout: 2 * time.Second, + SendBatchSize: 50, } requestCount := 100 @@ -1317,9 +1240,8 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry) { func TestBatchLogsProcessor_Timeout(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 100, - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, + Timeout: 3 * time.Second, + SendBatchSize: 100, } requestCount := 5 logsPerRequest := 10 @@ -1363,9 +1285,8 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { func TestBatchLogProcessor_Shutdown(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 1000, - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, + Timeout: 3 * time.Second, + SendBatchSize: 1000, } requestCount := 5 logsPerRequest := 10 @@ -1632,9 +1553,7 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { func TestBatchZeroConfig(t *testing.T) { // This is a no-op configuration. No need for a timer, no // minimum, no mxaimum, just a pass through. - cfg := Config{ - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, - } + cfg := Config{} require.NoError(t, cfg.Validate()) @@ -1675,8 +1594,7 @@ func TestBatchSplitOnly(t *testing.T) { const logsPerRequest = 100 cfg := Config{ - SendBatchMaxSize: maxBatch, - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, + SendBatchMaxSize: maxBatch, } require.NoError(t, cfg.Validate()) @@ -1706,28 +1624,6 @@ func TestBatchSplitOnly(t *testing.T) { } } -func TestBatchTooLarge(t *testing.T) { - cfg := Config{ - SendBatchMaxSize: 100000, - SendBatchSize: 100000, - MaxInFlightSizeMiB: 1, - } - - require.NoError(t, cfg.Validate()) - - sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) - require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) - - ld := testdata.GenerateLogs(100000) - err = batcher.ConsumeLogs(context.Background(), ld) - assert.Error(t, err) - assert.Contains(t, err.Error(), "request size exceeds max-in-flight bytes") -} - func TestBatchProcessorEmptyBatch(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) diff --git a/collector/processor/concurrentbatchprocessor/config.go b/collector/processor/concurrentbatchprocessor/config.go index e01aa918..1c7ada77 100644 --- a/collector/processor/concurrentbatchprocessor/config.go +++ b/collector/processor/concurrentbatchprocessor/config.go @@ -45,9 +45,10 @@ type Config struct { // combination of MetadataKeys. MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"` - // MaxInFlightSizeMiB limits the number of bytes in queue waiting to be - // processed by the senders. If zero, this functionality is disabled. - MaxInFlightSizeMiB uint32 `mapstructure:"max_in_flight_size_mib"` + // deprecatedMaxInFlightSizeMiB is deprecated. This functionality has + // been eliminated, the OTel-Arrow receiver admission control + // is recommended. + deprecatedMaxInFlightSizeMiB uint32 `mapstructure:"max_in_flight_size_mib"` } var _ component.Config = (*Config)(nil) diff --git a/collector/processor/concurrentbatchprocessor/config_test.go b/collector/processor/concurrentbatchprocessor/config_test.go index 63758f5c..d2f6c839 100644 --- a/collector/processor/concurrentbatchprocessor/config_test.go +++ b/collector/processor/concurrentbatchprocessor/config_test.go @@ -35,24 +35,21 @@ func TestUnmarshalConfig(t *testing.T) { SendBatchMaxSize: uint32(11000), Timeout: time.Second * 10, MetadataCardinalityLimit: 1000, - MaxInFlightSizeMiB: 12345, }, cfg) } func TestValidateConfig_DefaultBatchMaxSize(t *testing.T) { cfg := &Config{ - SendBatchSize: 100, - SendBatchMaxSize: 0, - MaxInFlightSizeMiB: 1, + SendBatchSize: 100, + SendBatchMaxSize: 0, } assert.NoError(t, cfg.Validate()) } func TestValidateConfig_ValidBatchSizes(t *testing.T) { cfg := &Config{ - SendBatchSize: 100, - SendBatchMaxSize: 1000, - MaxInFlightSizeMiB: 1, + SendBatchSize: 100, + SendBatchMaxSize: 1000, } assert.NoError(t, cfg.Validate()) @@ -60,17 +57,15 @@ func TestValidateConfig_ValidBatchSizes(t *testing.T) { func TestValidateConfig_InvalidBatchSize(t *testing.T) { cfg := &Config{ - SendBatchSize: 1000, - SendBatchMaxSize: 100, - MaxInFlightSizeMiB: 1, + SendBatchSize: 1000, + SendBatchMaxSize: 100, } assert.Error(t, cfg.Validate()) } func TestValidateConfig_InvalidTimeout(t *testing.T) { cfg := &Config{ - Timeout: -time.Second, - MaxInFlightSizeMiB: 1, + Timeout: -time.Second, } assert.Error(t, cfg.Validate()) } diff --git a/collector/processor/concurrentbatchprocessor/factory.go b/collector/processor/concurrentbatchprocessor/factory.go index a31de5d4..25a9172f 100644 --- a/collector/processor/concurrentbatchprocessor/factory.go +++ b/collector/processor/concurrentbatchprocessor/factory.go @@ -19,9 +19,6 @@ const ( defaultSendBatchSize = uint32(8192) defaultTimeout = 200 * time.Millisecond - // default inflight bytes is 32 MiB - defaultMaxInFlightSizeMiB = 32 - // defaultMetadataCardinalityLimit should be set to the number // of metadata configurations the user expects to submit to // the collector. @@ -42,7 +39,6 @@ func createDefaultConfig() component.Config { return &Config{ SendBatchSize: defaultSendBatchSize, Timeout: defaultTimeout, - MaxInFlightSizeMiB: defaultMaxInFlightSizeMiB, MetadataCardinalityLimit: defaultMetadataCardinalityLimit, } } diff --git a/collector/processor/concurrentbatchprocessor/testdata/config.yaml b/collector/processor/concurrentbatchprocessor/testdata/config.yaml index d549d7d7..3ed4c8db 100644 --- a/collector/processor/concurrentbatchprocessor/testdata/config.yaml +++ b/collector/processor/concurrentbatchprocessor/testdata/config.yaml @@ -1,4 +1,3 @@ timeout: 10s send_batch_size: 10000 send_batch_max_size: 11000 -max_in_flight_size_mib: 12345