-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
(internal/otelarrow) Add new LIFO boundedqueue (#36140)
- Loading branch information
Showing
7 changed files
with
711 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: enhancement | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: otelarrowreceiver | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Add a new LIFO-based bounded queue. | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [36074] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [user] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# Admission Package | ||
|
||
## Overview | ||
|
||
The admission package provides a BoundedQueue object. This object | ||
implements a semaphore for limiting the number of bytes admitted into | ||
a collector pipeline. Additionally, the BoundedQueue limits the | ||
number of bytes allowed to block on a call to `Acquire(pending int64)`. | ||
|
||
There are two error conditions generated within this code: | ||
|
||
- `rejecting request, too much pending data`: When the limit on waiting bytes its reached, this will be returned to limit the total amount waiting. | ||
- `rejecting request, request is too large`: When an individual request exceeds the configured limit, this will be returned without acquiring or waiting. | ||
|
||
The BoundedQueue implements LIFO semantics. See this | ||
[article](https://medium.com/swlh/fifo-considered-harmful-793b76f98374) | ||
explaining why it is preferred to FIFO semantics. | ||
|
||
## Usage | ||
|
||
Create a new BoundedQueue by calling `bq := admission.NewBoundedQueue(maxLimitBytes, maxLimitWaiting)` | ||
|
||
Within the component call `bq.Acquire(ctx, requestSize)` which will: | ||
|
||
1. succeed immediately if there is enough available memory, | ||
2. fail immediately if there are too many waiters, or | ||
3. block until context cancelation or enough bytes becomes available. | ||
|
||
When the resources have been acquired successfully, a closure is | ||
returned that, when called, will release the semaphore. When the | ||
semaphore is released, pending waiters that can be satisfied will | ||
acquire the resource and become unblocked. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package admission2 // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" | ||
|
||
import ( | ||
"container/list" | ||
"context" | ||
"sync" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/otel/attribute" | ||
"go.opentelemetry.io/otel/trace" | ||
grpccodes "google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
||
var ErrTooMuchWaiting = status.Error(grpccodes.ResourceExhausted, "rejecting request, too much pending data") | ||
var ErrRequestTooLarge = status.Errorf(grpccodes.InvalidArgument, "rejecting request, request is too large") | ||
|
||
// BoundedQueue is a LIFO-oriented admission-controlled Queue. | ||
type BoundedQueue struct { | ||
maxLimitAdmit uint64 | ||
maxLimitWait uint64 | ||
tracer trace.Tracer | ||
|
||
// lock protects currentAdmitted, currentWaiting, and waiters | ||
|
||
lock sync.Mutex | ||
currentAdmitted uint64 | ||
currentWaiting uint64 | ||
waiters *list.List // of *waiter | ||
} | ||
|
||
var _ Queue = &BoundedQueue{} | ||
|
||
// waiter is an item in the BoundedQueue waiters list. | ||
type waiter struct { | ||
notify N | ||
pending uint64 | ||
} | ||
|
||
// NewBoundedQueue returns a LIFO-oriented Queue implementation which | ||
// admits `maxLimitAdmit` bytes concurrently and allows up to | ||
// `maxLimitWait` bytes to wait for admission. | ||
func NewBoundedQueue(ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait uint64) Queue { | ||
return &BoundedQueue{ | ||
maxLimitAdmit: maxLimitAdmit, | ||
maxLimitWait: maxLimitWait, | ||
waiters: list.New(), | ||
tracer: ts.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"), | ||
} | ||
} | ||
|
||
// acquireOrGetWaiter returns with three distinct conditions depending | ||
// on whether it was accepted, rejected, or asked to wait. | ||
// | ||
// - element=nil, error=nil: the fast success path | ||
// - element=nil, error=non-nil: the fast failure path | ||
// - element=non-nil, error=non-nil: the slow success path | ||
func (bq *BoundedQueue) acquireOrGetWaiter(pending uint64) (*list.Element, error) { | ||
if pending > bq.maxLimitAdmit { | ||
// when the request will never succeed because it is | ||
// individually over the total limit, fail fast. | ||
return nil, ErrRequestTooLarge | ||
} | ||
|
||
bq.lock.Lock() | ||
defer bq.lock.Unlock() | ||
|
||
if bq.currentAdmitted+pending <= bq.maxLimitAdmit { | ||
// the fast success path. | ||
bq.currentAdmitted += pending | ||
return nil, nil | ||
} | ||
|
||
// since we were unable to admit, check if we can wait. | ||
if bq.currentWaiting+pending > bq.maxLimitWait { | ||
return nil, ErrTooMuchWaiting | ||
} | ||
|
||
// otherwise we need to wait | ||
return bq.addWaiterLocked(pending), nil | ||
} | ||
|
||
// Acquire implements Queue. | ||
func (bq *BoundedQueue) Acquire(ctx context.Context, pending uint64) (ReleaseFunc, error) { | ||
element, err := bq.acquireOrGetWaiter(pending) | ||
parentSpan := trace.SpanFromContext(ctx) | ||
pendingAttr := trace.WithAttributes(attribute.Int64("pending", int64(pending))) | ||
|
||
if err != nil { | ||
parentSpan.AddEvent("admission rejected (fast path)", pendingAttr) | ||
return noopRelease, err | ||
} else if element == nil { | ||
parentSpan.AddEvent("admission accepted (fast path)", pendingAttr) | ||
return bq.releaseFunc(pending), nil | ||
} | ||
|
||
parentSpan.AddEvent("enter admission queue") | ||
|
||
ctx, span := bq.tracer.Start(ctx, "admission_blocked", pendingAttr) | ||
defer span.End() | ||
|
||
waiter := element.Value.(*waiter) | ||
|
||
select { | ||
case <-waiter.notify.Chan(): | ||
parentSpan.AddEvent("admission accepted (slow path)", pendingAttr) | ||
return bq.releaseFunc(pending), nil | ||
|
||
case <-ctx.Done(): | ||
bq.lock.Lock() | ||
defer bq.lock.Unlock() | ||
|
||
if waiter.notify.HasBeenNotified() { | ||
// We were also admitted, which can happen | ||
// concurrently with cancellation. Make sure | ||
// to release since no one else will do it. | ||
bq.releaseLocked(pending) | ||
} else { | ||
// Remove ourselves from the list of waiters | ||
// so that we can't be admitted in the future. | ||
bq.removeWaiterLocked(pending, element) | ||
bq.admitWaitersLocked() | ||
} | ||
|
||
parentSpan.AddEvent("admission rejected (canceled)", pendingAttr) | ||
return noopRelease, status.Error(grpccodes.Canceled, context.Cause(ctx).Error()) | ||
} | ||
} | ||
|
||
func (bq *BoundedQueue) admitWaitersLocked() { | ||
for bq.waiters.Len() != 0 { | ||
// Ensure there is enough room to admit the next waiter. | ||
element := bq.waiters.Back() | ||
waiter := element.Value.(*waiter) | ||
if bq.currentAdmitted+waiter.pending > bq.maxLimitAdmit { | ||
// Returning means continuing to wait for the | ||
// most recent arrival to get service by another release. | ||
return | ||
} | ||
|
||
// Release the next waiter and tell it that it has been admitted. | ||
bq.removeWaiterLocked(waiter.pending, element) | ||
bq.currentAdmitted += waiter.pending | ||
|
||
waiter.notify.Notify() | ||
} | ||
} | ||
|
||
func (bq *BoundedQueue) addWaiterLocked(pending uint64) *list.Element { | ||
bq.currentWaiting += pending | ||
return bq.waiters.PushBack(&waiter{ | ||
pending: pending, | ||
notify: newNotification(), | ||
}) | ||
} | ||
|
||
func (bq *BoundedQueue) removeWaiterLocked(pending uint64, element *list.Element) { | ||
bq.currentWaiting -= pending | ||
bq.waiters.Remove(element) | ||
} | ||
|
||
func (bq *BoundedQueue) releaseLocked(pending uint64) { | ||
bq.currentAdmitted -= pending | ||
bq.admitWaitersLocked() | ||
} | ||
|
||
func (bq *BoundedQueue) releaseFunc(pending uint64) ReleaseFunc { | ||
return func() { | ||
bq.lock.Lock() | ||
defer bq.lock.Unlock() | ||
|
||
bq.releaseLocked(pending) | ||
} | ||
} |
Oops, something went wrong.