diff --git a/tracing/deferred.go b/tracing/deferred.go index bfbccfa..58be945 100644 --- a/tracing/deferred.go +++ b/tracing/deferred.go @@ -22,30 +22,32 @@ type DeferredTracer struct { } type DeferredSpan struct { - name string - err error - start time.Time - end time.Time - opts []trace.SpanStartOption - children []*DeferredSpan - flushed bool + name string + err error + end time.Time + opts []trace.SpanStartOption + children []*DeferredSpan + spanContext trace.SpanContext + flushed bool } -// NewTracer creates a DeferredTracer instance. -func NewTracer() *DeferredTracer { +// NewDeferredTracer creates a DeferredTracer instance. +func NewDeferredTracer() *DeferredTracer { return new(DeferredTracer) } func (t *DeferredTracer) StartSpan(opts ...trace.SpanStartOption) *DeferredSpan { start := clock.Now() name, fileTag := getCallerSpanName(1) - opts = append(opts, trace.WithAttributes( - attribute.String("file", fileTag), - )) + opts = append(opts, + trace.WithTimestamp(start), + trace.WithAttributes( + attribute.String("file", fileTag), + ), + ) span := &DeferredSpan{ - name: name, - start: start, - opts: opts, + name: name, + opts: opts, } t.spans = append(t.spans, span) return span @@ -54,13 +56,15 @@ func (t *DeferredTracer) StartSpan(opts ...trace.SpanStartOption) *DeferredSpan func (t *DeferredTracer) StartNamedSpan(name string, opts ...trace.SpanStartOption) *DeferredSpan { start := clock.Now() fileTag := getFileTag(1) - opts = append(opts, trace.WithAttributes( - attribute.String("file", fileTag), - )) + opts = append(opts, + trace.WithTimestamp(start), + trace.WithAttributes( + attribute.String("file", fileTag), + ), + ) span := &DeferredSpan{ - name: name, - start: start, - opts: opts, + name: name, + opts: opts, } t.spans = append(t.spans, span) return span @@ -68,14 +72,13 @@ func (t *DeferredTracer) StartNamedSpan(name string, opts ...trace.SpanStartOpti // Flush sends all deferred events to OpenTelemetry. // Deferred spans will be created as children to span assigned to ctx. -// Returns true if any spans are still open. -func (t *DeferredTracer) Flush(ctx context.Context) bool { +// Returns true if any spans still remain open. +func (t *DeferredTracer) Flush(ctx context.Context) (remaining bool) { var keepSpans []*DeferredSpan for _, span := range t.spans { - remaining := t.flushSpan(ctx, span) - if len(remaining) > 0 { - keepSpans = append(keepSpans, remaining...) + if t.flushSpan(ctx, span) { + keepSpans = append(keepSpans, span) } } @@ -83,17 +86,21 @@ func (t *DeferredTracer) Flush(ctx context.Context) bool { return len(t.spans) > 0 } -// flushSpan sends a span and its children to OpenTelemetry. Any unclosed -// spans are returned in remaining return value. -func (t *DeferredTracer) flushSpan(ctx context.Context, span *DeferredSpan) (remaining []*DeferredSpan) { +// flushSpan sends a span and its children to OpenTelemetry. +// If parent span is closed, traverse child spans. +// Idempotent: flushes spans only once. May call repeatedly until all spans +// are flushed. +// Returns true if any spans still remain open. +func (t *DeferredTracer) flushSpan(ctx context.Context, span *DeferredSpan) (remaining bool) { if span.end.IsZero() { - // Preserve unclosed spans. - remaining = append(remaining, span) - } else if !span.flushed { + // Return if not closed. Do not traverse children. + return true + } + if !span.flushed { // Create real OpenTelemetry span. - opts := append(span.opts, trace.WithTimestamp(span.start)) - ctx2 := StartNamedScope(ctx, span.name, opts...) + ctx2 := StartNamedScope(ctx, span.name, span.opts...) realspan := trace.SpanFromContext(ctx2) + span.spanContext = realspan.SpanContext() if span.err != nil { realspan.RecordError(span.err) realspan.SetStatus(codes.Error, span.err.Error()) @@ -103,19 +110,16 @@ func (t *DeferredTracer) flushSpan(ctx context.Context, span *DeferredSpan) (rem } // Traverse children. - childrenRemaining := false + ctx2 := trace.ContextWithSpanContext(ctx, span.spanContext) + var keepChildren []*DeferredSpan for _, childSpan := range span.children { - childRemaining := t.flushSpan(ctx, childSpan) - if len(childRemaining) > 0 { - remaining = append(remaining, childRemaining...) - childrenRemaining = true + if t.flushSpan(ctx2, childSpan) { + keepChildren = append(keepChildren, childSpan) } } - if !childrenRemaining { - span.children = span.children[:0] - } + span.children = keepChildren - return + return len(keepChildren) > 0 } func (span *DeferredSpan) EndSpan(err error, opts ...trace.SpanStartOption) { @@ -129,13 +133,15 @@ func (span *DeferredSpan) EndSpan(err error, opts ...trace.SpanStartOption) { func (span *DeferredSpan) StartChildSpan(opts ...trace.SpanStartOption) *DeferredSpan { start := clock.Now() name, fileTag := getCallerSpanName(1) - opts = append(opts, trace.WithAttributes( - attribute.String("file", fileTag), - )) + opts = append(opts, + trace.WithTimestamp(start), + trace.WithAttributes( + attribute.String("file", fileTag), + ), + ) childSpan := &DeferredSpan{ - name: name, - start: start, - opts: opts, + name: name, + opts: opts, } span.children = append(span.children, childSpan) return childSpan @@ -144,13 +150,15 @@ func (span *DeferredSpan) StartChildSpan(opts ...trace.SpanStartOption) *Deferre func (span *DeferredSpan) StartNamedChildSpan(name string, opts ...trace.SpanStartOption) *DeferredSpan { start := clock.Now() fileTag := getFileTag(1) - opts = append(opts, trace.WithAttributes( - attribute.String("file", fileTag), - )) + opts = append(opts, + trace.WithTimestamp(start), + trace.WithAttributes( + attribute.String("file", fileTag), + ), + ) childSpan := &DeferredSpan{ - name: name, - start: start, - opts: opts, + name: name, + opts: opts, } span.children = append(span.children, childSpan) return childSpan diff --git a/tracing/deferred_test.go b/tracing/deferred_test.go new file mode 100644 index 0000000..b0f2e96 --- /dev/null +++ b/tracing/deferred_test.go @@ -0,0 +1,117 @@ +package tracing_test + +import ( + "context" + "os" + "testing" + + "github.com/mailgun/holster/v4/tracing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/trace" +) + +func TestDeferredTracer(t *testing.T) { + ctx := context.Background() + initTracing := func(t *testing.T) { + os.Setenv("OTEL_TRACES_EXPORTER", "test") + defer os.Unsetenv("OTEL_TRACES_EXPORTER") + err := tracing.InitTracing(ctx, t.Name()) + require.NoError(t, err) + } + + t.Run("Single span", func(t *testing.T) { + // Given + initTracing(t) + prefix := t.Name() + "_" + + // When + dtracer := tracing.NewDeferredTracer() + span1 := dtracer.StartNamedSpan(prefix + "Span 1") + span1.EndSpan(nil) + remaining := dtracer.Flush(ctx) + err := tracing.CloseTracing(ctx) + require.NoError(t, err) + + // Then + assert.Zero(t, remaining) + count := tracing.GlobalTestExporter.Count() + assert.Equal(t, 1, count) + reader := tracing.GlobalTestExporter.NewSpanReader() + spans := make([]trace.ReadOnlySpan, count) + n, err := reader.Read(spans) + require.NoError(t, err) + assert.Equal(t, count, n) + assert.Equal(t, prefix+"Span 1", spans[0].Name()) + }) + + t.Run("Multiple spans", func(t *testing.T) { + // Given + initTracing(t) + prefix := t.Name() + "_" + + // When + dtracer := tracing.NewDeferredTracer() + span1 := dtracer.StartNamedSpan(prefix + "Span 1") + span1.EndSpan(nil) + span2 := dtracer.StartNamedSpan(prefix + "Span 2") + span2.EndSpan(nil) + span3 := dtracer.StartNamedSpan(prefix + "Span 3") + span3.EndSpan(nil) + remaining := dtracer.Flush(ctx) + err := tracing.CloseTracing(ctx) + require.NoError(t, err) + + // Then + assert.Zero(t, remaining) + count := tracing.GlobalTestExporter.Count() + assert.Equal(t, 3, count) + reader := tracing.GlobalTestExporter.NewSpanReader() + spans := make([]trace.ReadOnlySpan, count) + n, err := reader.Read(spans) + require.NoError(t, err) + assert.Equal(t, count, n) + assert.Equal(t, prefix+"Span 1", spans[0].Name()) + assert.Equal(t, prefix+"Span 2", spans[1].Name()) + assert.Equal(t, prefix+"Span 3", spans[2].Name()) + + // Check same parent span IDs. + assert.Equal(t, spans[0].Parent().SpanID(), spans[1].Parent().SpanID()) + assert.Equal(t, spans[0].Parent().SpanID(), spans[2].Parent().SpanID()) + }) + + t.Run("Nested spans", func(t *testing.T) { + // Given + initTracing(t) + prefix := t.Name() + "_" + + // When + dtracer := tracing.NewDeferredTracer() + span1 := dtracer.StartNamedSpan(prefix + "Span 1") + span2 := span1.StartNamedChildSpan(prefix + "Span 2") + span3 := span2.StartNamedChildSpan(prefix + "Span 3") + span3.EndSpan(nil) + span2.EndSpan(nil) + span1.EndSpan(nil) + remaining := dtracer.Flush(ctx) + err := tracing.CloseTracing(ctx) + require.NoError(t, err) + + // Then + assert.Zero(t, remaining) + count := tracing.GlobalTestExporter.Count() + assert.Equal(t, 3, count) + reader := tracing.GlobalTestExporter.NewSpanReader() + spans := make([]trace.ReadOnlySpan, count) + n, err := reader.Read(spans) + require.NoError(t, err) + assert.Equal(t, count, n) + assert.Equal(t, prefix+"Span 1", spans[0].Name()) + assert.Equal(t, prefix+"Span 2", spans[1].Name()) + assert.Equal(t, prefix+"Span 3", spans[2].Name()) + + // Check parent/child span IDs. + assert.Equal(t, spans[1].SpanContext().SpanID(), spans[2].Parent().SpanID()) + assert.Equal(t, spans[0].SpanContext().SpanID(), spans[1].Parent().SpanID()) + }) +} diff --git a/tracing/testexporter.go b/tracing/testexporter.go new file mode 100644 index 0000000..76c4c4d --- /dev/null +++ b/tracing/testexporter.go @@ -0,0 +1,96 @@ +package tracing + +import ( + "context" + "fmt" + "io" + "sync" + + "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" +) + +// TestExporter used for testing. +type TestExporter struct { + Spans []trace.ReadOnlySpan + mutex sync.Mutex +} + +func (e *TestExporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error { + logrus.WithFields(logrus.Fields{ + "count": len(spans), + }).Info("TestExporter.ExportSpans()") + + for _, span := range spans { + sc := span.SpanContext() + parentsc := span.Parent() + logrus.WithFields(logrus.Fields{ + "name": span.Name(), + "traceID": fmt.Sprintf("%x", sc.TraceID()), + "spanID": fmt.Sprintf("%x", sc.SpanID()), + "parentSpanID": fmt.Sprintf("%x", parentsc.SpanID()), + "attributes": attrToMap(span.Attributes()), + }).Info("Span") + } + + e.mutex.Lock() + defer e.mutex.Unlock() + e.Spans = append(e.Spans, spans...) + return nil +} + +func (e *TestExporter) Shutdown(ctx context.Context) error { + logrus.Info("TestExporter.Shutdown()") + return nil +} + +type SpanReader struct { + exporter *TestExporter + index int +} + +func (e *TestExporter) Count() int { + e.mutex.Lock() + defer e.mutex.Unlock() + return len(e.Spans) +} + +func (e *TestExporter) NewSpanReader() *SpanReader { + return &SpanReader{ + exporter: e, + } +} + +func (e *TestExporter) Reset() { + e.mutex.Lock() + defer e.mutex.Unlock() + e.Spans = make([]trace.ReadOnlySpan, 0) +} + +func (r *SpanReader) Read(p []trace.ReadOnlySpan) (n int, err error) { + r.exporter.mutex.Lock() + defer r.exporter.mutex.Unlock() + + if r.index >= len(r.exporter.Spans) { + return 0, io.EOF + } + pindex := 0 + for { + if r.index >= len(r.exporter.Spans) || pindex >= len(p) { + return n, nil + } + p[pindex] = r.exporter.Spans[r.index] + pindex++ + r.index++ + n++ + } +} + +func attrToMap(attrs []attribute.KeyValue) map[string]any { + m := make(map[string]any) + for _, attr := range attrs { + m[string(attr.Key)] = attr.Value.AsInterface() + } + return m +} diff --git a/tracing/testexporter_test.go b/tracing/testexporter_test.go new file mode 100644 index 0000000..3ddd8f6 --- /dev/null +++ b/tracing/testexporter_test.go @@ -0,0 +1,108 @@ +package tracing_test + +import ( + "context" + "os" + "testing" + + "github.com/mailgun/holster/v4/tracing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/trace" +) + +func TestTestExporter(t *testing.T) { + ctx := context.Background() + initTracing := func(t *testing.T) { + os.Setenv("OTEL_TRACES_EXPORTER", "test") + defer os.Unsetenv("OTEL_TRACES_EXPORTER") + err := tracing.InitTracing(ctx, t.Name()) + require.NoError(t, err) + } + + t.Run("Single span", func(t *testing.T) { + // Given + initTracing(t) + prefix := t.Name() + "_" + + // When + ctx1 := tracing.StartNamedScope(ctx, prefix+"Span 1") + tracing.EndScope(ctx1, nil) + err := tracing.CloseTracing(ctx) + require.NoError(t, err) + + // Then + count := tracing.GlobalTestExporter.Count() + assert.Equal(t, 1, count) + reader := tracing.GlobalTestExporter.NewSpanReader() + spans := make([]trace.ReadOnlySpan, count) + n, err := reader.Read(spans) + require.NoError(t, err) + assert.Equal(t, count, n) + assert.Equal(t, prefix+"Span 1", spans[0].Name()) + }) + + t.Run("Multiple spans", func(t *testing.T) { + // Given + initTracing(t) + prefix := t.Name() + "_" + + // When + ctx1 := tracing.StartNamedScope(ctx, prefix+"Span 1") + tracing.EndScope(ctx1, nil) + ctx2 := tracing.StartNamedScope(ctx, prefix+"Span 2") + tracing.EndScope(ctx2, nil) + ctx3 := tracing.StartNamedScope(ctx, prefix+"Span 3") + tracing.EndScope(ctx3, nil) + err := tracing.CloseTracing(ctx) + require.NoError(t, err) + + // Then + count := tracing.GlobalTestExporter.Count() + assert.Equal(t, 3, count) + reader := tracing.GlobalTestExporter.NewSpanReader() + spans := make([]trace.ReadOnlySpan, count) + n, err := reader.Read(spans) + require.NoError(t, err) + assert.Equal(t, count, n) + assert.Equal(t, prefix+"Span 1", spans[0].Name()) + assert.Equal(t, prefix+"Span 2", spans[1].Name()) + assert.Equal(t, prefix+"Span 3", spans[2].Name()) + + // Check same parent span IDs. + assert.Equal(t, spans[0].Parent().SpanID(), spans[1].Parent().SpanID()) + assert.Equal(t, spans[0].Parent().SpanID(), spans[2].Parent().SpanID()) + }) + + t.Run("Nested spans", func(t *testing.T) { + // Given + initTracing(t) + prefix := t.Name() + "_" + + // When + ctx1 := tracing.StartNamedScope(ctx, prefix+"Span 1") + ctx2 := tracing.StartNamedScope(ctx1, prefix+"Span 2") + ctx3 := tracing.StartNamedScope(ctx2, prefix+"Span 3") + tracing.EndScope(ctx3, nil) + tracing.EndScope(ctx2, nil) + tracing.EndScope(ctx1, nil) + err := tracing.CloseTracing(ctx) + require.NoError(t, err) + + // Then + count := tracing.GlobalTestExporter.Count() + assert.Equal(t, 3, count) + reader := tracing.GlobalTestExporter.NewSpanReader() + spans := make([]trace.ReadOnlySpan, count) + n, err := reader.Read(spans) + require.NoError(t, err) + assert.Equal(t, count, n) + assert.Equal(t, prefix+"Span 3", spans[0].Name()) + assert.Equal(t, prefix+"Span 2", spans[1].Name()) + assert.Equal(t, prefix+"Span 1", spans[2].Name()) + + // Check parent/child span IDs. + assert.Equal(t, spans[1].SpanContext().SpanID(), spans[0].Parent().SpanID()) + assert.Equal(t, spans[2].SpanContext().SpanID(), spans[1].Parent().SpanID()) + }) +} diff --git a/tracing/tracing.go b/tracing/tracing.go index bd889da..13eaa4e 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -35,9 +35,10 @@ var ( logrus.WarnLevel, logrus.InfoLevel, logrus.DebugLevel, logrus.TraceLevel, } - log = logrus.WithField("category", "tracing") - globalLibraryName string - SemconvSchemaURL = semconv.SchemaURL + log = logrus.WithField("category", "tracing") + globalLibraryName string + SemconvSchemaURL = semconv.SchemaURL + GlobalTestExporter *TestExporter ) // InitTracing initializes a global OpenTelemetry tracer provider singleton. @@ -63,6 +64,9 @@ func InitTracing(ctx context.Context, libraryName string, opts ...TracingOption) case "none": // No exporter. Used with unit tests. continue + case "test": + // Used in tests. + exporter = makeTestExporter() case "jaeger": exporter, err = makeJaegerExporter() if err != nil { @@ -268,3 +272,8 @@ func makeJaegerExporter() (*jaeger.Exporter, error) { return exp, nil } + +func makeTestExporter() sdktrace.SpanExporter { + GlobalTestExporter = new(TestExporter) + return GlobalTestExporter +}