From 6e18d9be876dfd656304a55776b684591fbf1999 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 5 Sep 2024 07:36:18 -0700 Subject: [PATCH] Record admission-blocked event as a span & duration (#244) Alternative to https://github.com/open-telemetry/otel-arrow/pull/242 Allow us to monitor when requests are being blocked by admission limits. --- collector/admission/boundedqueue.go | 46 +++++++++++++++++++---------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/collector/admission/boundedqueue.go b/collector/admission/boundedqueue.go index b19e72e8..8aaa9ed1 100644 --- a/collector/admission/boundedqueue.go +++ b/collector/admission/boundedqueue.go @@ -7,33 +7,45 @@ import ( "github.com/google/uuid" orderedmap "github.com/wk8/go-ordered-map/v2" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" ) var ErrTooManyWaiters = fmt.Errorf("rejecting request, too many waiters") type BoundedQueue struct { - maxLimitBytes int64 + maxLimitBytes int64 maxLimitWaiters int64 - currentBytes int64 - currentWaiters int64 - lock sync.Mutex - waiters *orderedmap.OrderedMap[uuid.UUID, waiter] + currentBytes int64 + currentWaiters int64 + lock sync.Mutex + waiters *orderedmap.OrderedMap[uuid.UUID, waiter] + tracer trace.Tracer } type waiter struct { - readyCh chan struct{} + readyCh chan struct{} pendingBytes int64 - ID uuid.UUID + ID uuid.UUID } func NewBoundedQueue(maxLimitBytes, maxLimitWaiters int64) *BoundedQueue { return &BoundedQueue{ - maxLimitBytes: maxLimitBytes, + maxLimitBytes: maxLimitBytes, maxLimitWaiters: maxLimitWaiters, - waiters: orderedmap.New[uuid.UUID, waiter](), + waiters: orderedmap.New[uuid.UUID, waiter](), + tracer: noop.NewTracerProvider().Tracer(""), } } +func NewTracedBoundedQueue(tp trace.TracerProvider, maxLimitBytes, maxLimitWaiters int64) *BoundedQueue { + bq := NewBoundedQueue(maxLimitBytes, maxLimitWaiters) + bq.tracer = tp.Tracer("otel-arrow/admission") + return bq +} + func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) { bq.lock.Lock() defer bq.lock.Unlock() @@ -42,13 +54,13 @@ func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) { return false, fmt.Errorf("rejecting request, request size larger than configured limit") } - if bq.currentBytes + pendingBytes <= bq.maxLimitBytes { // no need to wait to admit + if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { // no need to wait to admit bq.currentBytes += pendingBytes return true, nil } // since we were unable to admit, check if we can wait. - if bq.currentWaiters + 1 > bq.maxLimitWaiters { // too many waiters + if bq.currentWaiters+1 > bq.maxLimitWaiters { // too many waiters return false, ErrTooManyWaiters } @@ -66,7 +78,7 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error { // otherwise we need to wait for bytes to be released curWaiter := waiter{ pendingBytes: pendingBytes, - readyCh: make(chan struct{}), + readyCh: make(chan struct{}), } bq.lock.Lock() @@ -84,6 +96,9 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error { } bq.lock.Unlock() + ctx, span := bq.tracer.Start(ctx, "admission_blocked", + trace.WithAttributes(attribute.Int64("pending", pendingBytes))) + defer span.End() select { case <-curWaiter.readyCh: @@ -93,6 +108,7 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error { bq.lock.Lock() defer bq.lock.Unlock() err = fmt.Errorf("context canceled: %w ", ctx.Err()) + span.SetStatus(codes.Error, "context canceled") _, found := bq.waiters.Delete(curWaiter.ID) if !found { @@ -121,7 +137,7 @@ func (bq *BoundedQueue) Release(pendingBytes int64) error { next := bq.waiters.Oldest() nextWaiter := next.Value nextKey := next.Key - if bq.currentBytes + nextWaiter.pendingBytes <= bq.maxLimitBytes { + if bq.currentBytes+nextWaiter.pendingBytes <= bq.maxLimitBytes { bq.currentBytes += nextWaiter.pendingBytes bq.currentWaiters -= 1 close(nextWaiter.readyCh) @@ -142,9 +158,9 @@ func (bq *BoundedQueue) Release(pendingBytes int64) error { func (bq *BoundedQueue) TryAcquire(pendingBytes int64) bool { bq.lock.Lock() defer bq.lock.Unlock() - if bq.currentBytes + pendingBytes <= bq.maxLimitBytes { + if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { bq.currentBytes += pendingBytes return true } return false -} \ No newline at end of file +}