diff --git a/.chloggen/tailsampling-record-policy.yaml b/.chloggen/tailsampling-record-policy.yaml new file mode 100644 index 000000000000..06f78576d1d2 --- /dev/null +++ b/.chloggen/tailsampling-record-policy.yaml @@ -0,0 +1,15 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/tailsampling + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: | + Adds support for optionally recording the policy (and any composite policy) associated with an inclusive tail processor sampling decision. + This functionality is disabled by default, you can enable it by passing the following feature flag to the collector: `+processor.tailsamplingprocessor.recordpolicy` + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35180] \ No newline at end of file diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index 0c859105c8a8..84cc0a2f0e9d 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -528,6 +528,16 @@ sum (otelcol_processor_tail_sampling_count_traces_sampled) by (policy) As a reminder, a policy voting to sample the trace does not guarantee sampling; an "inverted not" decision from another policy would still discard the trace. +### Tracking sampling policy +To better understand _which_ sampling policy made the decision to include a trace, you can enable tracking the policy responsible for sampling a trace via the `processor.tailsamplingprocessor.recordpolicy` feature gate. + +When this feature gate is set, this will add additional attributes on each sampled span: + +| Attribute | Description | Present? | +|---------------------------------|---------------------------------------------------------------------------|----------------------------| +| `tailsampling.policy` | Records the configured name of the policy that sampled a trace | Always | +| `tailsampling.composite_policy` | Records the configured name of a composite subpolicy that sampled a trace | When composite policy used | + ### Policy Evaluation Errors ``` diff --git a/processor/tailsamplingprocessor/composite_helper.go b/processor/tailsamplingprocessor/composite_helper.go index 769bfcf95765..9ca587f17a62 100644 --- a/processor/tailsamplingprocessor/composite_helper.go +++ b/processor/tailsamplingprocessor/composite_helper.go @@ -7,6 +7,7 @@ import ( "go.opentelemetry.io/collector/component" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry" ) func getNewCompositePolicy(settings component.TelemetrySettings, config *CompositeCfg) (sampling.PolicyEvaluator, error) { @@ -22,10 +23,11 @@ func getNewCompositePolicy(settings component.TelemetrySettings, config *Composi evalParams := sampling.SubPolicyEvalParams{ Evaluator: policy, MaxSpansPerSecond: int64(rateAllocationsMap[policyCfg.Name]), + Name: policyCfg.Name, } subPolicyEvalParams[i] = evalParams } - return sampling.NewComposite(settings.Logger, config.MaxTotalSpansPerSecond, subPolicyEvalParams, sampling.MonotonicClock{}), nil + return sampling.NewComposite(settings.Logger, config.MaxTotalSpansPerSecond, subPolicyEvalParams, sampling.MonotonicClock{}, telemetry.IsRecordPolicyEnabled()), nil } // Apply rate allocations to the sub-policies diff --git a/processor/tailsamplingprocessor/composite_helper_test.go b/processor/tailsamplingprocessor/composite_helper_test.go index 578f3be289b9..dee22721d418 100644 --- a/processor/tailsamplingprocessor/composite_helper_test.go +++ b/processor/tailsamplingprocessor/composite_helper_test.go @@ -52,12 +52,14 @@ func TestCompositeHelper(t *testing.T) { { Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100, 0), MaxSpansPerSecond: 250, + Name: "test-composite-policy-1", }, { Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 200, 0), MaxSpansPerSecond: 500, + Name: "test-composite-policy-2", }, - }, sampling.MonotonicClock{}) + }, sampling.MonotonicClock{}, false) assert.Equal(t, expected, actual) }) diff --git a/processor/tailsamplingprocessor/factory.go b/processor/tailsamplingprocessor/factory.go index 662b2496dab0..8f9a382bade3 100644 --- a/processor/tailsamplingprocessor/factory.go +++ b/processor/tailsamplingprocessor/factory.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/processor" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry" ) // NewFactory returns a new factory for the Tail Sampling processor. @@ -38,5 +39,10 @@ func createTracesProcessor( nextConsumer consumer.Traces, ) (processor.Traces, error) { tCfg := cfg.(*Config) - return newTracesProcessor(ctx, params, nextConsumer, *tCfg) + opts := []Option{} + + if telemetry.IsRecordPolicyEnabled() { + opts = append(opts, withRecordPolicy()) + } + return newTracesProcessor(ctx, params, nextConsumer, *tCfg, opts...) } diff --git a/processor/tailsamplingprocessor/generated_component_telemetry_test.go b/processor/tailsamplingprocessor/generated_component_telemetry_test.go index 11979c48d9f0..c6a05907843d 100644 --- a/processor/tailsamplingprocessor/generated_component_telemetry_test.go +++ b/processor/tailsamplingprocessor/generated_component_telemetry_test.go @@ -7,15 +7,14 @@ import ( "testing" "github.com/stretchr/testify/require" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) type componentTestTelemetry struct { diff --git a/processor/tailsamplingprocessor/internal/metadata/generated_telemetry.go b/processor/tailsamplingprocessor/internal/metadata/generated_telemetry.go index f8fdf605ad19..ddd40e2a1af1 100644 --- a/processor/tailsamplingprocessor/internal/metadata/generated_telemetry.go +++ b/processor/tailsamplingprocessor/internal/metadata/generated_telemetry.go @@ -5,12 +5,11 @@ package metadata import ( "errors" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" ) func Meter(settings component.TelemetrySettings) metric.Meter { diff --git a/processor/tailsamplingprocessor/internal/metadata/generated_telemetry_test.go b/processor/tailsamplingprocessor/internal/metadata/generated_telemetry_test.go index 919da43ebd7f..00a7a44ed926 100644 --- a/processor/tailsamplingprocessor/internal/metadata/generated_telemetry_test.go +++ b/processor/tailsamplingprocessor/internal/metadata/generated_telemetry_test.go @@ -6,15 +6,14 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/otel/metric" embeddedmetric "go.opentelemetry.io/otel/metric/embedded" noopmetric "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" embeddedtrace "go.opentelemetry.io/otel/trace/embedded" nooptrace "go.opentelemetry.io/otel/trace/noop" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" ) type mockMeter struct { diff --git a/processor/tailsamplingprocessor/internal/sampling/composite.go b/processor/tailsamplingprocessor/internal/sampling/composite.go index 0c98c5a1f195..a88d32e3df31 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite.go @@ -19,6 +19,8 @@ type subpolicy struct { // spans per second that each subpolicy sampled in this period sampledSPS int64 + + name string } // Composite evaluator and its internal data @@ -35,7 +37,8 @@ type Composite struct { // The time provider (can be different from clock for testing purposes) timeProvider TimeProvider - logger *zap.Logger + logger *zap.Logger + recordSubPolicy bool } var _ PolicyEvaluator = (*Composite)(nil) @@ -44,6 +47,7 @@ var _ PolicyEvaluator = (*Composite)(nil) type SubPolicyEvalParams struct { Evaluator PolicyEvaluator MaxSpansPerSecond int64 + Name string } // NewComposite creates a policy evaluator that samples all subpolicies. @@ -52,6 +56,7 @@ func NewComposite( maxTotalSpansPerSecond int64, subPolicyParams []SubPolicyEvalParams, timeProvider TimeProvider, + recordSubPolicy bool, ) PolicyEvaluator { var subpolicies []*subpolicy @@ -59,7 +64,7 @@ func NewComposite( sub := &subpolicy{} sub.evaluator = subPolicyParams[i].Evaluator sub.allocatedSPS = subPolicyParams[i].MaxSpansPerSecond - + sub.name = subPolicyParams[i].Name // We are just starting, so there is no previous input, set it to 0 sub.sampledSPS = 0 @@ -67,10 +72,11 @@ func NewComposite( } return &Composite{ - maxTotalSPS: maxTotalSpansPerSecond, - subpolicies: subpolicies, - timeProvider: timeProvider, - logger: logger, + maxTotalSPS: maxTotalSpansPerSecond, + subpolicies: subpolicies, + timeProvider: timeProvider, + logger: logger, + recordSubPolicy: recordSubPolicy, } } @@ -110,6 +116,9 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace sub.sampledSPS = spansInSecondIfSampled // Let the sampling happen + if c.recordSubPolicy { + SetAttrOnScopeSpans(trace, "tailsampling.composite_policy", sub.name) + } return Sampled, nil } diff --git a/processor/tailsamplingprocessor/internal/sampling/composite_test.go b/processor/tailsamplingprocessor/internal/sampling/composite_test.go index c323fe849946..e422f2e7290b 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite_test.go @@ -60,7 +60,7 @@ func TestCompositeEvaluatorNotSampled(t *testing.T) { // Create 2 policies which do not match any trace n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false) n2 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 200, 300, false) - c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100}, {n2, 100}}, FakeTimeProvider{}) + c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100, "eval-1"}, {n2, 100, "eval-2"}}, FakeTimeProvider{}, false) trace := createTrace() @@ -77,7 +77,7 @@ func TestCompositeEvaluatorSampled(t *testing.T) { // Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled. n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false) n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) - c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100}, {n2, 100}}, FakeTimeProvider{}) + c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100, "eval-1"}, {n2, 100, "eval-2"}}, FakeTimeProvider{}, false) trace := createTrace() @@ -89,13 +89,32 @@ func TestCompositeEvaluatorSampled(t *testing.T) { assert.Equal(t, expected, decision) } +func TestCompositeEvaluatorSampled_RecordSubPolicy(t *testing.T) { + // Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled. + n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false) + n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) + c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100, "eval-1"}, {n2, 100, "eval-2"}}, FakeTimeProvider{}, true) + + trace := newTraceWithKV(traceID, "test-key", 0) + + decision, err := c.Evaluate(context.Background(), traceID, trace) + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) + + // The second policy is AlwaysSample, so the decision should be Sampled. + expected := Sampled + assert.Equal(t, expected, decision) + val, ok := trace.ReceivedBatches.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().Get("tailsampling.composite_policy") + assert.True(t, ok, "Did not find expected key") + assert.Equal(t, "eval-2", val.AsString()) +} + func TestCompositeEvaluator_OverflowAlwaysSampled(t *testing.T) { timeProvider := &FakeTimeProvider{second: 0} // Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled. n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false) n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) - c := NewComposite(zap.NewNop(), 3, []SubPolicyEvalParams{{n1, 1}, {n2, 1}}, timeProvider) + c := NewComposite(zap.NewNop(), 3, []SubPolicyEvalParams{{n1, 1, "eval-1"}, {n2, 1, "eval-2"}}, timeProvider, false) trace := newTraceWithKV(traceID, "tag", int64(10)) @@ -128,7 +147,7 @@ func TestCompositeEvaluatorSampled_AlwaysSampled(t *testing.T) { // Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled. n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false) n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) - c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20}, {n2, 20}}, FakeTimeProvider{}) + c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20, "eval-1"}, {n2, 20, "eval-2"}}, FakeTimeProvider{}, false) for i := 1; i <= 10; i++ { trace := createTrace() @@ -146,7 +165,7 @@ func TestCompositeEvaluatorInverseSampled_AlwaysSampled(t *testing.T) { // The first policy does not match, the second matches through invert n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, false) n2 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, true) - c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20}, {n2, 20}}, FakeTimeProvider{}) + c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20, "eval-1"}, {n2, 20, "eval-2"}}, FakeTimeProvider{}, false) for i := 1; i <= 10; i++ { trace := createTrace() @@ -160,12 +179,33 @@ func TestCompositeEvaluatorInverseSampled_AlwaysSampled(t *testing.T) { } } +func TestCompositeEvaluatorInverseSampled_AlwaysSampled_RecordSubPolicy(t *testing.T) { + // The first policy does not match, the second matches through invert + n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, false) + n2 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, true) + c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20, "eval-1"}, {n2, 20, "eval-2"}}, FakeTimeProvider{}, true) + + for i := 1; i <= 10; i++ { + trace := newTraceWithKV(traceID, "test-key", 0) + + decision, err := c.Evaluate(context.Background(), traceID, trace) + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) + + // The second policy is AlwaysSample, so the decision should be Sampled. + expected := Sampled + assert.Equal(t, expected, decision) + val, ok := trace.ReceivedBatches.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().Get("tailsampling.composite_policy") + assert.True(t, ok, "Did not find expected key") + assert.Equal(t, "eval-2", val.AsString()) + } +} + func TestCompositeEvaluatorThrottling(t *testing.T) { // Create only one subpolicy, with 100% Sampled policy. n1 := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) timeProvider := &FakeTimeProvider{second: 0} const totalSPS = 10 - c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS}}, timeProvider) + c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS, "eval-1"}}, timeProvider, false) trace := createTrace() @@ -205,7 +245,7 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) { n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) timeProvider := &FakeTimeProvider{second: 0} const totalSPS = 10 - c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2}, {n2, totalSPS / 2}}, timeProvider) + c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2, "eval-1"}, {n2, totalSPS / 2, "eval-2"}}, timeProvider, false) trace := createTrace() diff --git a/processor/tailsamplingprocessor/internal/sampling/util.go b/processor/tailsamplingprocessor/internal/sampling/util.go index 6b896e526d04..d8aedc686d3b 100644 --- a/processor/tailsamplingprocessor/internal/sampling/util.go +++ b/processor/tailsamplingprocessor/internal/sampling/util.go @@ -93,3 +93,17 @@ func invertHasInstrumentationLibrarySpanWithCondition(ilss ptrace.ScopeSpansSlic } return true } + +func SetAttrOnScopeSpans(data *TraceData, attrName string, attrKey string) { + data.Mutex.Lock() + defer data.Mutex.Unlock() + + rs := data.ReceivedBatches.ResourceSpans() + for i := 0; i < rs.Len(); i++ { + rss := rs.At(i) + for j := 0; j < rss.ScopeSpans().Len(); j++ { + ss := rss.ScopeSpans().At(j) + ss.Scope().Attributes().PutStr(attrName, attrKey) + } + } +} diff --git a/processor/tailsamplingprocessor/internal/sampling/util_test.go b/processor/tailsamplingprocessor/internal/sampling/util_test.go new file mode 100644 index 000000000000..270336fad5c1 --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/util_test.go @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestSetAttrOnScopeSpans_Empty(_ *testing.T) { + traces := ptrace.NewTraces() + traceData := &TraceData{ + ReceivedBatches: traces, + } + + SetAttrOnScopeSpans(traceData, "test.attr", "value") +} + +func TestSetAttrOnScopeSpans_Many(t *testing.T) { + assertAttrExists := func(t *testing.T, attrs pcommon.Map, key string, value string) { + v, ok := attrs.Get(key) + assert.True(t, ok) + assert.Equal(t, value, v.AsString()) + } + + traces := ptrace.NewTraces() + + rs1 := traces.ResourceSpans().AppendEmpty() + ss1 := rs1.ScopeSpans().AppendEmpty() + span1 := ss1.Spans().AppendEmpty() + span2 := ss1.Spans().AppendEmpty() + ss2 := rs1.ScopeSpans().AppendEmpty() + span3 := ss2.Spans().AppendEmpty() + rs2 := traces.ResourceSpans().AppendEmpty() + ss3 := rs2.ScopeSpans().AppendEmpty() + span4 := ss3.Spans().AppendEmpty() + + traceData := &TraceData{ + ReceivedBatches: traces, + } + + SetAttrOnScopeSpans(traceData, "test.attr", "value") + + assertAttrExists(t, ss1.Scope().Attributes(), "test.attr", "value") + assertAttrExists(t, ss2.Scope().Attributes(), "test.attr", "value") + assertAttrExists(t, ss3.Scope().Attributes(), "test.attr", "value") + + _, ok := span1.Attributes().Get("test.attr") + assert.False(t, ok) + _, ok = span2.Attributes().Get("test.attr") + assert.False(t, ok) + _, ok = span3.Attributes().Get("test.attr") + assert.False(t, ok) + _, ok = span4.Attributes().Get("test.attr") + assert.False(t, ok) +} + +func BenchmarkSetAttrOnScopeSpans(b *testing.B) { + for n := 0; n < b.N; n++ { + traces := ptrace.NewTraces() + + for i := 0; i < 5; i++ { + rs := traces.ResourceSpans().AppendEmpty() + ss1 := rs.ScopeSpans().AppendEmpty() + ss1.Spans().AppendEmpty() + ss1.Spans().AppendEmpty() + ss1.Spans().AppendEmpty() + + ss2 := rs.ScopeSpans().AppendEmpty() + ss2.Spans().AppendEmpty() + ss2.Spans().AppendEmpty() + + ss3 := rs.ScopeSpans().AppendEmpty() + ss3.Spans().AppendEmpty() + } + + traceData := &TraceData{ + ReceivedBatches: traces, + } + + b.StartTimer() + SetAttrOnScopeSpans(traceData, "test.attr", "value") + b.StopTimer() + } +} diff --git a/processor/tailsamplingprocessor/internal/telemetry/featureflag.go b/processor/tailsamplingprocessor/internal/telemetry/featureflag.go index 2907ad803da0..47f382377cb1 100644 --- a/processor/tailsamplingprocessor/internal/telemetry/featureflag.go +++ b/processor/tailsamplingprocessor/internal/telemetry/featureflag.go @@ -14,3 +14,13 @@ var metricStatCountSpansSampledFeatureGate = featuregate.GlobalRegistry().MustRe func IsMetricStatCountSpansSampledEnabled() bool { return metricStatCountSpansSampledFeatureGate.IsEnabled() } + +var recordPolicyFeatureGate = featuregate.GlobalRegistry().MustRegister( + "processor.tailsamplingprocessor.recordpolicy", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, attaches the name of the policy (and if applicable, composite policy) responsible for sampling a trace in the 'tailsampling.policy'/ 'tailsampling.composite_policy' attributes."), +) + +func IsRecordPolicyEnabled() bool { + return recordPolicyFeatureGate.IsEnabled() +} diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 02883fbc4778..189d4be39141 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -59,6 +59,7 @@ type tailSamplingSpanProcessor struct { nonSampledIDCache cache.Cache[bool] deleteChan chan pcommon.TraceID numTracesOnMap *atomic.Uint64 + recordPolicy bool } // spanAndScope a structure for holding information about span and its instrumentation scope. @@ -205,6 +206,12 @@ func withNonSampledDecisionCache(c cache.Cache[bool]) Option { } } +func withRecordPolicy() Option { + return func(tsp *tailSamplingSpanProcessor) { + tsp.recordPolicy = true + } +} + func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg) (sampling.PolicyEvaluator, error) { switch cfg.Type { case Composite: @@ -308,12 +315,12 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sampling.TraceData, metrics *policyMetrics) sampling.Decision { finalDecision := sampling.NotSampled - samplingDecision := map[sampling.Decision]bool{ - sampling.Error: false, - sampling.Sampled: false, - sampling.NotSampled: false, - sampling.InvertSampled: false, - sampling.InvertNotSampled: false, + samplingDecisions := map[sampling.Decision]*policy{ + sampling.Error: nil, + sampling.Sampled: nil, + sampling.NotSampled: nil, + sampling.InvertSampled: nil, + sampling.InvertNotSampled: nil, } ctx := context.Background() @@ -323,7 +330,9 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa decision, err := p.evaluator.Evaluate(ctx, id, trace) tsp.telemetry.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(time.Since(policyEvaluateStartTime)/time.Microsecond), p.attribute) if err != nil { - samplingDecision[sampling.Error] = true + if samplingDecisions[sampling.Error] == nil { + samplingDecisions[sampling.Error] = p + } metrics.evaluateErrorCount++ tsp.logger.Debug("Sampling policy error", zap.Error(err)) } else { @@ -332,18 +341,29 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision]) } - samplingDecision[decision] = true + // We associate the first policy with the sampling decision to understand what policy sampled a span + if samplingDecisions[decision] == nil { + samplingDecisions[decision] = p + } } } + var sampledPolicy *policy + // InvertNotSampled takes precedence over any other decision switch { - case samplingDecision[sampling.InvertNotSampled]: + case samplingDecisions[sampling.InvertNotSampled] != nil: finalDecision = sampling.NotSampled - case samplingDecision[sampling.Sampled]: + case samplingDecisions[sampling.Sampled] != nil: finalDecision = sampling.Sampled - case samplingDecision[sampling.InvertSampled] && !samplingDecision[sampling.NotSampled]: + sampledPolicy = samplingDecisions[sampling.Sampled] + case samplingDecisions[sampling.InvertSampled] != nil && samplingDecisions[sampling.NotSampled] == nil: finalDecision = sampling.Sampled + sampledPolicy = samplingDecisions[sampling.InvertSampled] + } + + if tsp.recordPolicy && sampledPolicy != nil { + sampling.SetAttrOnScopeSpans(trace, "tailsampling.policy", sampledPolicy.name) } return finalDecision diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index f6e9d1730ed1..6106e7b732a3 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" @@ -160,6 +161,57 @@ func TestSamplingMultiplePolicies(t *testing.T) { require.NoError(t, tel.Shutdown(context.Background())) } +func TestSamplingMultiplePolicies_WithRecordPolicy(t *testing.T) { + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + } + nextConsumer := new(consumertest.TracesSink) + s := setupTestTelemetry() + ct := s.NewSettings() + idb := newSyncIDBatcher() + + mpe1 := &mockPolicyEvaluator{} + mpe2 := &mockPolicyEvaluator{} + + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + {name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))}, + } + + p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withRecordPolicy()) + require.NoError(t, err) + + require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + // First policy takes precedence + mpe1.NextDecision = sampling.Sampled + mpe2.NextDecision = sampling.Sampled + + // Generate and deliver first span + require.NoError(t, p.ConsumeTraces(context.Background(), simpleTraces())) + + tsp := p.(*tailSamplingSpanProcessor) + + // The first tick won't do anything + tsp.policyTicker.OnTick() + // This will cause policy evaluations on the first span + tsp.policyTicker.OnTick() + + // The final decision SHOULD be Sampled. + require.EqualValues(t, 1, nextConsumer.SpanCount()) + + // First span should have an attribute that records the policy that sampled it + policy, ok := nextConsumer.AllTraces()[0].ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().Get("tailsampling.policy") + if !ok { + assert.FailNow(t, "Did not find expected attribute") + } + require.EqualValues(t, "mock-policy-1", policy.AsString()) +} + func TestSamplingPolicyDecisionNotSampled(t *testing.T) { cfg := Config{ DecisionWait: defaultTestDecisionWait, @@ -206,6 +258,47 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { require.NoError(t, tel.Shutdown(context.Background())) } +func TestSamplingPolicyDecisionNotSampled_WithRecordPolicy(t *testing.T) { + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + } + nextConsumer := new(consumertest.TracesSink) + s := setupTestTelemetry() + ct := s.NewSettings() + idb := newSyncIDBatcher() + + mpe1 := &mockPolicyEvaluator{} + + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withRecordPolicy()) + require.NoError(t, err) + + require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + // InvertNotSampled takes precedence + mpe1.NextDecision = sampling.NotSampled + + // Generate and deliver first span + require.NoError(t, p.ConsumeTraces(context.Background(), simpleTraces())) + + tsp := p.(*tailSamplingSpanProcessor) + + // The first tick won't do anything + tsp.policyTicker.OnTick() + // This will cause policy evaluations on the first span + tsp.policyTicker.OnTick() + + // The final decision SHOULD be NotSampled. + require.EqualValues(t, 0, nextConsumer.SpanCount()) +} + func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { cfg := Config{ DecisionWait: defaultTestDecisionWait, @@ -257,6 +350,50 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { require.NoError(t, tel.Shutdown(context.Background())) } +func TestSamplingPolicyDecisionInvertNotSampled_WithRecordPolicy(t *testing.T) { + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + } + nextConsumer := new(consumertest.TracesSink) + s := setupTestTelemetry() + ct := s.NewSettings() + idb := newSyncIDBatcher() + + mpe1 := &mockPolicyEvaluator{} + mpe2 := &mockPolicyEvaluator{} + + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + {name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))}, + } + + p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withRecordPolicy()) + require.NoError(t, err) + + require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + // InvertNotSampled takes precedence + mpe1.NextDecision = sampling.InvertNotSampled + mpe2.NextDecision = sampling.Sampled + + // Generate and deliver first span + require.NoError(t, p.ConsumeTraces(context.Background(), simpleTraces())) + + tsp := p.(*tailSamplingSpanProcessor) + + // The first tick won't do anything + tsp.policyTicker.OnTick() + // This will cause policy evaluations on the first span + tsp.policyTicker.OnTick() + + // The final decision SHOULD be NotSampled. + require.EqualValues(t, 0, nextConsumer.SpanCount()) +} + func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { cfg := Config{ DecisionWait: defaultTestDecisionWait,