Skip to content

Commit

Permalink
batch_processor.go
Browse files Browse the repository at this point in the history
  • Loading branch information
moh-osman3 committed Nov 15, 2023
1 parent ebd7d0b commit 677c45f
Showing 1 changed file with 120 additions and 0 deletions.
120 changes: 120 additions & 0 deletions collector/processor/concurrentbatchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,126 @@ func TestProcessorLifecycle(t *testing.T) {
}
}

type panicConsumer struct {
}

func (pc *panicConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
panic("testing panic")
return nil
}
func (pc *panicConsumer) ConsumeMetrics(ctx context.Context, td pmetric.Metrics) error {
panic("testing panic")
return nil
}

func (pc *panicConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func TestBatchProcessorSpansPanicRecover(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.Timeout = 10 * time.Second
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bp, err := newBatchTracesProcessor(creationSet, &panicConsumer{}, cfg, true)

require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 10
spansPerRequest := 100
sentResourceSpans := ptrace.NewTraces().ResourceSpans()
var wg sync.WaitGroup
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraces(spansPerRequest)
spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex))
}
td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty())
// ConsumeTraces is a blocking function and should be run in a go routine
// until batch size reached to unblock.
wg.Add(1)
go func() {
err = bp.ConsumeTraces(context.Background(), td)
assert.Contains(t, err.Error(), "testing panic")
wg.Done()
}()
}

wg.Wait()
require.NoError(t, bp.Shutdown(context.Background()))
}

func TestBatchProcessorMetricsPanicRecover(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.Timeout = 10 * time.Second
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bp, err := newBatchMetricsProcessor(creationSet, &panicConsumer{}, cfg, true)

require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 10
metricsPerRequest := 100
sentResourceMetrics := pmetric.NewMetrics().ResourceMetrics()
var wg sync.WaitGroup
for requestNum := 0; requestNum < requestCount; requestNum++ {
md := testdata.GenerateMetrics(metricsPerRequest)
metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
for metricIndex := 0; metricIndex < metricsPerRequest; metricIndex++ {
metrics.At(metricIndex).SetName(getTestMetricName(requestNum, metricIndex))
}
md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty())
wg.Add(1)
go func() {
err = bp.ConsumeMetrics(context.Background(), md)
assert.Contains(t, err.Error(), "testing panic")
wg.Done()
}()
}

wg.Wait()
require.NoError(t, bp.Shutdown(context.Background()))
}

func TestBatchProcessorMetricsPanicRecover(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.Timeout = 10 * time.Second
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bp, err := newBatchMetricsProcessor(creationSet, &panicConsumer{}, cfg, true)

require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 10
logsPerRequest := 100
sentResourceLogs := plog.NewLogs().ResourceLogs()
var wg sync.WaitGroup
for requestNum := 0; requestNum < requestCount; requestNum++ {
ld := testdata.GenerateLogs(logsPerRequest)
logs := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords()
for logIndex := 0; logIndex < logsPerRequest; logIndex++ {
logs.At(logIndex).SetSeverityText(getTestLogSeverityText(requestNum, logIndex))
}
ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty())
wg.Add(1)
go func() {
err = batcher.ConsumeLogs(context.Background(), ld)
assert.Contains(t, err.Error(), "testing panic")
wg.Done()
}()
}

wg.Wait()
require.NoError(t, bp.Shutdown(context.Background()))
}

func TestBatchProcessorSpansDelivered(t *testing.T) {
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
Expand Down

0 comments on commit 677c45f

Please sign in to comment.