Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[concurrentbatchprocessor] Consistent span creation, latency, bi-directional links #241

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ func (b *shard) sendItems(trigger trigger) {
before := time.Now()
var err error

var parentSpan trace.Span
var parent context.Context
isSingleCtx := allSame(contexts)

Expand All @@ -382,13 +383,30 @@ func (b *shard) sendItems(trigger trigger) {
// because batch items can be incoming from multiple receivers.
if isSingleCtx {
parent = contexts[0]
parent, parentSpan = b.tracer.Tracer("otel").Start(parent, "concurrent_batch_processor/export")
} else {
var sp trace.Span
links := buildLinks(contexts)
parent, sp = b.tracer.Tracer("otel").Start(b.exportCtx, "concurrent_batch_processor/export", trace.WithLinks(links...))
sp.End()
spans := parentSpans(contexts)

links := make([]trace.Link, len(spans))
for i, span := range spans {
links[i] = trace.Link{SpanContext: span.SpanContext()}
}
parent, parentSpan = b.tracer.Tracer("otel").Start(b.exportCtx, "concurrent_batch_processor/export", trace.WithLinks(links...))

// Note: linking in the opposite direction.
// This could be inferred by the trace
// backend, but this adds helpful information
// in cases where sampling may break links.
// See https://github.com/open-telemetry/opentelemetry-specification/issues/1877
for _, span := range spans {
span.AddLink(trace.Link{SpanContext: parentSpan.SpanContext()})
}
}
err = b.batch.export(parent, req)
// Note: call End() before returning to caller contexts, otherwise
// trace-based tests will not recognize unfinished spans when the test
// terminates.
parentSpan.End()

latency := time.Since(before)
for i := range waiters {
Expand All @@ -407,8 +425,8 @@ func (b *shard) sendItems(trigger trigger) {
b.totalSent = numItemsAfter
}

func buildLinks(contexts []context.Context) []trace.Link {
var links []trace.Link
func parentSpans(contexts []context.Context) []trace.Span {
var spans []trace.Span
unique := make(map[context.Context]bool)
for i := range contexts {
_, ok := unique[contexts[i]]
Expand All @@ -418,11 +436,10 @@ func buildLinks(contexts []context.Context) []trace.Link {

unique[contexts[i]] = true

link := trace.Link{SpanContext: trace.SpanContextFromContext(contexts[i])}
links = append(links, link)
spans = append(spans, trace.SpanFromContext(contexts[i]))
}

return links
return spans
}

// helper function to check if a slice of contexts contains more than one unique context.
Expand Down
120 changes: 82 additions & 38 deletions collector/processor/concurrentbatchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.opentelemetry.io/otel"
noopmetric "go.opentelemetry.io/otel/metric/noop"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
)

Expand Down Expand Up @@ -354,14 +355,12 @@ func TestBatchProcessorCancelContext(t *testing.T) {
require.NoError(t, bp.Shutdown(context.Background()))
}

func TestBatchProcessorUnbrokenParentContext(t *testing.T) {
func TestBatchProcessorUnbrokenParentContextSingle(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 100
cfg.SendBatchMaxSize = 100
cfg.Timeout = 3 * time.Second
cfg.MaxInFlightSizeMiB = 2
creationSet := processortest.NewNopSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
requestCount := 10
spansPerRequest := 5249
exp := tracetest.NewInMemoryExporter()
Expand All @@ -370,10 +369,10 @@ func TestBatchProcessorUnbrokenParentContext(t *testing.T) {
)
otel.SetTracerProvider(tp)
tracer := tp.Tracer("otel")
bg, rootSp := tracer.Start(context.Background(), "test_start_parent")
rootSp.End()
bg, rootSp := tracer.Start(context.Background(), "test_parent")

createSet := exporter.Settings{
ID: component.MustNewID("test_exporter"),
TelemetrySettings: component.TelemetrySettings{
TracerProvider: tp,
MeterProvider: noopmetric.MeterProvider{},
Expand All @@ -387,7 +386,10 @@ func TestBatchProcessorUnbrokenParentContext(t *testing.T) {
next, err := exporterhelper.NewTracesExporter(bg, createSet, Config{}, func(ctx context.Context, td ptrace.Traces) error { return nil }, opt)
require.NoError(t, err)

bp, err := newBatchTracesProcessor(creationSet, next, cfg)
processorSet := processortest.NewNopSettings()
processorSet.MetricsLevel = configtelemetry.LevelDetailed
processorSet.TracerProvider = tp
bp, err := newBatchTracesProcessor(processorSet, next, cfg)
require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))

Expand All @@ -409,20 +411,27 @@ func TestBatchProcessorUnbrokenParentContext(t *testing.T) {
}()
}
wg.Wait()
rootSp.End()

// need to flush tracerprovider
tp.ForceFlush(bg)
td := exp.GetSpans()

numBatches := float64(spansPerRequest*requestCount) / float64(cfg.SendBatchMaxSize)
assert.Equal(t, int(math.Ceil(numBatches))+1, len(td))
for i := range td {
if !td[i].Parent.HasTraceID() {
assert.Equal(t, td[i].SpanContext, rootSp.SpanContext())
assert.Equal(t, 2*int(math.Ceil(numBatches))+1, len(td))
for _, span := range td {
switch span.Name {
case "concurrent_batch_processor/export":
// more test below
break
case "exporter/test_exporter/traces":
continue
case "test_parent":
continue
default:
t.Error("unexpected span name:", span.Name)
}
// confirm parent is rootSp
assert.Equal(t, td[i].Parent, rootSp.SpanContext())
assert.Equal(t, span.Parent, rootSp.SpanContext())
}

require.NoError(t, bp.Shutdown(context.Background()))
Expand All @@ -434,8 +443,6 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) {
cfg.SendBatchMaxSize = 100
cfg.Timeout = 3 * time.Second
cfg.MaxInFlightSizeMiB = 2
creationSet := processortest.NewNopSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
requestCount := 50
// keep spansPerRequest small to ensure multiple contexts end up in the same batch.
spansPerRequest := 5
Expand All @@ -445,10 +452,10 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) {
)
otel.SetTracerProvider(tp)
tracer := tp.Tracer("otel")
bg, rootSp := tracer.Start(context.Background(), "test_start_parent")
rootSp.End()
bg := context.Background()

createSet := exporter.Settings{
ID: component.MustNewID("test_exporter"),
TelemetrySettings: component.TelemetrySettings{
TracerProvider: tp,
MeterProvider: noopmetric.MeterProvider{},
Expand All @@ -461,25 +468,23 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) {
next, err := exporterhelper.NewTracesExporter(bg, createSet, Config{}, func(ctx context.Context, td ptrace.Traces) error { return nil }, opt)
require.NoError(t, err)

bp, err := newBatchTracesProcessor(creationSet, next, cfg)
processorSet := processortest.NewNopSettings()
processorSet.MetricsLevel = configtelemetry.LevelDetailed
processorSet.TracerProvider = tp
bp, err := newBatchTracesProcessor(processorSet, next, cfg)
require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, bp.Start(bg, componenttest.NewNopHost()))

var endLater []trace.Span
mkCtx := func() context.Context {
ctx, span := tracer.Start(bg, "test_context")
endLater = append(endLater, span)
return ctx
}
callCtxs := []context.Context{
bg,
client.NewContext(bg, client.Info{
Metadata: client.NewMetadata(map[string][]string{
"token1": {"single"},
"token3": {"n/a"},
}),
}),
client.NewContext(bg, client.Info{
Metadata: client.NewMetadata(map[string][]string{
"token1": {"single"},
"token2": {"one", "two"},
"token4": {"n/a"},
}),
}),
mkCtx(),
mkCtx(),
mkCtx(),
}

sentResourceSpans := ptrace.NewTraces().ResourceSpans()
Expand All @@ -502,16 +507,55 @@ func TestBatchProcessorUnbrokenParentContextMultiple(t *testing.T) {
}
wg.Wait()

// need to flush tracerprovider
// Flush and reset the internal traces exporter.
tp.ForceFlush(bg)
td := exp.GetSpans()
exp.Reset()

// Expect 2 spans per batch, one exporter and one batch processor.
numBatches := float64(spansPerRequest*requestCount) / float64(cfg.SendBatchMaxSize)
assert.Equal(t, 2*int(math.Ceil(numBatches))+1, len(td))
for i := range td {
if len(td[i].Links) != 0 {
assert.Equal(t, len(td[i].Links), len(callCtxs))
assert.Equal(t, td[i].Links[0].SpanContext, rootSp.SpanContext())
assert.Equal(t, 2*int(math.Ceil(numBatches)), len(td))

var expectSpanCtxs []trace.SpanContext
for _, span := range endLater {
expectSpanCtxs = append(expectSpanCtxs, span.SpanContext())
}
for _, span := range td {
switch span.Name {
case "concurrent_batch_processor/export":
// more test below
break
case "exporter/test_exporter/traces":
continue
default:
t.Error("unexpected span name:", span.Name)
}
assert.Equal(t, len(callCtxs), len(span.Links))

var haveSpanCtxs []trace.SpanContext
for _, link := range span.Links {
haveSpanCtxs = append(haveSpanCtxs, link.SpanContext)
}

assert.ElementsMatch(t, expectSpanCtxs, haveSpanCtxs)
}

// End the parent spans
for _, span := range endLater {
span.End()
}

tp.ForceFlush(bg)
td = exp.GetSpans()

assert.Equal(t, len(callCtxs), len(td))
for _, span := range td {
switch span.Name {
case "test_context":
default:
t.Error("unexpected span name:", span.Name)
}
assert.Less(t, 0, len(span.Links))
}

require.NoError(t, bp.Shutdown(context.Background()))
Expand Down
Loading