Skip to content

Commit

Permalink
Implemented disabled queue
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Jan 18, 2025
1 parent a3c7d95 commit ff4a975
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 6 deletions.
18 changes: 18 additions & 0 deletions .chloggen/disabled_queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds "disabled" queue which is used when the user sets up batching but not queuing.

# One or more tracking issues or pull requests related to the change
issues: [8122, 10368]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
20 changes: 18 additions & 2 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,24 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg)
}

if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled ||
usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
if usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled {
be.queueFactory = exporterqueue.NewDisabledQueueFactory[internal.Request]()
be.queueCfg.QueueSize = be.BatcherCfg.MinSizeItems
q := be.queueFactory(
context.Background(),
exporterqueue.Settings{
Signal: signal,
ExporterSettings: be.Set,
},
be.queueCfg)
be.QueueSender = NewQueueSender(
q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg).MarkAsDisabled()
for _, op := range options {
err = multierr.Append(err, op(be))
}
}

if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set)
be.BatchSender = bs
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func TestBatchSender_PostShutdown(t *testing.T) {
assert.Equal(t, int64(8), sink.itemsCount.Load())
})
}
runTest("enable_queue_batcher", true)
// We don't expect the same behavior when disable_queue_batcher is true
runTest("disable_queue_batcher", false)
}

Expand Down
18 changes: 15 additions & 3 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type QueueSender struct {
batcher queue.Batcher
consumers *queue.Consumers[internal.Request]

enabled bool

obsrep *ObsReport
exporterID component.ID
logger *zap.Logger
Expand All @@ -97,6 +99,7 @@ func NewQueueSender(
obsrep: obsrep,
exporterID: set.ID,
logger: set.Logger,
enabled: true,
}

exportFunc := func(ctx context.Context, req internal.Request) error {
Expand All @@ -115,6 +118,13 @@ func NewQueueSender(
return qs
}

// MarkAsDisabled marks queue sender as disabled and returns the queue sender.
// If queue sender is disabled, then requests are blocked until they are successfully sent out.
func (qs *QueueSender) MarkAsDisabled() *QueueSender {
qs.enabled = false
return qs
}

// Start is invoked during service startup.
func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
if err := qs.queue.Start(ctx, host); err != nil {
Expand Down Expand Up @@ -180,10 +190,12 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error {
func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error {
// Prevent cancellation and deadline to propagate to the context stored in the queue.
// The grpc/http based receivers will cancel the request context after this function returns.
c := context.WithoutCancel(ctx)
if qs.enabled {
ctx = context.WithoutCancel(ctx)
}

span := trace.SpanFromContext(c)
if err := qs.queue.Offer(c, req); err != nil {
span := trace.SpanFromContext(ctx)
if err := qs.queue.Offer(ctx, req); err != nil {
span.AddEvent("Failed to enqueue item.", trace.WithAttributes(qs.traceAttribute))
return err
}
Expand Down
87 changes: 87 additions & 0 deletions exporter/exporterqueue/disabled_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
)

// boundedQueue blocks insert until the batch containing the request is sent out.
type disabledQueue[T any] struct {
component.StartFunc
*sizedQueue[disabledMemQueueEl[T]]

mu sync.Mutex
nextIndex uint64
doneCh map[uint64](chan error)
}

type disabledMemQueueEl[T any] struct {
index uint64
req T
}

// QueueSettings defines internal parameters for boundedQueue creation.
type disabledQueueSettings[T any] struct {
sizer sizer[T]
capacity int64
}

type disabledQueueSizer[T any] struct {
sizer sizer[T]
}

func (s disabledQueueSizer[T]) Sizeof(item disabledMemQueueEl[T]) int64 {
return s.sizer.Sizeof(item.req)
}

// NewBoundedQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewDisabledQueue[T any](set disabledQueueSettings[T]) Queue[T] {
return &disabledQueue[T]{
sizedQueue: newSizedQueue[disabledMemQueueEl[T]](
set.capacity,
disabledQueueSizer[T]{sizer: set.sizer},
true /*blocking*/),
doneCh: make(map[uint64](chan error)),
}
}

// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *disabledQueue[T]) Offer(ctx context.Context, req T) error {
q.mu.Lock()
index := q.nextIndex
q.nextIndex++
done := make(chan error)
q.doneCh[index] = done

if err := q.sizedQueue.Offer(
ctx,
disabledMemQueueEl[T]{req: req, index: index}); err != nil {
delete(q.doneCh, index)
q.mu.Unlock()
return err
}
q.mu.Unlock()
err := <-done
return err
}

func (q *disabledQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
ctx, item, ok := q.sizedQueue.pop()
return item.index, ctx, item.req, ok
}

// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished.
// For in queue, this function is noop.
func (q *disabledQueue[T]) OnProcessingFinished(index uint64, err error) {
q.mu.Lock()
defer q.mu.Unlock()

q.doneCh[index] <- err
delete(q.doneCh, index)
}
37 changes: 37 additions & 0 deletions exporter/exporterqueue/disabled_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterqueue

import (
"context"
"errors"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBlockingMemoryQueue(t *testing.T) {
var wg sync.WaitGroup
q := NewDisabledQueue[string](disabledQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1})

err := errors.New("This is an error")
wg.Add(1)
go func() {
assert.EqualError(t, q.Offer(context.Background(), "a"), err.Error()) // Blocks until OnProcessingFinished is called
wg.Done()
}()

index, ctx, req, ok := q.Read(context.Background())
for !ok {
index, ctx, req, ok = q.Read(context.Background())
}

require.Equal(t, uint64(0), index)
require.Equal(t, context.Background(), ctx)
require.Equal(t, "a", req)
q.OnProcessingFinished(index, err)
wg.Wait()
}
12 changes: 12 additions & 0 deletions exporter/exporterqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,15 @@ func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings P
})
}
}

// NewDisabledQueueFactory returns a factory to create a new disabled queue.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewDisabledQueueFactory[T any]() Factory[T] {
return func(_ context.Context, _ Settings, cfg Config) Queue[T] {
return NewDisabledQueue[T](disabledQueueSettings[T]{
sizer: &requestSizer[T]{},
capacity: int64(cfg.QueueSize),
})
}
}

0 comments on commit ff4a975

Please sign in to comment.