From a7d32bfbf154285556cc0f4976489933129453ec Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 15 Nov 2024 11:49:01 -0800 Subject: [PATCH] Make otelarrow admission control metrics consistent (#36359) --- .chloggen/otelarrow-metrics.yaml | 31 +++ internal/otelarrow/admission2/boundedqueue.go | 39 +++- .../otelarrow/admission2/boundedqueue_test.go | 198 +++++++++++++----- internal/otelarrow/doc.go | 6 + internal/otelarrow/documentation.md | 23 ++ .../generated_component_telemetry_test.go | 11 +- internal/otelarrow/generated_package_test.go | 13 ++ internal/otelarrow/go.mod | 1 + .../internal/metadata/generated_telemetry.go | 97 +++++++++ .../metadata/generated_telemetry_test.go | 4 +- internal/otelarrow/metadata.yaml | 25 +++ receiver/otelarrowreceiver/README.md | 11 +- receiver/otelarrowreceiver/go.mod | 4 +- .../otelarrowreceiver/internal/arrow/arrow.go | 53 ++--- .../internal/arrow/arrow_test.go | 9 +- .../internal/logs/otlp_test.go | 3 +- .../internal/metadata/generated_telemetry.go | 77 ------- .../internal/metrics/otlp_test.go | 3 +- .../internal/trace/otlp_test.go | 3 +- receiver/otelarrowreceiver/metadata.yaml | 26 --- receiver/otelarrowreceiver/otelarrow.go | 5 +- 21 files changed, 425 insertions(+), 217 deletions(-) create mode 100644 .chloggen/otelarrow-metrics.yaml create mode 100644 internal/otelarrow/doc.go create mode 100644 internal/otelarrow/documentation.md rename {receiver/otelarrowreceiver => internal/otelarrow}/generated_component_telemetry_test.go (85%) create mode 100644 internal/otelarrow/generated_package_test.go create mode 100644 internal/otelarrow/internal/metadata/generated_telemetry.go rename {receiver/otelarrowreceiver => internal/otelarrow}/internal/metadata/generated_telemetry_test.go (95%) delete mode 100644 receiver/otelarrowreceiver/internal/metadata/generated_telemetry.go diff --git a/.chloggen/otelarrow-metrics.yaml b/.chloggen/otelarrow-metrics.yaml new file mode 100644 index 000000000000..b5531151dd7d --- /dev/null +++ b/.chloggen/otelarrow-metrics.yaml @@ -0,0 +1,31 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: New admission control metrics are consistent across Arrow and OTLP data paths. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36334] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + `otelcol_otelarrow_admission_in_flight_bytes` new, replaces `otelcol_otel_arrow_receiver_in_flight_bytes` + `otelcol_otelarrow_admission_waiting_bytes`: new, describes waiting requests + `otelcol_otel_arrow_receiver_in_flight_items`: removed + `otelcol_otel_arrow_receiver_in_flight_requests`: removed + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/otelarrow/admission2/boundedqueue.go b/internal/otelarrow/admission2/boundedqueue.go index 026ad341c317..0b5efc67fb48 100644 --- a/internal/otelarrow/admission2/boundedqueue.go +++ b/internal/otelarrow/admission2/boundedqueue.go @@ -10,9 +10,13 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" grpccodes "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + internalmetadata "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" ) var ( @@ -22,9 +26,10 @@ var ( // BoundedQueue is a LIFO-oriented admission-controlled Queue. type BoundedQueue struct { - maxLimitAdmit uint64 - maxLimitWait uint64 - tracer trace.Tracer + maxLimitAdmit uint64 + maxLimitWait uint64 + tracer trace.Tracer + telemetryBuilder *internalmetadata.TelemetryBuilder // lock protects currentAdmitted, currentWaiting, and waiters @@ -45,13 +50,37 @@ type waiter struct { // NewBoundedQueue returns a LIFO-oriented Queue implementation which // admits `maxLimitAdmit` bytes concurrently and allows up to // `maxLimitWait` bytes to wait for admission. -func NewBoundedQueue(ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait uint64) Queue { - return &BoundedQueue{ +func NewBoundedQueue(id component.ID, ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait uint64) (Queue, error) { + bq := &BoundedQueue{ maxLimitAdmit: maxLimitAdmit, maxLimitWait: maxLimitWait, waiters: list.New(), tracer: ts.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"), } + attr := metric.WithAttributes(attribute.String(netstats.ReceiverKey, id.String())) + telemetryBuilder, err := internalmetadata.NewTelemetryBuilder(ts, + internalmetadata.WithOtelarrowAdmissionInFlightBytesCallback(bq.inFlightCB, attr), + internalmetadata.WithOtelarrowAdmissionWaitingBytesCallback(bq.waitingCB, attr), + ) + if err != nil { + return nil, err + } + bq.telemetryBuilder = telemetryBuilder + return bq, nil +} + +func (bq *BoundedQueue) inFlightCB() int64 { + // Note, see https://github.com/open-telemetry/otel-arrow/issues/270 + bq.lock.Lock() + defer bq.lock.Unlock() + return int64(bq.currentAdmitted) +} + +func (bq *BoundedQueue) waitingCB() int64 { + // Note, see https://github.com/open-telemetry/otel-arrow/issues/270 + bq.lock.Lock() + defer bq.lock.Unlock() + return int64(bq.currentWaiting) } // acquireOrGetWaiter returns with three distinct conditions depending diff --git a/internal/otelarrow/admission2/boundedqueue_test.go b/internal/otelarrow/admission2/boundedqueue_test.go index 8903e8b90773..354419f6893a 100644 --- a/internal/otelarrow/admission2/boundedqueue_test.go +++ b/internal/otelarrow/admission2/boundedqueue_test.go @@ -12,23 +12,53 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/resource" "google.golang.org/grpc/codes" grpccodes "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" +) + +const ( + expectScope = "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow" + expectInFlightName = "otelcol_otelarrow_admission_in_flight_bytes" + expectWaitingName = "otelcol_otelarrow_admission_waiting_bytes" ) type bqTest struct { - t *testing.T + t *testing.T + reader *sdkmetric.ManualReader + provider *sdkmetric.MeterProvider *BoundedQueue } -var noopTelemetry = componenttest.NewNopTelemetrySettings() - func newBQTest(t *testing.T, maxAdmit, maxWait uint64) bqTest { + settings := componenttest.NewNopTelemetrySettings() + + reader := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider( + sdkmetric.WithResource(resource.Empty()), + sdkmetric.WithReader(reader), + ) + settings.MeterProvider = provider + settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { + return settings.MeterProvider + } + + bq, err := NewBoundedQueue(component.MustNewID("admission_testing"), settings, maxAdmit, maxWait) + require.NoError(t, err) return bqTest{ t: t, - BoundedQueue: NewBoundedQueue(noopTelemetry, maxAdmit, maxWait).(*BoundedQueue), + reader: reader, + provider: provider, + BoundedQueue: bq.(*BoundedQueue), } } @@ -67,35 +97,39 @@ func mkRange(from, to uint64) []uint64 { func TestBoundedQueueLimits(t *testing.T) { for _, test := range []struct { - name string - maxLimitAdmit uint64 - maxLimitWait uint64 - requestSizes []uint64 - timeout time.Duration - expectErrs map[string]int + name string + maxLimitAdmit uint64 + maxLimitWait uint64 + expectAcquired [2]int64 + requestSizes []uint64 + timeout time.Duration + expectErrs map[string]int }{ { - name: "simple_no_waiters_25", - maxLimitAdmit: 1000, - maxLimitWait: 0, - requestSizes: mkRepeat(25, 40), - timeout: 0, - expectErrs: map[string]int{}, + name: "simple_no_waiters_25", + maxLimitAdmit: 1000, + maxLimitWait: 0, + requestSizes: mkRepeat(25, 40), + expectAcquired: [2]int64{1000, 1000}, + timeout: 0, + expectErrs: map[string]int{}, }, { - name: "simple_no_waiters_1", - maxLimitAdmit: 1000, - maxLimitWait: 0, - requestSizes: mkRepeat(1, 1000), - timeout: 0, - expectErrs: map[string]int{}, + name: "simple_no_waiters_1", + maxLimitAdmit: 1000, + maxLimitWait: 0, + requestSizes: mkRepeat(1, 1000), + expectAcquired: [2]int64{1000, 1000}, + timeout: 0, + expectErrs: map[string]int{}, }, { - name: "without_waiting_remainder", - maxLimitAdmit: 1000, - maxLimitWait: 0, - requestSizes: mkRepeat(30, 40), - timeout: 0, + name: "without_waiting_remainder", + maxLimitAdmit: 1000, + maxLimitWait: 0, + requestSizes: mkRepeat(30, 40), + expectAcquired: [2]int64{990, 990}, + timeout: 0, expectErrs: map[string]int{ // 7 failures with a remainder of 10 // 30 * (40 - 7) = 990 @@ -103,51 +137,57 @@ func TestBoundedQueueLimits(t *testing.T) { }, }, { - name: "without_waiting_complete", - maxLimitAdmit: 1000, - maxLimitWait: 0, - requestSizes: append(mkRepeat(30, 40), 10), - timeout: 0, + name: "without_waiting_complete", + maxLimitAdmit: 1000, + maxLimitWait: 0, + requestSizes: append(mkRepeat(30, 40), 10), + expectAcquired: [2]int64{1000, 1000}, + timeout: 0, expectErrs: map[string]int{ // 30*33+10 succeed, 7 failures (as above) ErrTooMuchWaiting.Error(): 7, }, }, { - name: "with_waiters_timeout", - maxLimitAdmit: 1000, - maxLimitWait: 1000, - requestSizes: mkRepeat(20, 100), - timeout: time.Second, + name: "with_waiters_timeout", + maxLimitAdmit: 1000, + maxLimitWait: 1000, + requestSizes: mkRepeat(20, 100), + expectAcquired: [2]int64{1000, 1000}, + timeout: time.Second, expectErrs: map[string]int{ // 20*50=1000 is half of the requests timing out status.Error(grpccodes.Canceled, context.DeadlineExceeded.Error()).Error(): 50, }, }, { - name: "with_size_exceeded", - maxLimitAdmit: 1000, - maxLimitWait: 2000, - requestSizes: []uint64{1001}, - timeout: 0, + name: "with_size_exceeded", + maxLimitAdmit: 1000, + maxLimitWait: 2000, + requestSizes: []uint64{1001}, + expectAcquired: [2]int64{0, 0}, + timeout: 0, expectErrs: map[string]int{ ErrRequestTooLarge.Error(): 1, }, }, { - name: "mixed_sizes", - maxLimitAdmit: 45, // 45 is the exact sum of request sizes - maxLimitWait: 0, - requestSizes: mkRange(1, 9), - timeout: 0, - expectErrs: map[string]int{}, + name: "mixed_sizes", + maxLimitAdmit: 45, // 45 is the exact sum of request sizes + maxLimitWait: 0, + requestSizes: mkRange(1, 9), + expectAcquired: [2]int64{45, 45}, + timeout: 0, + expectErrs: map[string]int{}, }, { name: "too_many_mixed_sizes", maxLimitAdmit: 44, // all but one request will succeed maxLimitWait: 0, requestSizes: mkRange(1, 9), - timeout: 0, + // worst case is the size=9 fails, so minimum is 44-9+1 + expectAcquired: [2]int64{36, 44}, + timeout: 0, expectErrs: map[string]int{ ErrTooMuchWaiting.Error(): 1, }, @@ -190,6 +230,12 @@ func TestBoundedQueueLimits(t *testing.T) { wait1.Wait() + // The in-flight bytes are in-range, none waiting. + inflight, waiting := bq.verifyMetrics(t) + require.LessOrEqual(t, test.expectAcquired[0], inflight) + require.GreaterOrEqual(t, test.expectAcquired[1], inflight) + require.Equal(t, int64(0), waiting) + close(releaseChan) wait2.Wait() @@ -214,10 +260,55 @@ func TestBoundedQueueLimits(t *testing.T) { // and the final state is all 0. bq.waitForPending(0, 0) + + // metrics are zero + inflight, waiting = bq.verifyMetrics(t) + require.Equal(t, int64(0), inflight) + require.Equal(t, int64(0), waiting) }) } } +func (bq bqTest) verifyPoint(t *testing.T, m metricdata.Metrics) int64 { + switch a := m.Data.(type) { + case metricdata.Sum[int64]: + require.Len(t, a.DataPoints, 1) + dp := a.DataPoints[0] + for _, attr := range dp.Attributes.ToSlice() { + if attr.Key == netstats.ReceiverKey && attr.Value.AsString() == "admission_testing" { + return dp.Value + } + } + t.Errorf("point value not found: %v", m.Data) + default: + t.Errorf("incorrect metric data type: %T", m.Data) + } + return -1 +} + +func (bq bqTest) verifyMetrics(t *testing.T) (inflight int64, waiting int64) { + inflight = -1 + waiting = -1 + + var rm metricdata.ResourceMetrics + require.NoError(t, bq.reader.Collect(context.Background(), &rm)) + + for _, sm := range rm.ScopeMetrics { + if sm.Scope.Name != expectScope { + continue + } + for _, m := range sm.Metrics { + switch m.Name { + case expectInFlightName: + inflight = bq.verifyPoint(t, m) + case expectWaitingName: + waiting = bq.verifyPoint(t, m) + } + } + } + return +} + func TestBoundedQueueLIFO(t *testing.T) { const maxAdmit = 10 @@ -252,6 +343,11 @@ func TestBoundedQueueLIFO(t *testing.T) { notW1 := bq.startWaiter(ctx, secondWait, &relW1) bq.waitForPending(maxAdmit, maxAdmit) + // The in-flight and waiting bytes are counted. + inflight, waiting := bq.verifyMetrics(t) + require.Equal(t, int64(maxAdmit), inflight) + require.Equal(t, int64(maxAdmit), waiting) + relFirst() // early is true when releasing the first acquired @@ -278,6 +374,10 @@ func TestBoundedQueueLIFO(t *testing.T) { relW1() bq.waitForPending(0, 0) + + inflight, waiting = bq.verifyMetrics(t) + require.Equal(t, int64(0), inflight) + require.Equal(t, int64(0), waiting) }) } } diff --git a/internal/otelarrow/doc.go b/internal/otelarrow/doc.go new file mode 100644 index 000000000000..a3890efafb00 --- /dev/null +++ b/internal/otelarrow/doc.go @@ -0,0 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:generate mdatagen metadata.yaml + +package otelarrow // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow" diff --git a/internal/otelarrow/documentation.md b/internal/otelarrow/documentation.md new file mode 100644 index 000000000000..eff8f632560b --- /dev/null +++ b/internal/otelarrow/documentation.md @@ -0,0 +1,23 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# otelarrow + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### otelcol_otelarrow_admission_in_flight_bytes + +Number of bytes that have started processing but are not finished. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| By | Sum | Int | false | + +### otelcol_otelarrow_admission_waiting_bytes + +Number of items waiting to start processing. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| By | Sum | Int | false | diff --git a/receiver/otelarrowreceiver/generated_component_telemetry_test.go b/internal/otelarrow/generated_component_telemetry_test.go similarity index 85% rename from receiver/otelarrowreceiver/generated_component_telemetry_test.go rename to internal/otelarrow/generated_component_telemetry_test.go index f738ca8f5696..15169a4838c9 100644 --- a/receiver/otelarrowreceiver/generated_component_telemetry_test.go +++ b/internal/otelarrow/generated_component_telemetry_test.go @@ -1,6 +1,6 @@ // Code generated by mdatagen. DO NOT EDIT. -package otelarrowreceiver +package otelarrow import ( "context" @@ -15,8 +15,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/receivertest" ) type componentTestTelemetry struct { @@ -24,13 +22,6 @@ type componentTestTelemetry struct { meterProvider *sdkmetric.MeterProvider } -func (tt *componentTestTelemetry) NewSettings() receiver.Settings { - set := receivertest.NewNopSettings() - set.TelemetrySettings = tt.newTelemetrySettings() - set.ID = component.NewID(component.MustNewType("otelarrow")) - return set -} - func (tt *componentTestTelemetry) newTelemetrySettings() component.TelemetrySettings { set := componenttest.NewNopTelemetrySettings() set.MeterProvider = tt.meterProvider diff --git a/internal/otelarrow/generated_package_test.go b/internal/otelarrow/generated_package_test.go new file mode 100644 index 000000000000..ec77bf1fb2a0 --- /dev/null +++ b/internal/otelarrow/generated_package_test.go @@ -0,0 +1,13 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package otelarrow + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/internal/otelarrow/go.mod b/internal/otelarrow/go.mod index f42ea96a0fa6..0b883120dc36 100644 --- a/internal/otelarrow/go.mod +++ b/internal/otelarrow/go.mod @@ -23,6 +23,7 @@ require ( go.opentelemetry.io/otel/sdk v1.32.0 go.opentelemetry.io/otel/sdk/metric v1.32.0 go.opentelemetry.io/otel/trace v1.32.0 + go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 google.golang.org/grpc v1.67.1 diff --git a/internal/otelarrow/internal/metadata/generated_telemetry.go b/internal/otelarrow/internal/metadata/generated_telemetry.go new file mode 100644 index 000000000000..64c1e40abefc --- /dev/null +++ b/internal/otelarrow/internal/metadata/generated_telemetry.go @@ -0,0 +1,97 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "context" + "errors" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" +) + +// Deprecated: [v0.108.0] use LeveledMeter instead. +func Meter(settings component.TelemetrySettings) metric.Meter { + return settings.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow") +} + +func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { + return settings.LeveledMeterProvider(level).Meter("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow") +} + +func Tracer(settings component.TelemetrySettings) trace.Tracer { + return settings.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow") +} + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + OtelarrowAdmissionInFlightBytes metric.Int64ObservableUpDownCounter + observeOtelarrowAdmissionInFlightBytes func(context.Context, metric.Observer) error + OtelarrowAdmissionWaitingBytes metric.Int64ObservableUpDownCounter + observeOtelarrowAdmissionWaitingBytes func(context.Context, metric.Observer) error + meters map[configtelemetry.Level]metric.Meter +} + +// TelemetryBuilderOption applies changes to default builder. +type TelemetryBuilderOption interface { + apply(*TelemetryBuilder) +} + +type telemetryBuilderOptionFunc func(mb *TelemetryBuilder) + +func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { + tbof(mb) +} + +// WithOtelarrowAdmissionInFlightBytesCallback sets callback for observable OtelarrowAdmissionInFlightBytes metric. +func WithOtelarrowAdmissionInFlightBytesCallback(cb func() int64, opts ...metric.ObserveOption) TelemetryBuilderOption { + return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) { + builder.observeOtelarrowAdmissionInFlightBytes = func(_ context.Context, o metric.Observer) error { + o.ObserveInt64(builder.OtelarrowAdmissionInFlightBytes, cb(), opts...) + return nil + } + }) +} + +// WithOtelarrowAdmissionWaitingBytesCallback sets callback for observable OtelarrowAdmissionWaitingBytes metric. +func WithOtelarrowAdmissionWaitingBytesCallback(cb func() int64, opts ...metric.ObserveOption) TelemetryBuilderOption { + return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) { + builder.observeOtelarrowAdmissionWaitingBytes = func(_ context.Context, o metric.Observer) error { + o.ObserveInt64(builder.OtelarrowAdmissionWaitingBytes, cb(), opts...) + return nil + } + }) +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}} + for _, op := range options { + op.apply(&builder) + } + builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic) + var err, errs error + builder.OtelarrowAdmissionInFlightBytes, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableUpDownCounter( + "otelcol_otelarrow_admission_in_flight_bytes", + metric.WithDescription("Number of bytes that have started processing but are not finished."), + metric.WithUnit("By"), + ) + errs = errors.Join(errs, err) + _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeOtelarrowAdmissionInFlightBytes, builder.OtelarrowAdmissionInFlightBytes) + errs = errors.Join(errs, err) + builder.OtelarrowAdmissionWaitingBytes, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableUpDownCounter( + "otelcol_otelarrow_admission_waiting_bytes", + metric.WithDescription("Number of items waiting to start processing."), + metric.WithUnit("By"), + ) + errs = errors.Join(errs, err) + _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeOtelarrowAdmissionWaitingBytes, builder.OtelarrowAdmissionWaitingBytes) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/receiver/otelarrowreceiver/internal/metadata/generated_telemetry_test.go b/internal/otelarrow/internal/metadata/generated_telemetry_test.go similarity index 95% rename from receiver/otelarrowreceiver/internal/metadata/generated_telemetry_test.go rename to internal/otelarrow/internal/metadata/generated_telemetry_test.go index 25002a5869ad..ab1411c26336 100644 --- a/receiver/otelarrowreceiver/internal/metadata/generated_telemetry_test.go +++ b/internal/otelarrow/internal/metadata/generated_telemetry_test.go @@ -54,14 +54,14 @@ func TestProviders(t *testing.T) { meter := Meter(set) if m, ok := meter.(mockMeter); ok { - require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver", m.name) + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow", m.name) } else { require.Fail(t, "returned Meter not mockMeter") } tracer := Tracer(set) if m, ok := tracer.(mockTracer); ok { - require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver", m.name) + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow", m.name) } else { require.Fail(t, "returned Meter not mockTracer") } diff --git a/internal/otelarrow/metadata.yaml b/internal/otelarrow/metadata.yaml index 0bd31377fc0d..56966ea6b25b 100644 --- a/internal/otelarrow/metadata.yaml +++ b/internal/otelarrow/metadata.yaml @@ -1,3 +1,28 @@ +type: otelarrow + status: + class: pkg + stability: + beta: [traces, metrics, logs] codeowners: active: [jmacd, moh-osman3] + +telemetry: + metrics: + otelarrow_admission_in_flight_bytes: + description: Number of bytes that have started processing but are not finished. + unit: By + enabled: true + sum: + monotonic: false + value_type: int + async: true + + otelarrow_admission_waiting_bytes: + description: Number of items waiting to start processing. + enabled: true + unit: By + sum: + monotonic: false + value_type: int + async: true diff --git a/receiver/otelarrowreceiver/README.md b/receiver/otelarrowreceiver/README.md index 2088caec1bb0..2257de9d5ae9 100644 --- a/receiver/otelarrowreceiver/README.md +++ b/receiver/otelarrowreceiver/README.md @@ -173,9 +173,8 @@ exporters: In addition to the the standard [obsreport](https://pkg.go.dev/go.opentelemetry.io/collector/obsreport) -metrics, this component provides network-level measurement instruments -which we anticipate will become part of `obsreport` in the future. At -the `normal` level of metrics detail: +metrics, this component provides network-level measurement instruments +measuring bytes transported. At the `normal` level of metrics detail: - `otelcol_receiver_recv`: uncompressed bytes received, prior to compression - `otelcol_receiver_recv_wire`: compressed bytes received, on the wire. @@ -189,6 +188,12 @@ of data being returned from the receiver will be instrumented: - `otelcol_receiver_sent`: uncompressed bytes sent, prior to compression - `otelcol_receiver_sent_wire`: compressed bytes sent, on the wire. +Metric instruments expose the state of the admission controller at the +`normal` level of detail: + +- `otelcol_otelarrow_admission_in_flight_bytes`: number of uncompressed bytes admitted into the pipeline +- `otelcol_otelarrow_admission_waiting_bytes`: number of uncompressed bytes waiting for admission + There several OpenTelemetry Protocol with Apache Arrow-consumer related metrics available to help diagnose internal performance. These are disabled at the basic level of detail. At the normal level, diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index b406619d0a6d..c96e2a0e495d 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -25,9 +25,7 @@ require ( go.opentelemetry.io/collector/receiver v0.113.0 go.opentelemetry.io/collector/receiver/receivertest v0.113.0 go.opentelemetry.io/otel v1.32.0 - go.opentelemetry.io/otel/metric v1.32.0 go.opentelemetry.io/otel/sdk v1.32.0 - go.opentelemetry.io/otel/sdk/metric v1.32.0 go.opentelemetry.io/otel/trace v1.32.0 go.uber.org/goleak v1.3.0 go.uber.org/mock v0.5.0 @@ -78,6 +76,8 @@ require ( go.opentelemetry.io/collector/pipeline v0.113.0 // indirect go.opentelemetry.io/collector/receiver/receiverprofiles v0.113.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect + go.opentelemetry.io/otel/metric v1.32.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.18.0 // indirect golang.org/x/sync v0.8.0 // indirect diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow.go b/receiver/otelarrowreceiver/internal/arrow/arrow.go index 6b28f52d356b..080b36e2a9ef 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -40,7 +40,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" - internalmetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata" ) const ( @@ -68,15 +67,14 @@ type Receiver struct { arrowpb.UnsafeArrowLogsServiceServer arrowpb.UnsafeArrowMetricsServiceServer - telemetry component.TelemetrySettings - tracer trace.Tracer - obsrecv *receiverhelper.ObsReport - gsettings configgrpc.ServerConfig - authServer auth.Server - newConsumer func() arrowRecord.ConsumerAPI - netReporter netstats.Interface - telemetryBuilder *internalmetadata.TelemetryBuilder - boundedQueue admission2.Queue + telemetry component.TelemetrySettings + tracer trace.Tracer + obsrecv *receiverhelper.ObsReport + gsettings configgrpc.ServerConfig + authServer auth.Server + newConsumer func() arrowRecord.ConsumerAPI + netReporter netstats.Interface + boundedQueue admission2.Queue } // receiverStream holds the inFlightWG for a single stream. @@ -97,21 +95,16 @@ func New( netReporter netstats.Interface, ) (*Receiver, error) { tracer := set.TelemetrySettings.TracerProvider.Tracer("otel-arrow-receiver") - telemetryBuilder, err := internalmetadata.NewTelemetryBuilder(set.TelemetrySettings) - if err != nil { - return nil, err - } return &Receiver{ - Consumers: cs, - obsrecv: obsrecv, - telemetry: set.TelemetrySettings, - tracer: tracer, - authServer: authServer, - newConsumer: newConsumer, - gsettings: gsettings, - netReporter: netReporter, - boundedQueue: bq, - telemetryBuilder: telemetryBuilder, + Consumers: cs, + obsrecv: obsrecv, + telemetry: set.TelemetrySettings, + tracer: tracer, + authServer: authServer, + newConsumer: newConsumer, + gsettings: gsettings, + netReporter: netReporter, + boundedQueue: bq, }, nil } @@ -423,7 +416,6 @@ func (r *receiverStream) newInFlightData(ctx context.Context, method string, bat _, span := r.tracer.Start(ctx, "otel_arrow_stream_inflight") r.inFlightWG.Add(1) - r.telemetryBuilder.OtelArrowReceiverInFlightRequests.Add(ctx, 1) id := &inFlightData{ receiverStream: r, method: method, @@ -505,13 +497,6 @@ func (id *inFlightData) anyDone(ctx context.Context) { id.releaser() } - if id.uncompSize != 0 { - id.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(ctx, -id.uncompSize) - } - if id.numItems != 0 { - id.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(ctx, int64(-id.numItems)) - } - // The netstats code knows that uncompressed size is // unreliable for arrow transport, so we instrument it // directly here. Only the primary direction of transport @@ -521,7 +506,6 @@ func (id *inFlightData) anyDone(ctx context.Context) { sized.Length = id.uncompSize id.netReporter.CountReceive(ctx, sized) - id.telemetryBuilder.OtelArrowReceiverInFlightRequests.Add(ctx, -1) id.inFlightWG.Done() } @@ -638,9 +622,6 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre flight.numItems = numItems flight.releaser = releaser - r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(inflightCtx, uncompSize) - r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(inflightCtx, int64(numItems)) - // Recognize that the request is still in-flight via consumeAndRespond() flight.refs.Add(1) diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index 57a92c7d922e..78ac585d5c23 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -50,9 +50,11 @@ import ( ) var noopTelemetry = componenttest.NewNopTelemetrySettings() +var testingID = component.MustNewID("testing") func defaultBQ() admission2.Queue { - return admission2.NewBoundedQueue(noopTelemetry, 100000, 10) + bq, _ := admission2.NewBoundedQueue(testingID, noopTelemetry, 100000, 10) + return bq } type ( @@ -462,11 +464,12 @@ func TestBoundedQueueLimits(t *testing.T) { // internal/otelarrow/test/e2e_test.go. if tt.expectErr { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0) - bq = admission2.NewBoundedQueue(noopTelemetry, uint64(sizer.TracesSize(td)-100), 0) + bq, err = admission2.NewBoundedQueue(testingID, noopTelemetry, uint64(sizer.TracesSize(td)-100), 0) } else { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - bq = admission2.NewBoundedQueue(noopTelemetry, uint64(tt.admitLimit), 0) + bq, err = admission2.NewBoundedQueue(testingID, noopTelemetry, uint64(tt.admitLimit), 0) } + require.NoError(t, err) ctc.start(ctc.newRealConsumer, bq) ctc.putBatch(batch, nil) diff --git a/receiver/otelarrowreceiver/internal/logs/otlp_test.go b/receiver/otelarrowreceiver/internal/logs/otlp_test.go index d270ff138bfb..d8c44d590542 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp_test.go @@ -206,7 +206,8 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) (net.Addr, *tracet ReceiverCreateSettings: set, }) require.NoError(t, err) - bq := admission2.NewBoundedQueue(telset, maxBytes, 0) + bq, err := admission2.NewBoundedQueue(set.ID, telset, maxBytes, 0) + require.NoError(t, err) r := New(zap.NewNop(), lc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() diff --git a/receiver/otelarrowreceiver/internal/metadata/generated_telemetry.go b/receiver/otelarrowreceiver/internal/metadata/generated_telemetry.go deleted file mode 100644 index eb423c1975f8..000000000000 --- a/receiver/otelarrowreceiver/internal/metadata/generated_telemetry.go +++ /dev/null @@ -1,77 +0,0 @@ -// Code generated by mdatagen. DO NOT EDIT. - -package metadata - -import ( - "errors" - - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/trace" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" -) - -// Deprecated: [v0.108.0] use LeveledMeter instead. -func Meter(settings component.TelemetrySettings) metric.Meter { - return settings.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver") -} - -func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { - return settings.LeveledMeterProvider(level).Meter("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver") -} - -func Tracer(settings component.TelemetrySettings) trace.Tracer { - return settings.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver") -} - -// TelemetryBuilder provides an interface for components to report telemetry -// as defined in metadata and user config. -type TelemetryBuilder struct { - meter metric.Meter - OtelArrowReceiverInFlightBytes metric.Int64UpDownCounter - OtelArrowReceiverInFlightItems metric.Int64UpDownCounter - OtelArrowReceiverInFlightRequests metric.Int64UpDownCounter - meters map[configtelemetry.Level]metric.Meter -} - -// TelemetryBuilderOption applies changes to default builder. -type TelemetryBuilderOption interface { - apply(*TelemetryBuilder) -} - -type telemetryBuilderOptionFunc func(mb *TelemetryBuilder) - -func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { - tbof(mb) -} - -// NewTelemetryBuilder provides a struct with methods to update all internal telemetry -// for a component -func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { - builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}} - for _, op := range options { - op.apply(&builder) - } - builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic) - var err, errs error - builder.OtelArrowReceiverInFlightBytes, err = builder.meters[configtelemetry.LevelBasic].Int64UpDownCounter( - "otelcol_otel_arrow_receiver_in_flight_bytes", - metric.WithDescription("Number of bytes in flight"), - metric.WithUnit("By"), - ) - errs = errors.Join(errs, err) - builder.OtelArrowReceiverInFlightItems, err = builder.meters[configtelemetry.LevelBasic].Int64UpDownCounter( - "otelcol_otel_arrow_receiver_in_flight_items", - metric.WithDescription("Number of items in flight"), - metric.WithUnit("1"), - ) - errs = errors.Join(errs, err) - builder.OtelArrowReceiverInFlightRequests, err = builder.meters[configtelemetry.LevelBasic].Int64UpDownCounter( - "otelcol_otel_arrow_receiver_in_flight_requests", - metric.WithDescription("Number of requests in flight"), - metric.WithUnit("1"), - ) - errs = errors.Join(errs, err) - return &builder, errs -} diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go index a78378f54bd3..b0059a059685 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go @@ -206,7 +206,8 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) (net.Addr, *tra ReceiverCreateSettings: set, }) require.NoError(t, err) - bq := admission2.NewBoundedQueue(telset, maxBytes, 0) + bq, err := admission2.NewBoundedQueue(set.ID, telset, maxBytes, 0) + require.NoError(t, err) r := New(zap.NewNop(), mc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go index 251e194dde3c..49c3ffa4c2b2 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -206,7 +206,8 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) (net.Addr, *trac ReceiverCreateSettings: set, }) require.NoError(t, err) - bq := admission2.NewBoundedQueue(telset, maxBytes, 0) + bq, err := admission2.NewBoundedQueue(set.ID, telset, maxBytes, 0) + require.NoError(t, err) r := New(zap.NewNop(), tc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() diff --git a/receiver/otelarrowreceiver/metadata.yaml b/receiver/otelarrowreceiver/metadata.yaml index ab229b2e5f78..b9b3b23dcc04 100644 --- a/receiver/otelarrowreceiver/metadata.yaml +++ b/receiver/otelarrowreceiver/metadata.yaml @@ -7,29 +7,3 @@ status: distributions: [contrib, k8s] codeowners: active: [jmacd, moh-osman3] - -telemetry: - metrics: - otel_arrow_receiver_in_flight_bytes: - description: Number of bytes in flight - unit: By - enabled: true - sum: - monotonic: false - value_type: int - - otel_arrow_receiver_in_flight_items: - description: Number of items in flight - enabled: true - unit: "1" - sum: - monotonic: false - value_type: int - - otel_arrow_receiver_in_flight_requests: - description: Number of requests in flight - enabled: true - unit: "1" - sum: - monotonic: false - value_type: int diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index b21bea395e96..c53c85b32190 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -70,7 +70,10 @@ func newOTelArrowReceiver(cfg *Config, set receiver.Settings) (*otelArrowReceive if cfg.Admission.RequestLimitMiB == 0 { bq = admission2.NewUnboundedQueue() } else { - bq = admission2.NewBoundedQueue(set.TelemetrySettings, cfg.Admission.RequestLimitMiB<<20, cfg.Admission.WaitingLimitMiB<<20) + bq, err = admission2.NewBoundedQueue(set.ID, set.TelemetrySettings, cfg.Admission.RequestLimitMiB<<20, cfg.Admission.WaitingLimitMiB<<20) + if err != nil { + return nil, err + } } r := &otelArrowReceiver{ cfg: cfg,