From 55c1ee7dc34cfb51061de18c9146938623fb1037 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Wed, 15 Nov 2023 17:37:26 -0500 Subject: [PATCH] fix tests --- collector/cmd/otelarrowcol/go.mod | 9 -------- .../batch_processor.go | 23 +++++++++++++++---- .../batch_processor_test.go | 14 +++++++---- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/collector/cmd/otelarrowcol/go.mod b/collector/cmd/otelarrowcol/go.mod index 1703961e..e958cae0 100644 --- a/collector/cmd/otelarrowcol/go.mod +++ b/collector/cmd/otelarrowcol/go.mod @@ -170,12 +170,3 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/open-telemetry/otel-arrow => ../../.. - -replace github.com/open-telemetry/otel-arrow/collector => ../../ - -// ambiguous import: found package cloud.google.com/go/compute/metadata in multiple modules -replace cloud.google.com/go => cloud.google.com/go v0.110.2 - -replace github.com/lightstep/telemetry-generator/generatorreceiver => ../../../../telemetry-generator/generatorreceiver/ diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 77b0aa03..e21a2a47 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -565,7 +565,12 @@ func (bt *batchTraces) sizeBytes(data any) int { return bt.sizer.TracesSize(data.(ptrace.Traces)) } -func (bt *batchTraces) export(ctx context.Context, req any) error { +func (bt *batchTraces) export(ctx context.Context, req any) (retErr error) { + defer func() { + if r := recover(); r != nil { + retErr = errors.New(r.(string)) + } + }() td := req.(ptrace.Traces) return bt.nextConsumer.ConsumeTraces(ctx, td) } @@ -605,7 +610,12 @@ func (bm *batchMetrics) sizeBytes(data any) int { return bm.sizer.MetricsSize(data.(pmetric.Metrics)) } -func (bm *batchMetrics) export(ctx context.Context, req any) error { +func (bm *batchMetrics) export(ctx context.Context, req any) (retErr error) { + defer func() { + if r := recover(); r != nil { + retErr = errors.New(r.(string)) + } + }() md := req.(pmetric.Metrics) return bm.nextConsumer.ConsumeMetrics(ctx, md) } @@ -657,7 +667,12 @@ func (bl *batchLogs) sizeBytes(data any) int { return bl.sizer.LogsSize(data.(plog.Logs)) } -func (bl *batchLogs) export(ctx context.Context, req any) error { +func (bl *batchLogs) export(ctx context.Context, req any) (retErr error) { + defer func() { + if r := recover(); r != nil { + retErr = errors.New(r.(string)) + } + }() ld := req.(plog.Logs) return bl.nextConsumer.ConsumeLogs(ctx, ld) } @@ -692,4 +707,4 @@ func (bl *batchLogs) add(item any) { } bl.logCount += newLogsCount ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs()) -} +} \ No newline at end of file diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index f8ad5f4a..625b3476 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -87,7 +87,11 @@ func (pc *panicConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) er panic("testing panic") return nil } -func (pc *panicConsumer) ConsumeMetrics(ctx context.Context, td pmetric.Metrics) error { +func (pc *panicConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + panic("testing panic") + return nil +} +func (pc *panicConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { panic("testing panic") return nil } @@ -166,13 +170,13 @@ func TestBatchProcessorMetricsPanicRecover(t *testing.T) { require.NoError(t, bp.Shutdown(context.Background())) } -func TestBatchProcessorMetricsPanicRecover(t *testing.T) { +func TestBatchProcessorLogsPanicRecover(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 cfg.Timeout = 10 * time.Second creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - bp, err := newBatchMetricsProcessor(creationSet, &panicConsumer{}, cfg, true) + bp, err := newBatchLogsProcessor(creationSet, &panicConsumer{}, cfg, true) require.NoError(t, err) require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) @@ -190,7 +194,7 @@ func TestBatchProcessorMetricsPanicRecover(t *testing.T) { ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty()) wg.Add(1) go func() { - err = batcher.ConsumeLogs(context.Background(), ld) + err = bp.ConsumeLogs(context.Background(), ld) assert.Contains(t, err.Error(), "testing panic") wg.Done() }() @@ -1343,4 +1347,4 @@ func TestBatchSplitOnly(t *testing.T) { for _, ld := range receivedMds { require.Equal(t, maxBatch, ld.LogRecordCount()) } -} +} \ No newline at end of file