Skip to content

Commit

Permalink
[connector/routing] Add ability to route by span context (open-teleme…
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored and sbylica-splunk committed Dec 17, 2024
1 parent cfa98dc commit d755bf0
Show file tree
Hide file tree
Showing 11 changed files with 575 additions and 14 deletions.
27 changes: 27 additions & 0 deletions .chloggen/routing-by-traces.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: routingconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add ability to route by span context

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36276]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
5 changes: 2 additions & 3 deletions connector/routingconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ If you are not already familiar with connectors, you may find it helpful to firs
The following settings are available:

- `table (required)`: the routing table for this connector.
- `table.context (optional, default: resource)`: the [OTTL Context] in which the statement will be evaluated. Currently, only `resource`, `metric`, `log`, and `request` are supported.
- `table.context (optional, default: resource)`: the [OTTL Context] in which the statement will be evaluated. Currently, only `resource`, `span`, `metric`, `log`, and `request` are supported.
- `table.statement`: the routing condition provided as the [OTTL] statement. Required if `table.condition` is not provided. May not be used for `request` context.
- `table.condition`: the routing condition provided as the [OTTL] condition. Required if `table.statement` is not provided. Required for `request` context.
- `table.pipelines (required)`: the list of pipelines to use when the routing condition is met.
Expand All @@ -43,7 +43,7 @@ The following settings are available:

### Limitations

- The `match_once` setting is only supported when using the `resource` context. If any routes use `metric`, `log` or `request` context, `match_once` must be set to `true`.
- The `match_once` setting is only supported when using the `resource` context. If any routes use `span`, `metric`, `log` or `request` context, `match_once` must be set to `true`.
- The `request` context requires use of the `condition` setting, and relies on a very limited grammar. Conditions must be in the form of `request["key"] == "value"` or `request["key"] != "value"`. (In the future, this grammar may be expanded to support more complex conditions.)

### Supported [OTTL] functions
Expand Down Expand Up @@ -287,7 +287,6 @@ service:
## Differences between the Routing Connector and Routing Processor
- Routing on context values is only supported for logs at this time.
- The connector routes to pipelines, not exporters as the processor does.
[Connectors README]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md
Expand Down
2 changes: 1 addition & 1 deletion connector/routingconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *Config) Validate() error {
return err
}
fallthrough
case "metric", "log": // ok
case "span", "metric", "log": // ok
if !c.MatchOnce {
return fmt.Errorf(`%q context is not supported with "match_once: false"`, item.Context)
}
Expand Down
16 changes: 16 additions & 0 deletions connector/routingconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,22 @@ func TestValidateConfig(t *testing.T) {
},
error: "invalid context: invalid",
},
{
name: "span context with match_once false",
config: &Config{
MatchOnce: false,
Table: []RoutingTableItem{
{
Context: "span",
Statement: `route() where attributes["attr"] == "acme"`,
Pipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
},
},
},
},
error: `"span" context is not supported with "match_once: false"`,
},
{
name: "metric context with match_once false",
config: &Config{
Expand Down
49 changes: 46 additions & 3 deletions connector/routingconnector/internal/ptraceutil/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,54 @@ import "go.opentelemetry.io/collector/pdata/ptrace"
// MoveResourcesIf calls f sequentially for each ResourceSpans present in the first ptrace.Traces.
// If f returns true, the element is removed from the first ptrace.Traces and added to the second ptrace.Traces.
func MoveResourcesIf(from, to ptrace.Traces, f func(ptrace.ResourceSpans) bool) {
from.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool {
if !f(rs) {
from.ResourceSpans().RemoveIf(func(resoruceSpans ptrace.ResourceSpans) bool {
if !f(resoruceSpans) {
return false
}
rs.CopyTo(to.ResourceSpans().AppendEmpty())
resoruceSpans.CopyTo(to.ResourceSpans().AppendEmpty())
return true
})
}

// MoveSpansWithContextIf calls f sequentially for each Span present in the first ptrace.Traces.
// If f returns true, the element is removed from the first ptrace.Traces and added to the second ptrace.Traces.
// Notably, the Resource and Scope associated with the Span are created in the second ptrace.Traces only once.
// Resources or Scopes are removed from the original if they become empty. All ordering is preserved.
func MoveSpansWithContextIf(from, to ptrace.Traces, f func(ptrace.ResourceSpans, ptrace.ScopeSpans, ptrace.Span) bool) {
resourceSpansSlice := from.ResourceSpans()
for i := 0; i < resourceSpansSlice.Len(); i++ {
resourceSpans := resourceSpansSlice.At(i)
scopeSpanSlice := resourceSpans.ScopeSpans()
var resourceSpansCopy *ptrace.ResourceSpans
for j := 0; j < scopeSpanSlice.Len(); j++ {
scopeSpans := scopeSpanSlice.At(j)
spanSlice := scopeSpans.Spans()
var scopeSpansCopy *ptrace.ScopeSpans
spanSlice.RemoveIf(func(span ptrace.Span) bool {
if !f(resourceSpans, scopeSpans, span) {
return false
}
if resourceSpansCopy == nil {
rmc := to.ResourceSpans().AppendEmpty()
resourceSpansCopy = &rmc
resourceSpans.Resource().CopyTo(resourceSpansCopy.Resource())
resourceSpansCopy.SetSchemaUrl(resourceSpans.SchemaUrl())
}
if scopeSpansCopy == nil {
smc := resourceSpansCopy.ScopeSpans().AppendEmpty()
scopeSpansCopy = &smc
scopeSpans.Scope().CopyTo(scopeSpansCopy.Scope())
scopeSpansCopy.SetSchemaUrl(scopeSpans.SchemaUrl())
}
span.CopyTo(scopeSpansCopy.Spans().AppendEmpty())
return true
})
}
scopeSpanSlice.RemoveIf(func(sm ptrace.ScopeSpans) bool {
return sm.Spans().Len() == 0
})
}
resourceSpansSlice.RemoveIf(func(resourceSpans ptrace.ResourceSpans) bool {
return resourceSpans.ScopeSpans().Len() == 0
})
}
144 changes: 144 additions & 0 deletions connector/routingconnector/internal/ptraceutil/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,147 @@ func TestMoveResourcesIf(t *testing.T) {
})
}
}

func TestMoveSpansWithContextIf(t *testing.T) {
testCases := []struct {
name string
moveIf func(ptrace.ResourceSpans, ptrace.ScopeSpans, ptrace.Span) bool
from ptrace.Traces
to ptrace.Traces
expectFrom ptrace.Traces
expectTo ptrace.Traces
}{
{
name: "move_none",
moveIf: func(_ ptrace.ResourceSpans, _ ptrace.ScopeSpans, _ ptrace.Span) bool {
return false
},
from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"),
to: ptrace.NewTraces(),
expectFrom: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"),
expectTo: ptrace.NewTraces(),
},
{
name: "move_all",
moveIf: func(_ ptrace.ResourceSpans, _ ptrace.ScopeSpans, _ ptrace.Span) bool {
return true
},
from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"),
to: ptrace.NewTraces(),
expectFrom: ptrace.NewTraces(),
expectTo: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"),
},
{
name: "move_all_from_one_resource",
moveIf: func(rl ptrace.ResourceSpans, _ ptrace.ScopeSpans, _ ptrace.Span) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB"
},
from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"),
to: ptrace.NewTraces(),
expectFrom: ptraceutiltest.NewTraces("A", "CD", "EF", "GH"),
expectTo: ptraceutiltest.NewTraces("B", "CD", "EF", "GH"),
},
{
name: "move_all_from_one_scope",
moveIf: func(rl ptrace.ResourceSpans, sl ptrace.ScopeSpans, _ ptrace.Span) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB" && sl.Scope().Name() == "scopeC"
},
from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"),
to: ptrace.NewTraces(),
expectFrom: ptraceutiltest.NewTracesFromOpts(
ptraceutiltest.WithResource('A',
ptraceutiltest.WithScope('C', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")),
ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")),
),
ptraceutiltest.WithResource('B',
ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")),
),
),
expectTo: ptraceutiltest.NewTraces("B", "C", "EF", "GH"),
},
{
name: "move_all_from_one_scope_in_each_resource",
moveIf: func(_ ptrace.ResourceSpans, sl ptrace.ScopeSpans, _ ptrace.Span) bool {
return sl.Scope().Name() == "scopeD"
},
from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"),
to: ptrace.NewTraces(),
expectFrom: ptraceutiltest.NewTraces("AB", "C", "EF", "GH"),
expectTo: ptraceutiltest.NewTraces("AB", "D", "EF", "GH"),
},
{
name: "move_one",
moveIf: func(rl ptrace.ResourceSpans, sl ptrace.ScopeSpans, m ptrace.Span) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceA" && sl.Scope().Name() == "scopeD" && m.Name() == "spanF"
},
from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"),
to: ptrace.NewTraces(),
expectFrom: ptraceutiltest.NewTracesFromOpts(
ptraceutiltest.WithResource('A',
ptraceutiltest.WithScope('C', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")),
ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH")),
),
ptraceutiltest.WithResource('B',
ptraceutiltest.WithScope('C', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")),
ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")),
),
),
expectTo: ptraceutiltest.NewTraces("A", "D", "F", "GH"),
},
{
name: "move_one_from_each_scope",
moveIf: func(_ ptrace.ResourceSpans, _ ptrace.ScopeSpans, m ptrace.Span) bool {
return m.Name() == "spanE"
},
from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"),
to: ptrace.NewTraces(),
expectFrom: ptraceutiltest.NewTraces("AB", "CD", "F", "GH"),
expectTo: ptraceutiltest.NewTraces("AB", "CD", "E", "GH"),
},
{
name: "move_one_from_each_scope_in_one_resource",
moveIf: func(rl ptrace.ResourceSpans, _ ptrace.ScopeSpans, m ptrace.Span) bool {
rname, ok := rl.Resource().Attributes().Get("resourceName")
return ok && rname.AsString() == "resourceB" && m.Name() == "spanE"
},
from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"),
to: ptrace.NewTraces(),
expectFrom: ptraceutiltest.NewTracesFromOpts(
ptraceutiltest.WithResource('A',
ptraceutiltest.WithScope('C', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")),
ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")),
),
ptraceutiltest.WithResource('B',
ptraceutiltest.WithScope('C', ptraceutiltest.WithSpan('F', "GH")),
ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('F', "GH")),
),
),
expectTo: ptraceutiltest.NewTraces("B", "CD", "E", "GH"),
},
{
name: "move_some_to_preexisting",
moveIf: func(_ ptrace.ResourceSpans, sl ptrace.ScopeSpans, _ ptrace.Span) bool {
return sl.Scope().Name() == "scopeD"
},
from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"),
to: ptraceutiltest.NewTraces("1", "2", "3", "4"),
expectFrom: ptraceutiltest.NewTraces("AB", "C", "EF", "GH"),
expectTo: ptraceutiltest.NewTracesFromOpts(
ptraceutiltest.WithResource('1', ptraceutiltest.WithScope('2', ptraceutiltest.WithSpan('3', "4"))),
ptraceutiltest.WithResource('A', ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH"))),
ptraceutiltest.WithResource('B', ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH"))),
),
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
ptraceutil.MoveSpansWithContextIf(tt.from, tt.to, tt.moveIf)
assert.NoError(t, ptracetest.CompareTraces(tt.expectFrom, tt.from), "from not modified as expected")
assert.NoError(t, ptracetest.CompareTraces(tt.expectTo, tt.to), "to not as expected")
})
}
}
54 changes: 54 additions & 0 deletions connector/routingconnector/internal/ptraceutiltest/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,57 @@ func NewTraces(resourceIDs, scopeIDs, spanIDs, spanEventIDs string) ptrace.Trace
}
return td
}

type Resource struct {
id byte
scopes []Scope
}

type Scope struct {
id byte
spans []Span
}

type Span struct {
id byte
spanEvents string
}

func WithResource(id byte, scopes ...Scope) Resource {
r := Resource{id: id}
r.scopes = append(r.scopes, scopes...)
return r
}

func WithScope(id byte, spans ...Span) Scope {
s := Scope{id: id}
s.spans = append(s.spans, spans...)
return s
}

func WithSpan(id byte, spanEvents string) Span {
return Span{id: id, spanEvents: spanEvents}
}

// NewTracesFromOpts creates a ptrace.Traces with the specified resources, scopes, metrics,
// and data points. The general idea is the same as NewMetrics, but this function allows for
// more flexibility in creating non-uniform structures.
func NewTracesFromOpts(resources ...Resource) ptrace.Traces {
td := ptrace.NewTraces()
for _, resource := range resources {
r := td.ResourceSpans().AppendEmpty()
r.Resource().Attributes().PutStr("resourceName", "resource"+string(resource.id))
for _, scope := range resource.scopes {
ss := r.ScopeSpans().AppendEmpty()
ss.Scope().SetName("scope" + string(scope.id))
for _, span := range scope.spans {
s := ss.Spans().AppendEmpty()
s.SetName("span" + string(span.id))
for i := 0; i < len(span.spanEvents); i++ {
s.Events().AppendEmpty().Attributes().PutStr("spanEventName", "spanEvent"+string(span.spanEvents[i]))
}
}
}
}
return td
}
Loading

0 comments on commit d755bf0

Please sign in to comment.