Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add span events for admission blocked/released events #242

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions collector/admission/boundedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,31 @@ import (

"github.com/google/uuid"
orderedmap "github.com/wk8/go-ordered-map/v2"
"go.opentelemetry.io/otel/trace"
)

var ErrTooManyWaiters = fmt.Errorf("rejecting request, too many waiters")

type BoundedQueue struct {
maxLimitBytes int64
maxLimitBytes int64
maxLimitWaiters int64
currentBytes int64
currentWaiters int64
lock sync.Mutex
waiters *orderedmap.OrderedMap[uuid.UUID, waiter]
currentBytes int64
currentWaiters int64
lock sync.Mutex
waiters *orderedmap.OrderedMap[uuid.UUID, waiter]
}

type waiter struct {
readyCh chan struct{}
readyCh chan struct{}
pendingBytes int64
ID uuid.UUID
ID uuid.UUID
}

func NewBoundedQueue(maxLimitBytes, maxLimitWaiters int64) *BoundedQueue {
return &BoundedQueue{
maxLimitBytes: maxLimitBytes,
maxLimitBytes: maxLimitBytes,
maxLimitWaiters: maxLimitWaiters,
waiters: orderedmap.New[uuid.UUID, waiter](),
waiters: orderedmap.New[uuid.UUID, waiter](),
}
}

Expand All @@ -42,13 +43,13 @@ func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) {
return false, fmt.Errorf("rejecting request, request size larger than configured limit")
}

if bq.currentBytes + pendingBytes <= bq.maxLimitBytes { // no need to wait to admit
if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { // no need to wait to admit
bq.currentBytes += pendingBytes
return true, nil
}

// since we were unable to admit, check if we can wait.
if bq.currentWaiters + 1 > bq.maxLimitWaiters { // too many waiters
if bq.currentWaiters+1 > bq.maxLimitWaiters { // too many waiters
return false, ErrTooManyWaiters
}

Expand All @@ -66,7 +67,7 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error {
// otherwise we need to wait for bytes to be released
curWaiter := waiter{
pendingBytes: pendingBytes,
readyCh: make(chan struct{}),
readyCh: make(chan struct{}),
}

bq.lock.Lock()
Expand All @@ -85,8 +86,11 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error {

bq.lock.Unlock()

trace.SpanFromContext(ctx).AddEvent("request blocked, pending bytes at limit")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm why add instrumentation in this package directly instead of adding instrumentation in the calling code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the calling code could unconditionally emit "requesting admission" and then "admission accepted" -- we would know the admission was blocked by the duration between these events. I thought that by instrumenting this package, we could limit the annotation to only the case where the request is blocked. Closed in favor of a different approach.

I've seen a span used to annotate the block


select {
case <-curWaiter.readyCh:
trace.SpanFromContext(ctx).AddEvent("request admitted")
return nil
case <-ctx.Done():
// canceled before acquired so remove waiter.
Expand Down Expand Up @@ -114,14 +118,11 @@ func (bq *BoundedQueue) Release(pendingBytes int64) error {
return fmt.Errorf("released more bytes than acquired")
}

for {
if bq.waiters.Len() == 0 {
return nil
}
for bq.waiters.Len() != 0 {
next := bq.waiters.Oldest()
nextWaiter := next.Value
nextKey := next.Key
if bq.currentBytes + nextWaiter.pendingBytes <= bq.maxLimitBytes {
if bq.currentBytes+nextWaiter.pendingBytes <= bq.maxLimitBytes {
bq.currentBytes += nextWaiter.pendingBytes
bq.currentWaiters -= 1
close(nextWaiter.readyCh)
Expand All @@ -142,9 +143,9 @@ func (bq *BoundedQueue) Release(pendingBytes int64) error {
func (bq *BoundedQueue) TryAcquire(pendingBytes int64) bool {
bq.lock.Lock()
defer bq.lock.Unlock()
if bq.currentBytes + pendingBytes <= bq.maxLimitBytes {
if bq.currentBytes+pendingBytes <= bq.maxLimitBytes {
bq.currentBytes += pendingBytes
return true
}
return false
}
}
Loading