Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
moh-osman3 committed Nov 15, 2023
1 parent 677c45f commit 55c1ee7
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
9 changes: 0 additions & 9 deletions collector/cmd/otelarrowcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,3 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/otel-arrow => ../../..

replace github.com/open-telemetry/otel-arrow/collector => ../../

// ambiguous import: found package cloud.google.com/go/compute/metadata in multiple modules
replace cloud.google.com/go => cloud.google.com/go v0.110.2

replace github.com/lightstep/telemetry-generator/generatorreceiver => ../../../../telemetry-generator/generatorreceiver/
23 changes: 19 additions & 4 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,12 @@ func (bt *batchTraces) sizeBytes(data any) int {
return bt.sizer.TracesSize(data.(ptrace.Traces))
}

func (bt *batchTraces) export(ctx context.Context, req any) error {
func (bt *batchTraces) export(ctx context.Context, req any) (retErr error) {
defer func() {
if r := recover(); r != nil {
retErr = errors.New(r.(string))
}
}()
td := req.(ptrace.Traces)
return bt.nextConsumer.ConsumeTraces(ctx, td)
}
Expand Down Expand Up @@ -605,7 +610,12 @@ func (bm *batchMetrics) sizeBytes(data any) int {
return bm.sizer.MetricsSize(data.(pmetric.Metrics))
}

func (bm *batchMetrics) export(ctx context.Context, req any) error {
func (bm *batchMetrics) export(ctx context.Context, req any) (retErr error) {
defer func() {
if r := recover(); r != nil {
retErr = errors.New(r.(string))
}
}()
md := req.(pmetric.Metrics)
return bm.nextConsumer.ConsumeMetrics(ctx, md)
}
Expand Down Expand Up @@ -657,7 +667,12 @@ func (bl *batchLogs) sizeBytes(data any) int {
return bl.sizer.LogsSize(data.(plog.Logs))
}

func (bl *batchLogs) export(ctx context.Context, req any) error {
func (bl *batchLogs) export(ctx context.Context, req any) (retErr error) {
defer func() {
if r := recover(); r != nil {
retErr = errors.New(r.(string))
}
}()
ld := req.(plog.Logs)
return bl.nextConsumer.ConsumeLogs(ctx, ld)
}
Expand Down Expand Up @@ -692,4 +707,4 @@ func (bl *batchLogs) add(item any) {
}
bl.logCount += newLogsCount
ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func (pc *panicConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) er
panic("testing panic")
return nil
}
func (pc *panicConsumer) ConsumeMetrics(ctx context.Context, td pmetric.Metrics) error {
func (pc *panicConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
panic("testing panic")
return nil
}
func (pc *panicConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
panic("testing panic")
return nil
}
Expand Down Expand Up @@ -166,13 +170,13 @@ func TestBatchProcessorMetricsPanicRecover(t *testing.T) {
require.NoError(t, bp.Shutdown(context.Background()))
}

func TestBatchProcessorMetricsPanicRecover(t *testing.T) {
func TestBatchProcessorLogsPanicRecover(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.Timeout = 10 * time.Second
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bp, err := newBatchMetricsProcessor(creationSet, &panicConsumer{}, cfg, true)
bp, err := newBatchLogsProcessor(creationSet, &panicConsumer{}, cfg, true)

require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -190,7 +194,7 @@ func TestBatchProcessorMetricsPanicRecover(t *testing.T) {
ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty())
wg.Add(1)
go func() {
err = batcher.ConsumeLogs(context.Background(), ld)
err = bp.ConsumeLogs(context.Background(), ld)
assert.Contains(t, err.Error(), "testing panic")
wg.Done()
}()
Expand Down Expand Up @@ -1343,4 +1347,4 @@ func TestBatchSplitOnly(t *testing.T) {
for _, ld := range receivedMds {
require.Equal(t, maxBatch, ld.LogRecordCount())
}
}
}

0 comments on commit 55c1ee7

Please sign in to comment.