Skip to content

Commit

Permalink
[chore] Remove unnecessary code from obsQueue wrapper (#12241)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Feb 2, 2025
1 parent aac0a30 commit d89466c
Showing 1 changed file with 4 additions and 27 deletions.
31 changes: 4 additions & 27 deletions exporter/exporterhelper/internal/obs_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal"
Expand All @@ -18,7 +17,7 @@ import (

// obsQueue is a helper to add observability to a queue.
type obsQueue[T internal.Request] struct {
delegate exporterqueue.Queue[T]
exporterqueue.Queue[T]
tb *metadata.TelemetryBuilder
metricAttr metric.MeasurementOption
enqueueFailedInst metric.Int64Counter
Expand Down Expand Up @@ -49,7 +48,7 @@ func newObsQueue[T internal.Request](set exporterqueue.Settings, delegate export
}

or := &obsQueue[T]{
delegate: delegate,
Queue: delegate,
tb: tb,
metricAttr: metric.WithAttributeSet(attribute.NewSet(exporterAttr)),
}
Expand All @@ -66,39 +65,17 @@ func newObsQueue[T internal.Request](set exporterqueue.Settings, delegate export
return or, nil
}

func (or *obsQueue[T]) Start(ctx context.Context, host component.Host) error {
return or.delegate.Start(ctx, host)
}

func (or *obsQueue[T]) Shutdown(ctx context.Context) error {
defer or.tb.Shutdown()
return or.delegate.Shutdown(ctx)
return or.Queue.Shutdown(ctx)
}

func (or *obsQueue[T]) Offer(ctx context.Context, req T) error {
numItems := req.ItemsCount()
err := or.delegate.Offer(ctx, req)
err := or.Queue.Offer(ctx, req)
// No metrics recorded for profiles, remove enqueueFailedInst check with nil when profiles metrics available.
if err != nil && or.enqueueFailedInst != nil {
or.enqueueFailedInst.Add(ctx, int64(numItems), or.metricAttr)
}
return err
}

// Size returns the current Size of the queue
func (or *obsQueue[T]) Size() int64 {
return or.delegate.Size()
}

// Capacity returns the capacity of the queue.
func (or *obsQueue[T]) Capacity() int64 {
return or.delegate.Capacity()
}

// Read pulls the next available item from the queue along with its done callback. Once processing is
// finished, the done callback must be called to clean up the storage.
// The function blocks until an item is available or if the queue is stopped.
// If the queue is stopped returns false, otherwise true.
func (or *obsQueue[T]) Read(ctx context.Context) (context.Context, T, exporterqueue.DoneCallback, bool) {
return or.delegate.Read(ctx)
}

0 comments on commit d89466c

Please sign in to comment.