Skip to content

Commit

Permalink
Add span skip attribute for span metrics generation (#1626)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Feb 11, 2025
1 parent 04b27fd commit 72a37e5
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 23 deletions.
27 changes: 21 additions & 6 deletions pkg/export/alloy/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/export/attributes"
attr "github.com/grafana/beyla/pkg/export/attributes/names"
"github.com/grafana/beyla/pkg/export/otel"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
Expand All @@ -18,16 +19,30 @@ func TracesReceiver(
ctx context.Context,
ctxInfo *global.ContextInfo,
cfg *beyla.TracesReceiverConfig,
spanMetricsEnabled bool,
userAttribSelection attributes.Selection,
) pipe.FinalProvider[[]request.Span] {
return (&tracesReceiver{ctx: ctx, cfg: cfg, attributes: userAttribSelection, hostID: ctxInfo.HostID}).provideLoop
return (&tracesReceiver{ctx: ctx, cfg: cfg, attributes: userAttribSelection, hostID: ctxInfo.HostID, spanMetricsEnabled: spanMetricsEnabled}).provideLoop
}

type tracesReceiver struct {
ctx context.Context
cfg *beyla.TracesReceiverConfig
attributes attributes.Selection
hostID string
ctx context.Context
cfg *beyla.TracesReceiverConfig
attributes attributes.Selection
hostID string
spanMetricsEnabled bool
}

func (tr *tracesReceiver) getConstantAttributes() (map[attr.Name]struct{}, error) {
traceAttrs, err := otel.GetUserSelectedAttributes(tr.attributes)
if err != nil {
return nil, err
}

if tr.spanMetricsEnabled {
traceAttrs[attr.SkipSpanMetrics] = struct{}{}
}
return traceAttrs, nil
}

func (tr *tracesReceiver) spanDiscarded(span *request.Span) bool {
Expand All @@ -40,7 +55,7 @@ func (tr *tracesReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], error)
}
return func(in <-chan []request.Span) {
// Get user attributes
traceAttrs, err := otel.GetUserSelectedAttributes(tr.attributes)
traceAttrs, err := tr.getConstantAttributes()
if err != nil {
slog.Error("error fetching user defined attributes", "error", err)
}
Expand Down
94 changes: 93 additions & 1 deletion pkg/export/alloy/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@ package alloy

import (
"context"
"encoding/binary"
"math/rand/v2"
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/grafana/beyla/pkg/beyla"
"github.com/grafana/beyla/pkg/export/attributes"
attr "github.com/grafana/beyla/pkg/export/attributes/names"
"github.com/grafana/beyla/pkg/export/otel"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/svc"
Expand Down Expand Up @@ -56,6 +63,71 @@ func TestTracesSkipsInstrumented(t *testing.T) {
}
}

func TestTraceSkipSpanMetrics(t *testing.T) {
spans := []request.Span{}
start := time.Now()
for i := 0; i < 10; i++ {
span := request.Span{Type: request.EventTypeHTTP,
RequestStart: start.UnixNano(),
Start: start.Add(time.Second).UnixNano(),
End: start.Add(3 * time.Second).UnixNano(),
Method: "GET",
Route: "/test" + strconv.Itoa(i),
Status: 200,
Service: svc.Attrs{},
TraceID: randomTraceID(),
}
spans = append(spans, span)
}

t.Run("test with span metrics on", func(t *testing.T) {
receiver := makeTracesTestReceiverWithSpanMetrics()

traces := generateTracesForSpans(t, receiver, spans)
assert.Equal(t, 10, len(traces))

for _, ts := range traces {
for i := 0; i < ts.ResourceSpans().Len(); i++ {
rs := ts.ResourceSpans().At(i)
for j := 0; j < rs.ScopeSpans().Len(); j++ {
ss := rs.ScopeSpans().At(j)
for k := 0; k < ss.Spans().Len(); k++ {
span := ss.Spans().At(k)
if strings.HasPrefix(span.Name(), "GET /test") {
v, ok := span.Attributes().Get(string(attr.SkipSpanMetrics.OTEL()))
assert.True(t, ok)
assert.Equal(t, true, v.Bool())
}
}
}
}
}
})

t.Run("test with span metrics off", func(t *testing.T) {
receiver := makeTracesTestReceiver()

traces := generateTracesForSpans(t, receiver, spans)
assert.Equal(t, 10, len(traces))

for _, ts := range traces {
for i := 0; i < ts.ResourceSpans().Len(); i++ {
rs := ts.ResourceSpans().At(i)
for j := 0; j < rs.ScopeSpans().Len(); j++ {
ss := rs.ScopeSpans().At(j)
for k := 0; k < ss.Spans().Len(); k++ {
span := ss.Spans().At(k)
if strings.HasPrefix(span.Name(), "GET /test") {
_, ok := span.Attributes().Get(string(attr.SkipSpanMetrics.OTEL()))
assert.False(t, ok)
}
}
}
}
}
})
}

func makeTracesTestReceiver() *tracesReceiver {
return &tracesReceiver{
ctx: context.Background(),
Expand All @@ -65,9 +137,19 @@ func makeTracesTestReceiver() *tracesReceiver {
}
}

func makeTracesTestReceiverWithSpanMetrics() *tracesReceiver {
return &tracesReceiver{
ctx: context.Background(),
cfg: &beyla.TracesReceiverConfig{},
attributes: attributes.Selection{},
hostID: "Alloy",
spanMetricsEnabled: true,
}
}

func generateTracesForSpans(t *testing.T, tr *tracesReceiver, spans []request.Span) []ptrace.Traces {
res := []ptrace.Traces{}
traceAttrs, err := otel.GetUserSelectedAttributes(tr.attributes)
traceAttrs, err := tr.getConstantAttributes()
assert.NoError(t, err)
for i := range spans {
span := &spans[i]
Expand All @@ -79,3 +161,13 @@ func generateTracesForSpans(t *testing.T, tr *tracesReceiver, spans []request.Sp

return res
}

func randomTraceID() trace.TraceID {
t := trace.TraceID{}

for i := 0; i < len(t); i += 4 {
binary.LittleEndian.PutUint32(t[i:], rand.Uint32())
}

return t
}
1 change: 1 addition & 0 deletions pkg/export/attributes/names/attrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ const (
HostID = Name(semconv.HostIDKey)

ServiceInstanceID = Name(semconv.ServiceInstanceIDKey)
SkipSpanMetrics = Name("span.metrics.skip")
)

// traces related attributes
Expand Down
51 changes: 37 additions & 14 deletions pkg/export/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,27 +130,29 @@ func (m *TracesConfig) guessProtocol() Protocol {
return ProtocolHTTPProtobuf
}

func makeTracesReceiver(ctx context.Context, cfg TracesConfig, ctxInfo *global.ContextInfo, userAttribSelection attributes.Selection) *tracesOTELReceiver {
func makeTracesReceiver(ctx context.Context, cfg TracesConfig, spanMetricsEnabled bool, ctxInfo *global.ContextInfo, userAttribSelection attributes.Selection) *tracesOTELReceiver {
return &tracesOTELReceiver{
ctx: ctx,
cfg: cfg,
ctxInfo: ctxInfo,
attributes: userAttribSelection,
is: instrumentations.NewInstrumentationSelection(cfg.Instrumentations),
ctx: ctx,
cfg: cfg,
ctxInfo: ctxInfo,
attributes: userAttribSelection,
is: instrumentations.NewInstrumentationSelection(cfg.Instrumentations),
spanMetricsEnabled: spanMetricsEnabled,
}
}

// TracesReceiver creates a terminal node that consumes request.Spans and sends OpenTelemetry metrics to the configured consumers.
func TracesReceiver(ctx context.Context, cfg TracesConfig, ctxInfo *global.ContextInfo, userAttribSelection attributes.Selection) pipe.FinalProvider[[]request.Span] {
return makeTracesReceiver(ctx, cfg, ctxInfo, userAttribSelection).provideLoop
func TracesReceiver(ctx context.Context, cfg TracesConfig, spanMetricsEnabled bool, ctxInfo *global.ContextInfo, userAttribSelection attributes.Selection) pipe.FinalProvider[[]request.Span] {
return makeTracesReceiver(ctx, cfg, spanMetricsEnabled, ctxInfo, userAttribSelection).provideLoop
}

type tracesOTELReceiver struct {
ctx context.Context
cfg TracesConfig
ctxInfo *global.ContextInfo
attributes attributes.Selection
is instrumentations.InstrumentationSelection
ctx context.Context
cfg TracesConfig
ctxInfo *global.ContextInfo
attributes attributes.Selection
is instrumentations.InstrumentationSelection
spanMetricsEnabled bool
}

func GetUserSelectedAttributes(attrs attributes.Selection) (map[attr.Name]struct{}, error) {
Expand All @@ -168,6 +170,18 @@ func GetUserSelectedAttributes(attrs attributes.Selection) (map[attr.Name]struct
return traceAttrs, err
}

func (tr *tracesOTELReceiver) getConstantAttributes() (map[attr.Name]struct{}, error) {
traceAttrs, err := GetUserSelectedAttributes(tr.attributes)
if err != nil {
return nil, err
}

if tr.spanMetricsEnabled {
traceAttrs[attr.SkipSpanMetrics] = struct{}{}
}
return traceAttrs, nil
}

func (tr *tracesOTELReceiver) spanDiscarded(span *request.Span) bool {
return span.IgnoreTraces() || span.Service.ExportsOTelTraces() || !tr.acceptSpan(span)
}
Expand Down Expand Up @@ -228,12 +242,16 @@ func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], err
return
}

traceAttrs, err := GetUserSelectedAttributes(tr.attributes)
traceAttrs, err := tr.getConstantAttributes()
if err != nil {
slog.Error("error selecting user trace attributes", "error", err)
return
}

if tr.spanMetricsEnabled {
traceAttrs[attr.SkipSpanMetrics] = struct{}{}
}

sampler := tr.cfg.Sampler.Implementation()

for spans := range in {
Expand Down Expand Up @@ -576,6 +594,7 @@ func (tr *tracesOTELReceiver) acceptSpan(span *request.Span) bool {

// TODO use semconv.DBSystemRedis when we update to OTEL semantic conventions library 1.30
var dbSystemRedis = attribute.String(string(attr.DBSystemName), semconv.DBSystemRedis.Value.AsString())
var spanMetricsSkip = attribute.Bool(string(attr.SkipSpanMetrics), true)

// nolint:cyclop
func traceAttributes(span *request.Span, optionalAttrs map[attr.Name]struct{}) []attribute.KeyValue {
Expand Down Expand Up @@ -674,6 +693,10 @@ func traceAttributes(span *request.Span, optionalAttrs map[attr.Name]struct{}) [
}
}

if _, ok := optionalAttrs[attr.SkipSpanMetrics]; ok {
attrs = append(attrs, spanMetricsSkip)
}

return attrs
}

Expand Down
Loading

0 comments on commit 72a37e5

Please sign in to comment.