Skip to content

Commit

Permalink
Remove concurrent batch processor in-flight bytes (#247)
Browse files Browse the repository at this point in the history
This leaves the in-flight metric, which is still useful. Receivers
should perform admission control or another form of memory limit should
be used.
  • Loading branch information
jmacd authored Sep 6, 2024
1 parent a2a35ab commit cb892b8
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 181 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## Unreleased

- Concurrent batch processor: tracing improvements. [#238](https://github.com/open-telemetry/otel-arrow/pull/238), [#241](https://github.com/open-telemetry/otel-arrow/pull/241)
- Concurrent batch processor: support disabling in-flight limits. [#243](https://github.com/open-telemetry/otel-arrow/pull/243)
- Update to latest OTel-Collector & OTel-Go dependencies. Remove collector packages now included in collector-contrib/internal/otelarrow. [#245](https://github.com/open-telemetry/otel-arrow/pull/245)
- Concurrent batch processor: remove support for in-flight limits. [#247](https://github.com/open-telemetry/otel-arrow/pull/248)

## [0.25.0](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.24.0) - 2024-07-17

Expand Down
7 changes: 1 addition & 6 deletions collector/processor/concurrentbatchprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ The differences in this component, relative to that component are:

1. Synchronous pipeline support: this component blocks each producer
until the request returns with success or an error status code.
2. Maximim in-flight-bytes setting. This component measures the
in-memory size of each request it admits to the pipeline and
otherwise stalls requests until they timeout. This function is
disabled by `max_in_flight_size_mib: 0`.
3. Unlimited concurrency: this component will start as many goroutines
2. Unlimited concurrency: this component will start as many goroutines
as needed to send batches through the pipeline.

Here is an example configuration:
Expand All @@ -22,7 +18,6 @@ Here is an example configuration:
send_batch_max_size: 1500
send_batch_size: 1000
timeout: 1s
max_in_flight_size_mib: 128
```

In this configuration, the component will admit up to 128MiB of
Expand Down
22 changes: 0 additions & 22 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -70,10 +69,6 @@ type batchProcessor struct {
// batcher will be either *singletonBatcher or *multiBatcher
batcher batcher

// in-flight bytes limit mechanism
limitBytes int64
sem *semaphore.Weighted

tracer trace.TracerProvider
}

Expand Down Expand Up @@ -171,8 +166,6 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat
}
sort.Strings(mks)

limitBytes := int64(cfg.MaxInFlightSizeMiB) << 20

tp := set.TelemetrySettings.TracerProvider
if tp == nil {
tp = otel.GetTracerProvider()
Expand All @@ -188,14 +181,9 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat
shutdownC: make(chan struct{}, 1),
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
limitBytes: limitBytes,
tracer: tp,
}

if limitBytes != 0 {
bp.sem = semaphore.NewWeighted(limitBytes)
}

if len(bp.metadataKeys) == 0 {
bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)}
} else {
Expand Down Expand Up @@ -459,9 +447,6 @@ func allSame(x []context.Context) bool {

func (bp *batchProcessor) countAcquire(ctx context.Context, bytes int64) error {
var err error
if bp.sem != nil {
err = bp.sem.Acquire(ctx, bytes)
}
if err == nil && bp.telemetry.batchInFlightBytes != nil {
bp.telemetry.batchInFlightBytes.Add(ctx, bytes, bp.telemetry.processorAttrOption)
}
Expand All @@ -472,9 +457,6 @@ func (bp *batchProcessor) countRelease(bytes int64) {
if bp.telemetry.batchInFlightBytes != nil {
bp.telemetry.batchInFlightBytes.Add(context.Background(), -bytes, bp.telemetry.processorAttrOption)
}
if bp.sem != nil {
bp.sem.Release(bytes)
}
}

func (b *shard) consumeAndWait(ctx context.Context, data any) error {
Expand Down Expand Up @@ -502,10 +484,6 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error {
}
bytes := int64(b.batch.sizeBytes(data))

if bytes > b.processor.limitBytes {
return fmt.Errorf("request size exceeds max-in-flight bytes: %d", bytes)
}

err := b.processor.countAcquire(ctx, bytes)
if err != nil {
return err
Expand Down
Loading

0 comments on commit cb892b8

Please sign in to comment.