Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental: DeferredTracer #200

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ jobs:
run: go mod download

- name: Start containers
run: docker-compose up -d --build
run: docker compose up -d --build

- name: Test
run: make test

- name: Stop containers
if: always()
run: docker-compose down
run: docker compose down
165 changes: 165 additions & 0 deletions tracing/deferred.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package tracing

import (
"context"
"time"

"github.com/mailgun/holster/v4/clock"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

// DeferredTracer collects deferred OpenTelemetry spans. Call StartSpan/EndSpan
// to build spans, then Flush to send queued events to OpenTelemetry. This
// generates trace spans at runtime but deferred until convenient to process.
// Useful for high concurrent applications where lock contention from frequent
// tracing operations would affect performance.
//
// Note: Not thread-safe. Use in synchronous code paths only.
type DeferredTracer struct {
spans []*DeferredSpan
}

type DeferredSpan struct {
name string
err error
end time.Time
opts []trace.SpanStartOption
children []*DeferredSpan
spanContext trace.SpanContext
flushed bool
}

// NewDeferredTracer creates a DeferredTracer instance.
func NewDeferredTracer() *DeferredTracer {
return new(DeferredTracer)
}

func (t *DeferredTracer) StartSpan(opts ...trace.SpanStartOption) *DeferredSpan {
start := clock.Now()
name, fileTag := getCallerSpanName(1)
opts = append(opts,
trace.WithTimestamp(start),
trace.WithAttributes(
attribute.String("file", fileTag),
),
)
span := &DeferredSpan{
name: name,
opts: opts,
}
t.spans = append(t.spans, span)
return span
}

func (t *DeferredTracer) StartNamedSpan(name string, opts ...trace.SpanStartOption) *DeferredSpan {
start := clock.Now()
fileTag := getFileTag(1)
opts = append(opts,
trace.WithTimestamp(start),
trace.WithAttributes(
attribute.String("file", fileTag),
),
)
span := &DeferredSpan{
name: name,
opts: opts,
}
t.spans = append(t.spans, span)
return span
}

// Flush sends all deferred events to OpenTelemetry.
// Deferred spans will be created as children to span assigned to ctx.
// Returns true if any spans still remain open.
func (t *DeferredTracer) Flush(ctx context.Context) (remaining bool) {
var keepSpans []*DeferredSpan

for _, span := range t.spans {
if t.flushSpan(ctx, span) {
keepSpans = append(keepSpans, span)
}
}

t.spans = keepSpans
return len(t.spans) > 0
}

// flushSpan sends a span and its children to OpenTelemetry.
// If parent span is closed, traverse child spans.
// Idempotent: flushes spans only once. May call repeatedly until all spans
// are flushed.
// Returns true if any spans still remain open.
func (t *DeferredTracer) flushSpan(ctx context.Context, span *DeferredSpan) (remaining bool) {
if span.end.IsZero() {
// Return if not closed. Do not traverse children.
return true
}
if !span.flushed {
// Create real OpenTelemetry span.
ctx2 := StartNamedScope(ctx, span.name, span.opts...)
realspan := trace.SpanFromContext(ctx2)
span.spanContext = realspan.SpanContext()
if span.err != nil {
realspan.RecordError(span.err)
realspan.SetStatus(codes.Error, span.err.Error())
}
realspan.End(trace.WithTimestamp(span.end))
span.flushed = true
}

// Traverse children.
ctx2 := trace.ContextWithSpanContext(ctx, span.spanContext)
var keepChildren []*DeferredSpan
for _, childSpan := range span.children {
if t.flushSpan(ctx2, childSpan) {
keepChildren = append(keepChildren, childSpan)
}
}
span.children = keepChildren

return len(keepChildren) > 0
}

func (span *DeferredSpan) EndSpan(err error, opts ...trace.SpanStartOption) {
if span.end.IsZero() {
span.err = err
span.end = clock.Now()
span.opts = append(span.opts, opts...)
}
}

func (span *DeferredSpan) StartChildSpan(opts ...trace.SpanStartOption) *DeferredSpan {
start := clock.Now()
name, fileTag := getCallerSpanName(1)
opts = append(opts,
trace.WithTimestamp(start),
trace.WithAttributes(
attribute.String("file", fileTag),
),
)
childSpan := &DeferredSpan{
name: name,
opts: opts,
}
span.children = append(span.children, childSpan)
return childSpan
}

func (span *DeferredSpan) StartNamedChildSpan(name string, opts ...trace.SpanStartOption) *DeferredSpan {
start := clock.Now()
fileTag := getFileTag(1)
opts = append(opts,
trace.WithTimestamp(start),
trace.WithAttributes(
attribute.String("file", fileTag),
),
)
childSpan := &DeferredSpan{
name: name,
opts: opts,
}
span.children = append(span.children, childSpan)
return childSpan
}
117 changes: 117 additions & 0 deletions tracing/deferred_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package tracing_test

import (
"context"
"os"
"testing"

"github.com/mailgun/holster/v4/tracing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/trace"
)

func TestDeferredTracer(t *testing.T) {
ctx := context.Background()
initTracing := func(t *testing.T) {
os.Setenv("OTEL_TRACES_EXPORTER", "test")
defer os.Unsetenv("OTEL_TRACES_EXPORTER")
err := tracing.InitTracing(ctx, t.Name())
require.NoError(t, err)
}

t.Run("Single span", func(t *testing.T) {
// Given
initTracing(t)
prefix := t.Name() + "_"

// When
dtracer := tracing.NewDeferredTracer()
span1 := dtracer.StartNamedSpan(prefix + "Span 1")
span1.EndSpan(nil)
remaining := dtracer.Flush(ctx)
err := tracing.CloseTracing(ctx)
require.NoError(t, err)

// Then
assert.Zero(t, remaining)
count := tracing.GlobalTestExporter.Count()
assert.Equal(t, 1, count)
reader := tracing.GlobalTestExporter.NewSpanReader()
spans := make([]trace.ReadOnlySpan, count)
n, err := reader.Read(spans)
require.NoError(t, err)
assert.Equal(t, count, n)
assert.Equal(t, prefix+"Span 1", spans[0].Name())
})

t.Run("Multiple spans", func(t *testing.T) {
// Given
initTracing(t)
prefix := t.Name() + "_"

// When
dtracer := tracing.NewDeferredTracer()
span1 := dtracer.StartNamedSpan(prefix + "Span 1")
span1.EndSpan(nil)
span2 := dtracer.StartNamedSpan(prefix + "Span 2")
span2.EndSpan(nil)
span3 := dtracer.StartNamedSpan(prefix + "Span 3")
span3.EndSpan(nil)
remaining := dtracer.Flush(ctx)
err := tracing.CloseTracing(ctx)
require.NoError(t, err)

// Then
assert.Zero(t, remaining)
count := tracing.GlobalTestExporter.Count()
assert.Equal(t, 3, count)
reader := tracing.GlobalTestExporter.NewSpanReader()
spans := make([]trace.ReadOnlySpan, count)
n, err := reader.Read(spans)
require.NoError(t, err)
assert.Equal(t, count, n)
assert.Equal(t, prefix+"Span 1", spans[0].Name())
assert.Equal(t, prefix+"Span 2", spans[1].Name())
assert.Equal(t, prefix+"Span 3", spans[2].Name())

// Check same parent span IDs.
assert.Equal(t, spans[0].Parent().SpanID(), spans[1].Parent().SpanID())
assert.Equal(t, spans[0].Parent().SpanID(), spans[2].Parent().SpanID())
})

t.Run("Nested spans", func(t *testing.T) {
// Given
initTracing(t)
prefix := t.Name() + "_"

// When
dtracer := tracing.NewDeferredTracer()
span1 := dtracer.StartNamedSpan(prefix + "Span 1")
span2 := span1.StartNamedChildSpan(prefix + "Span 2")
span3 := span2.StartNamedChildSpan(prefix + "Span 3")
span3.EndSpan(nil)
span2.EndSpan(nil)
span1.EndSpan(nil)
remaining := dtracer.Flush(ctx)
err := tracing.CloseTracing(ctx)
require.NoError(t, err)

// Then
assert.Zero(t, remaining)
count := tracing.GlobalTestExporter.Count()
assert.Equal(t, 3, count)
reader := tracing.GlobalTestExporter.NewSpanReader()
spans := make([]trace.ReadOnlySpan, count)
n, err := reader.Read(spans)
require.NoError(t, err)
assert.Equal(t, count, n)
assert.Equal(t, prefix+"Span 1", spans[0].Name())
assert.Equal(t, prefix+"Span 2", spans[1].Name())
assert.Equal(t, prefix+"Span 3", spans[2].Name())

// Check parent/child span IDs.
assert.Equal(t, spans[1].SpanContext().SpanID(), spans[2].Parent().SpanID())
assert.Equal(t, spans[0].SpanContext().SpanID(), spans[1].Parent().SpanID())
})
}
96 changes: 96 additions & 0 deletions tracing/testexporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package tracing

import (
"context"
"fmt"
"io"
"sync"

"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace"
)

// TestExporter used for testing.
type TestExporter struct {
Spans []trace.ReadOnlySpan
mutex sync.Mutex
}

func (e *TestExporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
logrus.WithFields(logrus.Fields{
"count": len(spans),
}).Info("TestExporter.ExportSpans()")

for _, span := range spans {
sc := span.SpanContext()
parentsc := span.Parent()
logrus.WithFields(logrus.Fields{
"name": span.Name(),
"traceID": fmt.Sprintf("%x", sc.TraceID()),
"spanID": fmt.Sprintf("%x", sc.SpanID()),
"parentSpanID": fmt.Sprintf("%x", parentsc.SpanID()),
"attributes": attrToMap(span.Attributes()),
}).Info("Span")
}

e.mutex.Lock()
defer e.mutex.Unlock()
e.Spans = append(e.Spans, spans...)
return nil
}

func (e *TestExporter) Shutdown(ctx context.Context) error {
logrus.Info("TestExporter.Shutdown()")
return nil
}

type SpanReader struct {
exporter *TestExporter
index int
}

func (e *TestExporter) Count() int {
e.mutex.Lock()
defer e.mutex.Unlock()
return len(e.Spans)
}

func (e *TestExporter) NewSpanReader() *SpanReader {
return &SpanReader{
exporter: e,
}
}

func (e *TestExporter) Reset() {
e.mutex.Lock()
defer e.mutex.Unlock()
e.Spans = make([]trace.ReadOnlySpan, 0)
}

func (r *SpanReader) Read(p []trace.ReadOnlySpan) (n int, err error) {
r.exporter.mutex.Lock()
defer r.exporter.mutex.Unlock()

if r.index >= len(r.exporter.Spans) {
return 0, io.EOF
}
pindex := 0
for {
if r.index >= len(r.exporter.Spans) || pindex >= len(p) {
return n, nil
}
p[pindex] = r.exporter.Spans[r.index]
pindex++
r.index++
n++
}
}

func attrToMap(attrs []attribute.KeyValue) map[string]any {
m := make(map[string]any)
for _, attr := range attrs {
m[string(attr.Key)] = attr.Value.AsInterface()
}
return m
}
Loading