Skip to content

Commit

Permalink
[concurrentbatchprocessor] Consistent span creation, latency, bi-dire…
Browse files Browse the repository at this point in the history
…ctional links
  • Loading branch information
jmacd committed Sep 4, 2024
1 parent 1a301f7 commit b44f636
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 47 deletions.
35 changes: 26 additions & 9 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ func (b *shard) sendItems(trigger trigger) {
before := time.Now()
var err error

var parentSpan trace.Span
var parent context.Context
isSingleCtx := allSame(contexts)

Expand All @@ -382,13 +383,30 @@ func (b *shard) sendItems(trigger trigger) {
// because batch items can be incoming from multiple receivers.
if isSingleCtx {
parent = contexts[0]
parent, parentSpan = b.tracer.Tracer("otel").Start(parent, "concurrent_batch_processor/export")
} else {
var sp trace.Span
links := buildLinks(contexts)
parent, sp = b.tracer.Tracer("otel").Start(b.exportCtx, "concurrent_batch_processor/export", trace.WithLinks(links...))
sp.End()
spans := parentSpans(contexts)

links := make([]trace.Link, len(spans))
for i, span := range spans {
links[i] = trace.Link{SpanContext: span.SpanContext()}
}
parent, parentSpan = b.tracer.Tracer("otel").Start(b.exportCtx, "concurrent_batch_processor/export", trace.WithLinks(links...))

// Note: linking in the opposite direction.
// This could be inferred by the trace
// backend, but this adds helpful information
// in cases where sampling may break links.
// See https://github.com/open-telemetry/opentelemetry-specification/issues/1877
for _, span := range spans {
span.AddLink(trace.Link{SpanContext: parentSpan.SpanContext()})
}
}
err = b.batch.export(parent, req)
// Note: call End() before returning to caller contexts, otherwise
// trace-based tests will not recognize unfinished spans when the test
// terminates.
parentSpan.End()

latency := time.Since(before)
for i := range waiters {
Expand All @@ -407,8 +425,8 @@ func (b *shard) sendItems(trigger trigger) {
b.totalSent = numItemsAfter
}

func buildLinks(contexts []context.Context) []trace.Link {
var links []trace.Link
func parentSpans(contexts []context.Context) []trace.Span {
var spans []trace.Span
unique := make(map[context.Context]bool)
for i := range contexts {
_, ok := unique[contexts[i]]
Expand All @@ -418,11 +436,10 @@ func buildLinks(contexts []context.Context) []trace.Link {

unique[contexts[i]] = true

link := trace.Link{SpanContext: trace.SpanContextFromContext(contexts[i])}
links = append(links, link)
spans = append(spans, trace.SpanFromContext(contexts[i]))
}

return links
return spans
}

// helper function to check if a slice of contexts contains more than one unique context.
Expand Down
120 changes: 82 additions & 38 deletions collector/processor/concurrentbatchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.opentelemetry.io/otel"
noopmetric "go.opentelemetry.io/otel/metric/noop"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
)

Expand Down Expand Up @@ -354,14 +355,12 @@ func TestBatchProcessorCancelContext(t *testing.T) {
require.NoError(t, bp.Shutdown(context.Background()))
}

func TestBatchProcessorUnbrokenParentContext(t *testing.T) {
func TestBatchProcessorUnbrokenParentContextSingle(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 100
cfg.SendBatchMaxSize = 100
cfg.Timeout = 3 * time.Second
cfg.MaxInFlightSizeMiB = 2
creationSet := processortest.NewNopSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
requestCount := 10
spansPerRequest := 5249
exp := tracetest.NewInMemoryExporter()
Expand All @@ -370,10 +369,10 @@ func TestBatchProcessorUnbrokenParentContext(t *testing.T) {
)
otel.SetTracerProvider(tp)
tracer := tp.Tracer("otel")
bg, rootSp := tracer.Start(context.Background(), "test_start_parent")
rootSp.End()
bg, rootSp := tracer.Start(context.Background(), "test_parent")

createSet := exporter.Settings{
ID: component.MustNewID("test_exporter"),
TelemetrySettings: component.TelemetrySettings{
TracerProvider: tp,
MeterProvider: noopmetric.MeterProvider{},
Expand All @@ -387,7 +386,10 @@ func TestBatchProcessorUnbrokenParentContext(t *testing.T) {
next, err := exporterhelper.NewTracesExporter(bg, createSet, Config{}, func(ctx context.Context, td ptrace.Traces) error { return nil }, opt)
require.NoError(t, err)

bp, err := newBatchTracesProcessor(creationSet, next, cfg)
processorSet := processortest.NewNopSettings()
processorSet.MetricsLevel = configtelemetry.LevelDetailed
processorSet.TracerProvider = tp
bp, err := newBatchTracesProcessor(processorSet, next, cfg)
require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))

Expand All @@ -409,20 +411,27 @@ func TestBatchProcessorUnbrokenParentContext(t *testing.T) {
}()
}
wg.Wait()
rootSp.End()

// need to flush tracerprovider
tp.ForceFlush(bg)
td := exp.GetSpans()

numBatches := float64(spansPerRequest*requestCount) / float64(cfg.SendBatchMaxSize)
assert.Equal(t, int(math.Ceil(numBatches))+1, len(td))
for i := range td {
if !td[i].Parent.HasTraceID() {
assert.Equal(t, td[i].SpanContext, rootSp.SpanContext())
assert.Equal(t, 2*int(math.Ceil(numBatches))+1, len(td))
for _, span := range td {
switch span.Name {
case "concurrent_batch_processor/export":
// more test below
break
case "exporter/test_exporter/traces":
continue
case "test_parent":
continue
default:
t.Error("unexpected span name:", span.Name)
}
// confirm parent is rootSp
assert.Equal(t, td[i].Parent, rootSp.SpanContext())
assert.Equal(t, span.Parent, rootSp.SpanContext())
}

require.NoError(t, bp.Shutdown(context.Background()))
Expand All @@ -434,8 +443,6 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) {
cfg.SendBatchMaxSize = 100
cfg.Timeout = 3 * time.Second
cfg.MaxInFlightSizeMiB = 2
creationSet := processortest.NewNopSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
requestCount := 50
// keep spansPerRequest small to ensure multiple contexts end up in the same batch.
spansPerRequest := 5
Expand All @@ -445,10 +452,10 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) {
)
otel.SetTracerProvider(tp)
tracer := tp.Tracer("otel")
bg, rootSp := tracer.Start(context.Background(), "test_start_parent")
rootSp.End()
bg := context.Background()

createSet := exporter.Settings{
ID: component.MustNewID("test_exporter"),
TelemetrySettings: component.TelemetrySettings{
TracerProvider: tp,
MeterProvider: noopmetric.MeterProvider{},
Expand All @@ -461,25 +468,23 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) {
next, err := exporterhelper.NewTracesExporter(bg, createSet, Config{}, func(ctx context.Context, td ptrace.Traces) error { return nil }, opt)
require.NoError(t, err)

bp, err := newBatchTracesProcessor(creationSet, next, cfg)
processorSet := processortest.NewNopSettings()
processorSet.MetricsLevel = configtelemetry.LevelDetailed
processorSet.TracerProvider = tp
bp, err := newBatchTracesProcessor(processorSet, next, cfg)
require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, bp.Start(bg, componenttest.NewNopHost()))

var endLater []trace.Span
mkCtx := func() context.Context {
ctx, span := tracer.Start(bg, "test_context")
endLater = append(endLater, span)
return ctx
}
callCtxs := []context.Context{
bg,
client.NewContext(bg, client.Info{
Metadata: client.NewMetadata(map[string][]string{
"token1": {"single"},
"token3": {"n/a"},
}),
}),
client.NewContext(bg, client.Info{
Metadata: client.NewMetadata(map[string][]string{
"token1": {"single"},
"token2": {"one", "two"},
"token4": {"n/a"},
}),
}),
mkCtx(),
mkCtx(),
mkCtx(),
}

sentResourceSpans := ptrace.NewTraces().ResourceSpans()
Expand All @@ -502,16 +507,55 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) {
}
wg.Wait()

// need to flush tracerprovider
// Flush and reset the internal traces exporter.
tp.ForceFlush(bg)
td := exp.GetSpans()
exp.Reset()

// Expect 2 spans per batch, one exporter and one batch processor.
numBatches := float64(spansPerRequest*requestCount) / float64(cfg.SendBatchMaxSize)
assert.Equal(t, 2*int(math.Ceil(numBatches))+1, len(td))
for i := range td {
if len(td[i].Links) != 0 {
assert.Equal(t, len(td[i].Links), len(callCtxs))
assert.Equal(t, td[i].Links[0].SpanContext, rootSp.SpanContext())
assert.Equal(t, 2*int(math.Ceil(numBatches)), len(td))

var expectSpanCtxs []trace.SpanContext
for _, span := range endLater {
expectSpanCtxs = append(expectSpanCtxs, span.SpanContext())
}
for _, span := range td {
switch span.Name {
case "concurrent_batch_processor/export":
// more test below
break
case "exporter/test_exporter/traces":
continue
default:
t.Error("unexpected span name:", span.Name)
}
assert.Equal(t, len(callCtxs), len(span.Links))

var haveSpanCtxs []trace.SpanContext
for _, link := range span.Links {
haveSpanCtxs = append(haveSpanCtxs, link.SpanContext)
}

assert.ElementsMatch(t, expectSpanCtxs, haveSpanCtxs)
}

// End the parent spans
for _, span := range endLater {
span.End()
}

tp.ForceFlush(bg)
td = exp.GetSpans()

assert.Equal(t, len(callCtxs), len(td))
for _, span := range td {
switch span.Name {
case "test_context":
default:
t.Error("unexpected span name:", span.Name)
}
assert.Less(t, 0, len(span.Links))
}

require.NoError(t, bp.Shutdown(context.Background()))
Expand Down

0 comments on commit b44f636

Please sign in to comment.