diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index ac8bc54a..e007759c 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -373,6 +373,7 @@ func (b *shard) sendItems(trigger trigger) { before := time.Now() var err error + var parentSpan trace.Span var parent context.Context isSingleCtx := allSame(contexts) @@ -382,13 +383,30 @@ func (b *shard) sendItems(trigger trigger) { // because batch items can be incoming from multiple receivers. if isSingleCtx { parent = contexts[0] + parent, parentSpan = b.tracer.Tracer("otel").Start(parent, "concurrent_batch_processor/export") } else { - var sp trace.Span - links := buildLinks(contexts) - parent, sp = b.tracer.Tracer("otel").Start(b.exportCtx, "concurrent_batch_processor/export", trace.WithLinks(links...)) - sp.End() + spans := parentSpans(contexts) + + links := make([]trace.Link, len(spans)) + for i, span := range spans { + links[i] = trace.Link{SpanContext: span.SpanContext()} + } + parent, parentSpan = b.tracer.Tracer("otel").Start(b.exportCtx, "concurrent_batch_processor/export", trace.WithLinks(links...)) + + // Note: linking in the opposite direction. + // This could be inferred by the trace + // backend, but this adds helpful information + // in cases where sampling may break links. + // See https://github.com/open-telemetry/opentelemetry-specification/issues/1877 + for _, span := range spans { + span.AddLink(trace.Link{SpanContext: parentSpan.SpanContext()}) + } } err = b.batch.export(parent, req) + // Note: call End() before returning to caller contexts, otherwise + // trace-based tests will not recognize unfinished spans when the test + // terminates. + parentSpan.End() latency := time.Since(before) for i := range waiters { @@ -407,8 +425,8 @@ func (b *shard) sendItems(trigger trigger) { b.totalSent = numItemsAfter } -func buildLinks(contexts []context.Context) []trace.Link { - var links []trace.Link +func parentSpans(contexts []context.Context) []trace.Span { + var spans []trace.Span unique := make(map[context.Context]bool) for i := range contexts { _, ok := unique[contexts[i]] @@ -418,11 +436,10 @@ func buildLinks(contexts []context.Context) []trace.Link { unique[contexts[i]] = true - link := trace.Link{SpanContext: trace.SpanContextFromContext(contexts[i])} - links = append(links, link) + spans = append(spans, trace.SpanFromContext(contexts[i])) } - return links + return spans } // helper function to check if a slice of contexts contains more than one unique context. diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index d1310fe5..fb01d366 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -34,6 +34,7 @@ import ( "go.opentelemetry.io/otel" noopmetric "go.opentelemetry.io/otel/metric/noop" sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" ) @@ -354,14 +355,12 @@ func TestBatchProcessorCancelContext(t *testing.T) { require.NoError(t, bp.Shutdown(context.Background())) } -func TestBatchProcessorUnbrokenParentContext(t *testing.T) { +func TestBatchProcessorUnbrokenParentContextSingle(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 100 cfg.SendBatchMaxSize = 100 cfg.Timeout = 3 * time.Second cfg.MaxInFlightSizeMiB = 2 - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed requestCount := 10 spansPerRequest := 5249 exp := tracetest.NewInMemoryExporter() @@ -370,10 +369,10 @@ func TestBatchProcessorUnbrokenParentContext(t *testing.T) { ) otel.SetTracerProvider(tp) tracer := tp.Tracer("otel") - bg, rootSp := tracer.Start(context.Background(), "test_start_parent") - rootSp.End() + bg, rootSp := tracer.Start(context.Background(), "test_parent") createSet := exporter.Settings{ + ID: component.MustNewID("test_exporter"), TelemetrySettings: component.TelemetrySettings{ TracerProvider: tp, MeterProvider: noopmetric.MeterProvider{}, @@ -387,7 +386,10 @@ func TestBatchProcessorUnbrokenParentContext(t *testing.T) { next, err := exporterhelper.NewTracesExporter(bg, createSet, Config{}, func(ctx context.Context, td ptrace.Traces) error { return nil }, opt) require.NoError(t, err) - bp, err := newBatchTracesProcessor(creationSet, next, cfg) + processorSet := processortest.NewNopSettings() + processorSet.MetricsLevel = configtelemetry.LevelDetailed + processorSet.TracerProvider = tp + bp, err := newBatchTracesProcessor(processorSet, next, cfg) require.NoError(t, err) require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) @@ -409,20 +411,27 @@ func TestBatchProcessorUnbrokenParentContext(t *testing.T) { }() } wg.Wait() + rootSp.End() // need to flush tracerprovider tp.ForceFlush(bg) td := exp.GetSpans() - numBatches := float64(spansPerRequest*requestCount) / float64(cfg.SendBatchMaxSize) - assert.Equal(t, int(math.Ceil(numBatches))+1, len(td)) - for i := range td { - if !td[i].Parent.HasTraceID() { - assert.Equal(t, td[i].SpanContext, rootSp.SpanContext()) + assert.Equal(t, 2*int(math.Ceil(numBatches))+1, len(td)) + for _, span := range td { + switch span.Name { + case "concurrent_batch_processor/export": + // more test below + break + case "exporter/test_exporter/traces": + continue + case "test_parent": continue + default: + t.Error("unexpected span name:", span.Name) } // confirm parent is rootSp - assert.Equal(t, td[i].Parent, rootSp.SpanContext()) + assert.Equal(t, span.Parent, rootSp.SpanContext()) } require.NoError(t, bp.Shutdown(context.Background())) @@ -434,8 +443,6 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { cfg.SendBatchMaxSize = 100 cfg.Timeout = 3 * time.Second cfg.MaxInFlightSizeMiB = 2 - creationSet := processortest.NewNopSettings() - creationSet.MetricsLevel = configtelemetry.LevelDetailed requestCount := 50 // keep spansPerRequest small to ensure multiple contexts end up in the same batch. spansPerRequest := 5 @@ -445,10 +452,10 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { ) otel.SetTracerProvider(tp) tracer := tp.Tracer("otel") - bg, rootSp := tracer.Start(context.Background(), "test_start_parent") - rootSp.End() + bg := context.Background() createSet := exporter.Settings{ + ID: component.MustNewID("test_exporter"), TelemetrySettings: component.TelemetrySettings{ TracerProvider: tp, MeterProvider: noopmetric.MeterProvider{}, @@ -461,25 +468,23 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { next, err := exporterhelper.NewTracesExporter(bg, createSet, Config{}, func(ctx context.Context, td ptrace.Traces) error { return nil }, opt) require.NoError(t, err) - bp, err := newBatchTracesProcessor(creationSet, next, cfg) + processorSet := processortest.NewNopSettings() + processorSet.MetricsLevel = configtelemetry.LevelDetailed + processorSet.TracerProvider = tp + bp, err := newBatchTracesProcessor(processorSet, next, cfg) require.NoError(t, err) - require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, bp.Start(bg, componenttest.NewNopHost())) + var endLater []trace.Span + mkCtx := func() context.Context { + ctx, span := tracer.Start(bg, "test_context") + endLater = append(endLater, span) + return ctx + } callCtxs := []context.Context{ - bg, - client.NewContext(bg, client.Info{ - Metadata: client.NewMetadata(map[string][]string{ - "token1": {"single"}, - "token3": {"n/a"}, - }), - }), - client.NewContext(bg, client.Info{ - Metadata: client.NewMetadata(map[string][]string{ - "token1": {"single"}, - "token2": {"one", "two"}, - "token4": {"n/a"}, - }), - }), + mkCtx(), + mkCtx(), + mkCtx(), } sentResourceSpans := ptrace.NewTraces().ResourceSpans() @@ -502,16 +507,55 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) { } wg.Wait() - // need to flush tracerprovider + // Flush and reset the internal traces exporter. tp.ForceFlush(bg) td := exp.GetSpans() + exp.Reset() + + // Expect 2 spans per batch, one exporter and one batch processor. numBatches := float64(spansPerRequest*requestCount) / float64(cfg.SendBatchMaxSize) - assert.Equal(t, 2*int(math.Ceil(numBatches))+1, len(td)) - for i := range td { - if len(td[i].Links) != 0 { - assert.Equal(t, len(td[i].Links), len(callCtxs)) - assert.Equal(t, td[i].Links[0].SpanContext, rootSp.SpanContext()) + assert.Equal(t, 2*int(math.Ceil(numBatches)), len(td)) + + var expectSpanCtxs []trace.SpanContext + for _, span := range endLater { + expectSpanCtxs = append(expectSpanCtxs, span.SpanContext()) + } + for _, span := range td { + switch span.Name { + case "concurrent_batch_processor/export": + // more test below + break + case "exporter/test_exporter/traces": + continue + default: + t.Error("unexpected span name:", span.Name) + } + assert.Equal(t, len(callCtxs), len(span.Links)) + + var haveSpanCtxs []trace.SpanContext + for _, link := range span.Links { + haveSpanCtxs = append(haveSpanCtxs, link.SpanContext) + } + + assert.ElementsMatch(t, expectSpanCtxs, haveSpanCtxs) + } + + // End the parent spans + for _, span := range endLater { + span.End() + } + + tp.ForceFlush(bg) + td = exp.GetSpans() + + assert.Equal(t, len(callCtxs), len(td)) + for _, span := range td { + switch span.Name { + case "test_context": + default: + t.Error("unexpected span name:", span.Name) } + assert.Less(t, 0, len(span.Links)) } require.NoError(t, bp.Shutdown(context.Background()))