Skip to content

Commit

Permalink
Adds support for optionally recording the policy (and any composite p…
Browse files Browse the repository at this point in the history
…olicy) associated with an inclusive tail processor sampling decision.

Resolves !35180.

- This functionality lives behind a feature flag that is disabled by default
- The original issue described a solution where we might attach the attribute solely to the root span. I'm not sure I agree with the commenter that we can rely on this (e.g. we might decide to sample halfway through a long-running trace) so I have attached the attributes to all present scope spans. This feels like a decent trade off between complexity + network cost, as finding the highest non-root parent would require multiple passes of the spans and keeping all span ids in a set

- Added automated tests to verify enabling the flag both records the expected decision while not impacting existing logic
- Built a custom version and ran it in our preprod environment to ensure it was stable over a 1h period (still evaluating, will update PR with any further observations)

Does this require a CHANGELOG entry?
  • Loading branch information
djluck committed Dec 6, 2024
1 parent dec4a7b commit 8e0239a
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 27 deletions.
4 changes: 3 additions & 1 deletion processor/tailsamplingprocessor/composite_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry"
"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
Expand All @@ -22,10 +23,11 @@ func getNewCompositePolicy(settings component.TelemetrySettings, config *Composi
evalParams := sampling.SubPolicyEvalParams{
Evaluator: policy,
MaxSpansPerSecond: int64(rateAllocationsMap[policyCfg.Name]),
Name: policyCfg.Name,
}
subPolicyEvalParams[i] = evalParams
}
return sampling.NewComposite(settings.Logger, config.MaxTotalSpansPerSecond, subPolicyEvalParams, sampling.MonotonicClock{}), nil
return sampling.NewComposite(settings.Logger, config.MaxTotalSpansPerSecond, subPolicyEvalParams, sampling.MonotonicClock{}, telemetry.IsRecordPolicyEnabled()), nil
}

// Apply rate allocations to the sub-policies
Expand Down
4 changes: 3 additions & 1 deletion processor/tailsamplingprocessor/composite_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ func TestCompositeHelper(t *testing.T) {
{
Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100, 0),
MaxSpansPerSecond: 250,
Name: "test-composite-policy-1",
},
{
Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 200, 0),
MaxSpansPerSecond: 500,
Name: "test-composite-policy-2",
},
}, sampling.MonotonicClock{})
}, sampling.MonotonicClock{}, false)
assert.Equal(t, expected, actual)
})

Expand Down
8 changes: 7 additions & 1 deletion processor/tailsamplingprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry

import (
"context"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -38,5 +39,10 @@ func createTracesProcessor(
nextConsumer consumer.Traces,
) (processor.Traces, error) {
tCfg := cfg.(*Config)
return newTracesProcessor(ctx, params, nextConsumer, *tCfg)
opts := []Option{}

if telemetry.IsRecordPolicyEnabled() {
opts = append(opts, withRecordPolicy())
}
return newTracesProcessor(ctx, params, nextConsumer, *tCfg, opts...)
}
21 changes: 15 additions & 6 deletions processor/tailsamplingprocessor/internal/sampling/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type subpolicy struct {

// spans per second that each subpolicy sampled in this period
sampledSPS int64

name string
}

// Composite evaluator and its internal data
Expand All @@ -35,7 +37,8 @@ type Composite struct {
// The time provider (can be different from clock for testing purposes)
timeProvider TimeProvider

logger *zap.Logger
logger *zap.Logger
recordSubPolicy bool
}

var _ PolicyEvaluator = (*Composite)(nil)
Expand All @@ -44,6 +47,7 @@ var _ PolicyEvaluator = (*Composite)(nil)
type SubPolicyEvalParams struct {
Evaluator PolicyEvaluator
MaxSpansPerSecond int64
Name string
}

// NewComposite creates a policy evaluator that samples all subpolicies.
Expand All @@ -52,25 +56,27 @@ func NewComposite(
maxTotalSpansPerSecond int64,
subPolicyParams []SubPolicyEvalParams,
timeProvider TimeProvider,
recordSubPolicy bool,
) PolicyEvaluator {
var subpolicies []*subpolicy

for i := 0; i < len(subPolicyParams); i++ {
sub := &subpolicy{}
sub.evaluator = subPolicyParams[i].Evaluator
sub.allocatedSPS = subPolicyParams[i].MaxSpansPerSecond

sub.name = subPolicyParams[i].Name
// We are just starting, so there is no previous input, set it to 0
sub.sampledSPS = 0

subpolicies = append(subpolicies, sub)
}

return &Composite{
maxTotalSPS: maxTotalSpansPerSecond,
subpolicies: subpolicies,
timeProvider: timeProvider,
logger: logger,
maxTotalSPS: maxTotalSpansPerSecond,
subpolicies: subpolicies,
timeProvider: timeProvider,
logger: logger,
recordSubPolicy: recordSubPolicy,
}
}

Expand Down Expand Up @@ -110,6 +116,9 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace
sub.sampledSPS = spansInSecondIfSampled

// Let the sampling happen
if c.recordSubPolicy {
SetAttrOnScopeSpans(trace, "tailsampling.composite_policy", sub.name)
}
return Sampled, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestCompositeEvaluatorNotSampled(t *testing.T) {
// Create 2 policies which do not match any trace
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
n2 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 200, 300, false)
c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100}, {n2, 100}}, FakeTimeProvider{})
c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100, "eval-1"}, {n2, 100, "eval-2"}}, FakeTimeProvider{}, false)

trace := createTrace()

Expand All @@ -77,7 +77,7 @@ func TestCompositeEvaluatorSampled(t *testing.T) {
// Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled.
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100}, {n2, 100}}, FakeTimeProvider{})
c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100, "eval-1"}, {n2, 100, "eval-2"}}, FakeTimeProvider{}, false)

trace := createTrace()

Expand All @@ -89,13 +89,35 @@ func TestCompositeEvaluatorSampled(t *testing.T) {
assert.Equal(t, expected, decision)
}

func TestCompositeEvaluatorSampled_RecordSubPolicy(t *testing.T) {

// Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled.
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100, "eval-1"}, {n2, 100, "eval-2"}}, FakeTimeProvider{}, true)

trace := newTraceWithKV(traceID, "test-key", 0)

decision, err := c.Evaluate(context.Background(), traceID, trace)
require.NoError(t, err, "Failed to evaluate composite policy: %v", err)

// The second policy is AlwaysSample, so the decision should be Sampled.
expected := Sampled
assert.Equal(t, expected, decision)
val, ok := trace.ReceivedBatches.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().Get("tailsampling.composite_policy")
if !ok {
assert.FailNow(t, "Did not find expected key")
}
assert.Equal(t, "eval-2", val.AsString())
}

func TestCompositeEvaluator_OverflowAlwaysSampled(t *testing.T) {
timeProvider := &FakeTimeProvider{second: 0}

// Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled.
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
c := NewComposite(zap.NewNop(), 3, []SubPolicyEvalParams{{n1, 1}, {n2, 1}}, timeProvider)
c := NewComposite(zap.NewNop(), 3, []SubPolicyEvalParams{{n1, 1, "eval-1"}, {n2, 1, "eval-2"}}, timeProvider, false)

trace := newTraceWithKV(traceID, "tag", int64(10))

Expand Down Expand Up @@ -128,7 +150,7 @@ func TestCompositeEvaluatorSampled_AlwaysSampled(t *testing.T) {
// Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled.
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20}, {n2, 20}}, FakeTimeProvider{})
c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20, "eval-1"}, {n2, 20, "eval-2"}}, FakeTimeProvider{}, false)

for i := 1; i <= 10; i++ {
trace := createTrace()
Expand All @@ -146,7 +168,7 @@ func TestCompositeEvaluatorInverseSampled_AlwaysSampled(t *testing.T) {
// The first policy does not match, the second matches through invert
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, false)
n2 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, true)
c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20}, {n2, 20}}, FakeTimeProvider{})
c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20, "eval-1"}, {n2, 20, "eval-2"}}, FakeTimeProvider{}, false)

for i := 1; i <= 10; i++ {
trace := createTrace()
Expand All @@ -160,12 +182,36 @@ func TestCompositeEvaluatorInverseSampled_AlwaysSampled(t *testing.T) {
}
}

func TestCompositeEvaluatorInverseSampled_AlwaysSampled_RecordSubPolicy(t *testing.T) {

// The first policy does not match, the second matches through invert
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, false)
n2 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, true)
c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20, "eval-1"}, {n2, 20, "eval-2"}}, FakeTimeProvider{}, true)

for i := 1; i <= 10; i++ {
trace := newTraceWithKV(traceID, "test-key", 0)

decision, err := c.Evaluate(context.Background(), traceID, trace)
require.NoError(t, err, "Failed to evaluate composite policy: %v", err)

// The second policy is AlwaysSample, so the decision should be Sampled.
expected := Sampled
assert.Equal(t, expected, decision)
val, ok := trace.ReceivedBatches.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().Get("tailsampling.composite_policy")
if !ok {
assert.FailNow(t, "Did not find expected key")
}
assert.Equal(t, "eval-2", val.AsString())
}
}

func TestCompositeEvaluatorThrottling(t *testing.T) {
// Create only one subpolicy, with 100% Sampled policy.
n1 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
timeProvider := &FakeTimeProvider{second: 0}
const totalSPS = 10
c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS}}, timeProvider)
c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS, "eval-1"}}, timeProvider, false)

trace := createTrace()

Expand Down Expand Up @@ -205,7 +251,7 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) {
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
timeProvider := &FakeTimeProvider{second: 0}
const totalSPS = 10
c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2}, {n2, totalSPS / 2}}, timeProvider)
c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2, "eval-1"}, {n2, totalSPS / 2, "eval-2"}}, timeProvider, false)

trace := createTrace()

Expand Down
13 changes: 13 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,16 @@ func invertHasInstrumentationLibrarySpanWithCondition(ilss ptrace.ScopeSpansSlic
}
return true
}

func SetAttrOnScopeSpans(data *TraceData, attrName string, attrKey string) {

rs := data.ReceivedBatches.ResourceSpans()

for i := 0; i < rs.Len(); i++ {
rss := rs.At(i)
for j := 0; j < rss.ScopeSpans().Len(); j++ {
ss := rss.ScopeSpans().At(j)
ss.Scope().Attributes().PutStr(attrName, attrKey)
}
}
}
10 changes: 10 additions & 0 deletions processor/tailsamplingprocessor/internal/telemetry/featureflag.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,13 @@ var metricStatCountSpansSampledFeatureGate = featuregate.GlobalRegistry().MustRe
func IsMetricStatCountSpansSampledEnabled() bool {
return metricStatCountSpansSampledFeatureGate.IsEnabled()
}

var recordPolicyFeatureGate = featuregate.GlobalRegistry().MustRegister(
"processor.tailsamplingprocessor.recordpolicy",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, attaches the name of the policy (and if applicable, composite policy) responsible for sampling a trace in the 'tailsampling.policy'/ 'tailsampling.composite_policy' attributes."),
)

func IsRecordPolicyEnabled() bool {
return recordPolicyFeatureGate.IsEnabled()
}
42 changes: 31 additions & 11 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type tailSamplingSpanProcessor struct {
nonSampledIDCache cache.Cache[bool]
deleteChan chan pcommon.TraceID
numTracesOnMap *atomic.Uint64
recordPolicy bool
}

// spanAndScope a structure for holding information about span and its instrumentation scope.
Expand Down Expand Up @@ -205,6 +206,12 @@ func withNonSampledDecisionCache(c cache.Cache[bool]) Option {
}
}

func withRecordPolicy() Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.recordPolicy = true
}
}

func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg) (sampling.PolicyEvaluator, error) {
switch cfg.Type {
case Composite:
Expand Down Expand Up @@ -308,12 +315,12 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {

func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sampling.TraceData, metrics *policyMetrics) sampling.Decision {
finalDecision := sampling.NotSampled
samplingDecision := map[sampling.Decision]bool{
sampling.Error: false,
sampling.Sampled: false,
sampling.NotSampled: false,
sampling.InvertSampled: false,
sampling.InvertNotSampled: false,
samplingDecisions := map[sampling.Decision]*policy{
sampling.Error: nil,
sampling.Sampled: nil,
sampling.NotSampled: nil,
sampling.InvertSampled: nil,
sampling.InvertNotSampled: nil,
}

ctx := context.Background()
Expand All @@ -323,7 +330,9 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa
decision, err := p.evaluator.Evaluate(ctx, id, trace)
tsp.telemetry.ProcessorTailSamplingSamplingDecisionLatency.Record(ctx, int64(time.Since(policyEvaluateStartTime)/time.Microsecond), p.attribute)
if err != nil {
samplingDecision[sampling.Error] = true
if samplingDecisions[sampling.Error] == nil {
samplingDecisions[sampling.Error] = p
}
metrics.evaluateErrorCount++
tsp.logger.Debug("Sampling policy error", zap.Error(err))
} else {
Expand All @@ -332,18 +341,29 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa
tsp.telemetry.ProcessorTailSamplingCountSpansSampled.Add(ctx, trace.SpanCount.Load(), p.attribute, decisionToAttribute[decision])
}

samplingDecision[decision] = true
// We associate the first policy with the sampling decision to understand what policy sampled a span
if samplingDecisions[decision] == nil {
samplingDecisions[decision] = p
}
}
}

var sampledPolicy *policy

// InvertNotSampled takes precedence over any other decision
switch {
case samplingDecision[sampling.InvertNotSampled]:
case samplingDecisions[sampling.InvertNotSampled] != nil:
finalDecision = sampling.NotSampled
case samplingDecision[sampling.Sampled]:
case samplingDecisions[sampling.Sampled] != nil:
finalDecision = sampling.Sampled
case samplingDecision[sampling.InvertSampled] && !samplingDecision[sampling.NotSampled]:
sampledPolicy = samplingDecisions[sampling.Sampled]
case samplingDecisions[sampling.InvertSampled] != nil && samplingDecisions[sampling.NotSampled] == nil:
finalDecision = sampling.Sampled
sampledPolicy = samplingDecisions[sampling.InvertSampled]
}

if tsp.recordPolicy && sampledPolicy != nil {
sampling.SetAttrOnScopeSpans(trace, "tailsampling.policy", sampledPolicy.name)
}

return finalDecision
Expand Down
Loading

0 comments on commit 8e0239a

Please sign in to comment.