Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(concurrentbatchprocessor): adds mib suffix to max bytes setting #121

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
shutdownC: make(chan struct{}, 1),
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytes)),
sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytesMiB)<<20),
}
if len(bp.metadataKeys) == 0 {
bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)}
Expand Down
125 changes: 67 additions & 58 deletions collector/processor/concurrentbatchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/semaphore"

"github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor/testdata"
"go.opentelemetry.io/collector/client"
Expand Down Expand Up @@ -207,7 +208,10 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) {
type blockingConsumer struct {
lock sync.Mutex
numItems int
numBytesAcquired int64
blocking chan struct{}
sem *semaphore.Weighted
szr *ptrace.ProtoMarshaler
}

func (bc *blockingConsumer) getItemsWaiting() int {
Expand All @@ -217,10 +221,15 @@ func (bc *blockingConsumer) getItemsWaiting() int {
}

func (bc *blockingConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
sz := int64(bc.szr.TracesSize(td))
bc.lock.Lock()
bc.numItems += td.SpanCount()
bc.numBytesAcquired += sz
bc.lock.Unlock()
bc.sem.Acquire(ctx, sz)
defer bc.sem.Release(sz)
<-bc.blocking

return nil
}

Expand All @@ -234,9 +243,9 @@ func (bc *blockingConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

// helper function to help determine a setting for cfg.MaxInFlightBytes based
// helper function to help determine a setting for cfg.MaxInFlightBytesMiB based
// on the number of requests and number of spans per request.
func calculateMaxInFlightBytes(numRequests, spansPerRequest int) uint32 {
func calculateMaxInFlightBytesMiB(numRequests, spansPerRequest int) uint32 {
sentResourceSpans := ptrace.NewTraces().ResourceSpans()
td := testdata.GenerateTraces(spansPerRequest)
spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
Expand All @@ -246,7 +255,11 @@ func calculateMaxInFlightBytes(numRequests, spansPerRequest int) uint32 {
td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty())

szr := &ptrace.ProtoMarshaler{}
return uint32(szr.TracesSize(td) * numRequests)
singleSz := szr.TracesSize(td)
numBytes := uint32(singleSz * numRequests)
numMiB := (numBytes - 1 + 1<<20) >> 20

return numMiB
}

// This test is meant to confirm that semaphore is still
Expand All @@ -257,10 +270,14 @@ func TestBatchProcessorCancelContext(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.Timeout = 10 * time.Second
cfg.MaxInFlightBytes = calculateMaxInFlightBytes(requestCount, spansPerRequest)
cfg.MaxInFlightBytesMiB = calculateMaxInFlightBytesMiB(requestCount, spansPerRequest)
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bc := &blockingConsumer{blocking: make(chan struct{}, 1)}
bc := &blockingConsumer{
blocking: make(chan struct{}, 1),
sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytesMiB<<20)),
szr: &ptrace.ProtoMarshaler{},
}
bp, err := newBatchTracesProcessor(creationSet, bc, cfg, true)
require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -287,34 +304,26 @@ func TestBatchProcessorCancelContext(t *testing.T) {

// check all spans arrived in blockingConsumer.
require.Eventually(t, func() bool {
numSpans := requestCount * spansPerRequest
numSpans := (requestCount) * spansPerRequest
return bc.getItemsWaiting() == numSpans
}, 5*time.Second, 10*time.Millisecond)

// semaphore should be fully acquired at this point.
assert.False(t, bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(1)))

wg.Add(1)
go func() {
td := testdata.GenerateTraces(spansPerRequest)
err = bp.ConsumeTraces(ctx, td)
assert.Contains(t, err.Error(), "context canceled")
wg.Done()
}()
// MaxInFlightBytesMiB is the upperbound on in flight bytes, so calculate
// how many free bytes the semaphore has.
excess := int64(cfg.MaxInFlightBytesMiB<<20) - bc.numBytesAcquired
assert.False(t, bp.sem.TryAcquire(excess+1))

// cancel context and wait for ConsumeTraces to return.
cancel()
wg.Wait()

// check sending another request does not change the semaphore count, even after ConsumeTraces returns.
assert.False(t, bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(1)))
assert.False(t, bp.sem.TryAcquire(excess+1))

// signal to the blockingConsumer to return response to waiters.
bc.unblock()

// Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytes bytes.
// Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytesMiB bytes.
require.Eventually(t, func() bool {
return bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(cfg.MaxInFlightBytes))
return bp.sem.TryAcquire(int64(cfg.MaxInFlightBytesMiB<<20))
}, 5*time.Second, 10*time.Millisecond)
require.NoError(t, bp.Shutdown(context.Background()))
}
Expand Down Expand Up @@ -582,9 +591,9 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {

func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytesMiB: defaultMaxMiB,
}
sink := new(consumertest.TracesSink)

Expand Down Expand Up @@ -617,9 +626,9 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxMiB,
}

requestCount := 100
Expand Down Expand Up @@ -683,9 +692,9 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxMiB,
}

requestCount := 100
Expand Down Expand Up @@ -757,9 +766,9 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {

func TestBatchMetricsProcessor_Timeout(t *testing.T) {
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 101,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 100 * time.Millisecond,
SendBatchSize: 101,
MaxInFlightBytesMiB: defaultMaxMiB,
}
requestCount := 5
metricsPerRequest := 10
Expand Down Expand Up @@ -803,9 +812,9 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {

func TestBatchMetricProcessor_Shutdown(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytesMiB: defaultMaxMiB,
}
requestCount := 5
metricsPerRequest := 10
Expand Down Expand Up @@ -896,20 +905,20 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) {
func BenchmarkBatchMetricProcessor(b *testing.B) {
b.StopTimer()
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MaxInFlightBytesMiB: defaultMaxMiB,
}
runMetricsProcessorBenchmark(b, cfg)
}

func BenchmarkMultiBatchMetricProcessor(b *testing.B) {
b.StopTimer()
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MetadataKeys: []string{"test", "test2"},
MaxInFlightBytes: defaultMaxBytes,
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MetadataKeys: []string{"test", "test2"},
MaxInFlightBytesMiB: defaultMaxMiB,
}
runMetricsProcessorBenchmark(b, cfg)
}
Expand Down Expand Up @@ -957,9 +966,9 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxMiB,
}

requestCount := 100
Expand Down Expand Up @@ -1023,9 +1032,9 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxMiB,
}

requestCount := 100
Expand Down Expand Up @@ -1075,9 +1084,9 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo

func TestBatchLogsProcessor_Timeout(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 100,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 3 * time.Second,
SendBatchSize: 100,
MaxInFlightBytesMiB: defaultMaxMiB,
}
requestCount := 5
logsPerRequest := 10
Expand Down Expand Up @@ -1121,9 +1130,9 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {

func TestBatchLogProcessor_Shutdown(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytesMiB: defaultMaxMiB,
}
requestCount := 5
logsPerRequest := 10
Expand Down Expand Up @@ -1391,7 +1400,7 @@ func TestBatchZeroConfig(t *testing.T) {
// This is a no-op configuration. No need for a timer, no
// minimum, no mxaimum, just a pass through.
cfg := Config{
MaxInFlightBytes: defaultMaxBytes,
MaxInFlightBytesMiB: defaultMaxMiB,
}

require.NoError(t, cfg.Validate())
Expand Down Expand Up @@ -1433,8 +1442,8 @@ func TestBatchSplitOnly(t *testing.T) {
const logsPerRequest = 100

cfg := Config{
SendBatchMaxSize: maxBatch,
MaxInFlightBytes: defaultMaxBytes,
SendBatchMaxSize: maxBatch,
MaxInFlightBytesMiB: defaultMaxMiB,
}

require.NoError(t, cfg.Validate())
Expand Down
19 changes: 18 additions & 1 deletion collector/processor/concurrentbatchprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type Config struct {

// MaxInFlightBytes limits the number of bytes in queue waiting to be
// processed by the senders.
MaxInFlightBytesMiB uint32 `mapstructure:"max_in_flight_bytes_mib"`

// Deprecated: Use MaxInFlightBytesMiB instead.
MaxInFlightBytes uint32 `mapstructure:"max_in_flight_bytes"`
}

Expand All @@ -68,5 +71,19 @@ func (cfg *Config) Validate() error {
if cfg.Timeout < 0 {
return errors.New("timeout must be greater or equal to 0")
}

if cfg.MaxInFlightBytes != 0 && cfg.MaxInFlightBytesMiB != 0 {
return errors.New("max_in_flight_bytes is deprecated, use only max_in_flight_bytes_mib instead")
}

if cfg.MaxInFlightBytes > 0 {
// Round up
cfg.MaxInFlightBytesMiB = (cfg.MaxInFlightBytes - 1 + 1<<20) >> 20
cfg.MaxInFlightBytes = 0
}

if cfg.MaxInFlightBytesMiB < 0 {
return errors.New("max_in_flight_bytes_mib must be greater than or equal to 0")
}
return nil
}
}
4 changes: 2 additions & 2 deletions collector/processor/concurrentbatchprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestUnmarshalConfig(t *testing.T) {
SendBatchMaxSize: uint32(11000),
Timeout: time.Second * 10,
MetadataCardinalityLimit: 1000,
MaxInFlightBytes: 12345,
MaxInFlightBytesMiB: 12345,
}, cfg)
}

Expand Down Expand Up @@ -74,4 +74,4 @@ func TestValidateConfig_InvalidTimeout(t *testing.T) {
func TestValidateConfig_ValidZero(t *testing.T) {
cfg := &Config{}
assert.NoError(t, cfg.Validate())
}
}
4 changes: 2 additions & 2 deletions collector/processor/concurrentbatchprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
defaultSendBatchSize = uint32(8192)
defaultTimeout = 200 * time.Millisecond
// default inflight bytes is 2 MiB
defaultMaxBytes = 2 * 1048576
defaultMaxMiB = 2

// defaultMetadataCardinalityLimit should be set to the number
// of metadata configurations the user expects to submit to
Expand All @@ -45,7 +45,7 @@ func createDefaultConfig() component.Config {
return &Config{
SendBatchSize: defaultSendBatchSize,
Timeout: defaultTimeout,
MaxInFlightBytes: defaultMaxBytes,
MaxInFlightBytesMiB: defaultMaxMiB,
MetadataCardinalityLimit: defaultMetadataCardinalityLimit,
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
timeout: 10s
send_batch_size: 10000
send_batch_max_size: 11000
max_in_flight_bytes: 12345
max_in_flight_bytes_mib: 12345