From fe188e91fecf08b74557a41be3d4be9f64b61a36 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Wed, 13 Dec 2023 02:44:29 -0500 Subject: [PATCH 1/2] add suffix --- .../batch_processor.go | 2 +- .../batch_processor_test.go | 88 +++++++++---------- .../concurrentbatchprocessor/config.go | 14 ++- .../concurrentbatchprocessor/config_test.go | 4 +- .../concurrentbatchprocessor/factory.go | 4 +- .../testdata/config.yaml | 2 +- 6 files changed, 63 insertions(+), 51 deletions(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 853ca2c7..049cd61b 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -166,7 +166,7 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func shutdownC: make(chan struct{}, 1), metadataKeys: mks, metadataLimit: int(cfg.MetadataCardinalityLimit), - sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytes)), + sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytesMiB)), } if len(bp.metadataKeys) == 0 { bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)} diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 1c0f9c0c..399ece2d 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -234,9 +234,9 @@ func (bc *blockingConsumer) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } -// helper function to help determine a setting for cfg.MaxInFlightBytes based +// helper function to help determine a setting for cfg.MaxInFlightBytesMiB based // on the number of requests and number of spans per request. -func calculateMaxInFlightBytes(numRequests, spansPerRequest int) uint32 { +func calculateMaxInFlightBytesMiB(numRequests, spansPerRequest int) uint32 { sentResourceSpans := ptrace.NewTraces().ResourceSpans() td := testdata.GenerateTraces(spansPerRequest) spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() @@ -257,7 +257,7 @@ func TestBatchProcessorCancelContext(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 cfg.Timeout = 10 * time.Second - cfg.MaxInFlightBytes = calculateMaxInFlightBytes(requestCount, spansPerRequest) + cfg.MaxInFlightBytesMiB = calculateMaxInFlightBytesMiB(requestCount, spansPerRequest) creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed bc := &blockingConsumer{blocking: make(chan struct{}, 1)} @@ -292,7 +292,7 @@ func TestBatchProcessorCancelContext(t *testing.T) { }, 5*time.Second, 10*time.Millisecond) // semaphore should be fully acquired at this point. - assert.False(t, bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(1))) + assert.False(t, bp.sem.TryAcquire(int64(1))) wg.Add(1) go func() { @@ -307,14 +307,14 @@ func TestBatchProcessorCancelContext(t *testing.T) { wg.Wait() // check sending another request does not change the semaphore count, even after ConsumeTraces returns. - assert.False(t, bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(1))) + assert.False(t, bp.sem.TryAcquire(int64(1))) // signal to the blockingConsumer to return response to waiters. bc.unblock() - // Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytes bytes. + // Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytesMiB bytes. require.Eventually(t, func() bool { - return bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(cfg.MaxInFlightBytes)) + return bp.sem.TryAcquire(int64(cfg.MaxInFlightBytesMiB)) }, 5*time.Second, 10*time.Millisecond) require.NoError(t, bp.Shutdown(context.Background())) } @@ -582,9 +582,9 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 1000, - MaxInFlightBytes: defaultMaxBytes, + Timeout: 3 * time.Second, + SendBatchSize: 1000, + MaxInFlightBytesMiB: defaultMaxBytes, } sink := new(consumertest.TracesSink) @@ -617,9 +617,9 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 200 * time.Millisecond, - SendBatchSize: 50, - MaxInFlightBytes: defaultMaxBytes, + Timeout: 200 * time.Millisecond, + SendBatchSize: 50, + MaxInFlightBytesMiB: defaultMaxBytes, } requestCount := 100 @@ -683,9 +683,9 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 2 * time.Second, - SendBatchSize: 50, - MaxInFlightBytes: defaultMaxBytes, + Timeout: 2 * time.Second, + SendBatchSize: 50, + MaxInFlightBytesMiB: defaultMaxBytes, } requestCount := 100 @@ -757,9 +757,9 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { func TestBatchMetricsProcessor_Timeout(t *testing.T) { cfg := Config{ - Timeout: 100 * time.Millisecond, - SendBatchSize: 101, - MaxInFlightBytes: defaultMaxBytes, + Timeout: 100 * time.Millisecond, + SendBatchSize: 101, + MaxInFlightBytesMiB: defaultMaxBytes, } requestCount := 5 metricsPerRequest := 10 @@ -803,9 +803,9 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { func TestBatchMetricProcessor_Shutdown(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 1000, - MaxInFlightBytes: defaultMaxBytes, + Timeout: 3 * time.Second, + SendBatchSize: 1000, + MaxInFlightBytesMiB: defaultMaxBytes, } requestCount := 5 metricsPerRequest := 10 @@ -896,9 +896,9 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) { func BenchmarkBatchMetricProcessor(b *testing.B) { b.StopTimer() cfg := Config{ - Timeout: 100 * time.Millisecond, - SendBatchSize: 2000, - MaxInFlightBytes: defaultMaxBytes, + Timeout: 100 * time.Millisecond, + SendBatchSize: 2000, + MaxInFlightBytesMiB: defaultMaxBytes, } runMetricsProcessorBenchmark(b, cfg) } @@ -906,10 +906,10 @@ func BenchmarkBatchMetricProcessor(b *testing.B) { func BenchmarkMultiBatchMetricProcessor(b *testing.B) { b.StopTimer() cfg := Config{ - Timeout: 100 * time.Millisecond, - SendBatchSize: 2000, - MetadataKeys: []string{"test", "test2"}, - MaxInFlightBytes: defaultMaxBytes, + Timeout: 100 * time.Millisecond, + SendBatchSize: 2000, + MetadataKeys: []string{"test", "test2"}, + MaxInFlightBytesMiB: defaultMaxBytes, } runMetricsProcessorBenchmark(b, cfg) } @@ -957,9 +957,9 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 200 * time.Millisecond, - SendBatchSize: 50, - MaxInFlightBytes: defaultMaxBytes, + Timeout: 200 * time.Millisecond, + SendBatchSize: 50, + MaxInFlightBytesMiB: defaultMaxBytes, } requestCount := 100 @@ -1023,9 +1023,9 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 2 * time.Second, - SendBatchSize: 50, - MaxInFlightBytes: defaultMaxBytes, + Timeout: 2 * time.Second, + SendBatchSize: 50, + MaxInFlightBytesMiB: defaultMaxBytes, } requestCount := 100 @@ -1075,9 +1075,9 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo func TestBatchLogsProcessor_Timeout(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 100, - MaxInFlightBytes: defaultMaxBytes, + Timeout: 3 * time.Second, + SendBatchSize: 100, + MaxInFlightBytesMiB: defaultMaxBytes, } requestCount := 5 logsPerRequest := 10 @@ -1121,9 +1121,9 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { func TestBatchLogProcessor_Shutdown(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 1000, - MaxInFlightBytes: defaultMaxBytes, + Timeout: 3 * time.Second, + SendBatchSize: 1000, + MaxInFlightBytesMiB: defaultMaxBytes, } requestCount := 5 logsPerRequest := 10 @@ -1391,7 +1391,7 @@ func TestBatchZeroConfig(t *testing.T) { // This is a no-op configuration. No need for a timer, no // minimum, no mxaimum, just a pass through. cfg := Config{ - MaxInFlightBytes: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxBytes, } require.NoError(t, cfg.Validate()) @@ -1433,8 +1433,8 @@ func TestBatchSplitOnly(t *testing.T) { const logsPerRequest = 100 cfg := Config{ - SendBatchMaxSize: maxBatch, - MaxInFlightBytes: defaultMaxBytes, + SendBatchMaxSize: maxBatch, + MaxInFlightBytesMiB: defaultMaxBytes, } require.NoError(t, cfg.Validate()) diff --git a/collector/processor/concurrentbatchprocessor/config.go b/collector/processor/concurrentbatchprocessor/config.go index 91c60ff7..d2e40f06 100644 --- a/collector/processor/concurrentbatchprocessor/config.go +++ b/collector/processor/concurrentbatchprocessor/config.go @@ -47,6 +47,9 @@ type Config struct { // MaxInFlightBytes limits the number of bytes in queue waiting to be // processed by the senders. + MaxInFlightBytesMiB uint32 `mapstructure:"max_in_flight_bytes_mib"` + + // Deprecated: Use MaxInFlightBytesMiB instead. MaxInFlightBytes uint32 `mapstructure:"max_in_flight_bytes"` } @@ -68,5 +71,14 @@ func (cfg *Config) Validate() error { if cfg.Timeout < 0 { return errors.New("timeout must be greater or equal to 0") } + + // remove this check once MaxInFlightBytes is removed. + if cfg.MaxInFlightBytes != 0 { + return errors.New("max_in_flight_bytes is deprecated, use max_in_flight_bytes_mib instead") + } + + if cfg.MaxInFlightBytesMiB < 0 { + return errors.New("max_in_flight_bytes_mib must be greater than or equal to 0") + } return nil -} \ No newline at end of file +} diff --git a/collector/processor/concurrentbatchprocessor/config_test.go b/collector/processor/concurrentbatchprocessor/config_test.go index eaed5e30..0a1cff20 100644 --- a/collector/processor/concurrentbatchprocessor/config_test.go +++ b/collector/processor/concurrentbatchprocessor/config_test.go @@ -35,7 +35,7 @@ func TestUnmarshalConfig(t *testing.T) { SendBatchMaxSize: uint32(11000), Timeout: time.Second * 10, MetadataCardinalityLimit: 1000, - MaxInFlightBytes: 12345, + MaxInFlightBytesMiB: 12345, }, cfg) } @@ -74,4 +74,4 @@ func TestValidateConfig_InvalidTimeout(t *testing.T) { func TestValidateConfig_ValidZero(t *testing.T) { cfg := &Config{} assert.NoError(t, cfg.Validate()) -} \ No newline at end of file +} diff --git a/collector/processor/concurrentbatchprocessor/factory.go b/collector/processor/concurrentbatchprocessor/factory.go index 7fe7b255..5694534a 100644 --- a/collector/processor/concurrentbatchprocessor/factory.go +++ b/collector/processor/concurrentbatchprocessor/factory.go @@ -19,7 +19,7 @@ const ( defaultSendBatchSize = uint32(8192) defaultTimeout = 200 * time.Millisecond // default inflight bytes is 2 MiB - defaultMaxBytes = 2 * 1048576 + defaultMaxBytes = 2 * 1048576 // defaultMetadataCardinalityLimit should be set to the number // of metadata configurations the user expects to submit to @@ -45,7 +45,7 @@ func createDefaultConfig() component.Config { return &Config{ SendBatchSize: defaultSendBatchSize, Timeout: defaultTimeout, - MaxInFlightBytes: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxBytes, MetadataCardinalityLimit: defaultMetadataCardinalityLimit, } } diff --git a/collector/processor/concurrentbatchprocessor/testdata/config.yaml b/collector/processor/concurrentbatchprocessor/testdata/config.yaml index ec151d47..c860cb06 100644 --- a/collector/processor/concurrentbatchprocessor/testdata/config.yaml +++ b/collector/processor/concurrentbatchprocessor/testdata/config.yaml @@ -1,4 +1,4 @@ timeout: 10s send_batch_size: 10000 send_batch_max_size: 11000 -max_in_flight_bytes: 12345 \ No newline at end of file +max_in_flight_bytes_mib: 12345 \ No newline at end of file From f9db7f1ca7a0d975f0b01b2c57909263153008db Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Thu, 14 Dec 2023 18:23:00 -0500 Subject: [PATCH 2/2] finally figure out unit test --- .../batch_processor.go | 2 +- .../batch_processor_test.go | 69 +++++++++++-------- .../concurrentbatchprocessor/config.go | 11 ++- .../concurrentbatchprocessor/factory.go | 4 +- 4 files changed, 50 insertions(+), 36 deletions(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 049cd61b..c6283401 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -166,7 +166,7 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func shutdownC: make(chan struct{}, 1), metadataKeys: mks, metadataLimit: int(cfg.MetadataCardinalityLimit), - sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytesMiB)), + sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytesMiB)<<20), } if len(bp.metadataKeys) == 0 { bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)} diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 399ece2d..1868278b 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/semaphore" "github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor/testdata" "go.opentelemetry.io/collector/client" @@ -207,7 +208,10 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) { type blockingConsumer struct { lock sync.Mutex numItems int + numBytesAcquired int64 blocking chan struct{} + sem *semaphore.Weighted + szr *ptrace.ProtoMarshaler } func (bc *blockingConsumer) getItemsWaiting() int { @@ -217,10 +221,15 @@ func (bc *blockingConsumer) getItemsWaiting() int { } func (bc *blockingConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + sz := int64(bc.szr.TracesSize(td)) bc.lock.Lock() bc.numItems += td.SpanCount() + bc.numBytesAcquired += sz bc.lock.Unlock() + bc.sem.Acquire(ctx, sz) + defer bc.sem.Release(sz) <-bc.blocking + return nil } @@ -246,7 +255,11 @@ func calculateMaxInFlightBytesMiB(numRequests, spansPerRequest int) uint32 { td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) szr := &ptrace.ProtoMarshaler{} - return uint32(szr.TracesSize(td) * numRequests) + singleSz := szr.TracesSize(td) + numBytes := uint32(singleSz * numRequests) + numMiB := (numBytes - 1 + 1<<20) >> 20 + + return numMiB } // This test is meant to confirm that semaphore is still @@ -260,7 +273,11 @@ func TestBatchProcessorCancelContext(t *testing.T) { cfg.MaxInFlightBytesMiB = calculateMaxInFlightBytesMiB(requestCount, spansPerRequest) creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - bc := &blockingConsumer{blocking: make(chan struct{}, 1)} + bc := &blockingConsumer{ + blocking: make(chan struct{}, 1), + sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytesMiB<<20)), + szr: &ptrace.ProtoMarshaler{}, + } bp, err := newBatchTracesProcessor(creationSet, bc, cfg, true) require.NoError(t, err) require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) @@ -287,34 +304,26 @@ func TestBatchProcessorCancelContext(t *testing.T) { // check all spans arrived in blockingConsumer. require.Eventually(t, func() bool { - numSpans := requestCount * spansPerRequest + numSpans := (requestCount) * spansPerRequest return bc.getItemsWaiting() == numSpans }, 5*time.Second, 10*time.Millisecond) - // semaphore should be fully acquired at this point. - assert.False(t, bp.sem.TryAcquire(int64(1))) - - wg.Add(1) - go func() { - td := testdata.GenerateTraces(spansPerRequest) - err = bp.ConsumeTraces(ctx, td) - assert.Contains(t, err.Error(), "context canceled") - wg.Done() - }() + // MaxInFlightBytesMiB is the upperbound on in flight bytes, so calculate + // how many free bytes the semaphore has. + excess := int64(cfg.MaxInFlightBytesMiB<<20) - bc.numBytesAcquired + assert.False(t, bp.sem.TryAcquire(excess+1)) // cancel context and wait for ConsumeTraces to return. cancel() wg.Wait() - - // check sending another request does not change the semaphore count, even after ConsumeTraces returns. - assert.False(t, bp.sem.TryAcquire(int64(1))) + assert.False(t, bp.sem.TryAcquire(excess+1)) // signal to the blockingConsumer to return response to waiters. bc.unblock() // Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytesMiB bytes. require.Eventually(t, func() bool { - return bp.sem.TryAcquire(int64(cfg.MaxInFlightBytesMiB)) + return bp.sem.TryAcquire(int64(cfg.MaxInFlightBytesMiB<<20)) }, 5*time.Second, 10*time.Millisecond) require.NoError(t, bp.Shutdown(context.Background())) } @@ -584,7 +593,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { cfg := Config{ Timeout: 3 * time.Second, SendBatchSize: 1000, - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, } sink := new(consumertest.TracesSink) @@ -619,7 +628,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { cfg := Config{ Timeout: 200 * time.Millisecond, SendBatchSize: 50, - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, } requestCount := 100 @@ -685,7 +694,7 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel cfg := Config{ Timeout: 2 * time.Second, SendBatchSize: 50, - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, } requestCount := 100 @@ -759,7 +768,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { cfg := Config{ Timeout: 100 * time.Millisecond, SendBatchSize: 101, - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, } requestCount := 5 metricsPerRequest := 10 @@ -805,7 +814,7 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) { cfg := Config{ Timeout: 3 * time.Second, SendBatchSize: 1000, - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, } requestCount := 5 metricsPerRequest := 10 @@ -898,7 +907,7 @@ func BenchmarkBatchMetricProcessor(b *testing.B) { cfg := Config{ Timeout: 100 * time.Millisecond, SendBatchSize: 2000, - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, } runMetricsProcessorBenchmark(b, cfg) } @@ -909,7 +918,7 @@ func BenchmarkMultiBatchMetricProcessor(b *testing.B) { Timeout: 100 * time.Millisecond, SendBatchSize: 2000, MetadataKeys: []string{"test", "test2"}, - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, } runMetricsProcessorBenchmark(b, cfg) } @@ -959,7 +968,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { cfg := Config{ Timeout: 200 * time.Millisecond, SendBatchSize: 50, - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, } requestCount := 100 @@ -1025,7 +1034,7 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo cfg := Config{ Timeout: 2 * time.Second, SendBatchSize: 50, - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, } requestCount := 100 @@ -1077,7 +1086,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { cfg := Config{ Timeout: 3 * time.Second, SendBatchSize: 100, - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, } requestCount := 5 logsPerRequest := 10 @@ -1123,7 +1132,7 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) { cfg := Config{ Timeout: 3 * time.Second, SendBatchSize: 1000, - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, } requestCount := 5 logsPerRequest := 10 @@ -1391,7 +1400,7 @@ func TestBatchZeroConfig(t *testing.T) { // This is a no-op configuration. No need for a timer, no // minimum, no mxaimum, just a pass through. cfg := Config{ - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, } require.NoError(t, cfg.Validate()) @@ -1434,7 +1443,7 @@ func TestBatchSplitOnly(t *testing.T) { cfg := Config{ SendBatchMaxSize: maxBatch, - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, } require.NoError(t, cfg.Validate()) diff --git a/collector/processor/concurrentbatchprocessor/config.go b/collector/processor/concurrentbatchprocessor/config.go index d2e40f06..6fa48503 100644 --- a/collector/processor/concurrentbatchprocessor/config.go +++ b/collector/processor/concurrentbatchprocessor/config.go @@ -72,9 +72,14 @@ func (cfg *Config) Validate() error { return errors.New("timeout must be greater or equal to 0") } - // remove this check once MaxInFlightBytes is removed. - if cfg.MaxInFlightBytes != 0 { - return errors.New("max_in_flight_bytes is deprecated, use max_in_flight_bytes_mib instead") + if cfg.MaxInFlightBytes != 0 && cfg.MaxInFlightBytesMiB != 0 { + return errors.New("max_in_flight_bytes is deprecated, use only max_in_flight_bytes_mib instead") + } + + if cfg.MaxInFlightBytes > 0 { + // Round up + cfg.MaxInFlightBytesMiB = (cfg.MaxInFlightBytes - 1 + 1<<20) >> 20 + cfg.MaxInFlightBytes = 0 } if cfg.MaxInFlightBytesMiB < 0 { diff --git a/collector/processor/concurrentbatchprocessor/factory.go b/collector/processor/concurrentbatchprocessor/factory.go index 5694534a..2c66276f 100644 --- a/collector/processor/concurrentbatchprocessor/factory.go +++ b/collector/processor/concurrentbatchprocessor/factory.go @@ -19,7 +19,7 @@ const ( defaultSendBatchSize = uint32(8192) defaultTimeout = 200 * time.Millisecond // default inflight bytes is 2 MiB - defaultMaxBytes = 2 * 1048576 + defaultMaxMiB = 2 // defaultMetadataCardinalityLimit should be set to the number // of metadata configurations the user expects to submit to @@ -45,7 +45,7 @@ func createDefaultConfig() component.Config { return &Config{ SendBatchSize: defaultSendBatchSize, Timeout: defaultTimeout, - MaxInFlightBytesMiB: defaultMaxBytes, + MaxInFlightBytesMiB: defaultMaxMiB, MetadataCardinalityLimit: defaultMetadataCardinalityLimit, } }