diff --git a/.chloggen/consumer-queue.yaml b/.chloggen/consumer-queue.yaml new file mode 100644 index 00000000000..cdf1c224b52 --- /dev/null +++ b/.chloggen/consumer-queue.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Change queue to embed the async consumers. + +# One or more tracking issues or pull requests related to the change +issues: [12242] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/internal/obs_queue_test.go b/exporter/exporterhelper/internal/obs_queue_test.go index 44c3763bce1..5350c8192fa 100644 --- a/exporter/exporterhelper/internal/obs_queue_test.go +++ b/exporter/exporterhelper/internal/obs_queue_test.go @@ -24,8 +24,7 @@ import ( ) type fakeQueue[T any] struct { - component.StartFunc - component.ShutdownFunc + exporterqueue.Queue[T] offerErr error size int64 capacity int64 @@ -39,10 +38,6 @@ func (fq *fakeQueue[T]) Capacity() int64 { return fq.capacity } -func (fq *fakeQueue[T]) Read(context.Context) (context.Context, T, exporterqueue.DoneCallback, bool) { - panic("implement me") -} - func (fq *fakeQueue[T]) Offer(context.Context, T) error { return fq.offerErr } diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 35d0b45cebb..e2b125bbd44 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -66,9 +66,8 @@ func (qCfg *QueueConfig) Validate() error { } type QueueSender struct { - queue exporterqueue.Queue[internal.Request] - batcher queue.Batcher - consumers *queue.Consumers[internal.Request] + queue exporterqueue.Queue[internal.Request] + batcher component.Component } func NewQueueSender( @@ -79,15 +78,6 @@ func NewQueueSender( exportFailureMessage string, next Sender[internal.Request], ) (*QueueSender, error) { - q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg)) - if err != nil { - return nil, err - } - - qs := &QueueSender{ - queue: q, - } - exportFunc := func(ctx context.Context, req internal.Request) error { err := next.Send(ctx, req) if err != nil { @@ -96,12 +86,30 @@ func NewQueueSender( } return err } - if usePullingBasedExporterQueueBatcher.IsEnabled() { - qs.batcher, _ = queue.NewBatcher(bCfg, q, exportFunc, qCfg.NumConsumers) - } else { - qs.consumers = queue.NewQueueConsumers[internal.Request](q, qCfg.NumConsumers, exportFunc) + if !usePullingBasedExporterQueueBatcher.IsEnabled() { + q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, func(ctx context.Context, req internal.Request, done exporterqueue.DoneCallback) { + done(exportFunc(ctx, req)) + })) + if err != nil { + return nil, err + } + return &QueueSender{queue: q}, nil } - return qs, nil + + b, err := queue.NewBatcher(bCfg, exportFunc, qCfg.NumConsumers) + if err != nil { + return nil, err + } + // TODO: https://github.com/open-telemetry/opentelemetry-collector/issues/12244 + if bCfg.Enabled { + qCfg.NumConsumers = 1 + } + q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, b.Consume)) + if err != nil { + return nil, err + } + + return &QueueSender{queue: q, batcher: b}, nil } // Start is invoked during service startup. @@ -113,25 +121,22 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error { if usePullingBasedExporterQueueBatcher.IsEnabled() { return qs.batcher.Start(ctx, host) } - return qs.consumers.Start(ctx, host) + + return nil } // Shutdown is invoked during service shutdown. func (qs *QueueSender) Shutdown(ctx context.Context) error { - // Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only + // Stop the queue and batcher, this will drain the queue and will call the retry (which is stopped) that will only // try once every request. - - if err := qs.queue.Shutdown(ctx); err != nil { - return err - } + err := qs.queue.Shutdown(ctx) if usePullingBasedExporterQueueBatcher.IsEnabled() { - return qs.batcher.Shutdown(ctx) + return errors.Join(err, qs.batcher.Shutdown(ctx)) } - err := qs.consumers.Shutdown(ctx) return err } -// send implements the requestSender interface. It puts the request in the queue. +// Send implements the requestSender interface. It puts the request in the queue. func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error { // Prevent cancellation and deadline to propagate to the context stored in the queue. // The grpc/http based receivers will cancel the request context after this function returns. diff --git a/exporter/exporterqueue/bounded_memory_queue.go b/exporter/exporterqueue/bounded_memory_queue.go index f3cc8bf58ce..2c8e1d1e0ff 100644 --- a/exporter/exporterqueue/bounded_memory_queue.go +++ b/exporter/exporterqueue/bounded_memory_queue.go @@ -30,7 +30,7 @@ type memoryQueueSettings[T any] struct { // newBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional // callback for dropped items (e.g. useful to emit metrics). -func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] { +func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) readableQueue[T] { return &boundedMemoryQueue[T]{ sizedQueue: newSizedQueue[T](set.capacity, set.sizer, set.blocking), } diff --git a/exporter/exporterqueue/bounded_memory_queue_test.go b/exporter/exporterqueue/bounded_memory_queue_test.go index baff4bbc7ac..b408fbcecb7 100644 --- a/exporter/exporterqueue/bounded_memory_queue_test.go +++ b/exporter/exporterqueue/bounded_memory_queue_test.go @@ -74,11 +74,11 @@ func TestBoundedQueue(t *testing.T) { func TestShutdownWhileNotEmpty(t *testing.T) { q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1000}) - assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) for i := 0; i < 10; i++ { require.NoError(t, q.Offer(context.Background(), strconv.FormatInt(int64(i), 10))) } - assert.NoError(t, q.Shutdown(context.Background())) + require.NoError(t, q.Shutdown(context.Background())) assert.Equal(t, int64(10), q.Size()) numConsumed := 0 @@ -115,16 +115,15 @@ func TestQueueUsage(t *testing.T) { t.Run(tt.name, func(t *testing.T) { q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100)}) consumed := &atomic.Int64{} - require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) - ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error { + ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done DoneCallback) { consumed.Add(1) - return nil + done(nil) }) + require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost())) for j := 0; j < 10; j++ { require.NoError(t, q.Offer(context.Background(), uint64(10))) } - assert.NoError(t, q.Shutdown(context.Background())) - assert.NoError(t, ac.Shutdown(context.Background())) + require.NoError(t, ac.Shutdown(context.Background())) assert.Equal(t, int64(10), consumed.Load()) }) } @@ -148,11 +147,11 @@ func TestBlockingQueueUsage(t *testing.T) { t.Run(tt.name, func(t *testing.T) { q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(100), blocking: true}) consumed := &atomic.Int64{} - require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) - ac := newAsyncConsumer(q, 10, func(context.Context, uint64) error { + ac := newConsumerQueue(q, 10, func(_ context.Context, _ uint64, done DoneCallback) { consumed.Add(1) - return nil + done(nil) }) + require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost())) wg := &sync.WaitGroup{} for i := 0; i < 10; i++ { wg.Add(1) @@ -164,8 +163,7 @@ func TestBlockingQueueUsage(t *testing.T) { }() } wg.Wait() - assert.NoError(t, q.Shutdown(context.Background())) - assert.NoError(t, ac.Shutdown(context.Background())) + require.NoError(t, ac.Shutdown(context.Background())) assert.Equal(t, int64(1_000_000), consumed.Load()) }) } @@ -173,16 +171,13 @@ func TestBlockingQueueUsage(t *testing.T) { func TestZeroSizeNoConsumers(t *testing.T) { q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 0}) - err := q.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) - require.ErrorIs(t, q.Offer(context.Background(), "a"), ErrQueueIsFull) // in process - assert.NoError(t, q.Shutdown(context.Background())) } -func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool { +func consume[T any](q readableQueue[T], consumeFunc func(context.Context, T) error) bool { ctx, req, done, ok := q.Read(context.Background()) if !ok { return false @@ -191,35 +186,6 @@ func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool return true } -type asyncConsumer struct { - stopWG sync.WaitGroup -} - -func newAsyncConsumer[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *asyncConsumer { - ac := &asyncConsumer{} - - ac.stopWG.Add(numConsumers) - for i := 0; i < numConsumers; i++ { - go func() { - defer ac.stopWG.Done() - for { - ctx, req, done, ok := q.Read(context.Background()) - if !ok { - return - } - done(consumeFunc(ctx, req)) - } - }() - } - return ac -} - -// Shutdown ensures that queue and all consumers are stopped. -func (qc *asyncConsumer) Shutdown(_ context.Context) error { - qc.stopWG.Wait() - return nil -} - func BenchmarkOffer(b *testing.B) { tests := []struct { name string @@ -236,20 +202,20 @@ func BenchmarkOffer(b *testing.B) { } for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: &requestSizer[uint64]{}, capacity: int64(10 * b.N)}) + q := newBoundedMemoryQueue[uint64](memoryQueueSettings[uint64]{sizer: tt.sizer, capacity: int64(10 * b.N)}) consumed := &atomic.Int64{} require.NoError(b, q.Start(context.Background(), componenttest.NewNopHost())) - ac := newAsyncConsumer(q, 1, func(context.Context, uint64) error { + ac := newConsumerQueue(q, 1, func(_ context.Context, _ uint64, done DoneCallback) { consumed.Add(1) - return nil + done(nil) }) + require.NoError(b, ac.Start(context.Background(), componenttest.NewNopHost())) b.ResetTimer() b.ReportAllocs() for j := 0; j < b.N; j++ { require.NoError(b, q.Offer(context.Background(), uint64(10))) } - assert.NoError(b, q.Shutdown(context.Background())) - assert.NoError(b, ac.Shutdown(context.Background())) + require.NoError(b, ac.Shutdown(context.Background())) assert.Equal(b, int64(b.N), consumed.Load()) }) } diff --git a/exporter/exporterqueue/consumers.go b/exporter/exporterqueue/consumers.go new file mode 100644 index 00000000000..022f04b9310 --- /dev/null +++ b/exporter/exporterqueue/consumers.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" + +import ( + "context" + "sync" + + "go.opentelemetry.io/collector/component" +) + +type consumerQueue[T any] struct { + readableQueue[T] + numConsumers int + consumeFunc ConsumeFunc[T] + stopWG sync.WaitGroup +} + +func newConsumerQueue[T any](q readableQueue[T], numConsumers int, consumeFunc ConsumeFunc[T]) *consumerQueue[T] { + return &consumerQueue[T]{ + readableQueue: q, + numConsumers: numConsumers, + consumeFunc: consumeFunc, + } +} + +// Start ensures that queue and all consumers are started. +func (qc *consumerQueue[T]) Start(ctx context.Context, host component.Host) error { + if err := qc.readableQueue.Start(ctx, host); err != nil { + return err + } + var startWG sync.WaitGroup + for i := 0; i < qc.numConsumers; i++ { + qc.stopWG.Add(1) + startWG.Add(1) + go func() { + startWG.Done() + defer qc.stopWG.Done() + for { + ctx, req, done, ok := qc.readableQueue.Read(context.Background()) + if !ok { + return + } + qc.consumeFunc(ctx, req, done) + } + }() + } + startWG.Wait() + + return nil +} + +// Shutdown ensures that queue and all consumers are stopped. +func (qc *consumerQueue[T]) Shutdown(ctx context.Context) error { + err := qc.readableQueue.Shutdown(ctx) + qc.stopWG.Wait() + return err +} diff --git a/exporter/exporterqueue/persistent_queue.go b/exporter/exporterqueue/persistent_queue.go index f7665357a9d..b35f98682be 100644 --- a/exporter/exporterqueue/persistent_queue.go +++ b/exporter/exporterqueue/persistent_queue.go @@ -92,7 +92,7 @@ type persistentQueue[T any] struct { } // newPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage -func newPersistentQueue[T any](set persistentQueueSettings[T]) Queue[T] { +func newPersistentQueue[T any](set persistentQueueSettings[T]) readableQueue[T] { _, isRequestSized := set.sizer.(*requestSizer[T]) pq := &persistentQueue[T]{ set: set, diff --git a/exporter/exporterqueue/persistent_queue_test.go b/exporter/exporterqueue/persistent_queue_test.go index 11d7cd7f0c8..7abdf3e481c 100644 --- a/exporter/exporterqueue/persistent_queue_test.go +++ b/exporter/exporterqueue/persistent_queue_test.go @@ -229,13 +229,14 @@ func createAndStartTestPersistentQueue(t *testing.T, sizer sizer[uint64], capaci unmarshaler: uint64Unmarshaler, set: exportertest.NewNopSettings(), }) + ac := newConsumerQueue(pq, numConsumers, func(ctx context.Context, item uint64, done DoneCallback) { + done(consumeFunc(ctx, item)) + }) host := &mockHost{ext: map[component.ID]component.Component{ {}: storagetest.NewMockStorageExtension(nil), }} - require.NoError(t, pq.Start(context.Background(), host)) - ac := newAsyncConsumer(pq, numConsumers, consumeFunc) + require.NoError(t, ac.Start(context.Background(), host)) t.Cleanup(func() { - require.NoError(t, pq.Shutdown(context.Background())) assert.NoError(t, ac.Shutdown(context.Background())) }) return pq @@ -423,15 +424,15 @@ func TestPersistentBlockingQueue(t *testing.T) { unmarshaler: uint64Unmarshaler, set: exportertest.NewNopSettings(), }) + consumed := &atomic.Int64{} + ac := newConsumerQueue(pq, 10, func(_ context.Context, _ uint64, done DoneCallback) { + consumed.Add(1) + done(nil) + }) host := &mockHost{ext: map[component.ID]component.Component{ {}: storagetest.NewMockStorageExtension(nil), }} - require.NoError(t, pq.Start(context.Background(), host)) - consumed := &atomic.Int64{} - ac := newAsyncConsumer(pq, 10, func(context.Context, uint64) error { - consumed.Add(int64(1)) - return nil - }) + require.NoError(t, ac.Start(context.Background(), host)) td := uint64(10) wg := &sync.WaitGroup{} @@ -449,8 +450,7 @@ func TestPersistentBlockingQueue(t *testing.T) { assert.Eventually(t, func() bool { return 1_000_000 == int(consumed.Load()) }, 5*time.Second, 10*time.Millisecond) - assert.NoError(t, pq.Shutdown(context.Background())) - assert.NoError(t, ac.Shutdown(context.Background())) + require.NoError(t, ac.Shutdown(context.Background())) }) } } @@ -800,8 +800,8 @@ func TestPersistentQueue_PutCloseReadClose(t *testing.T) { assert.Equal(t, int64(0), ps.Size()) // Put two elements and close the extension - assert.NoError(t, ps.Offer(context.Background(), req)) - assert.NoError(t, ps.Offer(context.Background(), req)) + require.NoError(t, ps.Offer(context.Background(), req)) + require.NoError(t, ps.Offer(context.Background(), req)) assert.Equal(t, int64(2), ps.Size()) // TODO: Remove this, after the initialization writes the readIndex. _, _, _, _ = ps.Read(context.Background()) @@ -822,7 +822,7 @@ func TestPersistentQueue_PutCloseReadClose(t *testing.T) { return nil }) require.Equal(t, int64(0), newPs.Size()) - assert.NoError(t, newPs.Shutdown(context.Background())) + require.NoError(t, newPs.Shutdown(context.Background())) } func BenchmarkPersistentQueue(b *testing.B) { @@ -1018,9 +1018,9 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) { assert.Equal(t, int64(0), pq.Size()) // Fill the queue up to the capacity. - assert.NoError(t, pq.Offer(context.Background(), uint64(40))) - assert.NoError(t, pq.Offer(context.Background(), uint64(40))) - assert.NoError(t, pq.Offer(context.Background(), uint64(20))) + require.NoError(t, pq.Offer(context.Background(), uint64(40))) + require.NoError(t, pq.Offer(context.Background(), uint64(40))) + require.NoError(t, pq.Offer(context.Background(), uint64(20))) assert.Equal(t, int64(100), pq.Size()) require.ErrorIs(t, pq.Offer(context.Background(), uint64(25)), ErrQueueIsFull) @@ -1056,7 +1056,7 @@ func TestPersistentQueue_ItemsCapacityUsageRestoredOnShutdown(t *testing.T) { })) assert.Equal(t, int64(10), newPQ.Size()) - assert.NoError(t, newPQ.Shutdown(context.Background())) + require.NoError(t, newPQ.Shutdown(context.Background())) } // This test covers the case when the items capacity queue is enabled for the first time. @@ -1066,9 +1066,9 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { assert.Equal(t, int64(0), pq.Size()) - assert.NoError(t, pq.Offer(context.Background(), uint64(40))) - assert.NoError(t, pq.Offer(context.Background(), uint64(20))) - assert.NoError(t, pq.Offer(context.Background(), uint64(25))) + require.NoError(t, pq.Offer(context.Background(), uint64(40))) + require.NoError(t, pq.Offer(context.Background(), uint64(20))) + require.NoError(t, pq.Offer(context.Background(), uint64(25))) assert.Equal(t, int64(3), pq.Size()) assert.True(t, consume(pq, func(_ context.Context, val uint64) error { @@ -1113,7 +1113,7 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { })) assert.Equal(t, int64(15), newPQ.Size()) - assert.NoError(t, newPQ.Shutdown(context.Background())) + require.NoError(t, newPQ.Shutdown(context.Background())) } // This test covers the case when the queue is restarted with the less capacity than needed to restore the queued items. @@ -1124,10 +1124,10 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { assert.Equal(t, int64(0), pq.Size()) - assert.NoError(t, pq.Offer(context.Background(), uint64(40))) - assert.NoError(t, pq.Offer(context.Background(), uint64(20))) - assert.NoError(t, pq.Offer(context.Background(), uint64(25))) - assert.NoError(t, pq.Offer(context.Background(), uint64(5))) + require.NoError(t, pq.Offer(context.Background(), uint64(40))) + require.NoError(t, pq.Offer(context.Background(), uint64(20))) + require.NoError(t, pq.Offer(context.Background(), uint64(25))) + require.NoError(t, pq.Offer(context.Background(), uint64(5))) // Read the first request just to populate the read index in the storage. // Otherwise, the write index won't be restored either. @@ -1165,9 +1165,9 @@ func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { assert.Equal(t, int64(1), newPQ.Size()) // Now it can accept new items - assert.NoError(t, newPQ.Offer(context.Background(), uint64(10))) + require.NoError(t, newPQ.Offer(context.Background(), uint64(10))) - assert.NoError(t, newPQ.Shutdown(context.Background())) + require.NoError(t, newPQ.Shutdown(context.Background())) } // This test covers the case when the persistent storage is recovered from a snapshot which has @@ -1205,8 +1205,8 @@ func TestPersistentQueue_RestoredUsedSizeIsCorrectedOnDrain(t *testing.T) { assert.True(t, consume(newPQ, func(context.Context, uint64) error { return nil })) assert.Equal(t, int64(0), newPQ.Size()) - assert.NoError(t, newPQ.Shutdown(context.Background())) - assert.NoError(t, pq.Shutdown(context.Background())) + require.NoError(t, newPQ.Shutdown(context.Background())) + require.NoError(t, pq.Shutdown(context.Background())) } func requireCurrentlyDispatchedItemsEqual(t *testing.T, pq *persistentQueue[uint64], compare []uint64) { diff --git a/exporter/exporterqueue/queue.go b/exporter/exporterqueue/queue.go index b4f93fc332f..3a99906a170 100644 --- a/exporter/exporterqueue/queue.go +++ b/exporter/exporterqueue/queue.go @@ -24,6 +24,8 @@ var ErrQueueIsFull = errors.New("sending queue is full") // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type DoneCallback func(processErr error) +type ConsumeFunc[T any] func(context.Context, T, DoneCallback) + // Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue // (boundedMemoryQueue) or via a disk-based queue (persistentQueue) // Experimental: This API is at the early stage of development and may change without backward compatibility @@ -38,6 +40,11 @@ type Queue[T any] interface { Size() int64 // Capacity returns the capacity of the queue. Capacity() int64 +} + +// TODO: Investigate why linter "unused" fails if add a private "read" func on the Queue. +type readableQueue[T any] interface { + Queue[T] // Read pulls the next available item from the queue along with its done callback. Once processing is // finished, the done callback must be called to clean up the storage. // The function blocks until an item is available or if the queue is stopped. @@ -64,18 +71,19 @@ type Unmarshaler[T any] func([]byte) (T, error) // Factory is a function that creates a new queue. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type Factory[T any] func(context.Context, Settings, Config) Queue[T] +type Factory[T any] func(context.Context, Settings, Config, ConsumeFunc[T]) Queue[T] // NewMemoryQueueFactory returns a factory to create a new memory queue. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func NewMemoryQueueFactory[T any]() Factory[T] { - return func(_ context.Context, _ Settings, cfg Config) Queue[T] { - return newBoundedMemoryQueue[T](memoryQueueSettings[T]{ + return func(_ context.Context, _ Settings, cfg Config, consume ConsumeFunc[T]) Queue[T] { + q := newBoundedMemoryQueue[T](memoryQueueSettings[T]{ sizer: &requestSizer[T]{}, capacity: int64(cfg.QueueSize), blocking: cfg.Blocking, }) + return newConsumerQueue(q, cfg.NumConsumers, consume) } } @@ -97,8 +105,8 @@ func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings P if storageID == nil { return NewMemoryQueueFactory[T]() } - return func(_ context.Context, set Settings, cfg Config) Queue[T] { - return newPersistentQueue[T](persistentQueueSettings[T]{ + return func(_ context.Context, set Settings, cfg Config, consume ConsumeFunc[T]) Queue[T] { + q := newPersistentQueue[T](persistentQueueSettings[T]{ sizer: &requestSizer[T]{}, capacity: int64(cfg.QueueSize), blocking: cfg.Blocking, @@ -108,5 +116,6 @@ func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings P unmarshaler: factorySettings.Unmarshaler, set: set.ExporterSettings, }) + return newConsumerQueue(q, cfg.NumConsumers, consume) } } diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go index 8089323d2ea..7e71dd9f826 100644 --- a/exporter/internal/queue/batcher.go +++ b/exporter/internal/queue/batcher.go @@ -5,7 +5,6 @@ package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" import ( "context" - "sync" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/exporterbatcher" @@ -13,64 +12,18 @@ import ( "go.opentelemetry.io/collector/exporter/internal" ) -type batch struct { - ctx context.Context - req internal.Request - dones []exporterqueue.DoneCallback -} - // Batcher is in charge of reading items from the queue and send them out asynchronously. type Batcher interface { component.Component -} - -type BaseBatcher struct { - batchCfg exporterbatcher.Config - queue exporterqueue.Queue[internal.Request] - workerPool chan struct{} - exportFunc func(ctx context.Context, req internal.Request) error - stopWG sync.WaitGroup + Consume(context.Context, internal.Request, exporterqueue.DoneCallback) } func NewBatcher(batchCfg exporterbatcher.Config, - queue exporterqueue.Queue[internal.Request], exportFunc func(ctx context.Context, req internal.Request) error, maxWorkers int, ) (Batcher, error) { if !batchCfg.Enabled { - return &DisabledBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil - } - return &DefaultBatcher{BaseBatcher: newBaseBatcher(batchCfg, queue, exportFunc, maxWorkers)}, nil -} - -func newBaseBatcher(batchCfg exporterbatcher.Config, - queue exporterqueue.Queue[internal.Request], - exportFunc func(ctx context.Context, req internal.Request) error, - maxWorkers int, -) BaseBatcher { - workerPool := make(chan struct{}, maxWorkers) - for i := 0; i < maxWorkers; i++ { - workerPool <- struct{}{} + return newDisabledBatcher(exportFunc), nil } - return BaseBatcher{ - batchCfg: batchCfg, - queue: queue, - workerPool: workerPool, - exportFunc: exportFunc, - stopWG: sync.WaitGroup{}, - } -} - -// flush starts a goroutine that calls exportFunc. It blocks until a worker is available if necessary. -func (qb *BaseBatcher) flush(ctx context.Context, req internal.Request, dones []exporterqueue.DoneCallback) { - qb.stopWG.Add(1) - <-qb.workerPool - go func() { - defer qb.stopWG.Done() - err := qb.exportFunc(ctx, req) - for _, done := range dones { - done(err) - } - qb.workerPool <- struct{}{} - }() + return newDefaultBatcher(batchCfg, exportFunc, maxWorkers), nil } diff --git a/exporter/internal/queue/consumers.go b/exporter/internal/queue/consumers.go deleted file mode 100644 index dc7418c4e5a..00000000000 --- a/exporter/internal/queue/consumers.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" - -import ( - "context" - "sync" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/exporter/exporterqueue" -) - -type Consumers[T any] struct { - queue exporterqueue.Queue[T] - numConsumers int - consumeFunc func(context.Context, T) error - stopWG sync.WaitGroup -} - -func NewQueueConsumers[T any](q exporterqueue.Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *Consumers[T] { - return &Consumers[T]{ - queue: q, - numConsumers: numConsumers, - consumeFunc: consumeFunc, - } -} - -// Start ensures that queue and all consumers are started. -func (qc *Consumers[T]) Start(_ context.Context, _ component.Host) error { - var startWG sync.WaitGroup - for i := 0; i < qc.numConsumers; i++ { - qc.stopWG.Add(1) - startWG.Add(1) - go func() { - startWG.Done() - defer qc.stopWG.Done() - for { - ctx, req, done, ok := qc.queue.Read(context.Background()) - if !ok { - return - } - done(qc.consumeFunc(ctx, req)) - } - }() - } - startWG.Wait() - - return nil -} - -// Shutdown ensures that queue and all consumers are stopped. -func (qc *Consumers[T]) Shutdown(_ context.Context) error { - qc.stopWG.Wait() - return nil -} diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go index b9ff68b1dcd..c625fc15a46 100644 --- a/exporter/internal/queue/default_batcher.go +++ b/exporter/internal/queue/default_batcher.go @@ -5,119 +5,130 @@ package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" import ( "context" - "math" "sync" "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/internal" ) -// DefaultBatcher continuously reads from the queue and flushes asynchronously if size limit is met or on timeout. -type DefaultBatcher struct { - BaseBatcher +type batch struct { + ctx context.Context + req internal.Request + dones []exporterqueue.DoneCallback +} + +// defaultBatcher continuously batch incoming requests and flushes asynchronously if minimum size limit is met or on timeout. +type defaultBatcher struct { + batchCfg exporterbatcher.Config + workerPool chan struct{} + exportFunc func(ctx context.Context, req internal.Request) error + stopWG sync.WaitGroup currentBatchMu sync.Mutex currentBatch *batch timer *time.Timer shutdownCh chan struct{} } -func (qb *DefaultBatcher) resetTimer() { +func newDefaultBatcher(batchCfg exporterbatcher.Config, + exportFunc func(ctx context.Context, req internal.Request) error, + maxWorkers int, +) *defaultBatcher { + workerPool := make(chan struct{}, maxWorkers) + for i := 0; i < maxWorkers; i++ { + workerPool <- struct{}{} + } + return &defaultBatcher{ + batchCfg: batchCfg, + workerPool: workerPool, + exportFunc: exportFunc, + stopWG: sync.WaitGroup{}, + shutdownCh: make(chan struct{}, 1), + } +} + +func (qb *defaultBatcher) resetTimer() { if qb.batchCfg.FlushTimeout != 0 { qb.timer.Reset(qb.batchCfg.FlushTimeout) } } -// startReadingFlushingGoroutine starts a goroutine that reads and then flushes. -func (qb *DefaultBatcher) startReadingFlushingGoroutine() { - qb.stopWG.Add(1) - go func() { - defer qb.stopWG.Done() - for { - // Read() blocks until the queue is non-empty or until the queue is stopped. - ctx, req, done, ok := qb.queue.Read(context.Background()) - if !ok { - close(qb.shutdownCh) +func (qb *defaultBatcher) Consume(ctx context.Context, req internal.Request, done exporterqueue.DoneCallback) { + qb.currentBatchMu.Lock() + if qb.batchCfg.MaxSizeItems > 0 { + var reqList []internal.Request + var mergeSplitErr error + if qb.currentBatch == nil || qb.currentBatch.req == nil { + qb.resetTimer() + reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil) + } else { + reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req) + } + + if mergeSplitErr != nil || reqList == nil { + done(mergeSplitErr) + qb.currentBatchMu.Unlock() + return + } + + // If there was a split, we flush everything immediately. + if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 { + qb.currentBatch = nil + qb.currentBatchMu.Unlock() + for i := 0; i < len(reqList); i++ { + qb.flush(ctx, reqList[i], []exporterqueue.DoneCallback{done}) + // TODO: handle partial failure + } + qb.resetTimer() + } else { + qb.currentBatch = &batch{ + req: reqList[0], + ctx: ctx, + dones: []exporterqueue.DoneCallback{done}, + } + qb.currentBatchMu.Unlock() + } + } else { + if qb.currentBatch == nil || qb.currentBatch.req == nil { + qb.resetTimer() + qb.currentBatch = &batch{ + req: req, + ctx: ctx, + dones: []exporterqueue.DoneCallback{done}, + } + } else { + // TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified + mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req) + if mergeErr != nil { + done(mergeErr) + qb.currentBatchMu.Unlock() return } - - qb.currentBatchMu.Lock() - - if qb.batchCfg.MaxSizeItems > 0 { - var reqList []internal.Request - var mergeSplitErr error - if qb.currentBatch == nil || qb.currentBatch.req == nil { - qb.resetTimer() - reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil) - } else { - reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req) - } - - if mergeSplitErr != nil || reqList == nil { - done(mergeSplitErr) - qb.currentBatchMu.Unlock() - continue - } - - // If there was a split, we flush everything immediately. - if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 { - qb.currentBatch = nil - qb.currentBatchMu.Unlock() - for i := 0; i < len(reqList); i++ { - qb.flush(ctx, reqList[i], []exporterqueue.DoneCallback{done}) - // TODO: handle partial failure - } - qb.resetTimer() - } else { - qb.currentBatch = &batch{ - req: reqList[0], - ctx: ctx, - dones: []exporterqueue.DoneCallback{done}, - } - qb.currentBatchMu.Unlock() - } - } else { - if qb.currentBatch == nil || qb.currentBatch.req == nil { - qb.resetTimer() - qb.currentBatch = &batch{ - req: req, - ctx: ctx, - dones: []exporterqueue.DoneCallback{done}, - } - } else { - // TODO: consolidate implementation for the cases where MaxSizeConfig is specified and the case where it is not specified - mergedReq, mergeErr := qb.currentBatch.req.MergeSplit(qb.currentBatch.ctx, qb.batchCfg.MaxSizeConfig, req) - if mergeErr != nil { - done(mergeErr) - qb.currentBatchMu.Unlock() - continue - } - qb.currentBatch = &batch{ - req: mergedReq[0], - ctx: qb.currentBatch.ctx, - dones: append(qb.currentBatch.dones, done), - } - } - - if qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSizeItems { - batchToFlush := *qb.currentBatch - qb.currentBatch = nil - qb.currentBatchMu.Unlock() - - // flush() blocks until successfully started a goroutine for flushing. - qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.dones) - qb.resetTimer() - } else { - qb.currentBatchMu.Unlock() - } + qb.currentBatch = &batch{ + req: mergedReq[0], + ctx: qb.currentBatch.ctx, + dones: append(qb.currentBatch.dones, done), } } - }() + + if qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSizeItems { + batchToFlush := *qb.currentBatch + qb.currentBatch = nil + qb.currentBatchMu.Unlock() + + // flush() blocks until successfully started a goroutine for flushing. + qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.dones) + qb.resetTimer() + } else { + qb.currentBatchMu.Unlock() + } + } } // startTimeBasedFlushingGoroutine starts a goroutine that flushes on timeout. -func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() { +func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() { qb.stopWG.Add(1) go func() { defer qb.stopWG.Done() @@ -133,23 +144,17 @@ func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() { } // Start starts the goroutine that reads from the queue and flushes asynchronously. -func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error { - qb.shutdownCh = make(chan struct{}, 1) - - if qb.batchCfg.FlushTimeout == 0 { - qb.timer = time.NewTimer(math.MaxInt) - qb.timer.Stop() - } else { +func (qb *defaultBatcher) Start(_ context.Context, _ component.Host) error { + if qb.batchCfg.FlushTimeout != 0 { qb.timer = time.NewTimer(qb.batchCfg.FlushTimeout) + qb.startTimeBasedFlushingGoroutine() } - qb.startReadingFlushingGoroutine() - qb.startTimeBasedFlushingGoroutine() return nil } // flushCurrentBatchIfNecessary sends out the current request batch if it is not nil -func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() { +func (qb *defaultBatcher) flushCurrentBatchIfNecessary() { qb.currentBatchMu.Lock() if qb.currentBatch == nil || qb.currentBatch.req == nil { qb.currentBatchMu.Unlock() @@ -164,8 +169,24 @@ func (qb *DefaultBatcher) flushCurrentBatchIfNecessary() { qb.resetTimer() } +// flush starts a goroutine that calls exportFunc. It blocks until a worker is available if necessary. +func (qb *defaultBatcher) flush(ctx context.Context, req internal.Request, dones []exporterqueue.DoneCallback) { + qb.stopWG.Add(1) + <-qb.workerPool + go func() { + defer qb.stopWG.Done() + err := qb.exportFunc(ctx, req) + for _, done := range dones { + done(err) + } + qb.workerPool <- struct{}{} + }() +} + // Shutdown ensures that queue and all Batcher are stopped. -func (qb *DefaultBatcher) Shutdown(_ context.Context) error { +func (qb *defaultBatcher) Shutdown(_ context.Context) error { + close(qb.shutdownCh) + // Make sure execute one last flush if necessary. qb.flushCurrentBatchIfNecessary() qb.stopWG.Wait() return nil diff --git a/exporter/internal/queue/default_batcher_test.go b/exporter/internal/queue/default_batcher_test.go index f9bb11ee1c5..a46649e0b80 100644 --- a/exporter/internal/queue/default_batcher_test.go +++ b/exporter/internal/queue/default_batcher_test.go @@ -45,18 +45,22 @@ func TestDefaultBatcher_NoSplit_MinThresholdZero_TimeoutDisabled(t *testing.T) { MinSizeItems: 0, } + ba, err := NewBatcher(cfg, + func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, + tt.maxWorkers) + require.NoError(t, err) + + // TODO: https://github.com/open-telemetry/opentelemetry-collector/issues/12244 + qCfg := exporterqueue.NewDefaultConfig() + qCfg.NumConsumers = 1 q := exporterqueue.NewMemoryQueueFactory[internal.Request]()( context.Background(), exporterqueue.Settings{ Signal: pipeline.SignalTraces, ExporterSettings: exportertest.NewNopSettings(), }, - exporterqueue.NewDefaultConfig()) - - ba, err := NewBatcher(cfg, q, - func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, - tt.maxWorkers) - require.NoError(t, err) + qCfg, + ba.Consume) require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) @@ -103,18 +107,22 @@ func TestDefaultBatcher_NoSplit_TimeoutDisabled(t *testing.T) { MinSizeItems: 10, } + ba, err := NewBatcher(cfg, + func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, + tt.maxWorkers) + require.NoError(t, err) + + // TODO: https://github.com/open-telemetry/opentelemetry-collector/issues/12244 + qCfg := exporterqueue.NewDefaultConfig() + qCfg.NumConsumers = 1 q := exporterqueue.NewMemoryQueueFactory[internal.Request]()( context.Background(), exporterqueue.Settings{ Signal: pipeline.SignalTraces, ExporterSettings: exportertest.NewNopSettings(), }, - exporterqueue.NewDefaultConfig()) - - ba, err := NewBatcher(cfg, q, - func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, - tt.maxWorkers) - require.NoError(t, err) + qCfg, + ba.Consume) require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) @@ -171,18 +179,22 @@ func TestDefaultBatcher_NoSplit_WithTimeout(t *testing.T) { MinSizeItems: 100, } + ba, err := NewBatcher(cfg, + func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, + tt.maxWorkers) + require.NoError(t, err) + + // TODO: https://github.com/open-telemetry/opentelemetry-collector/issues/12244 + qCfg := exporterqueue.NewDefaultConfig() + qCfg.NumConsumers = 1 q := exporterqueue.NewMemoryQueueFactory[internal.Request]()( context.Background(), exporterqueue.Settings{ Signal: pipeline.SignalTraces, ExporterSettings: exportertest.NewNopSettings(), }, - exporterqueue.NewDefaultConfig()) - - ba, err := NewBatcher(cfg, q, - func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, - tt.maxWorkers) - require.NoError(t, err) + qCfg, + ba.Consume) require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) @@ -239,18 +251,22 @@ func TestDefaultBatcher_Split_TimeoutDisabled(t *testing.T) { MaxSizeItems: 100, } + ba, err := NewBatcher(cfg, + func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, + tt.maxWorkers) + require.NoError(t, err) + + // TODO: https://github.com/open-telemetry/opentelemetry-collector/issues/12244 + qCfg := exporterqueue.NewDefaultConfig() + qCfg.NumConsumers = 1 q := exporterqueue.NewMemoryQueueFactory[internal.Request]()( context.Background(), exporterqueue.Settings{ Signal: pipeline.SignalTraces, ExporterSettings: exportertest.NewNopSettings(), }, - exporterqueue.NewDefaultConfig()) - - ba, err := NewBatcher(cfg, q, - func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, - tt.maxWorkers) - require.NoError(t, err) + qCfg, + ba.Consume) require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) @@ -288,18 +304,22 @@ func TestDefaultBatcher_Shutdown(t *testing.T) { batchCfg.MinSizeItems = 10 batchCfg.FlushTimeout = 100 * time.Second + ba, err := NewBatcher(batchCfg, + func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, + 2) + require.NoError(t, err) + + // TODO: https://github.com/open-telemetry/opentelemetry-collector/issues/12244 + qCfg := exporterqueue.NewDefaultConfig() + qCfg.NumConsumers = 1 q := exporterqueue.NewMemoryQueueFactory[internal.Request]()( context.Background(), exporterqueue.Settings{ Signal: pipeline.SignalTraces, ExporterSettings: exportertest.NewNopSettings(), }, - exporterqueue.NewDefaultConfig()) - - ba, err := NewBatcher(batchCfg, q, - func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, - 2) - require.NoError(t, err) + qCfg, + ba.Consume) require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) diff --git a/exporter/internal/queue/disabled_batcher.go b/exporter/internal/queue/disabled_batcher.go index 5b8b43476cd..9408aa34ce3 100644 --- a/exporter/internal/queue/disabled_batcher.go +++ b/exporter/internal/queue/disabled_batcher.go @@ -8,35 +8,21 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/exporterqueue" + "go.opentelemetry.io/collector/exporter/internal" ) -// DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will +// disabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will // be sent out (asynchronously) immediately regardless of the size. -type DisabledBatcher struct { - BaseBatcher +type disabledBatcher[T any] struct { + component.StartFunc + component.ShutdownFunc + exportFunc func(context.Context, T) error } -// Start starts the goroutine that reads from the queue and flushes asynchronously. -func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { - // This goroutine reads and then flushes. - // 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped. - // 2. flush() blocks until there are idle workers in the worker pool. - qb.stopWG.Add(1) - go func() { - defer qb.stopWG.Done() - for { - ctx, req, done, ok := qb.queue.Read(context.Background()) - if !ok { - return - } - qb.flush(ctx, req, []exporterqueue.DoneCallback{done}) - } - }() - return nil +func (db *disabledBatcher[T]) Consume(ctx context.Context, req T, done exporterqueue.DoneCallback) { + done(db.exportFunc(ctx, req)) } -// Shutdown ensures that queue and all Batcher are stopped. -func (qb *DisabledBatcher) Shutdown(_ context.Context) error { - qb.stopWG.Wait() - return nil +func newDisabledBatcher(exportFunc func(ctx context.Context, req internal.Request) error) Batcher { + return &disabledBatcher[internal.Request]{exportFunc: exportFunc} } diff --git a/exporter/internal/queue/disabled_batcher_test.go b/exporter/internal/queue/disabled_batcher_test.go index bba4427c2c3..dffa77fafeb 100644 --- a/exporter/internal/queue/disabled_batcher_test.go +++ b/exporter/internal/queue/disabled_batcher_test.go @@ -40,18 +40,19 @@ func TestDisabledBatcher_Basic(t *testing.T) { cfg := exporterbatcher.NewDefaultConfig() cfg.Enabled = false + ba, err := NewBatcher(cfg, + func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, + tt.maxWorkers) + require.NoError(t, err) + q := exporterqueue.NewMemoryQueueFactory[internal.Request]()( context.Background(), exporterqueue.Settings{ Signal: pipeline.SignalTraces, ExporterSettings: exportertest.NewNopSettings(), }, - exporterqueue.NewDefaultConfig()) - - ba, err := NewBatcher(cfg, q, - func(ctx context.Context, req internal.Request) error { return req.Export(ctx) }, - tt.maxWorkers) - require.NoError(t, err) + exporterqueue.NewDefaultConfig(), + ba.Consume) require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))