From 6eece9b82c8e266e7ca53f8c2cee4ff5c99b8f69 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 12 Nov 2024 08:21:51 -0800 Subject: [PATCH] (internal/otelarrow) Add new LIFO boundedqueue (#36140) --- .chloggen/otelarrow-lifo.yaml | 27 ++ internal/otelarrow/admission2/README.md | 32 ++ internal/otelarrow/admission2/boundedqueue.go | 177 ++++++++++ .../otelarrow/admission2/boundedqueue_test.go | 334 ++++++++++++++++++ internal/otelarrow/admission2/controller.go | 56 +++ internal/otelarrow/admission2/notification.go | 42 +++ .../otelarrow/admission2/notification_test.go | 43 +++ 7 files changed, 711 insertions(+) create mode 100644 .chloggen/otelarrow-lifo.yaml create mode 100644 internal/otelarrow/admission2/README.md create mode 100644 internal/otelarrow/admission2/boundedqueue.go create mode 100644 internal/otelarrow/admission2/boundedqueue_test.go create mode 100644 internal/otelarrow/admission2/controller.go create mode 100644 internal/otelarrow/admission2/notification.go create mode 100644 internal/otelarrow/admission2/notification_test.go diff --git a/.chloggen/otelarrow-lifo.yaml b/.chloggen/otelarrow-lifo.yaml new file mode 100644 index 000000000000..7d13d24fcd9f --- /dev/null +++ b/.chloggen/otelarrow-lifo.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add a new LIFO-based bounded queue. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36074] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/otelarrow/admission2/README.md b/internal/otelarrow/admission2/README.md new file mode 100644 index 000000000000..9076096e2ec5 --- /dev/null +++ b/internal/otelarrow/admission2/README.md @@ -0,0 +1,32 @@ +# Admission Package + +## Overview + +The admission package provides a BoundedQueue object. This object +implements a semaphore for limiting the number of bytes admitted into +a collector pipeline. Additionally, the BoundedQueue limits the +number of bytes allowed to block on a call to `Acquire(pending int64)`. + +There are two error conditions generated within this code: + +- `rejecting request, too much pending data`: When the limit on waiting bytes its reached, this will be returned to limit the total amount waiting. +- `rejecting request, request is too large`: When an individual request exceeds the configured limit, this will be returned without acquiring or waiting. + +The BoundedQueue implements LIFO semantics. See this +[article](https://medium.com/swlh/fifo-considered-harmful-793b76f98374) +explaining why it is preferred to FIFO semantics. + +## Usage + +Create a new BoundedQueue by calling `bq := admission.NewBoundedQueue(maxLimitBytes, maxLimitWaiting)` + +Within the component call `bq.Acquire(ctx, requestSize)` which will: + +1. succeed immediately if there is enough available memory, +2. fail immediately if there are too many waiters, or +3. block until context cancelation or enough bytes becomes available. + +When the resources have been acquired successfully, a closure is +returned that, when called, will release the semaphore. When the +semaphore is released, pending waiters that can be satisfied will +acquire the resource and become unblocked. diff --git a/internal/otelarrow/admission2/boundedqueue.go b/internal/otelarrow/admission2/boundedqueue.go new file mode 100644 index 000000000000..e0cd95e3bb2c --- /dev/null +++ b/internal/otelarrow/admission2/boundedqueue.go @@ -0,0 +1,177 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package admission2 // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" + +import ( + "container/list" + "context" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + grpccodes "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ErrTooMuchWaiting = status.Error(grpccodes.ResourceExhausted, "rejecting request, too much pending data") +var ErrRequestTooLarge = status.Errorf(grpccodes.InvalidArgument, "rejecting request, request is too large") + +// BoundedQueue is a LIFO-oriented admission-controlled Queue. +type BoundedQueue struct { + maxLimitAdmit uint64 + maxLimitWait uint64 + tracer trace.Tracer + + // lock protects currentAdmitted, currentWaiting, and waiters + + lock sync.Mutex + currentAdmitted uint64 + currentWaiting uint64 + waiters *list.List // of *waiter +} + +var _ Queue = &BoundedQueue{} + +// waiter is an item in the BoundedQueue waiters list. +type waiter struct { + notify N + pending uint64 +} + +// NewBoundedQueue returns a LIFO-oriented Queue implementation which +// admits `maxLimitAdmit` bytes concurrently and allows up to +// `maxLimitWait` bytes to wait for admission. +func NewBoundedQueue(ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait uint64) Queue { + return &BoundedQueue{ + maxLimitAdmit: maxLimitAdmit, + maxLimitWait: maxLimitWait, + waiters: list.New(), + tracer: ts.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"), + } +} + +// acquireOrGetWaiter returns with three distinct conditions depending +// on whether it was accepted, rejected, or asked to wait. +// +// - element=nil, error=nil: the fast success path +// - element=nil, error=non-nil: the fast failure path +// - element=non-nil, error=non-nil: the slow success path +func (bq *BoundedQueue) acquireOrGetWaiter(pending uint64) (*list.Element, error) { + if pending > bq.maxLimitAdmit { + // when the request will never succeed because it is + // individually over the total limit, fail fast. + return nil, ErrRequestTooLarge + } + + bq.lock.Lock() + defer bq.lock.Unlock() + + if bq.currentAdmitted+pending <= bq.maxLimitAdmit { + // the fast success path. + bq.currentAdmitted += pending + return nil, nil + } + + // since we were unable to admit, check if we can wait. + if bq.currentWaiting+pending > bq.maxLimitWait { + return nil, ErrTooMuchWaiting + } + + // otherwise we need to wait + return bq.addWaiterLocked(pending), nil +} + +// Acquire implements Queue. +func (bq *BoundedQueue) Acquire(ctx context.Context, pending uint64) (ReleaseFunc, error) { + element, err := bq.acquireOrGetWaiter(pending) + parentSpan := trace.SpanFromContext(ctx) + pendingAttr := trace.WithAttributes(attribute.Int64("pending", int64(pending))) + + if err != nil { + parentSpan.AddEvent("admission rejected (fast path)", pendingAttr) + return noopRelease, err + } else if element == nil { + parentSpan.AddEvent("admission accepted (fast path)", pendingAttr) + return bq.releaseFunc(pending), nil + } + + parentSpan.AddEvent("enter admission queue") + + ctx, span := bq.tracer.Start(ctx, "admission_blocked", pendingAttr) + defer span.End() + + waiter := element.Value.(*waiter) + + select { + case <-waiter.notify.Chan(): + parentSpan.AddEvent("admission accepted (slow path)", pendingAttr) + return bq.releaseFunc(pending), nil + + case <-ctx.Done(): + bq.lock.Lock() + defer bq.lock.Unlock() + + if waiter.notify.HasBeenNotified() { + // We were also admitted, which can happen + // concurrently with cancellation. Make sure + // to release since no one else will do it. + bq.releaseLocked(pending) + } else { + // Remove ourselves from the list of waiters + // so that we can't be admitted in the future. + bq.removeWaiterLocked(pending, element) + bq.admitWaitersLocked() + } + + parentSpan.AddEvent("admission rejected (canceled)", pendingAttr) + return noopRelease, status.Error(grpccodes.Canceled, context.Cause(ctx).Error()) + } +} + +func (bq *BoundedQueue) admitWaitersLocked() { + for bq.waiters.Len() != 0 { + // Ensure there is enough room to admit the next waiter. + element := bq.waiters.Back() + waiter := element.Value.(*waiter) + if bq.currentAdmitted+waiter.pending > bq.maxLimitAdmit { + // Returning means continuing to wait for the + // most recent arrival to get service by another release. + return + } + + // Release the next waiter and tell it that it has been admitted. + bq.removeWaiterLocked(waiter.pending, element) + bq.currentAdmitted += waiter.pending + + waiter.notify.Notify() + } +} + +func (bq *BoundedQueue) addWaiterLocked(pending uint64) *list.Element { + bq.currentWaiting += pending + return bq.waiters.PushBack(&waiter{ + pending: pending, + notify: newNotification(), + }) +} + +func (bq *BoundedQueue) removeWaiterLocked(pending uint64, element *list.Element) { + bq.currentWaiting -= pending + bq.waiters.Remove(element) +} + +func (bq *BoundedQueue) releaseLocked(pending uint64) { + bq.currentAdmitted -= pending + bq.admitWaitersLocked() +} + +func (bq *BoundedQueue) releaseFunc(pending uint64) ReleaseFunc { + return func() { + bq.lock.Lock() + defer bq.lock.Unlock() + + bq.releaseLocked(pending) + } +} diff --git a/internal/otelarrow/admission2/boundedqueue_test.go b/internal/otelarrow/admission2/boundedqueue_test.go new file mode 100644 index 000000000000..8903e8b90773 --- /dev/null +++ b/internal/otelarrow/admission2/boundedqueue_test.go @@ -0,0 +1,334 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package admission2 + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "google.golang.org/grpc/codes" + grpccodes "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type bqTest struct { + t *testing.T + *BoundedQueue +} + +var noopTelemetry = componenttest.NewNopTelemetrySettings() + +func newBQTest(t *testing.T, maxAdmit, maxWait uint64) bqTest { + return bqTest{ + t: t, + BoundedQueue: NewBoundedQueue(noopTelemetry, maxAdmit, maxWait).(*BoundedQueue), + } +} + +func (bq *bqTest) startWaiter(ctx context.Context, size uint64, relp *ReleaseFunc) N { + n := newNotification() + go func() { + var err error + *relp, err = bq.Acquire(ctx, size) + require.NoError(bq.t, err) + n.Notify() + }() + return n +} + +func (bq *bqTest) waitForPending(admitted, waiting uint64) { + require.Eventually(bq.t, func() bool { + bq.lock.Lock() + defer bq.lock.Unlock() + return bq.currentAdmitted == admitted && bq.currentWaiting == waiting + }, time.Second, 20*time.Millisecond) +} + +func mkRepeat(x uint64, n int) []uint64 { + if n == 0 { + return nil + } + return append(mkRepeat(x, n-1), x) +} + +func mkRange(from, to uint64) []uint64 { + if from > to { + return nil + } + return append([]uint64{from}, mkRange(from+1, to)...) +} + +func TestBoundedQueueLimits(t *testing.T) { + for _, test := range []struct { + name string + maxLimitAdmit uint64 + maxLimitWait uint64 + requestSizes []uint64 + timeout time.Duration + expectErrs map[string]int + }{ + { + name: "simple_no_waiters_25", + maxLimitAdmit: 1000, + maxLimitWait: 0, + requestSizes: mkRepeat(25, 40), + timeout: 0, + expectErrs: map[string]int{}, + }, + { + name: "simple_no_waiters_1", + maxLimitAdmit: 1000, + maxLimitWait: 0, + requestSizes: mkRepeat(1, 1000), + timeout: 0, + expectErrs: map[string]int{}, + }, + { + name: "without_waiting_remainder", + maxLimitAdmit: 1000, + maxLimitWait: 0, + requestSizes: mkRepeat(30, 40), + timeout: 0, + expectErrs: map[string]int{ + // 7 failures with a remainder of 10 + // 30 * (40 - 7) = 990 + ErrTooMuchWaiting.Error(): 7, + }, + }, + { + name: "without_waiting_complete", + maxLimitAdmit: 1000, + maxLimitWait: 0, + requestSizes: append(mkRepeat(30, 40), 10), + timeout: 0, + expectErrs: map[string]int{ + // 30*33+10 succeed, 7 failures (as above) + ErrTooMuchWaiting.Error(): 7, + }, + }, + { + name: "with_waiters_timeout", + maxLimitAdmit: 1000, + maxLimitWait: 1000, + requestSizes: mkRepeat(20, 100), + timeout: time.Second, + expectErrs: map[string]int{ + // 20*50=1000 is half of the requests timing out + status.Error(grpccodes.Canceled, context.DeadlineExceeded.Error()).Error(): 50, + }, + }, + { + name: "with_size_exceeded", + maxLimitAdmit: 1000, + maxLimitWait: 2000, + requestSizes: []uint64{1001}, + timeout: 0, + expectErrs: map[string]int{ + ErrRequestTooLarge.Error(): 1, + }, + }, + { + name: "mixed_sizes", + maxLimitAdmit: 45, // 45 is the exact sum of request sizes + maxLimitWait: 0, + requestSizes: mkRange(1, 9), + timeout: 0, + expectErrs: map[string]int{}, + }, + { + name: "too_many_mixed_sizes", + maxLimitAdmit: 44, // all but one request will succeed + maxLimitWait: 0, + requestSizes: mkRange(1, 9), + timeout: 0, + expectErrs: map[string]int{ + ErrTooMuchWaiting.Error(): 1, + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + bq := newBQTest(t, test.maxLimitAdmit, test.maxLimitWait) + ctx := context.Background() + + if test.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, test.timeout) + defer cancel() + } + + numRequests := len(test.requestSizes) + allErrors := make(chan error, numRequests) + + releaseChan := make(chan struct{}) + var wait1 sync.WaitGroup + var wait2 sync.WaitGroup + + wait1.Add(numRequests) + wait2.Add(numRequests) + + for _, requestSize := range test.requestSizes { + go func() { + release, err := bq.Acquire(ctx, requestSize) + allErrors <- err + + wait1.Done() + + <-releaseChan + + release() + + wait2.Done() + }() + } + + wait1.Wait() + + close(releaseChan) + + wait2.Wait() + + close(allErrors) + + errCounts := map[string]int{} + + for err := range allErrors { + if err == nil { + continue + } + errCounts[err.Error()]++ + } + + require.Equal(t, test.expectErrs, errCounts) + + // Make sure we can allocate the whole limit at end-of-test. + release, err := bq.Acquire(ctx, test.maxLimitAdmit) + assert.NoError(t, err) + release() + + // and the final state is all 0. + bq.waitForPending(0, 0) + }) + } +} + +func TestBoundedQueueLIFO(t *testing.T) { + const maxAdmit = 10 + + for _, firstAcquire := range mkRange(2, 8) { + for _, firstWait := range mkRange(2, 8) { + t.Run(fmt.Sprint(firstAcquire, ",", firstWait), func(t *testing.T) { + t.Parallel() + + bq := newBQTest(t, maxAdmit, maxAdmit) + ctx := context.Background() + + // Fill the queue + relFirst, err := bq.Acquire(ctx, firstAcquire) + require.NoError(t, err) + bq.waitForPending(firstAcquire, 0) + + relSecond, err := bq.Acquire(ctx, maxAdmit-firstAcquire-1) + require.NoError(t, err) + bq.waitForPending(maxAdmit-1, 0) + + relOne, err := bq.Acquire(ctx, 1) + require.NoError(t, err) + bq.waitForPending(maxAdmit, 0) + + // Create two half-size waiters + var relW0 ReleaseFunc + notW0 := bq.startWaiter(ctx, firstWait, &relW0) + bq.waitForPending(maxAdmit, firstWait) + + var relW1 ReleaseFunc + secondWait := maxAdmit - firstWait + notW1 := bq.startWaiter(ctx, secondWait, &relW1) + bq.waitForPending(maxAdmit, maxAdmit) + + relFirst() + + // early is true when releasing the first acquired + // will not make enough room for the first waiter + early := firstAcquire < secondWait + if early { + relSecond() + } + + // Expect notifications in LIFO order, i.e., W1 before W0. + select { + case <-notW0.Chan(): + t.Fatalf("FIFO order -- incorrect") + case <-notW1.Chan(): + if !early { + relSecond() + } + } + relOne() + + <-notW0.Chan() + + relW0() + relW1() + + bq.waitForPending(0, 0) + }) + } + } +} + +func TestBoundedQueueCancelation(t *testing.T) { + // this test attempts to exercise the race condition between + // the Acquire slow path and context cancelation. + const ( + repetition = 100 + maxAdmit = 10 + ) + bq := newBQTest(t, maxAdmit, maxAdmit) + + for number := range repetition { + ctx, cancel := context.WithCancel(context.Background()) + + tester := func() { + // This acquire either succeeds or is canceled. + testrel, err := bq.Acquire(ctx, maxAdmit) + defer testrel() + if err == nil { + return + } + serr, ok := status.FromError(err) + require.True(t, ok, "has gRPC status") + require.Equal(t, codes.Canceled, serr.Code()) + } + + release, err := bq.Acquire(ctx, maxAdmit) + require.NoError(t, err) + + go tester() + + if number%2 == 0 { + go cancel() + go release() + } else { + go release() + go cancel() + } + + bq.waitForPending(0, 0) + } +} + +func TestBoundedQueueNoop(t *testing.T) { + nq := NewUnboundedQueue() + for _, i := range mkRange(1, 100) { + rel, err := nq.Acquire(context.Background(), i<<20) + require.NoError(t, err) + defer rel() + } +} diff --git a/internal/otelarrow/admission2/controller.go b/internal/otelarrow/admission2/controller.go new file mode 100644 index 000000000000..6999a10838e9 --- /dev/null +++ b/internal/otelarrow/admission2/controller.go @@ -0,0 +1,56 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package admission2 // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" + +import ( + "context" +) + +// Queue is a weighted admission queue interface. +type Queue interface { + // Acquire asks the controller to admit the caller. + // + // The weight parameter specifies how large of an admission to make. + // This might be used on the bytes of request (for example) to differentiate + // between large and small requests. + // + // Admit will return when one of the following events occurs: + // + // (1) admission is allowed, or + // (2) the provided ctx becomes canceled, or + // (3) there are so many existing waiters that the + // controller decides to reject this caller without + // admitting it. + // + // In case (1), the return value will be a non-nil + // ReleaseFunc. The caller must invoke it after it is finished + // with the resource being guarded by the admission + // controller. + // + // In case (2), the return value will be a Cancelled or + // DeadlineExceeded error. + // + // In case (3), the return value will be a ResourceExhausted + // error. + Acquire(ctx context.Context, weight uint64) (ReleaseFunc, error) +} + +// ReleaseFunc is returned by Acquire when the Acquire() was admitted. +type ReleaseFunc func() + +type noopController struct{} + +var _ Queue = noopController{} + +// NewUnboundedQueue returns a no-op implementation of the Queue interface. +func NewUnboundedQueue() Queue { + return noopController{} +} + +func noopRelease() {} + +// Acquire implements Queue. +func (noopController) Acquire(_ context.Context, _ uint64) (ReleaseFunc, error) { + return noopRelease, nil +} diff --git a/internal/otelarrow/admission2/notification.go b/internal/otelarrow/admission2/notification.go new file mode 100644 index 000000000000..efbf66143548 --- /dev/null +++ b/internal/otelarrow/admission2/notification.go @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package admission2 // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" + +// notification.N is a minimal Go version of absl::Notification: +// +// https://github.com/abseil/abseil-cpp/blob/master/absl/synchronization/notification.h +// +// Use New() to construct a notification object (the zero value is not +// usable). +type N struct { + c chan struct{} +} + +func newNotification() N { + return N{c: make(chan struct{})} +} + +func (n *N) Notify() { + close(n.c) +} + +func (n *N) HasBeenNotified() bool { + select { + case <-n.c: + return true + default: + return false + } +} + +func (n *N) WaitForNotification() { + <-n.c +} + +// Chan allows a caller to wait for the notification as part of a +// select statement. Outside of a select statement, prefer writing +// WaitForNotification(). +func (n *N) Chan() <-chan struct{} { + return n.c +} diff --git a/internal/otelarrow/admission2/notification_test.go b/internal/otelarrow/admission2/notification_test.go new file mode 100644 index 000000000000..1e66d8445a41 --- /dev/null +++ b/internal/otelarrow/admission2/notification_test.go @@ -0,0 +1,43 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package admission2 + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestNotification(t *testing.T) { + require := require.New(t) + + start := newNotification() + require.False(start.HasBeenNotified()) + + done := make([]N, 5) + for i := 0; i < 5; i++ { + done[i] = newNotification() + go func(i int) { + start.WaitForNotification() + done[i].Notify() + }(i) + } + + // None of the goroutines can make progress until we notify + // start. + for now := time.Now(); time.Now().Before(now.Add(100 * time.Millisecond)); { + for _, n := range done { + require.False(n.HasBeenNotified()) + } + } + + start.Notify() + + // Now the goroutines can finish. + for _, n := range done { + n.WaitForNotification() + require.True(n.HasBeenNotified()) + } +}