diff --git a/collector/admission/boundedqueue.go b/collector/admission/boundedqueue.go index b19e72e8..bd5c5921 100644 --- a/collector/admission/boundedqueue.go +++ b/collector/admission/boundedqueue.go @@ -7,30 +7,31 @@ import ( "github.com/google/uuid" orderedmap "github.com/wk8/go-ordered-map/v2" + "go.opentelemetry.io/otel/trace" ) var ErrTooManyWaiters = fmt.Errorf("rejecting request, too many waiters") type BoundedQueue struct { - maxLimitBytes int64 + maxLimitBytes int64 maxLimitWaiters int64 - currentBytes int64 - currentWaiters int64 - lock sync.Mutex - waiters *orderedmap.OrderedMap[uuid.UUID, waiter] + currentBytes int64 + currentWaiters int64 + lock sync.Mutex + waiters *orderedmap.OrderedMap[uuid.UUID, waiter] } type waiter struct { - readyCh chan struct{} + readyCh chan struct{} pendingBytes int64 - ID uuid.UUID + ID uuid.UUID } func NewBoundedQueue(maxLimitBytes, maxLimitWaiters int64) *BoundedQueue { return &BoundedQueue{ - maxLimitBytes: maxLimitBytes, + maxLimitBytes: maxLimitBytes, maxLimitWaiters: maxLimitWaiters, - waiters: orderedmap.New[uuid.UUID, waiter](), + waiters: orderedmap.New[uuid.UUID, waiter](), } } @@ -42,13 +43,13 @@ func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) { return false, fmt.Errorf("rejecting request, request size larger than configured limit") } - if bq.currentBytes + pendingBytes <= bq.maxLimitBytes { // no need to wait to admit + if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { // no need to wait to admit bq.currentBytes += pendingBytes return true, nil } // since we were unable to admit, check if we can wait. - if bq.currentWaiters + 1 > bq.maxLimitWaiters { // too many waiters + if bq.currentWaiters+1 > bq.maxLimitWaiters { // too many waiters return false, ErrTooManyWaiters } @@ -66,7 +67,7 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error { // otherwise we need to wait for bytes to be released curWaiter := waiter{ pendingBytes: pendingBytes, - readyCh: make(chan struct{}), + readyCh: make(chan struct{}), } bq.lock.Lock() @@ -85,8 +86,11 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error { bq.lock.Unlock() + trace.SpanFromContext(ctx).AddEvent("request blocked, pending bytes at limit") + select { case <-curWaiter.readyCh: + trace.SpanFromContext(ctx).AddEvent("request admitted") return nil case <-ctx.Done(): // canceled before acquired so remove waiter. @@ -114,14 +118,11 @@ func (bq *BoundedQueue) Release(pendingBytes int64) error { return fmt.Errorf("released more bytes than acquired") } - for { - if bq.waiters.Len() == 0 { - return nil - } + for bq.waiters.Len() != 0 { next := bq.waiters.Oldest() nextWaiter := next.Value nextKey := next.Key - if bq.currentBytes + nextWaiter.pendingBytes <= bq.maxLimitBytes { + if bq.currentBytes+nextWaiter.pendingBytes <= bq.maxLimitBytes { bq.currentBytes += nextWaiter.pendingBytes bq.currentWaiters -= 1 close(nextWaiter.readyCh) @@ -142,9 +143,9 @@ func (bq *BoundedQueue) Release(pendingBytes int64) error { func (bq *BoundedQueue) TryAcquire(pendingBytes int64) bool { bq.lock.Lock() defer bq.lock.Unlock() - if bq.currentBytes + pendingBytes <= bq.maxLimitBytes { + if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { bq.currentBytes += pendingBytes return true } return false -} \ No newline at end of file +}