Skip to content

Commit

Permalink
Make otelarrow admission control metrics consistent (open-telemetry#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd authored Nov 15, 2024
1 parent c406f44 commit a7d32bf
Show file tree
Hide file tree
Showing 21 changed files with 425 additions and 217 deletions.
31 changes: 31 additions & 0 deletions .chloggen/otelarrow-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: New admission control metrics are consistent across Arrow and OTLP data paths.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36334]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
`otelcol_otelarrow_admission_in_flight_bytes` new, replaces `otelcol_otel_arrow_receiver_in_flight_bytes`
`otelcol_otelarrow_admission_waiting_bytes`: new, describes waiting requests
`otelcol_otel_arrow_receiver_in_flight_items`: removed
`otelcol_otel_arrow_receiver_in_flight_requests`: removed
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
39 changes: 34 additions & 5 deletions internal/otelarrow/admission2/boundedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

internalmetadata "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
)

var (
Expand All @@ -22,9 +26,10 @@ var (

// BoundedQueue is a LIFO-oriented admission-controlled Queue.
type BoundedQueue struct {
maxLimitAdmit uint64
maxLimitWait uint64
tracer trace.Tracer
maxLimitAdmit uint64
maxLimitWait uint64
tracer trace.Tracer
telemetryBuilder *internalmetadata.TelemetryBuilder

// lock protects currentAdmitted, currentWaiting, and waiters

Expand All @@ -45,13 +50,37 @@ type waiter struct {
// NewBoundedQueue returns a LIFO-oriented Queue implementation which
// admits `maxLimitAdmit` bytes concurrently and allows up to
// `maxLimitWait` bytes to wait for admission.
func NewBoundedQueue(ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait uint64) Queue {
return &BoundedQueue{
func NewBoundedQueue(id component.ID, ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait uint64) (Queue, error) {
bq := &BoundedQueue{
maxLimitAdmit: maxLimitAdmit,
maxLimitWait: maxLimitWait,
waiters: list.New(),
tracer: ts.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"),
}
attr := metric.WithAttributes(attribute.String(netstats.ReceiverKey, id.String()))
telemetryBuilder, err := internalmetadata.NewTelemetryBuilder(ts,
internalmetadata.WithOtelarrowAdmissionInFlightBytesCallback(bq.inFlightCB, attr),
internalmetadata.WithOtelarrowAdmissionWaitingBytesCallback(bq.waitingCB, attr),
)
if err != nil {
return nil, err
}
bq.telemetryBuilder = telemetryBuilder
return bq, nil
}

func (bq *BoundedQueue) inFlightCB() int64 {
// Note, see https://github.com/open-telemetry/otel-arrow/issues/270
bq.lock.Lock()
defer bq.lock.Unlock()
return int64(bq.currentAdmitted)
}

func (bq *BoundedQueue) waitingCB() int64 {
// Note, see https://github.com/open-telemetry/otel-arrow/issues/270
bq.lock.Lock()
defer bq.lock.Unlock()
return int64(bq.currentWaiting)
}

// acquireOrGetWaiter returns with three distinct conditions depending
Expand Down
198 changes: 149 additions & 49 deletions internal/otelarrow/admission2/boundedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,53 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
"google.golang.org/grpc/codes"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
)

const (
expectScope = "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"
expectInFlightName = "otelcol_otelarrow_admission_in_flight_bytes"
expectWaitingName = "otelcol_otelarrow_admission_waiting_bytes"
)

type bqTest struct {
t *testing.T
t *testing.T
reader *sdkmetric.ManualReader
provider *sdkmetric.MeterProvider
*BoundedQueue
}

var noopTelemetry = componenttest.NewNopTelemetrySettings()

func newBQTest(t *testing.T, maxAdmit, maxWait uint64) bqTest {
settings := componenttest.NewNopTelemetrySettings()

reader := sdkmetric.NewManualReader()
provider := sdkmetric.NewMeterProvider(
sdkmetric.WithResource(resource.Empty()),
sdkmetric.WithReader(reader),
)
settings.MeterProvider = provider
settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
return settings.MeterProvider
}

bq, err := NewBoundedQueue(component.MustNewID("admission_testing"), settings, maxAdmit, maxWait)
require.NoError(t, err)
return bqTest{
t: t,
BoundedQueue: NewBoundedQueue(noopTelemetry, maxAdmit, maxWait).(*BoundedQueue),
reader: reader,
provider: provider,
BoundedQueue: bq.(*BoundedQueue),
}
}

Expand Down Expand Up @@ -67,87 +97,97 @@ func mkRange(from, to uint64) []uint64 {

func TestBoundedQueueLimits(t *testing.T) {
for _, test := range []struct {
name string
maxLimitAdmit uint64
maxLimitWait uint64
requestSizes []uint64
timeout time.Duration
expectErrs map[string]int
name string
maxLimitAdmit uint64
maxLimitWait uint64
expectAcquired [2]int64
requestSizes []uint64
timeout time.Duration
expectErrs map[string]int
}{
{
name: "simple_no_waiters_25",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: mkRepeat(25, 40),
timeout: 0,
expectErrs: map[string]int{},
name: "simple_no_waiters_25",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: mkRepeat(25, 40),
expectAcquired: [2]int64{1000, 1000},
timeout: 0,
expectErrs: map[string]int{},
},
{
name: "simple_no_waiters_1",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: mkRepeat(1, 1000),
timeout: 0,
expectErrs: map[string]int{},
name: "simple_no_waiters_1",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: mkRepeat(1, 1000),
expectAcquired: [2]int64{1000, 1000},
timeout: 0,
expectErrs: map[string]int{},
},
{
name: "without_waiting_remainder",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: mkRepeat(30, 40),
timeout: 0,
name: "without_waiting_remainder",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: mkRepeat(30, 40),
expectAcquired: [2]int64{990, 990},
timeout: 0,
expectErrs: map[string]int{
// 7 failures with a remainder of 10
// 30 * (40 - 7) = 990
ErrTooMuchWaiting.Error(): 7,
},
},
{
name: "without_waiting_complete",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: append(mkRepeat(30, 40), 10),
timeout: 0,
name: "without_waiting_complete",
maxLimitAdmit: 1000,
maxLimitWait: 0,
requestSizes: append(mkRepeat(30, 40), 10),
expectAcquired: [2]int64{1000, 1000},
timeout: 0,
expectErrs: map[string]int{
// 30*33+10 succeed, 7 failures (as above)
ErrTooMuchWaiting.Error(): 7,
},
},
{
name: "with_waiters_timeout",
maxLimitAdmit: 1000,
maxLimitWait: 1000,
requestSizes: mkRepeat(20, 100),
timeout: time.Second,
name: "with_waiters_timeout",
maxLimitAdmit: 1000,
maxLimitWait: 1000,
requestSizes: mkRepeat(20, 100),
expectAcquired: [2]int64{1000, 1000},
timeout: time.Second,
expectErrs: map[string]int{
// 20*50=1000 is half of the requests timing out
status.Error(grpccodes.Canceled, context.DeadlineExceeded.Error()).Error(): 50,
},
},
{
name: "with_size_exceeded",
maxLimitAdmit: 1000,
maxLimitWait: 2000,
requestSizes: []uint64{1001},
timeout: 0,
name: "with_size_exceeded",
maxLimitAdmit: 1000,
maxLimitWait: 2000,
requestSizes: []uint64{1001},
expectAcquired: [2]int64{0, 0},
timeout: 0,
expectErrs: map[string]int{
ErrRequestTooLarge.Error(): 1,
},
},
{
name: "mixed_sizes",
maxLimitAdmit: 45, // 45 is the exact sum of request sizes
maxLimitWait: 0,
requestSizes: mkRange(1, 9),
timeout: 0,
expectErrs: map[string]int{},
name: "mixed_sizes",
maxLimitAdmit: 45, // 45 is the exact sum of request sizes
maxLimitWait: 0,
requestSizes: mkRange(1, 9),
expectAcquired: [2]int64{45, 45},
timeout: 0,
expectErrs: map[string]int{},
},
{
name: "too_many_mixed_sizes",
maxLimitAdmit: 44, // all but one request will succeed
maxLimitWait: 0,
requestSizes: mkRange(1, 9),
timeout: 0,
// worst case is the size=9 fails, so minimum is 44-9+1
expectAcquired: [2]int64{36, 44},
timeout: 0,
expectErrs: map[string]int{
ErrTooMuchWaiting.Error(): 1,
},
Expand Down Expand Up @@ -190,6 +230,12 @@ func TestBoundedQueueLimits(t *testing.T) {

wait1.Wait()

// The in-flight bytes are in-range, none waiting.
inflight, waiting := bq.verifyMetrics(t)
require.LessOrEqual(t, test.expectAcquired[0], inflight)
require.GreaterOrEqual(t, test.expectAcquired[1], inflight)
require.Equal(t, int64(0), waiting)

close(releaseChan)

wait2.Wait()
Expand All @@ -214,10 +260,55 @@ func TestBoundedQueueLimits(t *testing.T) {

// and the final state is all 0.
bq.waitForPending(0, 0)

// metrics are zero
inflight, waiting = bq.verifyMetrics(t)
require.Equal(t, int64(0), inflight)
require.Equal(t, int64(0), waiting)
})
}
}

func (bq bqTest) verifyPoint(t *testing.T, m metricdata.Metrics) int64 {
switch a := m.Data.(type) {
case metricdata.Sum[int64]:
require.Len(t, a.DataPoints, 1)
dp := a.DataPoints[0]
for _, attr := range dp.Attributes.ToSlice() {
if attr.Key == netstats.ReceiverKey && attr.Value.AsString() == "admission_testing" {
return dp.Value
}
}
t.Errorf("point value not found: %v", m.Data)
default:
t.Errorf("incorrect metric data type: %T", m.Data)
}
return -1
}

func (bq bqTest) verifyMetrics(t *testing.T) (inflight int64, waiting int64) {
inflight = -1
waiting = -1

var rm metricdata.ResourceMetrics
require.NoError(t, bq.reader.Collect(context.Background(), &rm))

for _, sm := range rm.ScopeMetrics {
if sm.Scope.Name != expectScope {
continue
}
for _, m := range sm.Metrics {
switch m.Name {
case expectInFlightName:
inflight = bq.verifyPoint(t, m)
case expectWaitingName:
waiting = bq.verifyPoint(t, m)
}
}
}
return
}

func TestBoundedQueueLIFO(t *testing.T) {
const maxAdmit = 10

Expand Down Expand Up @@ -252,6 +343,11 @@ func TestBoundedQueueLIFO(t *testing.T) {
notW1 := bq.startWaiter(ctx, secondWait, &relW1)
bq.waitForPending(maxAdmit, maxAdmit)

// The in-flight and waiting bytes are counted.
inflight, waiting := bq.verifyMetrics(t)
require.Equal(t, int64(maxAdmit), inflight)
require.Equal(t, int64(maxAdmit), waiting)

relFirst()

// early is true when releasing the first acquired
Expand All @@ -278,6 +374,10 @@ func TestBoundedQueueLIFO(t *testing.T) {
relW1()

bq.waitForPending(0, 0)

inflight, waiting = bq.verifyMetrics(t)
require.Equal(t, int64(0), inflight)
require.Equal(t, int64(0), waiting)
})
}
}
Expand Down
6 changes: 6 additions & 0 deletions internal/otelarrow/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml

package otelarrow // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"
Loading

0 comments on commit a7d32bf

Please sign in to comment.