Skip to content

Commit

Permalink
[connector/routing] Add ability to route by datapoint context (open-t…
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Dec 2, 2024
1 parent 4b45285 commit bc7d967
Show file tree
Hide file tree
Showing 11 changed files with 2,455 additions and 222 deletions.
27 changes: 27 additions & 0 deletions .chloggen/routing-by-datapoints-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: routingconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add abiilty to route by 'datapoint' context

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36523]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 2 additions & 2 deletions connector/routingconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ If you are not already familiar with connectors, you may find it helpful to firs
The following settings are available:

- `table (required)`: the routing table for this connector.
- `table.context (optional, default: resource)`: the [OTTL Context] in which the statement will be evaluated. Currently, only `resource`, `span`, `metric`, `log`, and `request` are supported.
- `table.context (optional, default: resource)`: the [OTTL Context] in which the statement will be evaluated. Currently, only `resource`, `span`, `metric`, `datapoint`, `log`, and `request` are supported.
- `table.statement`: the routing condition provided as the [OTTL] statement. Required if `table.condition` is not provided. May not be used for `request` context.
- `table.condition`: the routing condition provided as the [OTTL] condition. Required if `table.statement` is not provided. Required for `request` context.
- `table.pipelines (required)`: the list of pipelines to use when the routing condition is met.
Expand All @@ -43,7 +43,7 @@ The following settings are available:

### Limitations

- The `match_once` setting is only supported when using the `resource` context. If any routes use `span`, `metric`, `log` or `request` context, `match_once` must be set to `true`.
- The `match_once` setting is only supported when using the `resource` context. If any routes use `span`, `metric`, `datapoint`, `log` or `request` context, `match_once` must be set to `true`.
- The `request` context requires use of the `condition` setting, and relies on a very limited grammar. Conditions must be in the form of `request["key"] == "value"` or `request["key"] != "value"`. (In the future, this grammar may be expanded to support more complex conditions.)

### Supported [OTTL] functions
Expand Down
2 changes: 1 addition & 1 deletion connector/routingconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *Config) Validate() error {
return err
}
fallthrough
case "span", "metric", "log": // ok
case "span", "metric", "datapoint", "log": // ok
if !c.MatchOnce {
return fmt.Errorf(`%q context is not supported with "match_once: false"`, item.Context)
}
Expand Down
16 changes: 16 additions & 0 deletions connector/routingconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,22 @@ func TestValidateConfig(t *testing.T) {
},
error: `"metric" context is not supported with "match_once: false"`,
},
{
name: "datapoint context with match_once false",
config: &Config{
MatchOnce: false,
Table: []RoutingTableItem{
{
Context: "datapoint",
Statement: `route() where attributes["attr"] == "acme"`,
Pipelines: []pipeline.ID{
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
},
},
},
},
error: `"datapoint" context is not supported with "match_once: false"`,
},
{
name: "log context with match_once false",
config: &Config{
Expand Down
193 changes: 193 additions & 0 deletions connector/routingconnector/internal/pmetricutil/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,196 @@ func MoveMetricsWithContextIf(from, to pmetric.Metrics, f func(pmetric.ResourceM
return rm.ScopeMetrics().Len() == 0
})
}

// MoveDataPointsWithContextIf calls f sequentially for each DataPoint present in the first pmetric.Metrics.
// If f returns true, the element is removed from the first pmetric.Metrics and added to the second pmetric.Metrics.
// Notably, the Resource, Scope, and Metric associated with the DataPoint are created in the second pmetric.Metrics only once.
// Resources, Scopes, or Metrics are removed from the original if they become empty. All ordering is preserved.
func MoveDataPointsWithContextIf(from, to pmetric.Metrics, f func(pmetric.ResourceMetrics, pmetric.ScopeMetrics, pmetric.Metric, any) bool) {
rms := from.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
sms := rm.ScopeMetrics()
var rmCopy *pmetric.ResourceMetrics
for j := 0; j < sms.Len(); j++ {
sm := sms.At(j)
ms := sm.Metrics()
var smCopy *pmetric.ScopeMetrics
for k := 0; k < ms.Len(); k++ {
m := ms.At(k)
var mCopy *pmetric.Metric

// TODO condense this code
switch m.Type() {
case pmetric.MetricTypeGauge:
dps := m.Gauge().DataPoints()
dps.RemoveIf(func(dp pmetric.NumberDataPoint) bool {
if !f(rm, sm, m, dp) {
return false
}
if rmCopy == nil {
rmc := to.ResourceMetrics().AppendEmpty()
rmCopy = &rmc
rm.Resource().CopyTo(rmCopy.Resource())
rmCopy.SetSchemaUrl(rm.SchemaUrl())
}
if smCopy == nil {
smc := rmCopy.ScopeMetrics().AppendEmpty()
smCopy = &smc
sm.Scope().CopyTo(smCopy.Scope())
smCopy.SetSchemaUrl(sm.SchemaUrl())
}
if mCopy == nil {
mc := smCopy.Metrics().AppendEmpty()
mCopy = &mc
mCopy.SetName(m.Name())
mCopy.SetDescription(m.Description())
mCopy.SetUnit(m.Unit())
mCopy.SetEmptyGauge()
}
dp.CopyTo(mCopy.Gauge().DataPoints().AppendEmpty())
return true
})
case pmetric.MetricTypeSum:
dps := m.Sum().DataPoints()
dps.RemoveIf(func(dp pmetric.NumberDataPoint) bool {
if !f(rm, sm, m, dp) {
return false
}
if rmCopy == nil {
rmc := to.ResourceMetrics().AppendEmpty()
rmCopy = &rmc
rm.Resource().CopyTo(rmCopy.Resource())
rmCopy.SetSchemaUrl(rm.SchemaUrl())
}
if smCopy == nil {
smc := rmCopy.ScopeMetrics().AppendEmpty()
smCopy = &smc
sm.Scope().CopyTo(smCopy.Scope())
smCopy.SetSchemaUrl(sm.SchemaUrl())
}
if mCopy == nil {
mc := smCopy.Metrics().AppendEmpty()
mCopy = &mc
mCopy.SetName(m.Name())
mCopy.SetDescription(m.Description())
mCopy.SetUnit(m.Unit())
mCopy.SetEmptySum()
}
dp.CopyTo(mCopy.Sum().DataPoints().AppendEmpty())
return true
})
case pmetric.MetricTypeHistogram:
dps := m.Histogram().DataPoints()
dps.RemoveIf(func(dp pmetric.HistogramDataPoint) bool {
if !f(rm, sm, m, dp) {
return false
}
if rmCopy == nil {
rmc := to.ResourceMetrics().AppendEmpty()
rmCopy = &rmc
rm.Resource().CopyTo(rmCopy.Resource())
rmCopy.SetSchemaUrl(rm.SchemaUrl())
}
if smCopy == nil {
smc := rmCopy.ScopeMetrics().AppendEmpty()
smCopy = &smc
sm.Scope().CopyTo(smCopy.Scope())
smCopy.SetSchemaUrl(sm.SchemaUrl())
}
if mCopy == nil {
mc := smCopy.Metrics().AppendEmpty()
mCopy = &mc
mCopy.SetName(m.Name())
mCopy.SetDescription(m.Description())
mCopy.SetUnit(m.Unit())
mCopy.SetEmptyHistogram()
}
dp.CopyTo(mCopy.Histogram().DataPoints().AppendEmpty())
return true
})
case pmetric.MetricTypeExponentialHistogram:
dps := m.ExponentialHistogram().DataPoints()
dps.RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool {
if !f(rm, sm, m, dp) {
return false
}
if rmCopy == nil {
rmc := to.ResourceMetrics().AppendEmpty()
rmCopy = &rmc
rm.Resource().CopyTo(rmCopy.Resource())
rmCopy.SetSchemaUrl(rm.SchemaUrl())
}
if smCopy == nil {
smc := rmCopy.ScopeMetrics().AppendEmpty()
smCopy = &smc
sm.Scope().CopyTo(smCopy.Scope())
smCopy.SetSchemaUrl(sm.SchemaUrl())
}
if mCopy == nil {
mc := smCopy.Metrics().AppendEmpty()
mCopy = &mc
mCopy.SetName(m.Name())
mCopy.SetDescription(m.Description())
mCopy.SetUnit(m.Unit())
mCopy.SetEmptyExponentialHistogram()
}
dp.CopyTo(mCopy.ExponentialHistogram().DataPoints().AppendEmpty())
return true
})
case pmetric.MetricTypeSummary:
dps := m.Summary().DataPoints()
dps.RemoveIf(func(dp pmetric.SummaryDataPoint) bool {
if !f(rm, sm, m, dp) {
return false
}
if rmCopy == nil {
rmc := to.ResourceMetrics().AppendEmpty()
rmCopy = &rmc
rm.Resource().CopyTo(rmCopy.Resource())
rmCopy.SetSchemaUrl(rm.SchemaUrl())
}
if smCopy == nil {
smc := rmCopy.ScopeMetrics().AppendEmpty()
smCopy = &smc
sm.Scope().CopyTo(smCopy.Scope())
smCopy.SetSchemaUrl(sm.SchemaUrl())
}
if mCopy == nil {
mc := smCopy.Metrics().AppendEmpty()
mCopy = &mc
mCopy.SetName(m.Name())
mCopy.SetDescription(m.Description())
mCopy.SetUnit(m.Unit())
mCopy.SetEmptySummary()
}
dp.CopyTo(mCopy.Summary().DataPoints().AppendEmpty())
return true
})
}
}
ms.RemoveIf(func(m pmetric.Metric) bool {
var numDPs int
switch m.Type() {
case pmetric.MetricTypeGauge:
numDPs = m.Gauge().DataPoints().Len()
case pmetric.MetricTypeSum:
numDPs = m.Sum().DataPoints().Len()
case pmetric.MetricTypeHistogram:
numDPs = m.Histogram().DataPoints().Len()
case pmetric.MetricTypeExponentialHistogram:
numDPs = m.ExponentialHistogram().DataPoints().Len()
case pmetric.MetricTypeSummary:
numDPs = m.Summary().DataPoints().Len()
}
return numDPs == 0
})
}
sms.RemoveIf(func(sm pmetric.ScopeMetrics) bool {
return sm.Metrics().Len() == 0
})
}
rms.RemoveIf(func(rm pmetric.ResourceMetrics) bool {
return rm.ScopeMetrics().Len() == 0
})
}
Loading

0 comments on commit bc7d967

Please sign in to comment.