Skip to content

Commit

Permalink
(otelarrowreceiver): LIFO-based admission control, waiting limit expr…
Browse files Browse the repository at this point in the history
…essed in MiB of request data
  • Loading branch information
jmacd committed Nov 1, 2024
1 parent 0dcdebf commit 216257d
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 37 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-admission.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Admission control improvements (LIFO); admission.waiter_limit is deprecated, replaced with admission.waiting_limit_mib.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36074]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
7 changes: 1 addition & 6 deletions internal/otelarrow/test/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,12 +714,7 @@ func TestIntegrationAdmissionLimited(t *testing.T) {

testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, rcfg *RecvConfig) {
rcfg.Admission.RequestLimitMiB = admitLimit

// Note: #36074 will change WaiterLimit to WaitingLimitMiB
// measured in bytes, not request count. This test is designed
// to work either way by virtue of having requests that are
// just shy of 1MiB.
rcfg.Admission.WaiterLimit = int64(waitingLimit)
rcfg.Admission.WaitingLimitMiB = waitingLimit

ecfg.Arrow.NumStreams = 10

Expand Down
7 changes: 2 additions & 5 deletions receiver/otelarrowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ type AdmissionConfig struct {
// for processing. When this field is zero, admission control is disabled.
RequestLimitMiB uint64 `mapstructure:"request_limit_mib"`

// WaiterLimit is the limit on the number of waiters waiting to be processed and consumed.
// WaitingLimitMiB is the limit on the amount of data waiting to be consumed.
// This is a dimension of memory limiting to ensure waiters are not consuming an
// unexpectedly large amount of memory in the arrow receiver.
WaiterLimit int64 `mapstructure:"waiter_limit"`
WaitingLimitMiB uint64 `mapstructure:"waiting_limit_mib"`
}

// ArrowConfig support configuring the Arrow receiver.
Expand Down Expand Up @@ -84,8 +84,5 @@ func (cfg *Config) Unmarshal(conf *confmap.Conf) error {
if cfg.Admission.RequestLimitMiB == 0 && cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 {
cfg.Admission.RequestLimitMiB = cfg.Arrow.DeprecatedAdmissionLimitMiB
}
if cfg.Admission.WaiterLimit == 0 && cfg.Arrow.DeprecatedWaiterLimit != 0 {
cfg.Admission.WaiterLimit = cfg.Arrow.DeprecatedWaiterLimit
}
return nil
}
5 changes: 2 additions & 3 deletions receiver/otelarrowreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestUnmarshalConfig(t *testing.T) {
},
Admission: AdmissionConfig{
RequestLimitMiB: 80,
WaiterLimit: 100,
WaitingLimitMiB: 100,
},
}, cfg)

Expand All @@ -107,7 +107,6 @@ func TestValidateDeprecatedConfig(t *testing.T) {
Admission: AdmissionConfig{
// cfg.Validate should now set these fields.
RequestLimitMiB: 80,
WaiterLimit: 100,
},
}, cfg)
}
Expand All @@ -134,7 +133,7 @@ func TestUnmarshalConfigUnix(t *testing.T) {
},
Admission: AdmissionConfig{
RequestLimitMiB: defaultRequestLimitMiB,
WaiterLimit: defaultWaiterLimit,
WaitingLimitMiB: defaultWaitingLimitMiB,
},
}, cfg)
}
Expand Down
4 changes: 2 additions & 2 deletions receiver/otelarrowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (

defaultMemoryLimitMiB = 128
defaultRequestLimitMiB = 128
defaultWaiterLimit = 1000
defaultWaitingLimitMiB = 1000
)

// NewFactory creates a new OTel-Arrow receiver factory.
Expand Down Expand Up @@ -52,7 +52,7 @@ func createDefaultConfig() component.Config {
},
Admission: AdmissionConfig{
RequestLimitMiB: defaultRequestLimitMiB,
WaiterLimit: defaultWaiterLimit,
WaitingLimitMiB: defaultWaitingLimitMiB,
},
}
}
Expand Down
10 changes: 5 additions & 5 deletions receiver/otelarrowreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ type inFlightData struct {

numItems int // how many items
uncompSize int64 // uncompressed data size == how many bytes held in the semaphore
releaser admission.ReleaseFunc
}

func (id *inFlightData) recvDone(ctx context.Context, recvErrPtr *error) {
Expand Down Expand Up @@ -501,10 +502,8 @@ func (id *inFlightData) anyDone(ctx context.Context) {

id.span.End()

if id.uncompSize != 0 {
if err := id.boundedQueue.Release(id.uncompSize); err != nil {
id.telemetry.Logger.Error("release error", zap.Error(err))
}
if id.releaser != nil {
id.releaser()
}

if id.uncompSize != 0 {
Expand Down Expand Up @@ -635,12 +634,13 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
// immediately if there are too many waiters, or will
// otherwise block until timeout or enough memory becomes
// available.
acquireErr := r.boundedQueue.Acquire(inflightCtx, uncompSize)
releaser, acquireErr := r.boundedQueue.Acquire(inflightCtx, uint64(uncompSize))
if acquireErr != nil {
return acquireErr
}
flight.uncompSize = uncompSize
flight.numItems = numItems
flight.releaser = releaser

r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(inflightCtx, uncompSize)
r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(inflightCtx, int64(numItems))
Expand Down
11 changes: 9 additions & 2 deletions receiver/otelarrowreceiver/internal/arrow/arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,12 +449,19 @@ func TestBoundedQueueLimits(t *testing.T) {
require.NoError(t, err)

var bq admission.Queue
// Note that this test exercises the case where there is or is not an
// error unrelated to pending data, thus we pass 0 in both cases as
// the WaitingLimitMiB below.
//
// There is an end-to-end test of admission control, including the
// ResourceExhausted status code we expect, in
// internal/otelarrow/test/e2e_test.go.
if tt.expectErr {
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0)
bq = admission.NewBoundedQueue(noopTelemetry, int64(sizer.TracesSize(td)-100), 10)
bq = admission.NewBoundedQueue(noopTelemetry, uint64(sizer.TracesSize(td)-100), 0)
} else {
ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil)
bq = admission.NewBoundedQueue(noopTelemetry, tt.admitLimit, 10)
bq = admission.NewBoundedQueue(noopTelemetry, uint64(tt.admitLimit), 0)
}

ctc.start(ctc.newRealConsumer, bq)
Expand Down
7 changes: 3 additions & 4 deletions receiver/otelarrowreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog
ctx = r.obsrecv.StartLogsOp(ctx)

var err error
sizeBytes := int64(r.sizer.LogsSize(req.Logs()))
if acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil {
sizeBytes := uint64(r.sizer.LogsSize(req.Logs()))
if releaser, acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil {
err = r.nextConsumer.ConsumeLogs(ctx, ld)
// Release() is not checked, see #36074.
_ = r.boundedQueue.Release(sizeBytes) // immediate release
releaser() // immediate release
} else {
err = acqErr
}
Expand Down
7 changes: 3 additions & 4 deletions receiver/otelarrowreceiver/internal/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p
ctx = r.obsrecv.StartMetricsOp(ctx)

var err error
sizeBytes := int64(r.sizer.MetricsSize(req.Metrics()))
if acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil {
sizeBytes := uint64(r.sizer.MetricsSize(req.Metrics()))
if releaser, acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil {
err = r.nextConsumer.ConsumeMetrics(ctx, md)
// Release() is not checked, see #36074.
_ = r.boundedQueue.Release(sizeBytes) // immediate release
releaser() // immediate release
} else {
err = acqErr
}
Expand Down
7 changes: 3 additions & 4 deletions receiver/otelarrowreceiver/internal/trace/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt
ctx = r.obsrecv.StartTracesOp(ctx)

var err error
sizeBytes := int64(r.sizer.TracesSize(req.Traces()))
if acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil {
sizeBytes := uint64(r.sizer.TracesSize(req.Traces()))
if releaser, acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil {
err = r.nextConsumer.ConsumeTraces(ctx, td)
// Release() is not checked, see #36074.
_ = r.boundedQueue.Release(sizeBytes) // immediate release
releaser() // immediate release
} else {
err = acqErr
}
Expand Down
2 changes: 1 addition & 1 deletion receiver/otelarrowreceiver/otelarrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func newOTelArrowReceiver(cfg *Config, set receiver.Settings) (*otelArrowReceive
if cfg.Admission.RequestLimitMiB == 0 {
bq = admission.NewUnboundedQueue()
} else {
bq = admission.NewBoundedQueue(set.TelemetrySettings, int64(cfg.Admission.RequestLimitMiB<<20), cfg.Admission.WaiterLimit)
bq = admission.NewBoundedQueue(set.TelemetrySettings, uint64(cfg.Admission.RequestLimitMiB<<20), uint64(cfg.Admission.WaitingLimitMiB<<20))
}
r := &otelArrowReceiver{
cfg: cfg,
Expand Down
2 changes: 1 addition & 1 deletion receiver/otelarrowreceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ protocols:
memory_limit_mib: 123
admission:
request_limit_mib: 80
waiter_limit: 100
waiting_limit_mib: 100

0 comments on commit 216257d

Please sign in to comment.