From 0fff4e1f5b00dedcc42dbf0c1b5343f0a204ae50 Mon Sep 17 00:00:00 2001 From: James Luck Date: Tue, 12 Nov 2024 18:19:43 +1100 Subject: [PATCH] ## Summary Adds support for optionally recording the policy (and any composite policy) associated with an inclusive tail processor sampling decision. Resolves !35180. ## Implementation notes - This functionality lives behind a feature flag that is disabled by default - The original issue described a solution where we might attach the attribute solely to the root span. I'm not sure I agree with the commenter that we can rely on this (e.g. we might decide to sample halfway through a long-running trace) so I have attached the attributes to all present scope spans. This feels like a decent trade off between complexity + network cost, as finding the highest non-root parent would require multiple passes of the spans and keeping all span ids in a set ## Testing - Added automated tests to verify enabling the flag both records the expected decision while not impacting existing logic - Built a custom version and ran it in our preprod environment to ensure it was stable over a 1h period (still evaluating, will update PR with any further observations) ## TODO Does this require a CHANGELOG entry? --- .../tailsamplingprocessor/composite_helper.go | 4 +- .../composite_helper_test.go | 4 +- processor/tailsamplingprocessor/factory.go | 8 +- .../internal/sampling/composite.go | 21 ++- .../internal/sampling/composite_test.go | 60 +++++++- .../internal/sampling/util.go | 13 ++ .../internal/telemetry/featureflag.go | 10 ++ processor/tailsamplingprocessor/processor.go | 45 ++++-- .../processor_decisions_test.go | 137 ++++++++++++++++++ 9 files changed, 275 insertions(+), 27 deletions(-) diff --git a/processor/tailsamplingprocessor/composite_helper.go b/processor/tailsamplingprocessor/composite_helper.go index 769bfcf95765..511b4747d6ab 100644 --- a/processor/tailsamplingprocessor/composite_helper.go +++ b/processor/tailsamplingprocessor/composite_helper.go @@ -4,6 +4,7 @@ package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor" import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry" "go.opentelemetry.io/collector/component" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" @@ -22,10 +23,11 @@ func getNewCompositePolicy(settings component.TelemetrySettings, config *Composi evalParams := sampling.SubPolicyEvalParams{ Evaluator: policy, MaxSpansPerSecond: int64(rateAllocationsMap[policyCfg.Name]), + Name: policyCfg.Name, } subPolicyEvalParams[i] = evalParams } - return sampling.NewComposite(settings.Logger, config.MaxTotalSpansPerSecond, subPolicyEvalParams, sampling.MonotonicClock{}), nil + return sampling.NewComposite(settings.Logger, config.MaxTotalSpansPerSecond, subPolicyEvalParams, sampling.MonotonicClock{}, telemetry.IsRecordPolicyEnabled()), nil } // Apply rate allocations to the sub-policies diff --git a/processor/tailsamplingprocessor/composite_helper_test.go b/processor/tailsamplingprocessor/composite_helper_test.go index 578f3be289b9..dee22721d418 100644 --- a/processor/tailsamplingprocessor/composite_helper_test.go +++ b/processor/tailsamplingprocessor/composite_helper_test.go @@ -52,12 +52,14 @@ func TestCompositeHelper(t *testing.T) { { Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100, 0), MaxSpansPerSecond: 250, + Name: "test-composite-policy-1", }, { Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 200, 0), MaxSpansPerSecond: 500, + Name: "test-composite-policy-2", }, - }, sampling.MonotonicClock{}) + }, sampling.MonotonicClock{}, false) assert.Equal(t, expected, actual) }) diff --git a/processor/tailsamplingprocessor/factory.go b/processor/tailsamplingprocessor/factory.go index 662b2496dab0..979f963adea5 100644 --- a/processor/tailsamplingprocessor/factory.go +++ b/processor/tailsamplingprocessor/factory.go @@ -7,6 +7,7 @@ package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry import ( "context" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry" "time" "go.opentelemetry.io/collector/component" @@ -38,5 +39,10 @@ func createTracesProcessor( nextConsumer consumer.Traces, ) (processor.Traces, error) { tCfg := cfg.(*Config) - return newTracesProcessor(ctx, params, nextConsumer, *tCfg) + opts := []Option{} + + if telemetry.IsRecordPolicyEnabled() { + opts = append(opts, withRecordPolicy()) + } + return newTracesProcessor(ctx, params, nextConsumer, *tCfg, opts...) } diff --git a/processor/tailsamplingprocessor/internal/sampling/composite.go b/processor/tailsamplingprocessor/internal/sampling/composite.go index b221229d7534..e2eeba47e027 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite.go @@ -19,6 +19,8 @@ type subpolicy struct { // spans per second that each subpolicy sampled in this period sampledSPS int64 + + name string } // Composite evaluator and its internal data @@ -35,7 +37,8 @@ type Composite struct { // The time provider (can be different from clock for testing purposes) timeProvider TimeProvider - logger *zap.Logger + logger *zap.Logger + recordSubPolicy bool } var _ PolicyEvaluator = (*Composite)(nil) @@ -44,6 +47,7 @@ var _ PolicyEvaluator = (*Composite)(nil) type SubPolicyEvalParams struct { Evaluator PolicyEvaluator MaxSpansPerSecond int64 + Name string } // NewComposite creates a policy evaluator that samples all subpolicies. @@ -52,6 +56,7 @@ func NewComposite( maxTotalSpansPerSecond int64, subPolicyParams []SubPolicyEvalParams, timeProvider TimeProvider, + recordSubPolicy bool, ) PolicyEvaluator { var subpolicies []*subpolicy @@ -60,7 +65,7 @@ func NewComposite( sub := &subpolicy{} sub.evaluator = subPolicyParams[i].Evaluator sub.allocatedSPS = subPolicyParams[i].MaxSpansPerSecond - + sub.name = subPolicyParams[i].Name // We are just starting, so there is no previous input, set it to 0 sub.sampledSPS = 0 @@ -68,10 +73,11 @@ func NewComposite( } return &Composite{ - maxTotalSPS: maxTotalSpansPerSecond, - subpolicies: subpolicies, - timeProvider: timeProvider, - logger: logger, + maxTotalSPS: maxTotalSpansPerSecond, + subpolicies: subpolicies, + timeProvider: timeProvider, + logger: logger, + recordSubPolicy: recordSubPolicy, } } @@ -111,6 +117,9 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace sub.sampledSPS = spansInSecondIfSampled // Let the sampling happen + if c.recordSubPolicy { + SetAttrOnScopeSpans(trace, "tailsampling.composite_policy", sub.name) + } return Sampled, nil } diff --git a/processor/tailsamplingprocessor/internal/sampling/composite_test.go b/processor/tailsamplingprocessor/internal/sampling/composite_test.go index 66a7d1606c34..a44ea934527d 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite_test.go @@ -61,7 +61,7 @@ func TestCompositeEvaluatorNotSampled(t *testing.T) { // Create 2 policies which do not match any trace n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false) n2 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 200, 300, false) - c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100}, {n2, 100}}, FakeTimeProvider{}) + c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100, "eval-1"}, {n2, 100, "eval-2"}}, FakeTimeProvider{}, false) trace := createTrace() @@ -79,7 +79,7 @@ func TestCompositeEvaluatorSampled(t *testing.T) { // Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled. n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false) n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) - c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100}, {n2, 100}}, FakeTimeProvider{}) + c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100, "eval-1"}, {n2, 100, "eval-2"}}, FakeTimeProvider{}, false) trace := createTrace() @@ -91,6 +91,28 @@ func TestCompositeEvaluatorSampled(t *testing.T) { assert.Equal(t, expected, decision) } +func TestCompositeEvaluatorSampled_RecordSubPolicy(t *testing.T) { + + // Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled. + n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false) + n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) + c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100, "eval-1"}, {n2, 100, "eval-2"}}, FakeTimeProvider{}, true) + + trace := newTraceWithKV(traceID, "test-key", 0) + + decision, err := c.Evaluate(context.Background(), traceID, trace) + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) + + // The second policy is AlwaysSample, so the decision should be Sampled. + expected := Sampled + assert.Equal(t, expected, decision) + val, ok := trace.ReceivedBatches.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().Get("tailsampling.composite_policy") + if !ok { + assert.FailNow(t, "Did not find expected key") + } + assert.Equal(t, "eval-2", val.AsString()) +} + func TestCompositeEvaluator_OverflowAlwaysSampled(t *testing.T) { timeProvider := &FakeTimeProvider{second: 0} @@ -98,7 +120,7 @@ func TestCompositeEvaluator_OverflowAlwaysSampled(t *testing.T) { // Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled. n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false) n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) - c := NewComposite(zap.NewNop(), 3, []SubPolicyEvalParams{{n1, 1}, {n2, 1}}, timeProvider) + c := NewComposite(zap.NewNop(), 3, []SubPolicyEvalParams{{n1, 1, "eval-1"}, {n2, 1, "eval-2"}}, timeProvider, false) trace := newTraceWithKV(traceID, "tag", int64(10)) @@ -132,7 +154,7 @@ func TestCompositeEvaluatorSampled_AlwaysSampled(t *testing.T) { // Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled. n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false) n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) - c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20}, {n2, 20}}, FakeTimeProvider{}) + c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20, "eval-1"}, {n2, 20, "eval-2"}}, FakeTimeProvider{}, false) for i := 1; i <= 10; i++ { trace := createTrace() @@ -151,7 +173,7 @@ func TestCompositeEvaluatorInverseSampled_AlwaysSampled(t *testing.T) { // The first policy does not match, the second matches through invert n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, false) n2 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, true) - c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20}, {n2, 20}}, FakeTimeProvider{}) + c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20, "eval-1"}, {n2, 20, "eval-2"}}, FakeTimeProvider{}, false) for i := 1; i <= 10; i++ { trace := createTrace() @@ -165,13 +187,37 @@ func TestCompositeEvaluatorInverseSampled_AlwaysSampled(t *testing.T) { } } +func TestCompositeEvaluatorInverseSampled_AlwaysSampled_RecordSubPolicy(t *testing.T) { + + // The first policy does not match, the second matches through invert + n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, false) + n2 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, true) + c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20, "eval-1"}, {n2, 20, "eval-2"}}, FakeTimeProvider{}, true) + + for i := 1; i <= 10; i++ { + trace := newTraceWithKV(traceID, "test-key", 0) + + decision, err := c.Evaluate(context.Background(), traceID, trace) + require.NoError(t, err, "Failed to evaluate composite policy: %v", err) + + // The second policy is AlwaysSample, so the decision should be Sampled. + expected := Sampled + assert.Equal(t, expected, decision) + val, ok := trace.ReceivedBatches.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().Get("tailsampling.composite_policy") + if !ok { + assert.FailNow(t, "Did not find expected key") + } + assert.Equal(t, "eval-2", val.AsString()) + } +} + func TestCompositeEvaluatorThrottling(t *testing.T) { // Create only one subpolicy, with 100% Sampled policy. n1 := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) timeProvider := &FakeTimeProvider{second: 0} const totalSPS = 10 - c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS}}, timeProvider) + c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS, "eval-1"}}, timeProvider, false) trace := createTrace() @@ -212,7 +258,7 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) { n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings()) timeProvider := &FakeTimeProvider{second: 0} const totalSPS = 10 - c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2}, {n2, totalSPS / 2}}, timeProvider) + c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2, "eval-1"}, {n2, totalSPS / 2, "eval-2"}}, timeProvider, false) trace := createTrace() diff --git a/processor/tailsamplingprocessor/internal/sampling/util.go b/processor/tailsamplingprocessor/internal/sampling/util.go index 6b896e526d04..9669ea39efbe 100644 --- a/processor/tailsamplingprocessor/internal/sampling/util.go +++ b/processor/tailsamplingprocessor/internal/sampling/util.go @@ -93,3 +93,16 @@ func invertHasInstrumentationLibrarySpanWithCondition(ilss ptrace.ScopeSpansSlic } return true } + +func SetAttrOnScopeSpans(data *TraceData, attrName string, attrKey string) { + + rs := data.ReceivedBatches.ResourceSpans() + + for i := 0; i < rs.Len(); i++ { + rss := rs.At(i) + for j := 0; j < rss.ScopeSpans().Len(); j++ { + ss := rss.ScopeSpans().At(j) + ss.Scope().Attributes().PutStr(attrName, attrKey) + } + } +} diff --git a/processor/tailsamplingprocessor/internal/telemetry/featureflag.go b/processor/tailsamplingprocessor/internal/telemetry/featureflag.go index 2907ad803da0..47f382377cb1 100644 --- a/processor/tailsamplingprocessor/internal/telemetry/featureflag.go +++ b/processor/tailsamplingprocessor/internal/telemetry/featureflag.go @@ -14,3 +14,13 @@ var metricStatCountSpansSampledFeatureGate = featuregate.GlobalRegistry().MustRe func IsMetricStatCountSpansSampledEnabled() bool { return metricStatCountSpansSampledFeatureGate.IsEnabled() } + +var recordPolicyFeatureGate = featuregate.GlobalRegistry().MustRegister( + "processor.tailsamplingprocessor.recordpolicy", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, attaches the name of the policy (and if applicable, composite policy) responsible for sampling a trace in the 'tailsampling.policy'/ 'tailsampling.composite_policy' attributes."), +) + +func IsRecordPolicyEnabled() bool { + return recordPolicyFeatureGate.IsEnabled() +} diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 4515290198ac..41004e9cb50d 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -58,6 +58,7 @@ type tailSamplingSpanProcessor struct { sampledIDCache cache.Cache[bool] deleteChan chan pcommon.TraceID numTracesOnMap *atomic.Uint64 + recordPolicy bool } // spanAndScope a structure for holding information about span and its instrumentation scope. @@ -188,6 +189,12 @@ func withSampledDecisionCache(c cache.Cache[bool]) Option { } } +func withRecordPolicy() Option { + return func(tsp *tailSamplingSpanProcessor) { + tsp.recordPolicy = true + } +} + func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg) (sampling.PolicyEvaluator, error) { switch cfg.Type { case Composite: @@ -291,12 +298,12 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sampling.TraceData, metrics *policyMetrics) sampling.Decision { finalDecision := sampling.NotSampled - samplingDecision := map[sampling.Decision]bool{ - sampling.Error: false, - sampling.Sampled: false, - sampling.NotSampled: false, - sampling.InvertSampled: false, - sampling.InvertNotSampled: false, + samplingDecisions := map[sampling.Decision]*policy{ + sampling.Error: nil, + sampling.Sampled: nil, + sampling.NotSampled: nil, + sampling.InvertSampled: nil, + sampling.InvertNotSampled: nil, } ctx := context.Background() @@ -306,7 +313,9 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa decision, err := p.evaluator.Evaluate(ctx, id, trace) tsp.telemetry.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(time.Since(policyEvaluateStartTime)/time.Microsecond), p.attribute) if err != nil { - samplingDecision[sampling.Error] = true + if samplingDecisions[sampling.Error] == nil { + samplingDecisions[sampling.Error] = p + } metrics.evaluateErrorCount++ tsp.logger.Debug("Sampling policy error", zap.Error(err)) } else { @@ -315,18 +324,29 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision]) } - samplingDecision[decision] = true + // We associate the first policy with the sampling decision to understand what policy sampled a span + if samplingDecisions[decision] == nil { + samplingDecisions[decision] = p + } } } + var sampledPolicy *policy + // InvertNotSampled takes precedence over any other decision switch { - case samplingDecision[sampling.InvertNotSampled]: + case samplingDecisions[sampling.InvertNotSampled] != nil: finalDecision = sampling.NotSampled - case samplingDecision[sampling.Sampled]: + case samplingDecisions[sampling.Sampled] != nil: finalDecision = sampling.Sampled - case samplingDecision[sampling.InvertSampled] && !samplingDecision[sampling.NotSampled]: + sampledPolicy = samplingDecisions[sampling.Sampled] + case samplingDecisions[sampling.InvertSampled] != nil && samplingDecisions[sampling.NotSampled] == nil: finalDecision = sampling.Sampled + sampledPolicy = samplingDecisions[sampling.InvertSampled] + } + + if tsp.recordPolicy && sampledPolicy != nil { + sampling.SetAttrOnScopeSpans(trace, "tailsampling.policy", sampledPolicy.name) } return finalDecision @@ -372,6 +392,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc appendToTraces(traceTd, resourceSpans, spans) tsp.releaseSampledTrace(tsp.ctx, id, traceTd) tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.Add(tsp.ctx, int64(len(spans))) + continue } @@ -485,6 +506,8 @@ func (tsp *tailSamplingSpanProcessor) releaseSampledTrace(ctx context.Context, i } } +// We know we cannot rely on order now +// Question is: how can we effeciently find things? func appendToTraces(dest ptrace.Traces, rss ptrace.ResourceSpans, spanAndScopes []spanAndScope) { rs := dest.ResourceSpans().AppendEmpty() rss.Resource().CopyTo(rs.Resource()) diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index e50ea4f7ce59..4c920ee73ae7 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -5,6 +5,7 @@ package tailsamplingprocessor import ( "context" + "github.com/stretchr/testify/assert" "testing" "time" @@ -160,6 +161,57 @@ func TestSamplingMultiplePolicies(t *testing.T) { require.EqualValues(t, 1, nextConsumer.SpanCount()) } +func TestSamplingMultiplePolicies_WithRecordPolicy(t *testing.T) { + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + } + nextConsumer := new(consumertest.TracesSink) + s := setupTestTelemetry() + ct := s.NewSettings() + idb := newSyncIDBatcher() + + mpe1 := &mockPolicyEvaluator{} + mpe2 := &mockPolicyEvaluator{} + + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + {name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))}, + } + + p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withRecordPolicy()) + require.NoError(t, err) + + require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + // First policy takes precedence + mpe1.NextDecision = sampling.Sampled + mpe2.NextDecision = sampling.Sampled + + // Generate and deliver first span + require.NoError(t, p.ConsumeTraces(context.Background(), simpleTraces())) + + tsp := p.(*tailSamplingSpanProcessor) + + // The first tick won't do anything + tsp.policyTicker.OnTick() + // This will cause policy evaluations on the first span + tsp.policyTicker.OnTick() + + // The final decision SHOULD be Sampled. + require.EqualValues(t, 1, nextConsumer.SpanCount()) + + // First span should have an attribute that records the policy that sampled it + policy, ok := nextConsumer.AllTraces()[0].ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().Get("tailsampling.policy") + if !ok { + assert.FailNow(t, "Did not find expected attribute") + } + require.EqualValues(t, "mock-policy-1", policy.AsString()) +} + func TestSamplingPolicyDecisionNotSampled(t *testing.T) { cfg := Config{ DecisionWait: defaultTestDecisionWait, @@ -206,6 +258,47 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { require.EqualValues(t, 0, nextConsumer.SpanCount()) } +func TestSamplingPolicyDecisionNotSampled_WithRecordPolicy(t *testing.T) { + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + } + nextConsumer := new(consumertest.TracesSink) + s := setupTestTelemetry() + ct := s.NewSettings() + idb := newSyncIDBatcher() + + mpe1 := &mockPolicyEvaluator{} + + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + } + + p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withRecordPolicy()) + require.NoError(t, err) + + require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + // InvertNotSampled takes precedence + mpe1.NextDecision = sampling.NotSampled + + // Generate and deliver first span + require.NoError(t, p.ConsumeTraces(context.Background(), simpleTraces())) + + tsp := p.(*tailSamplingSpanProcessor) + + // The first tick won't do anything + tsp.policyTicker.OnTick() + // This will cause policy evaluations on the first span + tsp.policyTicker.OnTick() + + // The final decision SHOULD be NotSampled. + require.EqualValues(t, 0, nextConsumer.SpanCount()) +} + func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { cfg := Config{ DecisionWait: defaultTestDecisionWait, @@ -257,6 +350,50 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { require.EqualValues(t, 0, nextConsumer.SpanCount()) } +func TestSamplingPolicyDecisionInvertNotSampled_WithRecordPolicy(t *testing.T) { + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + } + nextConsumer := new(consumertest.TracesSink) + s := setupTestTelemetry() + ct := s.NewSettings() + idb := newSyncIDBatcher() + + mpe1 := &mockPolicyEvaluator{} + mpe2 := &mockPolicyEvaluator{} + + policies := []*policy{ + {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, + {name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))}, + } + + p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withRecordPolicy()) + require.NoError(t, err) + + require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + // InvertNotSampled takes precedence + mpe1.NextDecision = sampling.InvertNotSampled + mpe2.NextDecision = sampling.Sampled + + // Generate and deliver first span + require.NoError(t, p.ConsumeTraces(context.Background(), simpleTraces())) + + tsp := p.(*tailSamplingSpanProcessor) + + // The first tick won't do anything + tsp.policyTicker.OnTick() + // This will cause policy evaluations on the first span + tsp.policyTicker.OnTick() + + // The final decision SHOULD be NotSampled. + require.EqualValues(t, 0, nextConsumer.SpanCount()) +} + func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { cfg := Config{ DecisionWait: defaultTestDecisionWait,