From fb4bbe8f47e7f1138244d8ecbf6499cdfe7f6983 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Thu, 5 Sep 2024 03:00:14 -0500 Subject: [PATCH 01/24] add admission control to otlp path --- receiver/otelarrowreceiver/config.go | 21 ++++++++++-------- receiver/otelarrowreceiver/config_test.go | 8 +++++-- receiver/otelarrowreceiver/factory.go | 4 +++- .../otelarrowreceiver/internal/logs/otlp.go | 19 ++++++++++++++-- .../internal/metrics/otlp.go | 19 ++++++++++++++-- .../otelarrowreceiver/internal/trace/otlp.go | 22 ++++++++++++++++--- receiver/otelarrowreceiver/otelarrow.go | 21 ++++++++++-------- .../otelarrowreceiver/testdata/config.yaml | 1 + 8 files changed, 87 insertions(+), 28 deletions(-) diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index cc2d47c929cb..1773adf1c28d 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -14,17 +14,12 @@ import ( // Protocols is the configuration for the supported protocols. type Protocols struct { - GRPC configgrpc.ServerConfig `mapstructure:"grpc"` - Arrow ArrowConfig `mapstructure:"arrow"` + GRPC configgrpc.ServerConfig `mapstructure:"grpc"` + Arrow ArrowConfig `mapstructure:"arrow"` + Admission AdmissionConfig `mapstructure:"admission"` } -// ArrowConfig support configuring the Arrow receiver. -type ArrowConfig struct { - // MemoryLimitMiB is the size of a shared memory region used - // by all Arrow streams, in MiB. When too much load is - // passing through, they will see ResourceExhausted errors. - MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"` - +type AdmissionConfig struct { // AdmissionLimitMiB limits the number of requests that are received by the stream based on // request size information available. Request size is used to control how much traffic we admit // for processing, but does not control how much memory is used during request processing. @@ -34,6 +29,14 @@ type ArrowConfig struct { // This is a dimension of memory limiting to ensure waiters are not consuming an // unexpectedly large amount of memory in the arrow receiver. WaiterLimit int64 `mapstructure:"waiter_limit"` +} + +// ArrowConfig support configuring the Arrow receiver. +type ArrowConfig struct { + // MemoryLimitMiB is the size of a shared memory region used + // by all Arrow streams, in MiB. When too much load is + // passing through, they will see ResourceExhausted errors. + MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"` // Zstd settings apply to OTel-Arrow use of gRPC specifically. Zstd zstd.DecoderConfig `mapstructure:"zstd"` diff --git a/receiver/otelarrowreceiver/config_test.go b/receiver/otelarrowreceiver/config_test.go index 0f373c65df51..9d99499901ab 100644 --- a/receiver/otelarrowreceiver/config_test.go +++ b/receiver/otelarrowreceiver/config_test.go @@ -77,7 +77,9 @@ func TestUnmarshalConfig(t *testing.T) { }, }, Arrow: ArrowConfig{ - MemoryLimitMiB: 123, + MemoryLimitMiB: 123, + }, + Admission: AdmissionConfig{ AdmissionLimitMiB: 80, WaiterLimit: 100, }, @@ -103,7 +105,9 @@ func TestUnmarshalConfigUnix(t *testing.T) { ReadBufferSize: 512 * 1024, }, Arrow: ArrowConfig{ - MemoryLimitMiB: defaultMemoryLimitMiB, + MemoryLimitMiB: defaultMemoryLimitMiB, + }, + Admission: AdmissionConfig{ AdmissionLimitMiB: defaultAdmissionLimitMiB, WaiterLimit: defaultWaiterLimit, }, diff --git a/receiver/otelarrowreceiver/factory.go b/receiver/otelarrowreceiver/factory.go index 9c4c57dcab60..e79349e61cf1 100644 --- a/receiver/otelarrowreceiver/factory.go +++ b/receiver/otelarrowreceiver/factory.go @@ -47,7 +47,9 @@ func createDefaultConfig() component.Config { ReadBufferSize: 512 * 1024, }, Arrow: ArrowConfig{ - MemoryLimitMiB: defaultMemoryLimitMiB, + MemoryLimitMiB: defaultMemoryLimitMiB, + }, + Admission: AdmissionConfig{ AdmissionLimitMiB: defaultAdmissionLimitMiB, WaiterLimit: defaultWaiterLimit, }, diff --git a/receiver/otelarrowreceiver/internal/logs/otlp.go b/receiver/otelarrowreceiver/internal/logs/otlp.go index 72f62dc9d28d..2e9e84ee1811 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp.go @@ -7,8 +7,11 @@ import ( "context" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" ) const dataFormatProtobuf = "protobuf" @@ -18,13 +21,17 @@ type Receiver struct { plogotlp.UnimplementedGRPCServer nextConsumer consumer.Logs obsrecv *receiverhelper.ObsReport + boundedQueue *admission.BoundedQueue + sizer *plog.ProtoMarshaler } // New creates a new Receiver reference. -func New(nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport) *Receiver { +func New(nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, + boundedQueue: bq, + sizer: &plog.ProtoMarshaler{}, } } @@ -37,7 +44,15 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog } ctx = r.obsrecv.StartLogsOp(ctx) - err := r.nextConsumer.ConsumeLogs(ctx, ld) + + sizeBytes := int64(r.sizer.LogsSize(req.Logs())) + err := r.boundedQueue.Acquire(ctx, sizeBytes) + if err != nil { + return plogotlp.NewExportResponse(), err + } + defer r.boundedQueue.Release(sizeBytes) + + err = r.nextConsumer.ConsumeLogs(ctx, ld) r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err) return plogotlp.NewExportResponse(), err diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp.go b/receiver/otelarrowreceiver/internal/metrics/otlp.go index 77e12a86ce14..e4ee5151f31d 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp.go @@ -7,8 +7,11 @@ import ( "context" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" ) const dataFormatProtobuf = "protobuf" @@ -18,13 +21,17 @@ type Receiver struct { pmetricotlp.UnimplementedGRPCServer nextConsumer consumer.Metrics obsrecv *receiverhelper.ObsReport + boundedQueue *admission.BoundedQueue + sizer *pmetric.ProtoMarshaler } // New creates a new Receiver reference. -func New(nextConsumer consumer.Metrics, obsrecv *receiverhelper.ObsReport) *Receiver { +func New(nextConsumer consumer.Metrics, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, + boundedQueue: bq, + sizer: &pmetric.ProtoMarshaler{}, } } @@ -37,7 +44,15 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p } ctx = r.obsrecv.StartMetricsOp(ctx) - err := r.nextConsumer.ConsumeMetrics(ctx, md) + + sizeBytes := int64(r.sizer.MetricsSize(req.Metrics())) + err := r.boundedQueue.Acquire(ctx, sizeBytes) + if err != nil { + return pmetricotlp.NewExportResponse(), err + } + defer r.boundedQueue.Release(sizeBytes) + + err = r.nextConsumer.ConsumeMetrics(ctx, md) r.obsrecv.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err) return pmetricotlp.NewExportResponse(), err diff --git a/receiver/otelarrowreceiver/internal/trace/otlp.go b/receiver/otelarrowreceiver/internal/trace/otlp.go index 82d836ed8b7e..65821c1cc361 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp.go @@ -5,10 +5,14 @@ package trace // import "github.com/open-telemetry/opentelemetry-collector-contr import ( "context" + "fmt" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" ) const dataFormatProtobuf = "protobuf" @@ -18,13 +22,17 @@ type Receiver struct { ptraceotlp.UnimplementedGRPCServer nextConsumer consumer.Traces obsrecv *receiverhelper.ObsReport + boundedQueue *admission.BoundedQueue + sizer *ptrace.ProtoMarshaler } // New creates a new Receiver reference. -func New(nextConsumer consumer.Traces, obsrecv *receiverhelper.ObsReport) *Receiver { +func New(nextConsumer consumer.Traces, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, + boundedQueue: bq, + sizer: &ptrace.ProtoMarshaler{}, } } @@ -36,9 +44,17 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt if numSpans == 0 { return ptraceotlp.NewExportResponse(), nil } - ctx = r.obsrecv.StartTracesOp(ctx) - err := r.nextConsumer.ConsumeTraces(ctx, td) + + sizeBytes := int64(r.sizer.TracesSize(req.Traces())) + fmt.Println(sizeBytes) + err := r.boundedQueue.Acquire(ctx, sizeBytes) + if err != nil { + return ptraceotlp.NewExportResponse(), err + } + defer r.boundedQueue.Release(sizeBytes) + + err = r.nextConsumer.ConsumeTraces(ctx, td) r.obsrecv.EndTracesOp(ctx, dataFormatProtobuf, numSpans, err) return ptraceotlp.NewExportResponse(), err diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index b90c9c36a615..e678c58b2437 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -43,8 +43,9 @@ type otelArrowReceiver struct { arrowReceiver *arrow.Receiver shutdownWG sync.WaitGroup - obsrepGRPC *receiverhelper.ObsReport - netReporter *netstats.NetworkReporter + obsrepGRPC *receiverhelper.ObsReport + netReporter *netstats.NetworkReporter + boundedQueue *admission.BoundedQueue settings receiver.Settings } @@ -57,10 +58,12 @@ func newOTelArrowReceiver(cfg *Config, set receiver.Settings) (*otelArrowReceive if err != nil { return nil, err } + bq := admission.NewBoundedQueue(int64(cfg.Admission.AdmissionLimitMiB<<20), cfg.Admission.WaiterLimit) r := &otelArrowReceiver{ - cfg: cfg, - settings: set, - netReporter: netReporter, + cfg: cfg, + settings: set, + netReporter: netReporter, + boundedQueue: bq, } if err = zstd.SetDecoderConfig(cfg.Arrow.Zstd); err != nil { return nil, err @@ -127,7 +130,7 @@ func (r *otelArrowReceiver) startProtocolServers(ctx context.Context, host compo opts = append(opts, arrowRecord.WithMeterProvider(r.settings.TelemetrySettings.MeterProvider, r.settings.TelemetrySettings.MetricsLevel)) } return arrowRecord.NewConsumer(opts...) - }, bq, r.netReporter) + }, r.boundedQueue, r.netReporter) if err != nil { return err @@ -178,15 +181,15 @@ func (r *otelArrowReceiver) Shutdown(_ context.Context) error { } func (r *otelArrowReceiver) registerTraceConsumer(tc consumer.Traces) { - r.tracesReceiver = trace.New(tc, r.obsrepGRPC) + r.tracesReceiver = trace.New(tc, r.obsrepGRPC, r.boundedQueue) } func (r *otelArrowReceiver) registerMetricsConsumer(mc consumer.Metrics) { - r.metricsReceiver = metrics.New(mc, r.obsrepGRPC) + r.metricsReceiver = metrics.New(mc, r.obsrepGRPC, r.boundedQueue) } func (r *otelArrowReceiver) registerLogsConsumer(lc consumer.Logs) { - r.logsReceiver = logs.New(lc, r.obsrepGRPC) + r.logsReceiver = logs.New(lc, r.obsrepGRPC, r.boundedQueue) } var _ arrow.Consumers = &otelArrowReceiver{} diff --git a/receiver/otelarrowreceiver/testdata/config.yaml b/receiver/otelarrowreceiver/testdata/config.yaml index 726263f82f9f..f565b1290892 100644 --- a/receiver/otelarrowreceiver/testdata/config.yaml +++ b/receiver/otelarrowreceiver/testdata/config.yaml @@ -27,5 +27,6 @@ protocols: permit_without_stream: true arrow: memory_limit_mib: 123 + admission: admission_limit_mib: 80 waiter_limit: 100 From 9706583ca1967f705a36a99326ec1f6465150b1a Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 6 Sep 2024 00:51:38 -0500 Subject: [PATCH 02/24] support deprecated option + changelog + unit tests --- .chloggen/admission-control-for-otlp.yaml | 27 ++++++++ receiver/otelarrowreceiver/config.go | 28 +++++++- .../internal/common/blocking_consumer.go | 46 +++++++++++++ .../internal/logs/otlp_test.go | 69 ++++++++++++++++++- .../internal/metrics/otlp_test.go | 69 ++++++++++++++++++- .../otelarrowreceiver/internal/trace/otlp.go | 2 - .../internal/trace/otlp_test.go | 69 ++++++++++++++++++- 7 files changed, 302 insertions(+), 8 deletions(-) create mode 100644 .chloggen/admission-control-for-otlp.yaml create mode 100644 receiver/otelarrowreceiver/internal/common/blocking_consumer.go diff --git a/.chloggen/admission-control-for-otlp.yaml b/.chloggen/admission-control-for-otlp.yaml new file mode 100644 index 000000000000..f69d3149a33f --- /dev/null +++ b/.chloggen/admission-control-for-otlp.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add admission control in the otelarrow receiver's standard otlp data path. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35021] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index 1773adf1c28d..db279c836b59 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -14,9 +14,9 @@ import ( // Protocols is the configuration for the supported protocols. type Protocols struct { - GRPC configgrpc.ServerConfig `mapstructure:"grpc"` - Arrow ArrowConfig `mapstructure:"arrow"` - Admission AdmissionConfig `mapstructure:"admission"` + GRPC configgrpc.ServerConfig `mapstructure:"grpc"` + Arrow ArrowConfig `mapstructure:"arrow"` + Admission AdmissionConfig `mapstructure:"admission"` } type AdmissionConfig struct { @@ -38,6 +38,12 @@ type ArrowConfig struct { // passing through, they will see ResourceExhausted errors. MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"` + // Deprecated: This field is no longer supported, use cfg.Admission.AdmissionLimitMiB instead. + AdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"` + + // Deprecated: This field is no longer supported, use cfg.Admission.WaiterLimit instead. + WaiterLimit int64 `mapstructure:"waiter_limit"` + // Zstd settings apply to OTel-Arrow use of gRPC specifically. Zstd zstd.DecoderConfig `mapstructure:"zstd"` } @@ -57,3 +63,19 @@ func (cfg *ArrowConfig) Validate() error { } return nil } + +func (cfg *Config) Validate() error { + if err := cfg.GRPC.Validate(); err != nil { + return err + } + if err := cfg.Arrow.Validate(); err != nil { + return err + } + if cfg.Arrow.AdmissionLimitMiB != 0 { + cfg.Admission.AdmissionLimitMiB = cfg.Arrow.AdmissionLimitMiB + } + if cfg.Arrow.WaiterLimit != 0 { + cfg.Admission.WaiterLimit = cfg.Arrow.WaiterLimit + } + return nil +} diff --git a/receiver/otelarrowreceiver/internal/common/blocking_consumer.go b/receiver/otelarrowreceiver/internal/common/blocking_consumer.go new file mode 100644 index 000000000000..fa7bd721727c --- /dev/null +++ b/receiver/otelarrowreceiver/internal/common/blocking_consumer.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package common + +import( + "context" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type BlockingConsumer struct{ + block chan struct{} +} + +func NewBlockingConsumer() *BlockingConsumer { + return &BlockingConsumer{ + block: make(chan struct{}), + } +} + +func (bc *BlockingConsumer) ConsumeTraces(ctx context.Context, _ ptrace.Traces) error { + <-bc.block + return nil +} + +func (bc *BlockingConsumer) ConsumeMetrics(ctx context.Context, _ pmetric.Metrics) error { + <-bc.block + return nil +} + +func (bc *BlockingConsumer) ConsumeLogs(ctx context.Context, _ plog.Logs) error { + <-bc.block + return nil +} + +func (bc *BlockingConsumer) Unblock() { + close(bc.block) +} + +func (bc *BlockingConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} \ No newline at end of file diff --git a/receiver/otelarrowreceiver/internal/logs/otlp_test.go b/receiver/otelarrowreceiver/internal/logs/otlp_test.go index 8c00b1c78f17..fcc7ffec41a0 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp_test.go @@ -7,7 +7,9 @@ import ( "context" "errors" "net" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -19,8 +21,16 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "go.uber.org/multierr" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/common" +) + +const ( + maxWaiters = 10 + maxBytes = int64(250) ) func TestExport(t *testing.T) { @@ -57,6 +67,61 @@ func TestExport_ErrorConsumer(t *testing.T) { assert.Equal(t, plogotlp.ExportResponse{}, resp) } +func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { + ld := testdata.GenerateLogs(10) + logSink := new(consumertest.LogsSink) + req := plogotlp.NewExportRequestFromLogs(ld) + + logClient := makeLogsServiceClient(t, logSink) + resp, err := logClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = Unknown desc = rejecting request, request size larger than configured limit") + assert.Equal(t, plogotlp.ExportResponse{}, resp) +} + +func TestExport_TooManyWaiters(t *testing.T) { + bc := common.NewBlockingConsumer() + + logsClient := makeLogsServiceClient(t, bc) + bg := context.Background() + var errs, err error + ld := testdata.GenerateLogs(1) + req := plogotlp.NewExportRequestFromLogs(ld) + var mtx sync.Mutex + numResponses := 0 + // Send request that will acquire all of the semaphores bytes and block. + go func() { + _, err = logsClient.Export(bg, req) + mtx.Lock() + errs = multierr.Append(errs, err) + numResponses+=1 + mtx.Unlock() + }() + + for i := 0; i < maxWaiters+1; i++ { + go func() { + _, err := logsClient.Export(bg, req) + mtx.Lock() + errs = multierr.Append(errs, err) + numResponses+=1 + mtx.Unlock() + }() + } + + // sleep so all async requests are blocked on semaphore Acquire. + time.Sleep(1 * time.Second) + + // unblock and wait for errors to be returned and written. + bc.Unblock() + assert.Eventually(t, func() bool { + mtx.Lock() + defer mtx.Unlock() + errSlice := multierr.Errors(errs) + return numResponses == maxWaiters+2 && len(errSlice) == 1 + }, 3 * time.Second, 10 * time.Millisecond) + + assert.ErrorContains(t, errs, "too many waiters") +} + func makeLogsServiceClient(t *testing.T, lc consumer.Logs) plogotlp.GRPCClient { addr := otlpReceiverOnGRPCServer(t, lc) cc, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -84,7 +149,9 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr { ReceiverCreateSettings: set, }) require.NoError(t, err) - r := New(lc, obsrecv) + + bq := admission.NewBoundedQueue(maxBytes, maxWaiters) + r := New(lc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() plogotlp.RegisterGRPCServer(srv, r) diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go index 71c4f939d813..f25a3e21cf3a 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go @@ -7,7 +7,9 @@ import ( "context" "errors" "net" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -19,8 +21,16 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "go.uber.org/multierr" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/common" +) + +const ( + maxWaiters = 10 + maxBytes = int64(250) ) func TestExport(t *testing.T) { @@ -57,6 +67,61 @@ func TestExport_ErrorConsumer(t *testing.T) { assert.Equal(t, pmetricotlp.ExportResponse{}, resp) } +func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { + md := testdata.GenerateMetrics(10) + metricSink := new(consumertest.MetricsSink) + req := pmetricotlp.NewExportRequestFromMetrics(md) + + metricsClient := makeMetricsServiceClient(t, metricSink) + resp, err := metricsClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = Unknown desc = rejecting request, request size larger than configured limit") + assert.Equal(t, pmetricotlp.ExportResponse{}, resp) +} + +func TestExport_TooManyWaiters(t *testing.T) { + bc := common.NewBlockingConsumer() + + metricsClient := makeMetricsServiceClient(t, bc) + bg := context.Background() + var errs, err error + md := testdata.GenerateMetrics(1) + req := pmetricotlp.NewExportRequestFromMetrics(md) + var mtx sync.Mutex + numResponses := 0 + // Send request that will acquire all of the semaphores bytes and block. + go func() { + _, err = metricsClient.Export(bg, req) + mtx.Lock() + errs = multierr.Append(errs, err) + numResponses+=1 + mtx.Unlock() + }() + + for i := 0; i < maxWaiters+1; i++ { + go func() { + _, err := metricsClient.Export(bg, req) + mtx.Lock() + errs = multierr.Append(errs, err) + numResponses+=1 + mtx.Unlock() + }() + } + + // sleep so all async requests are blocked on semaphore Acquire. + time.Sleep(1 * time.Second) + + // unblock and wait for errors to be returned and written. + bc.Unblock() + assert.Eventually(t, func() bool { + mtx.Lock() + defer mtx.Unlock() + errSlice := multierr.Errors(errs) + return numResponses == maxWaiters+2 && len(errSlice) == 1 + }, 3 * time.Second, 10 * time.Millisecond) + + assert.ErrorContains(t, errs, "too many waiters") +} + func makeMetricsServiceClient(t *testing.T, mc consumer.Metrics) pmetricotlp.GRPCClient { addr := otlpReceiverOnGRPCServer(t, mc) @@ -85,7 +150,9 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { ReceiverCreateSettings: set, }) require.NoError(t, err) - r := New(mc, obsrecv) + + bq := admission.NewBoundedQueue(maxBytes, maxWaiters) + r := New(mc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() pmetricotlp.RegisterGRPCServer(srv, r) diff --git a/receiver/otelarrowreceiver/internal/trace/otlp.go b/receiver/otelarrowreceiver/internal/trace/otlp.go index 65821c1cc361..9200794a7e3a 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp.go @@ -5,7 +5,6 @@ package trace // import "github.com/open-telemetry/opentelemetry-collector-contr import ( "context" - "fmt" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" @@ -47,7 +46,6 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt ctx = r.obsrecv.StartTracesOp(ctx) sizeBytes := int64(r.sizer.TracesSize(req.Traces())) - fmt.Println(sizeBytes) err := r.boundedQueue.Acquire(ctx, sizeBytes) if err != nil { return ptraceotlp.NewExportResponse(), err diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go index 2ce1929e875a..9d6ec891e4b0 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -7,7 +7,9 @@ import ( "context" "errors" "net" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -19,8 +21,16 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "go.uber.org/multierr" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/common" +) + +const ( + maxWaiters = 10 + maxBytes = int64(250) ) func TestExport(t *testing.T) { @@ -55,6 +65,62 @@ func TestExport_ErrorConsumer(t *testing.T) { assert.Equal(t, ptraceotlp.ExportResponse{}, resp) } +func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { + td := testdata.GenerateTraces(10) + traceSink := new(consumertest.TracesSink) + req := ptraceotlp.NewExportRequestFromTraces(td) + + traceClient := makeTraceServiceClient(t, traceSink) + + resp, err := traceClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = Unknown desc = rejecting request, request size larger than configured limit") + assert.Equal(t, ptraceotlp.ExportResponse{}, resp) +} + +func TestExport_TooManyWaiters(t *testing.T) { + bc := common.NewBlockingConsumer() + + traceClient := makeTraceServiceClient(t, bc) + bg := context.Background() + var errs, err error + td := testdata.GenerateTraces(1) + req := ptraceotlp.NewExportRequestFromTraces(td) + var mtx sync.Mutex + numResponses := 0 + // Send request that will acquire all of the semaphores bytes and block. + go func() { + _, err = traceClient.Export(bg, req) + mtx.Lock() + errs = multierr.Append(errs, err) + numResponses+=1 + mtx.Unlock() + }() + + for i := 0; i < maxWaiters+1; i++ { + go func() { + _, err := traceClient.Export(bg, req) + mtx.Lock() + errs = multierr.Append(errs, err) + numResponses+=1 + mtx.Unlock() + }() + } + + // sleep so all async requests are blocked on semaphore Acquire. + time.Sleep(1 * time.Second) + + // unblock and wait for errors to be returned and written. + bc.Unblock() + assert.Eventually(t, func() bool { + mtx.Lock() + defer mtx.Unlock() + errSlice := multierr.Errors(errs) + return numResponses == maxWaiters+2 && len(errSlice) == 1 + }, 3 * time.Second, 10 * time.Millisecond) + + assert.ErrorContains(t, errs, "too many waiters") +} + func makeTraceServiceClient(t *testing.T, tc consumer.Traces) ptraceotlp.GRPCClient { addr := otlpReceiverOnGRPCServer(t, tc) cc, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -82,7 +148,8 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { ReceiverCreateSettings: set, }) require.NoError(t, err) - r := New(tc, obsrecv) + bq := admission.NewBoundedQueue(maxBytes, maxWaiters) + r := New(tc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() ptraceotlp.RegisterGRPCServer(srv, r) From 35d9d0986de1537f33a4565e69226d052566b71d Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 6 Sep 2024 01:04:29 -0500 Subject: [PATCH 03/24] rename --- .../internal/{common => testing}/blocking_consumer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename receiver/otelarrowreceiver/internal/{common => testing}/blocking_consumer.go (73%) diff --git a/receiver/otelarrowreceiver/internal/common/blocking_consumer.go b/receiver/otelarrowreceiver/internal/testing/blocking_consumer.go similarity index 73% rename from receiver/otelarrowreceiver/internal/common/blocking_consumer.go rename to receiver/otelarrowreceiver/internal/testing/blocking_consumer.go index fa7bd721727c..e658e4d2940e 100644 --- a/receiver/otelarrowreceiver/internal/common/blocking_consumer.go +++ b/receiver/otelarrowreceiver/internal/testing/blocking_consumer.go @@ -22,17 +22,17 @@ func NewBlockingConsumer() *BlockingConsumer { } } -func (bc *BlockingConsumer) ConsumeTraces(ctx context.Context, _ ptrace.Traces) error { +func (bc *BlockingConsumer) ConsumeTraces(_ context.Context, _ ptrace.Traces) error { <-bc.block return nil } -func (bc *BlockingConsumer) ConsumeMetrics(ctx context.Context, _ pmetric.Metrics) error { +func (bc *BlockingConsumer) ConsumeMetrics(_ context.Context, _ pmetric.Metrics) error { <-bc.block return nil } -func (bc *BlockingConsumer) ConsumeLogs(ctx context.Context, _ plog.Logs) error { +func (bc *BlockingConsumer) ConsumeLogs(_ context.Context, _ plog.Logs) error { <-bc.block return nil } From b3feccfcf09fd526d4c24aeed3d101a6bf4b4a9d Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 6 Sep 2024 01:57:18 -0500 Subject: [PATCH 04/24] replace statement? --- receiver/otelarrowreceiver/go.mod | 2 ++ receiver/otelarrowreceiver/internal/logs/otlp_test.go | 4 ++-- receiver/otelarrowreceiver/internal/metrics/otlp_test.go | 4 ++-- .../internal/{testing => testconsumer}/blocking_consumer.go | 2 +- receiver/otelarrowreceiver/internal/trace/otlp_test.go | 4 ++-- 5 files changed, 9 insertions(+), 7 deletions(-) rename receiver/otelarrowreceiver/internal/{testing => testconsumer}/blocking_consumer.go (97%) diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index d1456a4725a4..7e2fc0f7442f 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -108,3 +108,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/share replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter => ../../exporter/otelarrowexporter replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../../internal/grpcutil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/internal/testconsumer => ./internal/testconsumer diff --git a/receiver/otelarrowreceiver/internal/logs/otlp_test.go b/receiver/otelarrowreceiver/internal/logs/otlp_test.go index fcc7ffec41a0..631f400b3618 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp_test.go @@ -25,7 +25,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" ) const ( @@ -79,7 +79,7 @@ func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { } func TestExport_TooManyWaiters(t *testing.T) { - bc := common.NewBlockingConsumer() + bc := testconsumer.NewBlockingConsumer() logsClient := makeLogsServiceClient(t, bc) bg := context.Background() diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go index f25a3e21cf3a..c0b915b91b94 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go @@ -25,7 +25,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" ) const ( @@ -79,7 +79,7 @@ func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { } func TestExport_TooManyWaiters(t *testing.T) { - bc := common.NewBlockingConsumer() + bc := testconsumer.NewBlockingConsumer() metricsClient := makeMetricsServiceClient(t, bc) bg := context.Background() diff --git a/receiver/otelarrowreceiver/internal/testing/blocking_consumer.go b/receiver/otelarrowreceiver/internal/testconsumer/blocking_consumer.go similarity index 97% rename from receiver/otelarrowreceiver/internal/testing/blocking_consumer.go rename to receiver/otelarrowreceiver/internal/testconsumer/blocking_consumer.go index e658e4d2940e..b23349876607 100644 --- a/receiver/otelarrowreceiver/internal/testing/blocking_consumer.go +++ b/receiver/otelarrowreceiver/internal/testconsumer/blocking_consumer.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package common +package testconsumer import( "context" diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go index 9d6ec891e4b0..2efa7a8d4d5a 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -25,7 +25,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" ) const ( @@ -78,7 +78,7 @@ func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { } func TestExport_TooManyWaiters(t *testing.T) { - bc := common.NewBlockingConsumer() + bc := testconsumer.NewBlockingConsumer() traceClient := makeTraceServiceClient(t, bc) bg := context.Background() From 36fc7b2b77219d1eb0a3c8b509ded78b506272a5 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 6 Sep 2024 02:53:41 -0500 Subject: [PATCH 05/24] lint --- receiver/otelarrowreceiver/internal/logs/otlp.go | 4 +++- receiver/otelarrowreceiver/internal/metrics/otlp.go | 4 +++- .../internal/testconsumer/blocking_consumer.go | 8 ++++---- receiver/otelarrowreceiver/internal/trace/otlp.go | 4 +++- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/receiver/otelarrowreceiver/internal/logs/otlp.go b/receiver/otelarrowreceiver/internal/logs/otlp.go index 2e9e84ee1811..0da759260dd8 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp.go @@ -50,7 +50,9 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog if err != nil { return plogotlp.NewExportResponse(), err } - defer r.boundedQueue.Release(sizeBytes) + defer func() { + err = r.boundedQueue.Release(sizeBytes) + }() err = r.nextConsumer.ConsumeLogs(ctx, ld) r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err) diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp.go b/receiver/otelarrowreceiver/internal/metrics/otlp.go index e4ee5151f31d..8767b84ccb75 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp.go @@ -50,7 +50,9 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p if err != nil { return pmetricotlp.NewExportResponse(), err } - defer r.boundedQueue.Release(sizeBytes) + defer func() { + err = r.boundedQueue.Release(sizeBytes) + }() err = r.nextConsumer.ConsumeMetrics(ctx, md) r.obsrecv.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err) diff --git a/receiver/otelarrowreceiver/internal/testconsumer/blocking_consumer.go b/receiver/otelarrowreceiver/internal/testconsumer/blocking_consumer.go index b23349876607..b132048dbb8b 100644 --- a/receiver/otelarrowreceiver/internal/testconsumer/blocking_consumer.go +++ b/receiver/otelarrowreceiver/internal/testconsumer/blocking_consumer.go @@ -1,9 +1,9 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package testconsumer +package testconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" -import( +import ( "context" "go.opentelemetry.io/collector/consumer" @@ -12,7 +12,7 @@ import( "go.opentelemetry.io/collector/pdata/ptrace" ) -type BlockingConsumer struct{ +type BlockingConsumer struct { block chan struct{} } @@ -43,4 +43,4 @@ func (bc *BlockingConsumer) Unblock() { func (bc *BlockingConsumer) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} -} \ No newline at end of file +} diff --git a/receiver/otelarrowreceiver/internal/trace/otlp.go b/receiver/otelarrowreceiver/internal/trace/otlp.go index 9200794a7e3a..6837fcbea4b3 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp.go @@ -50,7 +50,9 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt if err != nil { return ptraceotlp.NewExportResponse(), err } - defer r.boundedQueue.Release(sizeBytes) + defer func() { + err = r.boundedQueue.Release(sizeBytes) + }() err = r.nextConsumer.ConsumeTraces(ctx, td) r.obsrecv.EndTracesOp(ctx, dataFormatProtobuf, numSpans, err) From 75788fd78b8e8f2719b467fa3580634ba4a3efc5 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 6 Sep 2024 03:00:22 -0500 Subject: [PATCH 06/24] lint --- receiver/otelarrowreceiver/internal/logs/otlp_test.go | 10 +++++----- .../otelarrowreceiver/internal/metrics/otlp_test.go | 10 +++++----- receiver/otelarrowreceiver/internal/trace/otlp_test.go | 10 +++++----- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/receiver/otelarrowreceiver/internal/logs/otlp_test.go b/receiver/otelarrowreceiver/internal/logs/otlp_test.go index 631f400b3618..3fccaeaadda1 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp_test.go @@ -19,9 +19,9 @@ import ( "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/multierr" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "go.uber.org/multierr" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" @@ -30,7 +30,7 @@ import ( const ( maxWaiters = 10 - maxBytes = int64(250) + maxBytes = int64(250) ) func TestExport(t *testing.T) { @@ -93,7 +93,7 @@ func TestExport_TooManyWaiters(t *testing.T) { _, err = logsClient.Export(bg, req) mtx.Lock() errs = multierr.Append(errs, err) - numResponses+=1 + numResponses += 1 mtx.Unlock() }() @@ -102,7 +102,7 @@ func TestExport_TooManyWaiters(t *testing.T) { _, err := logsClient.Export(bg, req) mtx.Lock() errs = multierr.Append(errs, err) - numResponses+=1 + numResponses += 1 mtx.Unlock() }() } @@ -117,7 +117,7 @@ func TestExport_TooManyWaiters(t *testing.T) { defer mtx.Unlock() errSlice := multierr.Errors(errs) return numResponses == maxWaiters+2 && len(errSlice) == 1 - }, 3 * time.Second, 10 * time.Millisecond) + }, 3*time.Second, 10*time.Millisecond) assert.ErrorContains(t, errs, "too many waiters") } diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go index c0b915b91b94..03a14dbc721d 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go @@ -19,9 +19,9 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/multierr" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "go.uber.org/multierr" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" @@ -30,7 +30,7 @@ import ( const ( maxWaiters = 10 - maxBytes = int64(250) + maxBytes = int64(250) ) func TestExport(t *testing.T) { @@ -93,7 +93,7 @@ func TestExport_TooManyWaiters(t *testing.T) { _, err = metricsClient.Export(bg, req) mtx.Lock() errs = multierr.Append(errs, err) - numResponses+=1 + numResponses += 1 mtx.Unlock() }() @@ -102,7 +102,7 @@ func TestExport_TooManyWaiters(t *testing.T) { _, err := metricsClient.Export(bg, req) mtx.Lock() errs = multierr.Append(errs, err) - numResponses+=1 + numResponses += 1 mtx.Unlock() }() } @@ -117,7 +117,7 @@ func TestExport_TooManyWaiters(t *testing.T) { defer mtx.Unlock() errSlice := multierr.Errors(errs) return numResponses == maxWaiters+2 && len(errSlice) == 1 - }, 3 * time.Second, 10 * time.Millisecond) + }, 3*time.Second, 10*time.Millisecond) assert.ErrorContains(t, errs, "too many waiters") } diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go index 2efa7a8d4d5a..98a527b8af35 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -19,9 +19,9 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/multierr" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "go.uber.org/multierr" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" @@ -30,7 +30,7 @@ import ( const ( maxWaiters = 10 - maxBytes = int64(250) + maxBytes = int64(250) ) func TestExport(t *testing.T) { @@ -92,7 +92,7 @@ func TestExport_TooManyWaiters(t *testing.T) { _, err = traceClient.Export(bg, req) mtx.Lock() errs = multierr.Append(errs, err) - numResponses+=1 + numResponses += 1 mtx.Unlock() }() @@ -101,7 +101,7 @@ func TestExport_TooManyWaiters(t *testing.T) { _, err := traceClient.Export(bg, req) mtx.Lock() errs = multierr.Append(errs, err) - numResponses+=1 + numResponses += 1 mtx.Unlock() }() } @@ -116,7 +116,7 @@ func TestExport_TooManyWaiters(t *testing.T) { defer mtx.Unlock() errSlice := multierr.Errors(errs) return numResponses == maxWaiters+2 && len(errSlice) == 1 - }, 3 * time.Second, 10 * time.Millisecond) + }, 3*time.Second, 10*time.Millisecond) assert.ErrorContains(t, errs, "too many waiters") } From 09447574d88f7c4bbc7047f3a3cdab8186114a8e Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 6 Sep 2024 03:06:40 -0500 Subject: [PATCH 07/24] make crosslink? --- receiver/otelarrowreceiver/go.mod | 2 -- 1 file changed, 2 deletions(-) diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index 7e2fc0f7442f..d1456a4725a4 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -108,5 +108,3 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/share replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter => ../../exporter/otelarrowexporter replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../../internal/grpcutil - -replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/internal/testconsumer => ./internal/testconsumer From 6eeee4bd094aa99da1b80edcf34f612fa93cf2de Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 6 Sep 2024 04:01:21 -0500 Subject: [PATCH 08/24] lint again --- receiver/otelarrowreceiver/config.go | 6 +++--- receiver/otelarrowreceiver/internal/logs/otlp_test.go | 4 ++-- receiver/otelarrowreceiver/internal/metrics/otlp_test.go | 4 ++-- receiver/otelarrowreceiver/internal/trace/otlp_test.go | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index db279c836b59..8fa3afeb77d4 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -14,9 +14,9 @@ import ( // Protocols is the configuration for the supported protocols. type Protocols struct { - GRPC configgrpc.ServerConfig `mapstructure:"grpc"` - Arrow ArrowConfig `mapstructure:"arrow"` - Admission AdmissionConfig `mapstructure:"admission"` + GRPC configgrpc.ServerConfig `mapstructure:"grpc"` + Arrow ArrowConfig `mapstructure:"arrow"` + Admission AdmissionConfig `mapstructure:"admission"` } type AdmissionConfig struct { diff --git a/receiver/otelarrowreceiver/internal/logs/otlp_test.go b/receiver/otelarrowreceiver/internal/logs/otlp_test.go index 3fccaeaadda1..1f8d35c71edd 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp_test.go @@ -93,7 +93,7 @@ func TestExport_TooManyWaiters(t *testing.T) { _, err = logsClient.Export(bg, req) mtx.Lock() errs = multierr.Append(errs, err) - numResponses += 1 + numResponses++ mtx.Unlock() }() @@ -102,7 +102,7 @@ func TestExport_TooManyWaiters(t *testing.T) { _, err := logsClient.Export(bg, req) mtx.Lock() errs = multierr.Append(errs, err) - numResponses += 1 + numResponses++ mtx.Unlock() }() } diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go index 03a14dbc721d..d6e7aeff3e66 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go @@ -93,7 +93,7 @@ func TestExport_TooManyWaiters(t *testing.T) { _, err = metricsClient.Export(bg, req) mtx.Lock() errs = multierr.Append(errs, err) - numResponses += 1 + numResponses++ mtx.Unlock() }() @@ -102,7 +102,7 @@ func TestExport_TooManyWaiters(t *testing.T) { _, err := metricsClient.Export(bg, req) mtx.Lock() errs = multierr.Append(errs, err) - numResponses += 1 + numResponses++ mtx.Unlock() }() } diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go index 98a527b8af35..2a2bf1f6119e 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -92,7 +92,7 @@ func TestExport_TooManyWaiters(t *testing.T) { _, err = traceClient.Export(bg, req) mtx.Lock() errs = multierr.Append(errs, err) - numResponses += 1 + numResponses++ mtx.Unlock() }() @@ -101,7 +101,7 @@ func TestExport_TooManyWaiters(t *testing.T) { _, err := traceClient.Export(bg, req) mtx.Lock() errs = multierr.Append(errs, err) - numResponses += 1 + numResponses++ mtx.Unlock() }() } From 692484fd5ef40c5e2e6698120e624b2898a5dca8 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 6 Sep 2024 14:36:32 -0500 Subject: [PATCH 09/24] address review feedback --- receiver/otelarrowreceiver/config.go | 12 +++++----- receiver/otelarrowreceiver/config_test.go | 24 +++++++++++++++++++ .../otelarrowreceiver/internal/logs/otlp.go | 8 +++++-- .../internal/metrics/otlp.go | 8 +++++-- .../otelarrowreceiver/internal/trace/otlp.go | 8 +++++-- receiver/otelarrowreceiver/otelarrow.go | 6 ++--- 6 files changed, 51 insertions(+), 15 deletions(-) diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index 8fa3afeb77d4..e83869d77a5d 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -39,10 +39,10 @@ type ArrowConfig struct { MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"` // Deprecated: This field is no longer supported, use cfg.Admission.AdmissionLimitMiB instead. - AdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"` + DeprecatedAdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"` // Deprecated: This field is no longer supported, use cfg.Admission.WaiterLimit instead. - WaiterLimit int64 `mapstructure:"waiter_limit"` + DeprecatedWaiterLimit int64 `mapstructure:"waiter_limit"` // Zstd settings apply to OTel-Arrow use of gRPC specifically. Zstd zstd.DecoderConfig `mapstructure:"zstd"` @@ -71,11 +71,11 @@ func (cfg *Config) Validate() error { if err := cfg.Arrow.Validate(); err != nil { return err } - if cfg.Arrow.AdmissionLimitMiB != 0 { - cfg.Admission.AdmissionLimitMiB = cfg.Arrow.AdmissionLimitMiB + if cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 { + cfg.Admission.AdmissionLimitMiB = cfg.Arrow.DeprecatedAdmissionLimitMiB } - if cfg.Arrow.WaiterLimit != 0 { - cfg.Admission.WaiterLimit = cfg.Arrow.WaiterLimit + if cfg.Arrow.DeprecatedWaiterLimit != 0 { + cfg.Admission.WaiterLimit = cfg.Arrow.DeprecatedWaiterLimit } return nil } diff --git a/receiver/otelarrowreceiver/config_test.go b/receiver/otelarrowreceiver/config_test.go index 9d99499901ab..8bd0d1aa072c 100644 --- a/receiver/otelarrowreceiver/config_test.go +++ b/receiver/otelarrowreceiver/config_test.go @@ -88,6 +88,30 @@ func TestUnmarshalConfig(t *testing.T) { } +// Tests that a deprecated config validation sets AdmissionLimitMiB and WaiterLimit in the correct config block. +func TestValidateDeprecatedConfig(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "deprecated.yaml")) + require.NoError(t, err) + cfg := &Config{} + assert.NoError(t, cm.Unmarshal(cfg)) + assert.NoError(t, cfg.Validate()) + assert.Equal(t, + &Config{ + Protocols: Protocols{ + Arrow: ArrowConfig{ + MemoryLimitMiB: 123, + DeprecatedAdmissionLimitMiB: 80, + DeprecatedWaiterLimit: 100, + }, + Admission: AdmissionConfig{ + // cfg.Validate should now set these fields. + AdmissionLimitMiB: 80, + WaiterLimit: 100, + }, + }, + }, cfg) +} + func TestUnmarshalConfigUnix(t *testing.T) { cm, err := confmaptest.LoadConf(filepath.Join("testdata", "uds.yaml")) require.NoError(t, err) diff --git a/receiver/otelarrowreceiver/internal/logs/otlp.go b/receiver/otelarrowreceiver/internal/logs/otlp.go index 0da759260dd8..02a81fd813da 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" ) @@ -23,15 +24,17 @@ type Receiver struct { obsrecv *receiverhelper.ObsReport boundedQueue *admission.BoundedQueue sizer *plog.ProtoMarshaler + logger *zap.Logger } // New creates a new Receiver reference. -func New(nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, boundedQueue: bq, sizer: &plog.ProtoMarshaler{}, + logger: logger, } } @@ -51,7 +54,8 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog return plogotlp.NewExportResponse(), err } defer func() { - err = r.boundedQueue.Release(sizeBytes) + releaseErr := r.boundedQueue.Release(sizeBytes) + r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) }() err = r.nextConsumer.ConsumeLogs(ctx, ld) diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp.go b/receiver/otelarrowreceiver/internal/metrics/otlp.go index 8767b84ccb75..58796c7c044e 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" ) @@ -23,15 +24,17 @@ type Receiver struct { obsrecv *receiverhelper.ObsReport boundedQueue *admission.BoundedQueue sizer *pmetric.ProtoMarshaler + logger *zap.Logger } // New creates a new Receiver reference. -func New(nextConsumer consumer.Metrics, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Metrics, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, boundedQueue: bq, sizer: &pmetric.ProtoMarshaler{}, + logger: logger, } } @@ -51,7 +54,8 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p return pmetricotlp.NewExportResponse(), err } defer func() { - err = r.boundedQueue.Release(sizeBytes) + releaseErr := r.boundedQueue.Release(sizeBytes) + r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) }() err = r.nextConsumer.ConsumeMetrics(ctx, md) diff --git a/receiver/otelarrowreceiver/internal/trace/otlp.go b/receiver/otelarrowreceiver/internal/trace/otlp.go index 6837fcbea4b3..a510f25fca43 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" ) @@ -23,15 +24,17 @@ type Receiver struct { obsrecv *receiverhelper.ObsReport boundedQueue *admission.BoundedQueue sizer *ptrace.ProtoMarshaler + logger *zap.Logger } // New creates a new Receiver reference. -func New(nextConsumer consumer.Traces, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Traces, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, boundedQueue: bq, sizer: &ptrace.ProtoMarshaler{}, + logger: logger, } } @@ -51,7 +54,8 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt return ptraceotlp.NewExportResponse(), err } defer func() { - err = r.boundedQueue.Release(sizeBytes) + releaseErr := r.boundedQueue.Release(sizeBytes) + r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) }() err = r.nextConsumer.ConsumeTraces(ctx, td) diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index e678c58b2437..f4ec7cd70d14 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -181,15 +181,15 @@ func (r *otelArrowReceiver) Shutdown(_ context.Context) error { } func (r *otelArrowReceiver) registerTraceConsumer(tc consumer.Traces) { - r.tracesReceiver = trace.New(tc, r.obsrepGRPC, r.boundedQueue) + r.tracesReceiver = trace.New(r.settings.Logger, tc, r.obsrepGRPC, r.boundedQueue) } func (r *otelArrowReceiver) registerMetricsConsumer(mc consumer.Metrics) { - r.metricsReceiver = metrics.New(mc, r.obsrepGRPC, r.boundedQueue) + r.metricsReceiver = metrics.New(r.settings.Logger, mc, r.obsrepGRPC, r.boundedQueue) } func (r *otelArrowReceiver) registerLogsConsumer(lc consumer.Logs) { - r.logsReceiver = logs.New(lc, r.obsrepGRPC, r.boundedQueue) + r.logsReceiver = logs.New(r.settings.Logger, lc, r.obsrepGRPC, r.boundedQueue) } var _ arrow.Consumers = &otelArrowReceiver{} From 2dcdd8e4b66c87c7d790b3dff0153a77475533ed Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 6 Sep 2024 14:48:13 -0500 Subject: [PATCH 10/24] fix tests --- receiver/otelarrowreceiver/internal/logs/otlp_test.go | 3 ++- receiver/otelarrowreceiver/internal/metrics/otlp_test.go | 3 ++- receiver/otelarrowreceiver/internal/trace/otlp_test.go | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/receiver/otelarrowreceiver/internal/logs/otlp_test.go b/receiver/otelarrowreceiver/internal/logs/otlp_test.go index 1f8d35c71edd..5f28d09099f1 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp_test.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/multierr" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -151,7 +152,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr { require.NoError(t, err) bq := admission.NewBoundedQueue(maxBytes, maxWaiters) - r := New(lc, obsrecv, bq) + r := New(zap.NewNop(), lc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() plogotlp.RegisterGRPCServer(srv, r) diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go index d6e7aeff3e66..a4b7107b5315 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/multierr" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -152,7 +153,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { require.NoError(t, err) bq := admission.NewBoundedQueue(maxBytes, maxWaiters) - r := New(mc, obsrecv, bq) + r := New(zap.NewNop(), mc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() pmetricotlp.RegisterGRPCServer(srv, r) diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go index 2a2bf1f6119e..e217b7b915b6 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/multierr" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -149,7 +150,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { }) require.NoError(t, err) bq := admission.NewBoundedQueue(maxBytes, maxWaiters) - r := New(tc, obsrecv, bq) + r := New(zap.NewNop(),tc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() ptraceotlp.RegisterGRPCServer(srv, r) From 4e4a4eda6ee9d5051f5bf19c0f477a55d007d9d7 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 6 Sep 2024 14:56:36 -0500 Subject: [PATCH 11/24] lint --- receiver/otelarrowreceiver/internal/trace/otlp_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go index e217b7b915b6..f45f4b653e50 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -150,7 +150,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { }) require.NoError(t, err) bq := admission.NewBoundedQueue(maxBytes, maxWaiters) - r := New(zap.NewNop(),tc, obsrecv, bq) + r := New(zap.NewNop(), tc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() ptraceotlp.RegisterGRPCServer(srv, r) From 4202afd124bee1ee824fae0886e1d4fc08ee4e21 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Fri, 6 Sep 2024 15:23:18 -0500 Subject: [PATCH 12/24] add yaml --- receiver/otelarrowreceiver/testdata/deprecated.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 receiver/otelarrowreceiver/testdata/deprecated.yaml diff --git a/receiver/otelarrowreceiver/testdata/deprecated.yaml b/receiver/otelarrowreceiver/testdata/deprecated.yaml new file mode 100644 index 000000000000..5675a753cb2d --- /dev/null +++ b/receiver/otelarrowreceiver/testdata/deprecated.yaml @@ -0,0 +1,6 @@ +protocols: + arrow: + memory_limit_mib: 123 + # these fields are deprecated and should populate cfg.Admission.AdmissionLimitMiB and cfg.Admission.WaiterLimit instead. + admission_limit_mib: 80 + waiter_limit: 100 \ No newline at end of file From 2d26469ae7facfeeff4cd2928c607836fbd72a2e Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 10 Sep 2024 02:47:46 -0600 Subject: [PATCH 13/24] fix test after rebase --- receiver/otelarrowreceiver/otelarrow.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index f4ec7cd70d14..084c6deaad1f 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -58,7 +58,7 @@ func newOTelArrowReceiver(cfg *Config, set receiver.Settings) (*otelArrowReceive if err != nil { return nil, err } - bq := admission.NewBoundedQueue(int64(cfg.Admission.AdmissionLimitMiB<<20), cfg.Admission.WaiterLimit) + bq := admission.NewBoundedQueue(set.TracerProvider, int64(cfg.Admission.AdmissionLimitMiB<<20), cfg.Admission.WaiterLimit) r := &otelArrowReceiver{ cfg: cfg, settings: set, @@ -118,7 +118,6 @@ func (r *otelArrowReceiver) startProtocolServers(ctx context.Context, host compo return err } } - bq := admission.NewBoundedQueue(r.settings.TracerProvider, int64(r.cfg.Arrow.AdmissionLimitMiB<<20), r.cfg.Arrow.WaiterLimit) r.arrowReceiver, err = arrow.New(arrow.Consumers(r), r.settings, r.obsrepGRPC, r.cfg.GRPC, authServer, func() arrowRecord.ConsumerAPI { var opts []arrowRecord.Option From d80fff47fc7dc9b6c9300ef23ab6f2ccf3a66ca9 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 10 Sep 2024 02:59:15 -0600 Subject: [PATCH 14/24] fix test --- receiver/otelarrowreceiver/internal/logs/otlp_test.go | 3 ++- receiver/otelarrowreceiver/internal/metrics/otlp_test.go | 3 ++- receiver/otelarrowreceiver/internal/trace/otlp_test.go | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/receiver/otelarrowreceiver/internal/logs/otlp_test.go b/receiver/otelarrowreceiver/internal/logs/otlp_test.go index 5f28d09099f1..bf58f738c34e 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp_test.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/trace/noop" "go.uber.org/multierr" "go.uber.org/zap" "google.golang.org/grpc" @@ -151,7 +152,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr { }) require.NoError(t, err) - bq := admission.NewBoundedQueue(maxBytes, maxWaiters) + bq := admission.NewBoundedQueue(noop.NewTracerProvider(), maxBytes, maxWaiters) r := New(zap.NewNop(), lc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go index a4b7107b5315..9bd0b9911e57 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/trace/noop" "go.uber.org/multierr" "go.uber.org/zap" "google.golang.org/grpc" @@ -152,7 +153,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { }) require.NoError(t, err) - bq := admission.NewBoundedQueue(maxBytes, maxWaiters) + bq := admission.NewBoundedQueue(noop.NewTracerProvider(), maxBytes, maxWaiters) r := New(zap.NewNop(), mc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go index f45f4b653e50..b968b79d20d8 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/trace/noop" "go.uber.org/multierr" "go.uber.org/zap" "google.golang.org/grpc" @@ -149,7 +150,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { ReceiverCreateSettings: set, }) require.NoError(t, err) - bq := admission.NewBoundedQueue(maxBytes, maxWaiters) + bq := admission.NewBoundedQueue(noop.NewTracerProvider(), maxBytes, maxWaiters) r := New(zap.NewNop(), tc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() From 58c525e6e4519b37ad8a77b379283b1f2a936dd5 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 10 Sep 2024 13:25:22 -0600 Subject: [PATCH 15/24] change admission config level and name --- receiver/otelarrowreceiver/config.go | 15 +++++----- receiver/otelarrowreceiver/config_test.go | 28 +++++++++---------- receiver/otelarrowreceiver/factory.go | 14 +++++----- receiver/otelarrowreceiver/otelarrow.go | 2 +- .../otelarrowreceiver/testdata/config.yaml | 6 ++-- 5 files changed, 33 insertions(+), 32 deletions(-) diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index e83869d77a5d..9714566b73fb 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -14,16 +14,15 @@ import ( // Protocols is the configuration for the supported protocols. type Protocols struct { - GRPC configgrpc.ServerConfig `mapstructure:"grpc"` - Arrow ArrowConfig `mapstructure:"arrow"` - Admission AdmissionConfig `mapstructure:"admission"` + GRPC configgrpc.ServerConfig `mapstructure:"grpc"` + Arrow ArrowConfig `mapstructure:"arrow"` } type AdmissionConfig struct { - // AdmissionLimitMiB limits the number of requests that are received by the stream based on + // RequestLimitMiB limits the number of requests that are received by the stream based on // request size information available. Request size is used to control how much traffic we admit // for processing, but does not control how much memory is used during request processing. - AdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"` + RequestLimitMiB uint64 `mapstructure:"request_limit_mib"` // WaiterLimit is the limit on the number of waiters waiting to be processed and consumed. // This is a dimension of memory limiting to ensure waiters are not consuming an @@ -38,7 +37,7 @@ type ArrowConfig struct { // passing through, they will see ResourceExhausted errors. MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"` - // Deprecated: This field is no longer supported, use cfg.Admission.AdmissionLimitMiB instead. + // Deprecated: This field is no longer supported, use cfg.Admission.RequestLimitMiB instead. DeprecatedAdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"` // Deprecated: This field is no longer supported, use cfg.Admission.WaiterLimit instead. @@ -52,6 +51,8 @@ type ArrowConfig struct { type Config struct { // Protocols is the configuration for gRPC and Arrow. Protocols `mapstructure:"protocols"` + // Admission is the configuration for controlling amount of request memory entering the receiver. + Admission AdmissionConfig `mapstructure:"admission"` } var _ component.Config = (*Config)(nil) @@ -72,7 +73,7 @@ func (cfg *Config) Validate() error { return err } if cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 { - cfg.Admission.AdmissionLimitMiB = cfg.Arrow.DeprecatedAdmissionLimitMiB + cfg.Admission.RequestLimitMiB = cfg.Arrow.DeprecatedAdmissionLimitMiB } if cfg.Arrow.DeprecatedWaiterLimit != 0 { cfg.Admission.WaiterLimit = cfg.Arrow.DeprecatedWaiterLimit diff --git a/receiver/otelarrowreceiver/config_test.go b/receiver/otelarrowreceiver/config_test.go index 8bd0d1aa072c..7c42af5e07b0 100644 --- a/receiver/otelarrowreceiver/config_test.go +++ b/receiver/otelarrowreceiver/config_test.go @@ -79,16 +79,16 @@ func TestUnmarshalConfig(t *testing.T) { Arrow: ArrowConfig{ MemoryLimitMiB: 123, }, - Admission: AdmissionConfig{ - AdmissionLimitMiB: 80, - WaiterLimit: 100, - }, + }, + Admission: AdmissionConfig{ + RequestLimitMiB: 80, + WaiterLimit: 100, }, }, cfg) } -// Tests that a deprecated config validation sets AdmissionLimitMiB and WaiterLimit in the correct config block. +// Tests that a deprecated config validation sets RequestLimitMiB and WaiterLimit in the correct config block. func TestValidateDeprecatedConfig(t *testing.T) { cm, err := confmaptest.LoadConf(filepath.Join("testdata", "deprecated.yaml")) require.NoError(t, err) @@ -103,11 +103,11 @@ func TestValidateDeprecatedConfig(t *testing.T) { DeprecatedAdmissionLimitMiB: 80, DeprecatedWaiterLimit: 100, }, - Admission: AdmissionConfig{ - // cfg.Validate should now set these fields. - AdmissionLimitMiB: 80, - WaiterLimit: 100, - }, + }, + Admission: AdmissionConfig{ + // cfg.Validate should now set these fields. + RequestLimitMiB: 80, + WaiterLimit: 100, }, }, cfg) } @@ -131,10 +131,10 @@ func TestUnmarshalConfigUnix(t *testing.T) { Arrow: ArrowConfig{ MemoryLimitMiB: defaultMemoryLimitMiB, }, - Admission: AdmissionConfig{ - AdmissionLimitMiB: defaultAdmissionLimitMiB, - WaiterLimit: defaultWaiterLimit, - }, + }, + Admission: AdmissionConfig{ + RequestLimitMiB: defaultRequestLimitMiB, + WaiterLimit: defaultWaiterLimit, }, }, cfg) } diff --git a/receiver/otelarrowreceiver/factory.go b/receiver/otelarrowreceiver/factory.go index e79349e61cf1..75e454e2957c 100644 --- a/receiver/otelarrowreceiver/factory.go +++ b/receiver/otelarrowreceiver/factory.go @@ -19,9 +19,9 @@ import ( const ( defaultGRPCEndpoint = "0.0.0.0:4317" - defaultMemoryLimitMiB = 128 - defaultAdmissionLimitMiB = defaultMemoryLimitMiB / 2 - defaultWaiterLimit = 1000 + defaultMemoryLimitMiB = 128 + defaultRequestLimitMiB = defaultMemoryLimitMiB / 2 + defaultWaiterLimit = 1000 ) // NewFactory creates a new OTel-Arrow receiver factory. @@ -49,10 +49,10 @@ func createDefaultConfig() component.Config { Arrow: ArrowConfig{ MemoryLimitMiB: defaultMemoryLimitMiB, }, - Admission: AdmissionConfig{ - AdmissionLimitMiB: defaultAdmissionLimitMiB, - WaiterLimit: defaultWaiterLimit, - }, + }, + Admission: AdmissionConfig{ + RequestLimitMiB: defaultRequestLimitMiB, + WaiterLimit: defaultWaiterLimit, }, } } diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index 084c6deaad1f..41784175a7a6 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -58,7 +58,7 @@ func newOTelArrowReceiver(cfg *Config, set receiver.Settings) (*otelArrowReceive if err != nil { return nil, err } - bq := admission.NewBoundedQueue(set.TracerProvider, int64(cfg.Admission.AdmissionLimitMiB<<20), cfg.Admission.WaiterLimit) + bq := admission.NewBoundedQueue(set.TracerProvider, int64(cfg.Admission.RequestLimitMiB<<20), cfg.Admission.WaiterLimit) r := &otelArrowReceiver{ cfg: cfg, settings: set, diff --git a/receiver/otelarrowreceiver/testdata/config.yaml b/receiver/otelarrowreceiver/testdata/config.yaml index f565b1290892..e911cafdd0c5 100644 --- a/receiver/otelarrowreceiver/testdata/config.yaml +++ b/receiver/otelarrowreceiver/testdata/config.yaml @@ -27,6 +27,6 @@ protocols: permit_without_stream: true arrow: memory_limit_mib: 123 - admission: - admission_limit_mib: 80 - waiter_limit: 100 +admission: + request_limit_mib: 80 + waiter_limit: 100 \ No newline at end of file From 1fae41022c4935034b4dee4d9cba2fdb79e337da Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 10 Sep 2024 13:25:58 -0600 Subject: [PATCH 16/24] gofmt --- receiver/otelarrowreceiver/config_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/receiver/otelarrowreceiver/config_test.go b/receiver/otelarrowreceiver/config_test.go index 7c42af5e07b0..60edaf00cf61 100644 --- a/receiver/otelarrowreceiver/config_test.go +++ b/receiver/otelarrowreceiver/config_test.go @@ -82,7 +82,7 @@ func TestUnmarshalConfig(t *testing.T) { }, Admission: AdmissionConfig{ RequestLimitMiB: 80, - WaiterLimit: 100, + WaiterLimit: 100, }, }, cfg) @@ -107,7 +107,7 @@ func TestValidateDeprecatedConfig(t *testing.T) { Admission: AdmissionConfig{ // cfg.Validate should now set these fields. RequestLimitMiB: 80, - WaiterLimit: 100, + WaiterLimit: 100, }, }, cfg) } @@ -134,7 +134,7 @@ func TestUnmarshalConfigUnix(t *testing.T) { }, Admission: AdmissionConfig{ RequestLimitMiB: defaultRequestLimitMiB, - WaiterLimit: defaultWaiterLimit, + WaiterLimit: defaultWaiterLimit, }, }, cfg) } From 57f9d56e8d726fbd3995dd272bc1c5db16101296 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 10 Sep 2024 13:35:39 -0600 Subject: [PATCH 17/24] add nil check --- receiver/otelarrowreceiver/internal/logs/otlp.go | 4 +++- receiver/otelarrowreceiver/internal/metrics/otlp.go | 4 +++- receiver/otelarrowreceiver/internal/trace/otlp.go | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/receiver/otelarrowreceiver/internal/logs/otlp.go b/receiver/otelarrowreceiver/internal/logs/otlp.go index 02a81fd813da..09dacbfd00a3 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp.go @@ -55,7 +55,9 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog } defer func() { releaseErr := r.boundedQueue.Release(sizeBytes) - r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) + if releaseErr != nil { + r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) + } }() err = r.nextConsumer.ConsumeLogs(ctx, ld) diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp.go b/receiver/otelarrowreceiver/internal/metrics/otlp.go index 58796c7c044e..e4cc8dcd1def 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp.go @@ -55,7 +55,9 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p } defer func() { releaseErr := r.boundedQueue.Release(sizeBytes) - r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) + if releaseErr != nil { + r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) + } }() err = r.nextConsumer.ConsumeMetrics(ctx, md) diff --git a/receiver/otelarrowreceiver/internal/trace/otlp.go b/receiver/otelarrowreceiver/internal/trace/otlp.go index a510f25fca43..d873e3324889 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp.go @@ -55,7 +55,9 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt } defer func() { releaseErr := r.boundedQueue.Release(sizeBytes) - r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) + if releaseErr != nil { + r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) + } }() err = r.nextConsumer.ConsumeTraces(ctx, td) From c04c5bd4ad2f4756873de666f528d7faec6a5f01 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Thu, 12 Sep 2024 12:19:51 -0600 Subject: [PATCH 18/24] rebase + review feedback --- receiver/otelarrowreceiver/factory.go | 2 +- receiver/otelarrowreceiver/internal/logs/otlp.go | 3 +-- receiver/otelarrowreceiver/internal/metrics/otlp.go | 3 +-- receiver/otelarrowreceiver/internal/trace/otlp.go | 3 +-- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/receiver/otelarrowreceiver/factory.go b/receiver/otelarrowreceiver/factory.go index 75e454e2957c..92d154060d86 100644 --- a/receiver/otelarrowreceiver/factory.go +++ b/receiver/otelarrowreceiver/factory.go @@ -20,7 +20,7 @@ const ( defaultGRPCEndpoint = "0.0.0.0:4317" defaultMemoryLimitMiB = 128 - defaultRequestLimitMiB = defaultMemoryLimitMiB / 2 + defaultRequestLimitMiB = 128 defaultWaiterLimit = 1000 ) diff --git a/receiver/otelarrowreceiver/internal/logs/otlp.go b/receiver/otelarrowreceiver/internal/logs/otlp.go index 09dacbfd00a3..23ec4c96fbbc 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp.go @@ -54,8 +54,7 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog return plogotlp.NewExportResponse(), err } defer func() { - releaseErr := r.boundedQueue.Release(sizeBytes) - if releaseErr != nil { + if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil { r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) } }() diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp.go b/receiver/otelarrowreceiver/internal/metrics/otlp.go index e4cc8dcd1def..d038d63bef3d 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp.go @@ -54,8 +54,7 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p return pmetricotlp.NewExportResponse(), err } defer func() { - releaseErr := r.boundedQueue.Release(sizeBytes) - if releaseErr != nil { + if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil { r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) } }() diff --git a/receiver/otelarrowreceiver/internal/trace/otlp.go b/receiver/otelarrowreceiver/internal/trace/otlp.go index d873e3324889..af9bc335ea19 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp.go @@ -54,8 +54,7 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt return ptraceotlp.NewExportResponse(), err } defer func() { - releaseErr := r.boundedQueue.Release(sizeBytes) - if releaseErr != nil { + if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil { r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) } }() From 3469d9a39524871373483b192be1bb9d681522d6 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 17 Sep 2024 15:20:03 -0700 Subject: [PATCH 19/24] review feedback --- .chloggen/admission-control-for-otlp.yaml | 6 +++++- receiver/otelarrowreceiver/README.md | 17 ++++++++++------- receiver/otelarrowreceiver/config.go | 8 ++++++-- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/.chloggen/admission-control-for-otlp.yaml b/.chloggen/admission-control-for-otlp.yaml index f69d3149a33f..f9c1ca8db679 100644 --- a/.chloggen/admission-control-for-otlp.yaml +++ b/.chloggen/admission-control-for-otlp.yaml @@ -7,7 +7,11 @@ change_type: enhancement component: otelarrowreceiver # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Add admission control in the otelarrow receiver's standard otlp data path. +note: | +Add admission control in the otelarrow receiver's standard otlp data path. +Also moves admission control config options to a separate block. +arrow.admission_limit_mib -> admission.request_limit_mib +arrow.waiter_limit -> admission.waiter_limit # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [35021] diff --git a/receiver/otelarrowreceiver/README.md b/receiver/otelarrowreceiver/README.md index b3451502c968..2ecaaebcc459 100644 --- a/receiver/otelarrowreceiver/README.md +++ b/receiver/otelarrowreceiver/README.md @@ -77,6 +77,16 @@ Several common configuration structures provide additional capabilities automati - [gRPC settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configgrpc/README.md) - [TLS and mTLS settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) +### Admission Control Configuration + +In the `admission` configuration block the following settings are available: + +- `request_limit_mib` (default: 128): limits the number of requests that are received by the stream based on request size information available. This should not be confused with `arrow.memory_limit_mib` which limits allocations made by the consumer when translating arrow records into pdata objects. i.e. request size is used to control how much traffic we admit, but does not control how much memory is used during request processing. + +- `waiter_limit` (default: 1000): limits the number of requests waiting on admission once `admission_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed. + +`request_limit_mib` and `waiter_limit` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/otel-arrow/tree/main/collector/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. + ### Arrow-specific Configuration In the `arrow` configuration block, the following settings are available: @@ -87,13 +97,6 @@ When the limit is reached, the receiver will return RESOURCE_EXHAUSTED error codes to the receiver, which are [conditionally retryable, see exporter retry configuration](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md). -- `admission_limit_mib` (default: 64): limits the number of requests that are received by the stream based on request size information available. This should not be confused with `memory_limit_mib` which limits allocations made by the consumer when translating arrow records into pdata objects. i.e. request size is used to control how much traffic we admit, but does not control how much memory is used during request processing. - -- `waiter_limit` (default: 1000): limits the number of requests waiting on admission once `admission_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed. - -`admission_limit_mib` and `waiter_limit` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/otel-arrow/tree/main/collector/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. - - ### Compression Configuration In the `arrow` configuration block, `zstd` sub-section applies to all diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index 9714566b73fb..b405e7386b03 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd" ) @@ -66,16 +67,19 @@ func (cfg *ArrowConfig) Validate() error { } func (cfg *Config) Validate() error { + logger := zap.Must(zap.NewDevelopment()) if err := cfg.GRPC.Validate(); err != nil { return err } if err := cfg.Arrow.Validate(); err != nil { return err } - if cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 { + if cfg.Admission.RequestLimitMiB == 0 && cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 { + logger.Warn("arrow.admission_limit_mib is deprecated, using admission.request_limit_mib instead.") cfg.Admission.RequestLimitMiB = cfg.Arrow.DeprecatedAdmissionLimitMiB } - if cfg.Arrow.DeprecatedWaiterLimit != 0 { + if cfg.Admission.WaiterLimit == 0 && cfg.Arrow.DeprecatedWaiterLimit != 0 { + logger.Warn("arrow.waiter_limit is deprecated, using admission.waiter_limit instead.") cfg.Admission.WaiterLimit = cfg.Arrow.DeprecatedWaiterLimit } return nil From c5791ae695fb84b855f566623001d0f18c5c7cc0 Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 17 Sep 2024 16:01:22 -0700 Subject: [PATCH 20/24] indent chlog? --- .chloggen/admission-control-for-otlp.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.chloggen/admission-control-for-otlp.yaml b/.chloggen/admission-control-for-otlp.yaml index f9c1ca8db679..92f27e26b56c 100644 --- a/.chloggen/admission-control-for-otlp.yaml +++ b/.chloggen/admission-control-for-otlp.yaml @@ -8,10 +8,10 @@ component: otelarrowreceiver # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). note: | -Add admission control in the otelarrow receiver's standard otlp data path. -Also moves admission control config options to a separate block. -arrow.admission_limit_mib -> admission.request_limit_mib -arrow.waiter_limit -> admission.waiter_limit + Add admission control in the otelarrow receiver's standard otlp data path. + Also moves admission control config options to a separate block. + arrow.admission_limit_mib -> admission.request_limit_mib + arrow.waiter_limit -> admission.waiter_limit # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [35021] From 3a878ac67dd9465562ad1ef5cdfb135af9162c6c Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Tue, 17 Sep 2024 16:22:04 -0700 Subject: [PATCH 21/24] update readme link --- receiver/otelarrowreceiver/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/otelarrowreceiver/README.md b/receiver/otelarrowreceiver/README.md index 2ecaaebcc459..e37c71141f94 100644 --- a/receiver/otelarrowreceiver/README.md +++ b/receiver/otelarrowreceiver/README.md @@ -85,7 +85,7 @@ In the `admission` configuration block the following settings are available: - `waiter_limit` (default: 1000): limits the number of requests waiting on admission once `admission_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed. -`request_limit_mib` and `waiter_limit` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/otel-arrow/tree/main/collector/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. +`request_limit_mib` and `waiter_limit` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/otelarrow/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. ### Arrow-specific Configuration From f5f170bd78c80e9c546edcd4a5b827df932bb66b Mon Sep 17 00:00:00 2001 From: moh-osman3 Date: Wed, 18 Sep 2024 15:42:44 -0700 Subject: [PATCH 22/24] move log warning to newOtelArrowReceiver --- receiver/otelarrowreceiver/config.go | 4 ---- receiver/otelarrowreceiver/otelarrow.go | 8 ++++++++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index b405e7386b03..f2ec2581303c 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -8,7 +8,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd" ) @@ -67,7 +66,6 @@ func (cfg *ArrowConfig) Validate() error { } func (cfg *Config) Validate() error { - logger := zap.Must(zap.NewDevelopment()) if err := cfg.GRPC.Validate(); err != nil { return err } @@ -75,11 +73,9 @@ func (cfg *Config) Validate() error { return err } if cfg.Admission.RequestLimitMiB == 0 && cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 { - logger.Warn("arrow.admission_limit_mib is deprecated, using admission.request_limit_mib instead.") cfg.Admission.RequestLimitMiB = cfg.Arrow.DeprecatedAdmissionLimitMiB } if cfg.Admission.WaiterLimit == 0 && cfg.Arrow.DeprecatedWaiterLimit != 0 { - logger.Warn("arrow.waiter_limit is deprecated, using admission.waiter_limit instead.") cfg.Admission.WaiterLimit = cfg.Arrow.DeprecatedWaiterLimit } return nil diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index 41784175a7a6..a9a3e2c83ebc 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -54,6 +54,14 @@ type otelArrowReceiver struct { // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. func newOTelArrowReceiver(cfg *Config, set receiver.Settings) (*otelArrowReceiver, error) { + if cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 { + set.Logger.Warn("arrow.admission_limit_mib is deprecated, using admission.request_limit_mib instead.") + } + + if cfg.Arrow.DeprecatedWaiterLimit != 0 { + set.Logger.Warn("arrow.waiter_limit is deprecated, using admission.waiter_limit instead.") + } + netReporter, err := netstats.NewReceiverNetworkReporter(set) if err != nil { return nil, err From c748c5ed289263665fc37377af00966d9bb9d3f8 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 20 Sep 2024 10:27:51 -0700 Subject: [PATCH 23/24] move mutation to Unmarshal --- receiver/otelarrowreceiver/config.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index f2ec2581303c..a85cb91ec72f 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/confmap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd" ) @@ -72,6 +73,14 @@ func (cfg *Config) Validate() error { if err := cfg.Arrow.Validate(); err != nil { return err } + return nil +} + +// Unmarshal will apply deprecated field values to assist the user with migration. +func (cfg *Config) Unmarshal(conf *confmap.Conf) error { + if err := conf.Unmarshal(cfg); err != nil { + return err + } if cfg.Admission.RequestLimitMiB == 0 && cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 { cfg.Admission.RequestLimitMiB = cfg.Arrow.DeprecatedAdmissionLimitMiB } From 62e8f823c93667cd89908011eaa9a6ea13053cf0 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 20 Sep 2024 11:41:42 -0700 Subject: [PATCH 24/24] tidy --- internal/otelarrow/go.mod | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/otelarrow/go.mod b/internal/otelarrow/go.mod index a0fe0c618331..5d104c8e2ac4 100644 --- a/internal/otelarrow/go.mod +++ b/internal/otelarrow/go.mod @@ -44,6 +44,7 @@ require ( github.com/fxamacker/cbor/v2 v2.4.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.1.0 // indirect github.com/goccy/go-json v0.10.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect @@ -51,7 +52,12 @@ require ( github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/knadh/koanf/providers/confmap v0.1.0 // indirect + github.com/knadh/koanf/v2 v2.1.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.2.3 // indirect @@ -70,6 +76,7 @@ require ( go.opentelemetry.io/collector/config/configretry v1.15.1-0.20240918193345-a3c0565031b0 // indirect go.opentelemetry.io/collector/config/configtls v1.15.1-0.20240918193345-a3c0565031b0 // indirect go.opentelemetry.io/collector/config/internal v0.109.1-0.20240918193345-a3c0565031b0 // indirect + go.opentelemetry.io/collector/confmap v1.15.1-0.20240918193345-a3c0565031b0 // indirect go.opentelemetry.io/collector/consumer/consumerprofiles v0.109.1-0.20240918193345-a3c0565031b0 // indirect go.opentelemetry.io/collector/extension v0.109.1-0.20240918193345-a3c0565031b0 // indirect go.opentelemetry.io/collector/extension/auth v0.109.1-0.20240918193345-a3c0565031b0 // indirect