From 12d6aaeb822f58e7039827a919b1c9468f75592a Mon Sep 17 00:00:00 2001 From: edmocosta <11836452+edmocosta@users.noreply.github.com> Date: Thu, 16 Jan 2025 15:43:29 +0100 Subject: [PATCH 1/3] Replace ParserCollection and add initial support for context inference --- processor/transformprocessor/config_test.go | 56 +- .../internal/common/config.go | 16 + .../internal/common/logs.go | 101 ++- .../internal/common/metrics.go | 165 ++-- .../internal/common/processor.go | 147 ++-- .../internal/common/traces.go | 137 ++-- .../internal/logs/processor.go | 9 +- .../internal/logs/processor_test.go | 428 ++++++++++- .../internal/metrics/processor.go | 9 +- .../internal/metrics/processor_test.go | 712 +++++++++++++++++- .../internal/traces/processor.go | 9 +- .../internal/traces/processor_test.go | 414 +++++++++- .../transformprocessor/testdata/config.yaml | 28 + 13 files changed, 1920 insertions(+), 311 deletions(-) diff --git a/processor/transformprocessor/config_test.go b/processor/transformprocessor/config_test.go index fe30c4c58ac1..e736707cd706 100644 --- a/processor/transformprocessor/config_test.go +++ b/processor/transformprocessor/config_test.go @@ -147,9 +147,63 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "bad_syntax_multi_signal"), errorLen: 3, }, + { + id: component.NewIDWithName(metadata.Type, "structured_configuration_with_path_context"), + expected: &Config{ + ErrorMode: ottl.PropagateError, + TraceStatements: []common.ContextStatements{ + { + Context: "span", + Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`}, + }, + }, + MetricStatements: []common.ContextStatements{ + { + Context: "metric", + Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`}, + }, + }, + LogStatements: []common.ContextStatements{ + { + Context: "log", + Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`}, + }, + }, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "structured_configuration_with_inferred_context"), + expected: &Config{ + ErrorMode: ottl.PropagateError, + TraceStatements: []common.ContextStatements{ + { + Statements: []string{ + `set(span.name, "bear") where span.attributes["http.path"] == "/animal"`, + `set(resource.attributes["name"], "bear")`, + }, + }, + }, + MetricStatements: []common.ContextStatements{ + { + Statements: []string{ + `set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`, + `set(resource.attributes["name"], "bear")`, + }, + }, + }, + LogStatements: []common.ContextStatements{ + { + Statements: []string{ + `set(log.body, "bear") where log.attributes["http.path"] == "/animal"`, + `set(resource.attributes["name"], "bear")`, + }, + }, + }, + }, + }, } for _, tt := range tests { - t.Run(tt.id.String(), func(t *testing.T) { + t.Run(tt.id.Name(), func(t *testing.T) { cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) assert.NoError(t, err) diff --git a/processor/transformprocessor/internal/common/config.go b/processor/transformprocessor/internal/common/config.go index c0f293457329..79087389d644 100644 --- a/processor/transformprocessor/internal/common/config.go +++ b/processor/transformprocessor/internal/common/config.go @@ -6,8 +6,12 @@ package common // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "fmt" "strings" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" ) +var _ ottl.StatementsGetter = (*ContextStatements)(nil) + type ContextID string const ( @@ -36,3 +40,15 @@ type ContextStatements struct { Conditions []string `mapstructure:"conditions"` Statements []string `mapstructure:"statements"` } + +func (c ContextStatements) GetStatements() []string { + return c.Statements +} + +func toContextStatements(statements any) (*ContextStatements, error) { + contextStatements, ok := statements.(ContextStatements) + if !ok { + return nil, fmt.Errorf("invalid context statements type, expected: common.ContextStatements, got: %T", statements) + } + return &contextStatements, nil +} diff --git a/processor/transformprocessor/internal/common/logs.go b/processor/transformprocessor/internal/common/logs.go index 4d9726c38260..60fd8be5ac75 100644 --- a/processor/transformprocessor/internal/common/logs.go +++ b/processor/transformprocessor/internal/common/logs.go @@ -7,38 +7,37 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" ) -var _ consumer.Logs = &logStatements{} +type LogsConsumer interface { + Context() ContextID + ConsumeLogs(ctx context.Context, ld plog.Logs, cache *pcommon.Map) error +} type logStatements struct { ottl.StatementSequence[ottllog.TransformContext] expr.BoolExpr[ottllog.TransformContext] } -func (l logStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (l logStatements) Context() ContextID { + return Log } -func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { +func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs, cache *pcommon.Map) error { for i := 0; i < ld.ResourceLogs().Len(); i++ { rlogs := ld.ResourceLogs().At(i) for j := 0; j < rlogs.ScopeLogs().Len(); j++ { slogs := rlogs.ScopeLogs().At(j) logs := slogs.LogRecords() for k := 0; k < logs.Len(); k++ { - tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource(), slogs, rlogs) + tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource(), slogs, rlogs, ottllog.WithCache(cache)) condition, err := l.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -55,76 +54,60 @@ func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return nil } -type LogParserCollection struct { - parserCollection - logParser ottl.Parser[ottllog.TransformContext] -} +type LogParserCollection ottl.ParserCollection[LogsConsumer] -type LogParserCollectionOption func(*LogParserCollection) error +type LogParserCollectionOption ottl.ParserCollectionOption[LogsConsumer] func WithLogParser(functions map[string]ottl.Factory[ottllog.TransformContext]) LogParserCollectionOption { - return func(lp *LogParserCollection) error { - logParser, err := ottllog.NewParser(functions, lp.settings) + return func(pc *ottl.ParserCollection[LogsConsumer]) error { + logParser, err := ottllog.NewParser(functions, pc.Settings, ottllog.EnablePathContextNames()) if err != nil { return err } - lp.logParser = logParser - return nil + return ottl.WithParserCollectionContext(ottllog.ContextName, &logParser, convertLogStatements)(pc) } } func WithLogErrorMode(errorMode ottl.ErrorMode) LogParserCollectionOption { - return func(lp *LogParserCollection) error { - lp.errorMode = errorMode - return nil - } + return LogParserCollectionOption(ottl.WithParserCollectionErrorMode[LogsConsumer](errorMode)) } func NewLogParserCollection(settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) { - rp, err := ottlresource.NewParser(ResourceFunctions(), settings) + pcOptions := []ottl.ParserCollectionOption[LogsConsumer]{ + withCommonContextParsers[LogsConsumer](), + ottl.EnableParserCollectionModifiedStatementLogging[LogsConsumer](true), + } + + for _, option := range options { + pcOptions = append(pcOptions, ottl.ParserCollectionOption[LogsConsumer](option)) + } + + pc, err := ottl.NewParserCollection(settings, pcOptions...) if err != nil { return nil, err } - sp, err := ottlscope.NewParser(ScopeFunctions(), settings) + + lpc := LogParserCollection(*pc) + return &lpc, nil +} + +func convertLogStatements(pc *ottl.ParserCollection[LogsConsumer], _ *ottl.Parser[ottllog.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottllog.TransformContext]) (LogsConsumer, error) { + contextStatements, err := toContextStatements(statements) if err != nil { return nil, err } - lpc := &LogParserCollection{ - parserCollection: parserCollection{ - settings: settings, - resourceParser: rp, - scopeParser: sp, - }, + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLog, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardLogFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr } - - for _, op := range options { - err := op(lpc) - if err != nil { - return nil, err - } - } - - return lpc, nil + lStatements := ottllog.NewStatementSequence(parsedStatements, pc.Settings, ottllog.WithStatementSequenceErrorMode(pc.ErrorMode)) + return logStatements{lStatements, globalExpr}, nil } -func (pc LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Logs, error) { - switch contextStatements.Context { - case Log: - parsedStatements, err := pc.logParser.ParseStatements(contextStatements.Statements) - if err != nil { - return nil, err - } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLog, contextStatements.Conditions, pc.parserCollection, filterottl.StandardLogFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr - } - lStatements := ottllog.NewStatementSequence(parsedStatements, pc.settings, ottllog.WithStatementSequenceErrorMode(pc.errorMode)) - return logStatements{lStatements, globalExpr}, nil - default: - statements, err := pc.parseCommonContextStatements(contextStatements) - if err != nil { - return nil, err - } - return statements, nil +func (lpc *LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (LogsConsumer, error) { + pc := ottl.ParserCollection[LogsConsumer](*lpc) + if contextStatements.Context != "" { + return pc.ParseStatementsWithContext(string(contextStatements.Context), contextStatements, true) } + return pc.ParseStatements(contextStatements) } diff --git a/processor/transformprocessor/internal/common/metrics.go b/processor/transformprocessor/internal/common/metrics.go index f4cec79cd15e..fdc9f455f99c 100644 --- a/processor/transformprocessor/internal/common/metrics.go +++ b/processor/transformprocessor/internal/common/metrics.go @@ -7,7 +7,6 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -16,31 +15,30 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" ) -var _ consumer.Metrics = &metricStatements{} +type MetricsConsumer interface { + Context() ContextID + ConsumeMetrics(ctx context.Context, md pmetric.Metrics, cache *pcommon.Map) error +} type metricStatements struct { ottl.StatementSequence[ottlmetric.TransformContext] expr.BoolExpr[ottlmetric.TransformContext] } -func (m metricStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (m metricStatements) Context() ContextID { + return Metric } -func (m metricStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { +func (m metricStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics, cache *pcommon.Map) error { for i := 0; i < md.ResourceMetrics().Len(); i++ { rmetrics := md.ResourceMetrics().At(i) for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { smetrics := rmetrics.ScopeMetrics().At(j) metrics := smetrics.Metrics() for k := 0; k < metrics.Len(); k++ { - tCtx := ottlmetric.NewTransformContext(metrics.At(k), smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics) + tCtx := ottlmetric.NewTransformContext(metrics.At(k), smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics, ottlmetric.WithCache(cache)) condition, err := m.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -57,20 +55,16 @@ func (m metricStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics return nil } -var _ consumer.Metrics = &dataPointStatements{} - type dataPointStatements struct { ottl.StatementSequence[ottldatapoint.TransformContext] expr.BoolExpr[ottldatapoint.TransformContext] } -func (d dataPointStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (d dataPointStatements) Context() ContextID { + return DataPoint } -func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { +func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics, cache *pcommon.Map) error { for i := 0; i < md.ResourceMetrics().Len(); i++ { rmetrics := md.ResourceMetrics().At(i) for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { @@ -78,19 +72,20 @@ func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metr metrics := smetrics.Metrics() for k := 0; k < metrics.Len(); k++ { metric := metrics.At(k) + transformContextOptions := []ottldatapoint.TransformContextOption{ottldatapoint.WithCache(cache)} var err error //exhaustive:enforce switch metric.Type() { case pmetric.MetricTypeSum: - err = d.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics) + err = d.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics, transformContextOptions) case pmetric.MetricTypeGauge: - err = d.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics) + err = d.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics, transformContextOptions) case pmetric.MetricTypeHistogram: - err = d.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics) + err = d.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics, transformContextOptions) case pmetric.MetricTypeExponentialHistogram: - err = d.handleExponentialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics) + err = d.handleExponentialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics, transformContextOptions) case pmetric.MetricTypeSummary: - err = d.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics) + err = d.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics, transformContextOptions) } if err != nil { return err @@ -101,9 +96,9 @@ func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metr return nil } -func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) error { +func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics, options []ottldatapoint.TransformContextOption) error { for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics) + tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics, options...) condition, err := d.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -118,9 +113,9 @@ func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pme return nil } -func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) error { +func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics, options []ottldatapoint.TransformContextOption) error { for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics) + tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics, options...) condition, err := d.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -135,9 +130,9 @@ func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps return nil } -func (d dataPointStatements) handleExponentialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) error { +func (d dataPointStatements) handleExponentialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics, options []ottldatapoint.TransformContextOption) error { for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics) + tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics, options...) condition, err := d.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -152,9 +147,9 @@ func (d dataPointStatements) handleExponentialHistogramDataPoints(ctx context.Co return nil } -func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) error { +func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics, options []ottldatapoint.TransformContextOption) error { for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics) + tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics, options...) condition, err := d.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -169,99 +164,83 @@ func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pm return nil } -type MetricParserCollection struct { - parserCollection - metricParser ottl.Parser[ottlmetric.TransformContext] - dataPointParser ottl.Parser[ottldatapoint.TransformContext] -} +type MetricParserCollection ottl.ParserCollection[MetricsConsumer] -type MetricParserCollectionOption func(*MetricParserCollection) error +type MetricParserCollectionOption ottl.ParserCollectionOption[MetricsConsumer] func WithMetricParser(functions map[string]ottl.Factory[ottlmetric.TransformContext]) MetricParserCollectionOption { - return func(mp *MetricParserCollection) error { - metricParser, err := ottlmetric.NewParser(functions, mp.settings) + return func(pc *ottl.ParserCollection[MetricsConsumer]) error { + metricParser, err := ottlmetric.NewParser(functions, pc.Settings, ottlmetric.EnablePathContextNames()) if err != nil { return err } - mp.metricParser = metricParser - return nil + return ottl.WithParserCollectionContext(ottlmetric.ContextName, &metricParser, convertMetricStatements)(pc) } } func WithDataPointParser(functions map[string]ottl.Factory[ottldatapoint.TransformContext]) MetricParserCollectionOption { - return func(mp *MetricParserCollection) error { - dataPointParser, err := ottldatapoint.NewParser(functions, mp.settings) + return func(pc *ottl.ParserCollection[MetricsConsumer]) error { + dataPointParser, err := ottldatapoint.NewParser(functions, pc.Settings, ottldatapoint.EnablePathContextNames()) if err != nil { return err } - mp.dataPointParser = dataPointParser - return nil + return ottl.WithParserCollectionContext(ottldatapoint.ContextName, &dataPointParser, convertDataPointStatements)(pc) } } func WithMetricErrorMode(errorMode ottl.ErrorMode) MetricParserCollectionOption { - return func(mp *MetricParserCollection) error { - mp.errorMode = errorMode - return nil - } + return MetricParserCollectionOption(ottl.WithParserCollectionErrorMode[MetricsConsumer](errorMode)) } func NewMetricParserCollection(settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) { - rp, err := ottlresource.NewParser(ResourceFunctions(), settings) + pcOptions := []ottl.ParserCollectionOption[MetricsConsumer]{ + withCommonContextParsers[MetricsConsumer](), + ottl.EnableParserCollectionModifiedStatementLogging[MetricsConsumer](true), + } + + for _, option := range options { + pcOptions = append(pcOptions, ottl.ParserCollectionOption[MetricsConsumer](option)) + } + + pc, err := ottl.NewParserCollection(settings, pcOptions...) if err != nil { return nil, err } - sp, err := ottlscope.NewParser(ScopeFunctions(), settings) + + mpc := MetricParserCollection(*pc) + return &mpc, nil +} + +func convertMetricStatements(pc *ottl.ParserCollection[MetricsConsumer], _ *ottl.Parser[ottlmetric.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottlmetric.TransformContext]) (MetricsConsumer, error) { + contextStatements, err := toContextStatements(statements) if err != nil { return nil, err } - mpc := &MetricParserCollection{ - parserCollection: parserCollection{ - settings: settings, - resourceParser: rp, - scopeParser: sp, - }, + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForMetric, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardMetricFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr } + mStatements := ottlmetric.NewStatementSequence(parsedStatements, pc.Settings, ottlmetric.WithStatementSequenceErrorMode(pc.ErrorMode)) + return metricStatements{mStatements, globalExpr}, nil +} - for _, op := range options { - err := op(mpc) - if err != nil { - return nil, err - } +func convertDataPointStatements(pc *ottl.ParserCollection[MetricsConsumer], _ *ottl.Parser[ottldatapoint.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottldatapoint.TransformContext]) (MetricsConsumer, error) { + contextStatements, err := toContextStatements(statements) + if err != nil { + return nil, err } - - return mpc, nil + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForDataPoint, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardDataPointFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr + } + dpStatements := ottldatapoint.NewStatementSequence(parsedStatements, pc.Settings, ottldatapoint.WithStatementSequenceErrorMode(pc.ErrorMode)) + return dataPointStatements{dpStatements, globalExpr}, nil } -func (pc MetricParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Metrics, error) { - switch contextStatements.Context { - case Metric: - parseStatements, err := pc.metricParser.ParseStatements(contextStatements.Statements) - if err != nil { - return nil, err - } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForMetric, contextStatements.Conditions, pc.parserCollection, filterottl.StandardMetricFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr - } - mStatements := ottlmetric.NewStatementSequence(parseStatements, pc.settings, ottlmetric.WithStatementSequenceErrorMode(pc.errorMode)) - return metricStatements{mStatements, globalExpr}, nil - case DataPoint: - parsedStatements, err := pc.dataPointParser.ParseStatements(contextStatements.Statements) - if err != nil { - return nil, err - } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForDataPoint, contextStatements.Conditions, pc.parserCollection, filterottl.StandardDataPointFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr - } - dpStatements := ottldatapoint.NewStatementSequence(parsedStatements, pc.settings, ottldatapoint.WithStatementSequenceErrorMode(pc.errorMode)) - return dataPointStatements{dpStatements, globalExpr}, nil - default: - statements, err := pc.parseCommonContextStatements(contextStatements) - if err != nil { - return nil, err - } - return statements, nil +func (mpc *MetricParserCollection) ParseContextStatements(contextStatements ContextStatements) (MetricsConsumer, error) { + pc := ottl.ParserCollection[MetricsConsumer](*mpc) + if contextStatements.Context != "" { + return pc.ParseStatementsWithContext(string(contextStatements.Context), contextStatements, true) } + return pc.ParseStatements(contextStatements) } diff --git a/processor/transformprocessor/internal/common/processor.go b/processor/transformprocessor/internal/common/processor.go index 40b984ed4ee5..4bc86eb518d8 100644 --- a/processor/transformprocessor/internal/common/processor.go +++ b/processor/transformprocessor/internal/common/processor.go @@ -5,10 +5,9 @@ package common // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "context" - "fmt" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -20,28 +19,21 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" ) -var ( - _ consumer.Traces = &resourceStatements{} - _ consumer.Metrics = &resourceStatements{} - _ consumer.Logs = &resourceStatements{} - _ baseContext = &resourceStatements{} -) +var _ baseContext = &resourceStatements{} type resourceStatements struct { ottl.StatementSequence[ottlresource.TransformContext] expr.BoolExpr[ottlresource.TransformContext] } -func (r resourceStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (r resourceStatements) Context() ContextID { + return Resource } -func (r resourceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { +func (r resourceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces, cache *pcommon.Map) error { for i := 0; i < td.ResourceSpans().Len(); i++ { rspans := td.ResourceSpans().At(i) - tCtx := ottlresource.NewTransformContext(rspans.Resource(), rspans) + tCtx := ottlresource.NewTransformContext(rspans.Resource(), rspans, ottlresource.WithCache(cache)) condition, err := r.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -56,10 +48,10 @@ func (r resourceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) return nil } -func (r resourceStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { +func (r resourceStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics, cache *pcommon.Map) error { for i := 0; i < md.ResourceMetrics().Len(); i++ { rmetrics := md.ResourceMetrics().At(i) - tCtx := ottlresource.NewTransformContext(rmetrics.Resource(), rmetrics) + tCtx := ottlresource.NewTransformContext(rmetrics.Resource(), rmetrics, ottlresource.WithCache(cache)) condition, err := r.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -74,10 +66,10 @@ func (r resourceStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metri return nil } -func (r resourceStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { +func (r resourceStatements) ConsumeLogs(ctx context.Context, ld plog.Logs, cache *pcommon.Map) error { for i := 0; i < ld.ResourceLogs().Len(); i++ { rlogs := ld.ResourceLogs().At(i) - tCtx := ottlresource.NewTransformContext(rlogs.Resource(), rlogs) + tCtx := ottlresource.NewTransformContext(rlogs.Resource(), rlogs, ottlresource.WithCache(cache)) condition, err := r.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -92,30 +84,23 @@ func (r resourceStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error return nil } -var ( - _ consumer.Traces = &scopeStatements{} - _ consumer.Metrics = &scopeStatements{} - _ consumer.Logs = &scopeStatements{} - _ baseContext = &scopeStatements{} -) +var _ baseContext = &scopeStatements{} type scopeStatements struct { ottl.StatementSequence[ottlscope.TransformContext] expr.BoolExpr[ottlscope.TransformContext] } -func (s scopeStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (s scopeStatements) Context() ContextID { + return Scope } -func (s scopeStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { +func (s scopeStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces, cache *pcommon.Map) error { for i := 0; i < td.ResourceSpans().Len(); i++ { rspans := td.ResourceSpans().At(i) for j := 0; j < rspans.ScopeSpans().Len(); j++ { sspans := rspans.ScopeSpans().At(j) - tCtx := ottlscope.NewTransformContext(sspans.Scope(), rspans.Resource(), sspans) + tCtx := ottlscope.NewTransformContext(sspans.Scope(), rspans.Resource(), sspans, ottlscope.WithCache(cache)) condition, err := s.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -131,12 +116,12 @@ func (s scopeStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) er return nil } -func (s scopeStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { +func (s scopeStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics, cache *pcommon.Map) error { for i := 0; i < md.ResourceMetrics().Len(); i++ { rmetrics := md.ResourceMetrics().At(i) for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { smetrics := rmetrics.ScopeMetrics().At(j) - tCtx := ottlscope.NewTransformContext(smetrics.Scope(), rmetrics.Resource(), smetrics) + tCtx := ottlscope.NewTransformContext(smetrics.Scope(), rmetrics.Resource(), smetrics, ottlscope.WithCache(cache)) condition, err := s.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -152,12 +137,12 @@ func (s scopeStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) return nil } -func (s scopeStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { +func (s scopeStatements) ConsumeLogs(ctx context.Context, ld plog.Logs, cache *pcommon.Map) error { for i := 0; i < ld.ResourceLogs().Len(); i++ { rlogs := ld.ResourceLogs().At(i) for j := 0; j < rlogs.ScopeLogs().Len(); j++ { slogs := rlogs.ScopeLogs().At(j) - tCtx := ottlscope.NewTransformContext(slogs.Scope(), rlogs.Resource(), slogs) + tCtx := ottlscope.NewTransformContext(slogs.Scope(), rlogs.Resource(), slogs, ottlscope.WithCache(cache)) condition, err := s.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -173,56 +158,86 @@ func (s scopeStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return nil } -type parserCollection struct { - settings component.TelemetrySettings - resourceParser ottl.Parser[ottlresource.TransformContext] - scopeParser ottl.Parser[ottlscope.TransformContext] - errorMode ottl.ErrorMode -} - type baseContext interface { - consumer.Traces - consumer.Metrics - consumer.Logs + TracesConsumer + MetricsConsumer + LogsConsumer } -func (pc parserCollection) parseCommonContextStatements(contextStatement ContextStatements) (baseContext, error) { - switch contextStatement.Context { - case Resource: - parsedStatements, err := pc.resourceParser.ParseStatements(contextStatement.Statements) +func withCommonContextParsers[R any]() ottl.ParserCollectionOption[R] { + return func(pc *ottl.ParserCollection[R]) error { + rp, err := ottlresource.NewParser(ResourceFunctions(), pc.Settings, ottlresource.EnablePathContextNames()) if err != nil { - return nil, err + return err } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForResource, contextStatement.Conditions, pc, filterottl.StandardResourceFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr + sp, err := ottlscope.NewParser(ScopeFunctions(), pc.Settings, ottlscope.EnablePathContextNames()) + if err != nil { + return err } - rStatements := ottlresource.NewStatementSequence(parsedStatements, pc.settings, ottlresource.WithStatementSequenceErrorMode(pc.errorMode)) - return resourceStatements{rStatements, globalExpr}, nil - case Scope: - parsedStatements, err := pc.scopeParser.ParseStatements(contextStatement.Statements) + + err = ottl.WithParserCollectionContext[ottlresource.TransformContext, R](ottlresource.ContextName, &rp, parseResourceContextStatements)(pc) if err != nil { - return nil, err + return err } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForScope, contextStatement.Conditions, pc, filterottl.StandardScopeFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr + + err = ottl.WithParserCollectionContext[ottlscope.TransformContext, R](ottlscope.ContextName, &sp, parseScopeContextStatements)(pc) + if err != nil { + return err } - sStatements := ottlscope.NewStatementSequence(parsedStatements, pc.settings, ottlscope.WithStatementSequenceErrorMode(pc.errorMode)) - return scopeStatements{sStatements, globalExpr}, nil - default: - return nil, fmt.Errorf("unknown context %v", contextStatement.Context) + + return nil + } +} + +func parseResourceContextStatements[R any]( + pc *ottl.ParserCollection[R], + _ *ottl.Parser[ottlresource.TransformContext], + _ string, + statements ottl.StatementsGetter, + parsedStatements []*ottl.Statement[ottlresource.TransformContext], +) (R, error) { + contextStatements, err := toContextStatements(statements) + if err != nil { + return *new(R), err + } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForResource, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardResourceFuncs()) + if errGlobalBoolExpr != nil { + return *new(R), errGlobalBoolExpr + } + rStatements := ottlresource.NewStatementSequence(parsedStatements, pc.Settings, ottlresource.WithStatementSequenceErrorMode(pc.ErrorMode)) + result := (baseContext)(resourceStatements{rStatements, globalExpr}) + return result.(R), nil +} + +func parseScopeContextStatements[R any]( + pc *ottl.ParserCollection[R], + _ *ottl.Parser[ottlscope.TransformContext], + _ string, + statements ottl.StatementsGetter, + parsedStatements []*ottl.Statement[ottlscope.TransformContext], +) (R, error) { + contextStatements, err := toContextStatements(statements) + if err != nil { + return *new(R), err + } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForScope, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardScopeFuncs()) + if errGlobalBoolExpr != nil { + return *new(R), errGlobalBoolExpr } + sStatements := ottlscope.NewStatementSequence(parsedStatements, pc.Settings, ottlscope.WithStatementSequenceErrorMode(pc.ErrorMode)) + result := (baseContext)(scopeStatements{sStatements, globalExpr}) + return result.(R), nil } func parseGlobalExpr[K any]( boolExprFunc func([]string, map[string]ottl.Factory[K], ottl.ErrorMode, component.TelemetrySettings) (*ottl.ConditionSequence[K], error), conditions []string, - pc parserCollection, + errorMode ottl.ErrorMode, + settings component.TelemetrySettings, standardFuncs map[string]ottl.Factory[K], ) (expr.BoolExpr[K], error) { if len(conditions) > 0 { - return boolExprFunc(conditions, standardFuncs, pc.errorMode, pc.settings) + return boolExprFunc(conditions, standardFuncs, errorMode, settings) } // By default, set the global expression to always true unless conditions are specified. return expr.AlwaysTrue[K](), nil diff --git a/processor/transformprocessor/internal/common/traces.go b/processor/transformprocessor/internal/common/traces.go index de03b8afe917..84cf139a8309 100644 --- a/processor/transformprocessor/internal/common/traces.go +++ b/processor/transformprocessor/internal/common/traces.go @@ -7,39 +7,38 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent" ) -var _ consumer.Traces = &traceStatements{} +type TracesConsumer interface { + Context() ContextID + ConsumeTraces(ctx context.Context, td ptrace.Traces, cache *pcommon.Map) error +} type traceStatements struct { ottl.StatementSequence[ottlspan.TransformContext] expr.BoolExpr[ottlspan.TransformContext] } -func (t traceStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (t traceStatements) Context() ContextID { + return Span } -func (t traceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { +func (t traceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces, cache *pcommon.Map) error { for i := 0; i < td.ResourceSpans().Len(); i++ { rspans := td.ResourceSpans().At(i) for j := 0; j < rspans.ScopeSpans().Len(); j++ { sspans := rspans.ScopeSpans().At(j) spans := sspans.Spans() for k := 0; k < spans.Len(); k++ { - tCtx := ottlspan.NewTransformContext(spans.At(k), sspans.Scope(), rspans.Resource(), sspans, rspans) + tCtx := ottlspan.NewTransformContext(spans.At(k), sspans.Scope(), rspans.Resource(), sspans, rspans, ottlspan.WithCache(cache)) condition, err := t.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -56,20 +55,16 @@ func (t traceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) er return nil } -var _ consumer.Traces = &spanEventStatements{} - type spanEventStatements struct { ottl.StatementSequence[ottlspanevent.TransformContext] expr.BoolExpr[ottlspanevent.TransformContext] } -func (s spanEventStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (s spanEventStatements) Context() ContextID { + return SpanEvent } -func (s spanEventStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { +func (s spanEventStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces, cache *pcommon.Map) error { for i := 0; i < td.ResourceSpans().Len(); i++ { rspans := td.ResourceSpans().At(i) for j := 0; j < rspans.ScopeSpans().Len(); j++ { @@ -79,7 +74,7 @@ func (s spanEventStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces span := spans.At(k) spanEvents := span.Events() for n := 0; n < spanEvents.Len(); n++ { - tCtx := ottlspanevent.NewTransformContext(spanEvents.At(n), span, sspans.Scope(), rspans.Resource(), sspans, rspans) + tCtx := ottlspanevent.NewTransformContext(spanEvents.At(n), span, sspans.Scope(), rspans.Resource(), sspans, rspans, ottlspanevent.WithCache(cache)) condition, err := s.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -97,95 +92,83 @@ func (s spanEventStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces return nil } -type TraceParserCollection struct { - parserCollection - spanParser ottl.Parser[ottlspan.TransformContext] - spanEventParser ottl.Parser[ottlspanevent.TransformContext] -} +type TraceParserCollection ottl.ParserCollection[TracesConsumer] -type TraceParserCollectionOption func(*TraceParserCollection) error +type TraceParserCollectionOption ottl.ParserCollectionOption[TracesConsumer] func WithSpanParser(functions map[string]ottl.Factory[ottlspan.TransformContext]) TraceParserCollectionOption { - return func(tp *TraceParserCollection) error { - spanParser, err := ottlspan.NewParser(functions, tp.settings) + return func(pc *ottl.ParserCollection[TracesConsumer]) error { + parser, err := ottlspan.NewParser(functions, pc.Settings, ottlspan.EnablePathContextNames()) if err != nil { return err } - tp.spanParser = spanParser - return nil + return ottl.WithParserCollectionContext(ottlspan.ContextName, &parser, convertSpanStatements)(pc) } } func WithSpanEventParser(functions map[string]ottl.Factory[ottlspanevent.TransformContext]) TraceParserCollectionOption { - return func(tp *TraceParserCollection) error { - spanEventParser, err := ottlspanevent.NewParser(functions, tp.settings) + return func(pc *ottl.ParserCollection[TracesConsumer]) error { + parser, err := ottlspanevent.NewParser(functions, pc.Settings, ottlspanevent.EnablePathContextNames()) if err != nil { return err } - tp.spanEventParser = spanEventParser - return nil + return ottl.WithParserCollectionContext(ottlspanevent.ContextName, &parser, convertSpanEventStatements)(pc) } } func WithTraceErrorMode(errorMode ottl.ErrorMode) TraceParserCollectionOption { - return func(tp *TraceParserCollection) error { - tp.errorMode = errorMode - return nil - } + return TraceParserCollectionOption(ottl.WithParserCollectionErrorMode[TracesConsumer](errorMode)) } func NewTraceParserCollection(settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) { - rp, err := ottlresource.NewParser(ResourceFunctions(), settings) + pcOptions := []ottl.ParserCollectionOption[TracesConsumer]{ + withCommonContextParsers[TracesConsumer](), + ottl.EnableParserCollectionModifiedStatementLogging[TracesConsumer](true), + } + + for _, option := range options { + pcOptions = append(pcOptions, ottl.ParserCollectionOption[TracesConsumer](option)) + } + + pc, err := ottl.NewParserCollection(settings, pcOptions...) if err != nil { return nil, err } - sp, err := ottlscope.NewParser(ScopeFunctions(), settings) + + tpc := TraceParserCollection(*pc) + return &tpc, nil +} + +func convertSpanStatements(pc *ottl.ParserCollection[TracesConsumer], _ *ottl.Parser[ottlspan.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottlspan.TransformContext]) (TracesConsumer, error) { + contextStatements, err := toContextStatements(statements) if err != nil { return nil, err } - tpc := &TraceParserCollection{ - parserCollection: parserCollection{ - settings: settings, - resourceParser: rp, - scopeParser: sp, - }, + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpan, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardSpanFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr } + sStatements := ottlspan.NewStatementSequence(parsedStatements, pc.Settings, ottlspan.WithStatementSequenceErrorMode(pc.ErrorMode)) + return traceStatements{sStatements, globalExpr}, nil +} - for _, op := range options { - err := op(tpc) - if err != nil { - return nil, err - } +func convertSpanEventStatements(pc *ottl.ParserCollection[TracesConsumer], _ *ottl.Parser[ottlspanevent.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottlspanevent.TransformContext]) (TracesConsumer, error) { + contextStatements, err := toContextStatements(statements) + if err != nil { + return nil, err } - - return tpc, nil + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpanEvent, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardSpanEventFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr + } + seStatements := ottlspanevent.NewStatementSequence(parsedStatements, pc.Settings, ottlspanevent.WithStatementSequenceErrorMode(pc.ErrorMode)) + return spanEventStatements{seStatements, globalExpr}, nil } -func (pc TraceParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Traces, error) { - switch contextStatements.Context { - case Span: - parsedStatements, err := pc.spanParser.ParseStatements(contextStatements.Statements) - if err != nil { - return nil, err - } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpan, contextStatements.Conditions, pc.parserCollection, filterottl.StandardSpanFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr - } - sStatements := ottlspan.NewStatementSequence(parsedStatements, pc.settings, ottlspan.WithStatementSequenceErrorMode(pc.errorMode)) - return traceStatements{sStatements, globalExpr}, nil - case SpanEvent: - parsedStatements, err := pc.spanEventParser.ParseStatements(contextStatements.Statements) - if err != nil { - return nil, err - } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpanEvent, contextStatements.Conditions, pc.parserCollection, filterottl.StandardSpanEventFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr - } - seStatements := ottlspanevent.NewStatementSequence(parsedStatements, pc.settings, ottlspanevent.WithStatementSequenceErrorMode(pc.errorMode)) - return spanEventStatements{seStatements, globalExpr}, nil - default: - return pc.parseCommonContextStatements(contextStatements) +func (tpc *TraceParserCollection) ParseContextStatements(contextStatements ContextStatements) (TracesConsumer, error) { + pc := ottl.ParserCollection[TracesConsumer](*tpc) + if contextStatements.Context != "" { + return pc.ParseStatementsWithContext(string(contextStatements.Context), contextStatements, true) } + return pc.ParseStatements(contextStatements) } diff --git a/processor/transformprocessor/internal/logs/processor.go b/processor/transformprocessor/internal/logs/processor.go index e2b184f3c8d7..011a6fa40114 100644 --- a/processor/transformprocessor/internal/logs/processor.go +++ b/processor/transformprocessor/internal/logs/processor.go @@ -7,7 +7,7 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/multierr" "go.uber.org/zap" @@ -18,7 +18,7 @@ import ( ) type Processor struct { - contexts []consumer.Logs + contexts []common.LogsConsumer logger *zap.Logger flatMode bool } @@ -29,7 +29,7 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E return nil, err } - contexts := make([]consumer.Logs, len(contextStatements)) + contexts := make([]common.LogsConsumer, len(contextStatements)) var errors error for i, cs := range contextStatements { context, err := pc.ParseContextStatements(cs) @@ -56,7 +56,8 @@ func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, e defer pdatautil.GroupByResourceLogs(ld.ResourceLogs()) } for _, c := range p.contexts { - err := c.ConsumeLogs(ctx, ld) + cache := pcommon.NewMap() + err := c.ConsumeLogs(ctx, ld, &cache) if err != nil { p.logger.Error("failed processing logs", zap.Error(err)) return ld, err diff --git a/processor/transformprocessor/internal/logs/processor_test.go b/processor/transformprocessor/internal/logs/processor_test.go index 448328138c21..f3aee564b923 100644 --- a/processor/transformprocessor/internal/logs/processor_test.go +++ b/processor/transformprocessor/internal/logs/processor_test.go @@ -69,6 +69,47 @@ func Test_ProcessLogs_ResourceContext(t *testing.T) { } } +func Test_ProcessLogs_InferredResourceContext(t *testing.T) { + tests := []struct { + statement string + want func(td plog.Logs) + }{ + { + statement: `set(resource.attributes["test"], "pass")`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(resource.attributes["test"], "pass") where resource.attributes["host.name"] == "wrong"`, + want: func(_ plog.Logs) { + }, + }, + { + statement: `set(resource.schema_url, "test_schema_url")`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).SetSchemaUrl("test_schema_url") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructLogs() + processor, err := NewProcessor([]common.ContextStatements{{Context: "", Statements: []string{tt.statement}}}, ottl.IgnoreError, false, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessLogs(context.Background(), td) + assert.NoError(t, err) + + exTd := constructLogs() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func Test_ProcessLogs_ScopeContext(t *testing.T) { tests := []struct { statement string @@ -110,6 +151,47 @@ func Test_ProcessLogs_ScopeContext(t *testing.T) { } } +func Test_ProcessLogs_InferredScopeContext(t *testing.T) { + tests := []struct { + statement string + want func(td plog.Logs) + }{ + { + statement: `set(scope.attributes["test"], "pass") where scope.name == "scope"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(scope.attributes["test"], "pass") where scope.version == 2`, + want: func(_ plog.Logs) { + }, + }, + { + statement: `set(scope.schema_url, "test_schema_url")`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).SetSchemaUrl("test_schema_url") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructLogs() + processor, err := NewProcessor([]common.ContextStatements{{Context: "", Statements: []string{tt.statement}}}, ottl.IgnoreError, false, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessLogs(context.Background(), td) + assert.NoError(t, err) + + exTd := constructLogs() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func Test_ProcessLogs_LogContext(t *testing.T) { tests := []struct { statement string @@ -364,6 +446,260 @@ func Test_ProcessLogs_LogContext(t *testing.T) { } } +func Test_ProcessLogs_InferredLogContext(t *testing.T) { + tests := []struct { + statement string + want func(td plog.Logs) + }{ + { + statement: `set(log.attributes["test"], "pass") where log.body == "operationA"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(log.attributes["test"], "pass") where resource.attributes["host.name"] == "localhost"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `keep_keys(log.attributes, ["http.method"]) where log.body == "operationA"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Clear() + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("http.method", + "get") + }, + }, + { + statement: `set(log.severity_text, "ok") where log.attributes["http.path"] == "/health"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).SetSeverityText("ok") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).SetSeverityText("ok") + }, + }, + { + statement: `replace_pattern(log.attributes["http.method"], "get", "post")`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("http.method", "post") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("http.method", "post") + }, + }, + { + statement: `replace_all_patterns(log.attributes, "value", "get", "post")`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("http.method", "post") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("http.method", "post") + }, + }, + { + statement: `replace_all_patterns(log.attributes, "key", "http.url", "url")`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Clear() + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("http.method", "get") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("http.path", "/health") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("url", "http://localhost/health") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("flags", "A|B|C") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("total.string", "123456789") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().Clear() + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("http.method", "get") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("http.path", "/health") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("url", "http://localhost/health") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("flags", "C|D") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("total.string", "345678") + }, + }, + { + statement: `set(log.attributes["test"], "pass") where log.dropped_attributes_count == 1`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(log.attributes["test"], "pass") where log.flags == 1`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(log.attributes["test"], "pass") where log.severity_number == SEVERITY_NUMBER_TRACE`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(log.severity_number, SEVERITY_NUMBER_TRACE2) where log.severity_number == 1`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).SetSeverityNumber(2) + }, + }, + { + statement: `set(log.attributes["test"], "pass") where log.trace_id == TraceID(0x0102030405060708090a0b0c0d0e0f10)`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(log.attributes["test"], "pass") where log.span_id == SpanID(0x0102030405060708)`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(log.attributes["test"], "pass") where IsMatch(log.body, "operation[AC]")`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `delete_key(log.attributes, "http.url") where log.body == "operationA"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Clear() + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("http.method", + "get") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("http.path", + "/health") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("total.string", + "123456789") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("flags", + "A|B|C") + }, + }, + { + statement: `delete_matching_keys(log.attributes, "http.*t.*") where log.body == "operationA"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Clear() + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("http.url", + "http://localhost/health") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("flags", + "A|B|C") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("total.string", + "123456789") + }, + }, + { + statement: `set(log.attributes["test"], Concat([log.attributes["http.method"], log.attributes["http.url"]], ": ")) where log.body == Concat(["operation", "A"], "")`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "get: http://localhost/health") + }, + }, + { + statement: `set(log.attributes["test"], Split(log.attributes["flags"], "|"))`, + want: func(td plog.Logs) { + v1 := td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutEmptySlice("test") + v1.AppendEmpty().SetStr("A") + v1.AppendEmpty().SetStr("B") + v1.AppendEmpty().SetStr("C") + v2 := td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutEmptySlice("test") + v2.AppendEmpty().SetStr("C") + v2.AppendEmpty().SetStr("D") + }, + }, + { + statement: `set(log.attributes["test"], Split(log.attributes["flags"], "|")) where log.body == "operationA"`, + want: func(td plog.Logs) { + newValue := td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutEmptySlice("test") + newValue.AppendEmpty().SetStr("A") + newValue.AppendEmpty().SetStr("B") + newValue.AppendEmpty().SetStr("C") + }, + }, + { + statement: `set(log.attributes["test"], Split(log.attributes["not_exist"], "|"))`, + want: func(_ plog.Logs) {}, + }, + { + statement: `set(log.attributes["test"], Substring(log.attributes["total.string"], 3, 3))`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "456") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "678") + }, + }, + { + statement: `set(log.attributes["test"], Substring(log.attributes["total.string"], 3, 3)) where log.body == "operationA"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "456") + }, + }, + { + statement: `set(log.attributes["test"], Substring(log.attributes["not_exist"], 3, 3))`, + want: func(_ plog.Logs) {}, + }, + { + statement: `set(log.attributes["test"], ["A", "B", "C"]) where log.body == "operationA"`, + want: func(td plog.Logs) { + v1 := td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutEmptySlice("test") + v1.AppendEmpty().SetStr("A") + v1.AppendEmpty().SetStr("B") + v1.AppendEmpty().SetStr("C") + }, + }, + { + statement: `set(log.attributes["test"], ConvertCase(log.body, "lower")) where log.body == "operationA"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "operationa") + }, + }, + { + statement: `set(log.attributes["test"], ConvertCase(log.body, "upper")) where log.body == "operationA"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "OPERATIONA") + }, + }, + { + statement: `set(log.attributes["test"], ConvertCase(log.body, "snake")) where log.body == "operationA"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "operation_a") + }, + }, + { + statement: `set(log.attributes["test"], ConvertCase(log.body, "camel")) where log.body == "operationA"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "OperationA") + }, + }, + { + statement: `merge_maps(log.attributes, ParseJSON("{\"json_test\":\"pass\"}"), "insert") where log.body == "operationA"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("json_test", "pass") + }, + }, + { + statement: `limit(log.attributes, 0, []) where log.body == "operationA"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().RemoveIf(func(_ string, _ pcommon.Value) bool { return true }) + }, + }, + { + statement: `set(log.attributes["test"], Log(1)) where log.body == "operationA"`, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutDouble("test", 0.0) + }, + }, + { + statement: `replace_match(log.body["metadata"]["uid"], "*", "12345")`, + want: func(_ plog.Logs) {}, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructLogs() + processor, err := NewProcessor([]common.ContextStatements{{Context: "", Statements: []string{tt.statement}}}, ottl.IgnoreError, false, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessLogs(context.Background(), td) + assert.NoError(t, err) + + exTd := constructLogs() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func Test_ProcessLogs_MixContext(t *testing.T) { tests := []struct { name string @@ -481,7 +817,97 @@ func Test_ProcessLogs_MixContext(t *testing.T) { } } -func Test_ProcessTraces_Error(t *testing.T) { +func Test_ProcessLogs_InferredMixContext(t *testing.T) { + tests := []struct { + name string + contextStatements []common.ContextStatements + want func(td plog.Logs) + }{ + { + name: "set resource and then use", + contextStatements: []common.ContextStatements{ + { + Statements: []string{`set(resource.attributes["test"], "pass")`}, + }, + { + Statements: []string{`set(log.attributes["test"], "pass") where resource.attributes["test"] == "pass"`}, + }, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).Resource().Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + name: "set scope and then use", + contextStatements: []common.ContextStatements{ + { + Statements: []string{`set(scope.attributes["test"], "pass")`}, + }, + { + Statements: []string{`set(log.attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`}, + }, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + name: "order matters", + contextStatements: []common.ContextStatements{ + { + Statements: []string{`set(log.attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`}, + }, + { + Statements: []string{`set(scope.attributes["test"], "pass")`}, + }, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + name: "reuse context", + contextStatements: []common.ContextStatements{ + { + Statements: []string{`set(scope.attributes["test"], "pass")`}, + }, + { + Statements: []string{`set(log.attributes["test"], "pass") where instrumentation_scope.attributes["test"] == "pass"`}, + }, + { + Statements: []string{`set(scope.attributes["test"], "fail")`}, + }, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "fail") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + td := constructLogs() + processor, err := NewProcessor(tt.contextStatements, ottl.IgnoreError, false, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessLogs(context.Background(), td) + assert.NoError(t, err) + + exTd := constructLogs() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func Test_ProcessLogs_ErrorMode(t *testing.T) { tests := []struct { statement string context common.ContextID diff --git a/processor/transformprocessor/internal/metrics/processor.go b/processor/transformprocessor/internal/metrics/processor.go index 135b1bcbad59..bfab0bb48313 100644 --- a/processor/transformprocessor/internal/metrics/processor.go +++ b/processor/transformprocessor/internal/metrics/processor.go @@ -7,7 +7,7 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/multierr" "go.uber.org/zap" @@ -17,7 +17,7 @@ import ( ) type Processor struct { - contexts []consumer.Metrics + contexts []common.MetricsConsumer logger *zap.Logger } @@ -27,7 +27,7 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E return nil, err } - contexts := make([]consumer.Metrics, len(contextStatements)) + contexts := make([]common.MetricsConsumer, len(contextStatements)) var errors error for i, cs := range contextStatements { context, err := pc.ParseContextStatements(cs) @@ -49,7 +49,8 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E func (p *Processor) ProcessMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { for _, c := range p.contexts { - err := c.ConsumeMetrics(ctx, md) + cache := pcommon.NewMap() + err := c.ConsumeMetrics(ctx, md, &cache) if err != nil { p.logger.Error("failed processing metrics", zap.Error(err)) return md, err diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index 128a9d00ced0..10132713998b 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -65,6 +65,47 @@ func Test_ProcessMetrics_ResourceContext(t *testing.T) { } } +func Test_ProcessMetrics_InferredResourceContext(t *testing.T) { + tests := []struct { + statement string + want func(td pmetric.Metrics) + }{ + { + statement: `set(resource.attributes["test"], "pass")`, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(resource.attributes["test"], "pass") where resource.attributes["host.name"] == "wrong"`, + want: func(_ pmetric.Metrics) { + }, + }, + { + statement: `set(resource.schema_url, "test_schema_url")`, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).SetSchemaUrl("test_schema_url") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructMetrics() + processor, err := NewProcessor([]common.ContextStatements{{Context: "", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessMetrics(context.Background(), td) + assert.NoError(t, err) + + exTd := constructMetrics() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func Test_ProcessMetrics_ScopeContext(t *testing.T) { tests := []struct { statement string @@ -106,6 +147,47 @@ func Test_ProcessMetrics_ScopeContext(t *testing.T) { } } +func Test_ProcessMetrics_InferredScopeContext(t *testing.T) { + tests := []struct { + statement string + want func(td pmetric.Metrics) + }{ + { + statement: `set(scope.attributes["test"], "pass") where scope.name == "scope"`, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(scope.attributes["test"], "pass") where scope.version == 2`, + want: func(_ pmetric.Metrics) { + }, + }, + { + statement: `set(scope.schema_url, "test_schema_url")`, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).SetSchemaUrl("test_schema_url") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructMetrics() + processor, err := NewProcessor([]common.ContextStatements{{Context: "", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessMetrics(context.Background(), td) + assert.NoError(t, err) + + exTd := constructMetrics() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func Test_ProcessMetrics_MetricContext(t *testing.T) { tests := []struct { statements []string @@ -281,6 +363,186 @@ func Test_ProcessMetrics_MetricContext(t *testing.T) { } } +func Test_ProcessMetrics_InferredMetricContext(t *testing.T) { + tests := []struct { + statements []string + want func(pmetric.Metrics) + }{ + { + statements: []string{`extract_sum_metric(true) where metric.name == "operationB"`}, + want: func(td pmetric.Metrics) { + sumMetric := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() + sumDp := sumMetric.SetEmptySum().DataPoints().AppendEmpty() + + histogramMetric := pmetric.NewMetric() + fillMetricTwo(histogramMetric) + histogramDp := histogramMetric.Histogram().DataPoints().At(0) + + sumMetric.SetDescription(histogramMetric.Description()) + sumMetric.SetName(histogramMetric.Name() + "_sum") + sumMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + sumMetric.Sum().SetIsMonotonic(true) + sumMetric.SetUnit(histogramMetric.Unit()) + + histogramDp.Attributes().CopyTo(sumDp.Attributes()) + sumDp.SetDoubleValue(histogramDp.Sum()) + sumDp.SetStartTimestamp(StartTimestamp) + + // we have two histogram datapoints, but only one of them has the Sum set + // so we should only have one Sum datapoint + }, + }, + { // this checks if subsequent statements apply to the newly created metric + statements: []string{ + `extract_sum_metric(true) where metric.name == "operationB"`, + `set(metric.name, "new_name") where metric.name == "operationB_sum"`, + }, + want: func(td pmetric.Metrics) { + sumMetric := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() + sumDp := sumMetric.SetEmptySum().DataPoints().AppendEmpty() + + histogramMetric := pmetric.NewMetric() + fillMetricTwo(histogramMetric) + histogramDp := histogramMetric.Histogram().DataPoints().At(0) + + sumMetric.SetDescription(histogramMetric.Description()) + sumMetric.SetName("new_name") + sumMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + sumMetric.Sum().SetIsMonotonic(true) + sumMetric.SetUnit(histogramMetric.Unit()) + + histogramDp.Attributes().CopyTo(sumDp.Attributes()) + sumDp.SetDoubleValue(histogramDp.Sum()) + sumDp.SetStartTimestamp(StartTimestamp) + + // we have two histogram datapoints, but only one of them has the Sum set + // so we should only have one Sum datapoint + }, + }, + { + statements: []string{`extract_count_metric(true) where metric.name == "operationB"`}, + want: func(td pmetric.Metrics) { + countMetric := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() + countMetric.SetEmptySum() + + histogramMetric := pmetric.NewMetric() + fillMetricTwo(histogramMetric) + + countMetric.SetDescription(histogramMetric.Description()) + countMetric.SetName(histogramMetric.Name() + "_count") + countMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + countMetric.Sum().SetIsMonotonic(true) + countMetric.SetUnit("1") + + histogramDp0 := histogramMetric.Histogram().DataPoints().At(0) + countDp0 := countMetric.Sum().DataPoints().AppendEmpty() + histogramDp0.Attributes().CopyTo(countDp0.Attributes()) + countDp0.SetIntValue(int64(histogramDp0.Count())) + countDp0.SetStartTimestamp(StartTimestamp) + + // we have two histogram datapoints + histogramDp1 := histogramMetric.Histogram().DataPoints().At(1) + countDp1 := countMetric.Sum().DataPoints().AppendEmpty() + histogramDp1.Attributes().CopyTo(countDp1.Attributes()) + countDp1.SetIntValue(int64(histogramDp1.Count())) + countDp1.SetStartTimestamp(StartTimestamp) + }, + }, + { + statements: []string{`copy_metric(name="http.request.status_code", unit="s") where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + newMetric := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).CopyTo(newMetric) + newMetric.SetName("http.request.status_code") + newMetric.SetUnit("s") + }, + }, + { + statements: []string{`scale_metric(10.0,"s") where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).SetDoubleValue(10.0) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).SetDoubleValue(37.0) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetUnit("s") + }, + }, + { + statements: []string{`scale_metric(10.0) where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).SetDoubleValue(10.0) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).SetDoubleValue(37.0) + }, + }, + { + statements: []string{`aggregate_on_attributes("sum", ["attr1", "attr2"]) where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + m := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + + dataPoints := pmetric.NewNumberDataPointSlice() + dataPoint1 := dataPoints.AppendEmpty() + dataPoint1.SetStartTimestamp(StartTimestamp) + dataPoint1.SetDoubleValue(4.7) + dataPoint1.Attributes().PutStr("attr1", "test1") + dataPoint1.Attributes().PutStr("attr2", "test2") + + dataPoints.CopyTo(m.Sum().DataPoints()) + }, + }, + { + statements: []string{`aggregate_on_attributes("min") where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + m := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + + dataPoints := pmetric.NewNumberDataPointSlice() + dataPoint1 := dataPoints.AppendEmpty() + dataPoint1.SetStartTimestamp(StartTimestamp) + dataPoint1.SetDoubleValue(1.0) + dataPoint1.Attributes().PutStr("attr1", "test1") + dataPoint1.Attributes().PutStr("attr2", "test2") + dataPoint1.Attributes().PutStr("attr3", "test3") + dataPoint1.Attributes().PutStr("flags", "A|B|C") + dataPoint1.Attributes().PutStr("total.string", "123456789") + + dataPoints.CopyTo(m.Sum().DataPoints()) + }, + }, + { + statements: []string{`aggregate_on_attribute_value("sum", "attr1", ["test1", "test2"], "test") where metric.name == "operationE"`}, + want: func(td pmetric.Metrics) { + m := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4) + + dataPoints := pmetric.NewNumberDataPointSlice() + dataPoint1 := dataPoints.AppendEmpty() + dataPoint1.SetStartTimestamp(StartTimestamp) + dataPoint1.SetDoubleValue(4.7) + dataPoint1.Attributes().PutStr("attr1", "test") + + dataPoints.CopyTo(m.Sum().DataPoints()) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statements[0], func(t *testing.T) { + var contextStatements []common.ContextStatements + for _, statement := range tt.statements { + contextStatements = append(contextStatements, common.ContextStatements{Context: "", Statements: []string{statement}}) + } + + td := constructMetrics() + processor, err := NewProcessor(contextStatements, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessMetrics(context.Background(), td) + assert.NoError(t, err) + + exTd := constructMetrics() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func Test_ProcessMetrics_DataPointContext(t *testing.T) { tests := []struct { statements []string @@ -724,6 +986,454 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { } } +func Test_ProcessMetrics_InferredDataPointContext(t *testing.T) { + tests := []struct { + statements []string + want func(pmetric.Metrics) + }{ + { + statements: []string{`set(datapoint.attributes["test"], "pass") where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], "pass") where resource.attributes["host.name"] == "myhost"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + statements: []string{`set(datapoint.attributes["int_value"], Int("2")) where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutInt("int_value", 2) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutInt("int_value", 2) + }, + }, + { + statements: []string{`set(datapoint.attributes["int_value"], Int(datapoint.value_double)) where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutInt("int_value", 1) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutInt("int_value", 3) + }, + }, + { + statements: []string{`keep_keys(datapoint.attributes, ["attr2"]) where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("attr2", "test2") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("attr2", "test2") + }, + }, + { + statements: []string{`set(metric.description, "test") where datapoint.attributes["attr1"] == "test1"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetDescription("test") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetDescription("test") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).SetDescription("test") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).SetDescription("test") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetDescription("test") + }, + }, + { + statements: []string{`set(metric.unit, "new unit")`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetUnit("new unit") + }, + }, + { + statements: []string{`set(metric.description, "Sum") where metric.type == METRIC_DATA_TYPE_SUM`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetDescription("Sum") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetDescription("Sum") + }, + }, + { + statements: []string{`set(metric.aggregation_temporality, AGGREGATION_TEMPORALITY_DELTA) where metric.aggregation_temporality == 0`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + }, + }, + { + statements: []string{`set(metric.is_monotonic, true) where metric.is_monotonic == false`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().SetIsMonotonic(true) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().SetIsMonotonic(true) + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], "pass") where datapoint.count == 1`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], "pass") where datapoint.scale == 1`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], "pass") where datapoint.zero_count == 1`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], "pass") where datapoint.positive.offset == 1`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], "pass") where datapoint.negative.offset == 1`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statements: []string{`replace_pattern(datapoint.attributes["attr1"], "test1", "pass")`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + }, + }, + { + statements: []string{`replace_all_patterns(datapoint.attributes, "value", "test1", "pass")`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).Sum().DataPoints().At(0).Attributes().PutStr("attr1", "pass") + }, + }, + { + statements: []string{`replace_all_patterns(datapoint.attributes, "key", "attr3", "attr4")`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("attr1", "test1") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("attr2", "test2") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("attr4", "test3") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("flags", "A|B|C") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("total.string", "123456789") + + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("attr1", "test1") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("attr2", "test2") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("attr4", "test3") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("flags", "A|B|C") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("total.string", "123456789") + + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("attr1", "test1") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("attr2", "test2") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("attr4", "test3") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("flags", "C|D") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("total.string", "345678") + + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("attr1", "test1") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("attr2", "test2") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("attr4", "test3") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("flags", "C|D") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("total.string", "345678") + + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("attr1", "test1") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("attr2", "test2") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("attr4", "test3") + + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("attr1", "test1") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("attr2", "test2") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("attr4", "test3") + + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr1", "test1") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr2", "test2") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).Summary().DataPoints().At(0).Attributes().PutStr("attr4", "test3") + }, + }, + { + statements: []string{`convert_summary_count_val_to_sum("delta", true) where metric.name == "operationD"`}, + want: func(td pmetric.Metrics) { + sumMetric := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() + sumDp := sumMetric.SetEmptySum().DataPoints().AppendEmpty() + + summaryMetric := pmetric.NewMetric() + fillMetricFour(summaryMetric) + summaryDp := summaryMetric.Summary().DataPoints().At(0) + + sumMetric.SetDescription(summaryMetric.Description()) + sumMetric.SetName(summaryMetric.Name() + "_count") + sumMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + sumMetric.Sum().SetIsMonotonic(true) + sumMetric.SetUnit(summaryMetric.Unit()) + + summaryDp.Attributes().CopyTo(sumDp.Attributes()) + sumDp.SetIntValue(int64(summaryDp.Count())) + sumDp.SetStartTimestamp(StartTimestamp) + sumDp.SetTimestamp(TestTimeStamp) + }, + }, + { + statements: []string{`convert_summary_sum_val_to_sum("delta", true) where metric.name == "operationD"`}, + want: func(td pmetric.Metrics) { + sumMetric := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() + sumDp := sumMetric.SetEmptySum().DataPoints().AppendEmpty() + + summaryMetric := pmetric.NewMetric() + fillMetricFour(summaryMetric) + summaryDp := summaryMetric.Summary().DataPoints().At(0) + + sumMetric.SetDescription(summaryMetric.Description()) + sumMetric.SetName(summaryMetric.Name() + "_sum") + sumMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + sumMetric.Sum().SetIsMonotonic(true) + sumMetric.SetUnit(summaryMetric.Unit()) + + summaryDp.Attributes().CopyTo(sumDp.Attributes()) + sumDp.SetDoubleValue(summaryDp.Sum()) + sumDp.SetStartTimestamp(StartTimestamp) + sumDp.SetTimestamp(TestTimeStamp) + }, + }, + { + statements: []string{ + `convert_summary_sum_val_to_sum("delta", true) where metric.name == "operationD"`, + `set(metric.unit, "new unit")`, + }, + want: func(td pmetric.Metrics) { + sumMetric := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() + sumDp := sumMetric.SetEmptySum().DataPoints().AppendEmpty() + + summaryMetric := pmetric.NewMetric() + fillMetricFour(summaryMetric) + summaryDp := summaryMetric.Summary().DataPoints().At(0) + + sumMetric.SetDescription(summaryMetric.Description()) + sumMetric.SetName(summaryMetric.Name() + "_sum") + sumMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + sumMetric.Sum().SetIsMonotonic(true) + sumMetric.SetUnit("new unit") + + summaryDp.Attributes().CopyTo(sumDp.Attributes()) + sumDp.SetDoubleValue(summaryDp.Sum()) + sumDp.SetStartTimestamp(StartTimestamp) + sumDp.SetTimestamp(TestTimeStamp) + + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(3).SetUnit("new unit") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(4).SetUnit("new unit") + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], "pass") where IsMatch(metric.name, "operation[AC]")`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(2).ExponentialHistogram().DataPoints().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + statements: []string{`delete_key(datapoint.attributes, "attr3") where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("attr1", "test1") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("attr2", "test2") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("total.string", "123456789") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("flags", "A|B|C") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("attr1", "test1") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("attr2", "test2") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("total.string", "123456789") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("flags", "A|B|C") + }, + }, + { + statements: []string{`delete_matching_keys(datapoint.attributes, "[23]") where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("attr1", "test1") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("flags", "A|B|C") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("total.string", "123456789") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().Clear() + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("attr1", "test1") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("flags", "A|B|C") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("total.string", "123456789") + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], Concat([datapoint.attributes["attr1"], datapoint.attributes["attr2"]], "-")) where metric.name == Concat(["operation", "A"], "")`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "test1-test2") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "test1-test2") + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], Split(datapoint.attributes["flags"], "|"))`}, + want: func(td pmetric.Metrics) { + v00 := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutEmptySlice("test") + v00.AppendEmpty().SetStr("A") + v00.AppendEmpty().SetStr("B") + v00.AppendEmpty().SetStr("C") + v01 := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutEmptySlice("test") + v01.AppendEmpty().SetStr("A") + v01.AppendEmpty().SetStr("B") + v01.AppendEmpty().SetStr("C") + v10 := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutEmptySlice("test") + v10.AppendEmpty().SetStr("C") + v10.AppendEmpty().SetStr("D") + v11 := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutEmptySlice("test") + v11.AppendEmpty().SetStr("C") + v11.AppendEmpty().SetStr("D") + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], Split(datapoint.attributes["flags"], "|")) where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + v00 := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutEmptySlice("test") + v00.AppendEmpty().SetStr("A") + v00.AppendEmpty().SetStr("B") + v00.AppendEmpty().SetStr("C") + v01 := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutEmptySlice("test") + v01.AppendEmpty().SetStr("A") + v01.AppendEmpty().SetStr("B") + v01.AppendEmpty().SetStr("C") + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], Split(datapoint.attributes["not_exist"], "|"))`}, + want: func(_ pmetric.Metrics) {}, + }, + { + statements: []string{`set(datapoint.attributes["test"], Substring(datapoint.attributes["total.string"], 3, 3))`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "456") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "456") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(0).Attributes().PutStr("test", "678") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Histogram().DataPoints().At(1).Attributes().PutStr("test", "678") + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], Substring(datapoint.attributes["total.string"], 3, 3)) where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "456") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "456") + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], Substring(datapoint.attributes["not_exist"], 3, 3))`}, + want: func(_ pmetric.Metrics) {}, + }, + { + statements: []string{ + `set(datapoint.attributes["test_lower"], ConvertCase(metric.name, "lower")) where metric.name == "operationA"`, + `set(datapoint.attributes["test_upper"], ConvertCase(metric.name, "upper")) where metric.name == "operationA"`, + `set(datapoint.attributes["test_snake"], ConvertCase(metric.name, "snake")) where metric.name == "operationA"`, + `set(datapoint.attributes["test_camel"], ConvertCase(metric.name, "camel")) where metric.name == "operationA"`, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test_lower", "operationa") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test_lower", "operationa") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test_upper", "OPERATIONA") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test_upper", "OPERATIONA") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test_snake", "operation_a") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test_snake", "operation_a") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test_camel", "OperationA") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test_camel", "OperationA") + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], ["A", "B", "C"]) where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + v00 := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutEmptySlice("test") + v00.AppendEmpty().SetStr("A") + v00.AppendEmpty().SetStr("B") + v00.AppendEmpty().SetStr("C") + v01 := td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutEmptySlice("test") + v01.AppendEmpty().SetStr("A") + v01.AppendEmpty().SetStr("B") + v01.AppendEmpty().SetStr("C") + }, + }, + { + statements: []string{`merge_maps(datapoint.attributes, ParseJSON("{\"json_test\":\"pass\"}"), "insert") where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("json_test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("json_test", "pass") + }, + }, + { + statements: []string{`limit(datapoint.attributes, 0, []) where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().RemoveIf(func(_ string, _ pcommon.Value) bool { return true }) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().RemoveIf(func(_ string, _ pcommon.Value) bool { return true }) + }, + }, + { + statements: []string{`set(datapoint.attributes["test"], Log(1)) where metric.name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutDouble("test", 0.0) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutDouble("test", 0.0) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statements[0], func(t *testing.T) { + td := constructMetrics() + var contextStatements []common.ContextStatements + for _, statement := range tt.statements { + contextStatements = append(contextStatements, common.ContextStatements{Context: "", Statements: []string{statement}}) + } + + processor, err := NewProcessor(contextStatements, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessMetrics(context.Background(), td) + assert.NoError(t, err) + + exTd := constructMetrics() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func Test_ProcessMetrics_MixContext(t *testing.T) { tests := []struct { name string @@ -862,7 +1572,7 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { } } -func Test_ProcessMetrics_Error(t *testing.T) { +func Test_ProcessMetrics_ErrorMode(t *testing.T) { tests := []struct { statement string context common.ContextID diff --git a/processor/transformprocessor/internal/traces/processor.go b/processor/transformprocessor/internal/traces/processor.go index e20c87880ce3..bc46f6d0819c 100644 --- a/processor/transformprocessor/internal/traces/processor.go +++ b/processor/transformprocessor/internal/traces/processor.go @@ -7,7 +7,7 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/multierr" "go.uber.org/zap" @@ -17,7 +17,7 @@ import ( ) type Processor struct { - contexts []consumer.Traces + contexts []common.TracesConsumer logger *zap.Logger } @@ -27,7 +27,7 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E return nil, err } - contexts := make([]consumer.Traces, len(contextStatements)) + contexts := make([]common.TracesConsumer, len(contextStatements)) var errors error for i, cs := range contextStatements { context, err := pc.ParseContextStatements(cs) @@ -49,7 +49,8 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E func (p *Processor) ProcessTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { for _, c := range p.contexts { - err := c.ConsumeTraces(ctx, td) + cache := pcommon.NewMap() + err := c.ConsumeTraces(ctx, td, &cache) if err != nil { p.logger.Error("failed processing traces", zap.Error(err)) return td, err diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go index 0da86dfeb262..9ac79c8a2e4b 100644 --- a/processor/transformprocessor/internal/traces/processor_test.go +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -70,6 +70,47 @@ func Test_ProcessTraces_ResourceContext(t *testing.T) { } } +func Test_ProcessTraces_InferredResourceContext(t *testing.T) { + tests := []struct { + statement string + want func(td ptrace.Traces) + }{ + { + statement: `set(resource.attributes["test"], "pass")`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(resource.attributes["test"], "pass") where resource.attributes["host.name"] == "wrong"`, + want: func(_ ptrace.Traces) { + }, + }, + { + statement: `set(resource.schema_url, "test_schema_url")`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).SetSchemaUrl("test_schema_url") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructTraces() + processor, err := NewProcessor([]common.ContextStatements{{Context: "", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessTraces(context.Background(), td) + assert.NoError(t, err) + + exTd := constructTraces() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func Test_ProcessTraces_ScopeContext(t *testing.T) { tests := []struct { statement string @@ -111,6 +152,47 @@ func Test_ProcessTraces_ScopeContext(t *testing.T) { } } +func Test_ProcessTraces_InferredScopeContext(t *testing.T) { + tests := []struct { + statement string + want func(td ptrace.Traces) + }{ + { + statement: `set(scope.attributes["test"], "pass") where scope.name == "scope"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(scope.attributes["test"], "pass") where scope.version == 2`, + want: func(_ ptrace.Traces) { + }, + }, + { + statement: `set(scope.schema_url, "test_schema_url")`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).SetSchemaUrl("test_schema_url") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructTraces() + processor, err := NewProcessor([]common.ContextStatements{{Context: "", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessTraces(context.Background(), td) + assert.NoError(t, err) + + exTd := constructTraces() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func Test_ProcessTraces_TraceContext(t *testing.T) { tests := []struct { statement string @@ -411,6 +493,306 @@ func Test_ProcessTraces_TraceContext(t *testing.T) { } } +func Test_ProcessTraces_InferredTraceContext(t *testing.T) { + tests := []struct { + statement string + want func(td ptrace.Traces) + }{ + { + statement: `set(span.attributes["test"], "pass") where span.name == "operationA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(span.attributes["test"], "pass") where resource.attributes["host.name"] == "localhost"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `keep_keys(span.attributes, ["http.method"]) where span.name == "operationA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().Clear() + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("http.method", "get") + }, + }, + { + statement: `set(span.status.code, 1) where span.attributes["http.path"] == "/health"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Status().SetCode(ptrace.StatusCodeOk) + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Status().SetCode(ptrace.StatusCodeOk) + }, + }, + { + statement: `set(span.attributes["test"], "pass") where span.dropped_attributes_count == 1`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(span.attributes["test"], "pass") where span.dropped_events_count == 1`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(span.attributes["test"], "pass") where span.dropped_links_count == 1`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(span.attributes["test"], "pass") where span.span_id == SpanID(0x0102030405060708)`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(span.attributes["test"], "pass") where span.parent_span_id == SpanID(0x0807060504030201)`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(span.attributes["test"], "pass") where span.trace_id == TraceID(0x0102030405060708090a0b0c0d0e0f10)`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(span.attributes["test"], "pass") where span.trace_state == "new"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `replace_pattern(span.attributes["http.method"], "get", "post")`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("http.method", "post") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("http.method", "post") + }, + }, + { + statement: `replace_all_patterns(span.attributes, "value", "get", "post")`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("http.method", "post") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("http.method", "post") + }, + }, + { + statement: `replace_all_patterns(span.attributes, "key", "http.url", "url")`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().Clear() + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("http.method", "get") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("http.path", "/health") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("url", "http://localhost/health") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("flags", "A|B|C") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("total.string", "123456789") + + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().Clear() + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("http.method", "get") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("http.path", "/health") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("url", "http://localhost/health") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("flags", "C|D") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("total.string", "345678") + }, + }, + { + statement: `set(span.attributes["test"], "pass") where IsMatch(span.name, "operation[AC]")`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(span.attributes["test"], "pass") where span.attributes["doesnt exist"] == nil`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `delete_key(span.attributes, "http.url") where span.name == "operationA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().Clear() + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("http.method", "get") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("http.path", "/health") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("total.string", "123456789") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("flags", "A|B|C") + }, + }, + { + statement: `delete_matching_keys(span.attributes, "http.*t.*") where span.name == "operationA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().Clear() + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("http.url", "http://localhost/health") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("flags", "A|B|C") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("total.string", "123456789") + }, + }, + { + statement: `set(span.attributes["test"], "pass") where span.kind == SPAN_KIND_INTERNAL`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + statement: `set(span.kind, SPAN_KIND_SERVER) where span.kind == 1`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetKind(2) + }, + }, + { + statement: `set(span.attributes["test"], Concat([span.attributes["http.method"], span.attributes["http.url"]], ": "))`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "get: http://localhost/health") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("test", "get: http://localhost/health") + }, + }, + { + statement: `set(span.attributes["test"], Concat([span.attributes["http.method"], ": ", span.attributes["http.url"]], ""))`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "get: http://localhost/health") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("test", "get: http://localhost/health") + }, + }, + { + statement: `set(span.attributes["test"], Concat([span.attributes["http.method"], span.attributes["http.url"]], ": ")) where span.name == Concat(["operation", "A"], "")`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "get: http://localhost/health") + }, + }, + { + statement: `set(span.attributes["kind"], Concat(["kind", ": ", span.kind], "")) where span.kind == 1`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("kind", "kind: 1") + }, + }, + { + statement: `set(span.attributes["test"], Split(span.attributes["flags"], "|"))`, + want: func(td ptrace.Traces) { + v1 := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutEmptySlice("test") + v1.AppendEmpty().SetStr("A") + v1.AppendEmpty().SetStr("B") + v1.AppendEmpty().SetStr("C") + v2 := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutEmptySlice("test") + v2.AppendEmpty().SetStr("C") + v2.AppendEmpty().SetStr("D") + }, + }, + { + statement: `set(span.attributes["test"], Split(span.attributes["flags"], "|")) where span.name == "operationA"`, + want: func(td ptrace.Traces) { + v1 := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutEmptySlice("test") + v1.AppendEmpty().SetStr("A") + v1.AppendEmpty().SetStr("B") + v1.AppendEmpty().SetStr("C") + }, + }, + { + statement: `set(span.attributes["test"], Split(span.attributes["not_exist"], "|"))`, + want: func(_ ptrace.Traces) {}, + }, + { + statement: `set(span.attributes["test"], Substring(span.attributes["total.string"], 3, 3))`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "456") + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("test", "678") + }, + }, + { + statement: `set(span.attributes["test"], Substring(span.attributes["total.string"], 3, 3)) where span.name == "operationA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "456") + }, + }, + { + statement: `set(span.attributes["test"], Substring(span.attributes["not_exist"], 3, 3))`, + want: func(_ ptrace.Traces) {}, + }, + { + statement: `set(span.attributes["test"], ["A", "B", "C"]) where span.name == "operationA"`, + want: func(td ptrace.Traces) { + v1 := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutEmptySlice("test") + v1.AppendEmpty().SetStr("A") + v1.AppendEmpty().SetStr("B") + v1.AppendEmpty().SetStr("C") + }, + }, + { + statement: `set(span.attributes["entrypoint"], span.name) where span.parent_span_id == SpanID(0x0000000000000000)`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("entrypoint", "operationB") + }, + }, + { + statement: `set(span.attributes["entrypoint-root"], span.name) where IsRootSpan()`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1).Attributes().PutStr("entrypoint-root", "operationB") + }, + }, + { + statement: `set(span.attributes["test"], ConvertCase(span.name, "lower")) where span.name == "operationA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "operationa") + }, + }, + { + statement: `set(span.attributes["test"], ConvertCase(span.name, "upper")) where span.name == "operationA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "OPERATIONA") + }, + }, + { + statement: `set(span.attributes["test"], ConvertCase(span.name, "snake")) where span.name == "operationA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "operation_a") + }, + }, + { + statement: `set(span.attributes["test"], ConvertCase(span.name, "camel")) where span.name == "operationA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "OperationA") + }, + }, + { + statement: `merge_maps(span.attributes, ParseJSON("{\"json_test\":\"pass\"}"), "insert") where span.name == "operationA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("json_test", "pass") + }, + }, + { + statement: `limit(span.attributes, 0, []) where span.name == "operationA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().RemoveIf(func(_ string, _ pcommon.Value) bool { return true }) + }, + }, + { + statement: `set(span.attributes["test"], Log(1)) where span.name == "operationA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutDouble("test", 0.0) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructTraces() + processor, err := NewProcessor([]common.ContextStatements{{Context: "", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessTraces(context.Background(), td) + assert.NoError(t, err) + + exTd := constructTraces() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func Test_ProcessTraces_SpanEventContext(t *testing.T) { tests := []struct { statement string @@ -441,6 +823,36 @@ func Test_ProcessTraces_SpanEventContext(t *testing.T) { } } +func Test_ProcessTraces_InferredSpanEventContext(t *testing.T) { + tests := []struct { + statement string + want func(td ptrace.Traces) + }{ + { + statement: `set(spanevent.attributes["test"], "pass") where spanevent.name == "eventA"`, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Events().At(0).Attributes().PutStr("test", "pass") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructTraces() + processor, err := NewProcessor([]common.ContextStatements{{Context: "", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessTraces(context.Background(), td) + assert.NoError(t, err) + + exTd := constructTraces() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func Test_ProcessTraces_MixContext(t *testing.T) { tests := []struct { name string @@ -558,7 +970,7 @@ func Test_ProcessTraces_MixContext(t *testing.T) { } } -func Test_ProcessTraces_Error(t *testing.T) { +func Test_ProcessTraces_ErrorMode(t *testing.T) { tests := []struct { statement string context common.ContextID diff --git a/processor/transformprocessor/testdata/config.yaml b/processor/transformprocessor/testdata/config.yaml index 8cf295298e54..c327eeb3a2ef 100644 --- a/processor/transformprocessor/testdata/config.yaml +++ b/processor/transformprocessor/testdata/config.yaml @@ -118,3 +118,31 @@ transform/unknown_context: transform/unknown_error_mode: error_mode: test + +transform/structured_configuration_with_path_context: + trace_statements: + - context: span + statements: + - set(span.name, "bear") where span.attributes["http.path"] == "/animal" + metric_statements: + - context: metric + statements: + - set(metric.name, "bear") where resource.attributes["http.path"] == "/animal" + log_statements: + - context: log + statements: + - set(log.body, "bear") where log.attributes["http.path"] == "/animal" + +transform/structured_configuration_with_inferred_context: + trace_statements: + - statements: + - set(span.name, "bear") where span.attributes["http.path"] == "/animal" + - set(resource.attributes["name"], "bear") + metric_statements: + - statements: + - set(metric.name, "bear") where resource.attributes["http.path"] == "/animal" + - set(resource.attributes["name"], "bear") + log_statements: + - statements: + - set(log.body, "bear") where log.attributes["http.path"] == "/animal" + - set(resource.attributes["name"], "bear") From af89c24a8b621e779f630e92adc76ee1a886b1d4 Mon Sep 17 00:00:00 2001 From: edmocosta <11836452+edmocosta@users.noreply.github.com> Date: Thu, 16 Jan 2025 15:57:16 +0100 Subject: [PATCH 2/3] Remove EnableParserCollectionModifiedStatementLogging from configured options --- processor/transformprocessor/internal/common/logs.go | 1 - processor/transformprocessor/internal/common/metrics.go | 1 - processor/transformprocessor/internal/common/traces.go | 1 - 3 files changed, 3 deletions(-) diff --git a/processor/transformprocessor/internal/common/logs.go b/processor/transformprocessor/internal/common/logs.go index 60fd8be5ac75..711cbc418396 100644 --- a/processor/transformprocessor/internal/common/logs.go +++ b/processor/transformprocessor/internal/common/logs.go @@ -75,7 +75,6 @@ func WithLogErrorMode(errorMode ottl.ErrorMode) LogParserCollectionOption { func NewLogParserCollection(settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) { pcOptions := []ottl.ParserCollectionOption[LogsConsumer]{ withCommonContextParsers[LogsConsumer](), - ottl.EnableParserCollectionModifiedStatementLogging[LogsConsumer](true), } for _, option := range options { diff --git a/processor/transformprocessor/internal/common/metrics.go b/processor/transformprocessor/internal/common/metrics.go index fdc9f455f99c..82f5434d18e4 100644 --- a/processor/transformprocessor/internal/common/metrics.go +++ b/processor/transformprocessor/internal/common/metrics.go @@ -195,7 +195,6 @@ func WithMetricErrorMode(errorMode ottl.ErrorMode) MetricParserCollectionOption func NewMetricParserCollection(settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) { pcOptions := []ottl.ParserCollectionOption[MetricsConsumer]{ withCommonContextParsers[MetricsConsumer](), - ottl.EnableParserCollectionModifiedStatementLogging[MetricsConsumer](true), } for _, option := range options { diff --git a/processor/transformprocessor/internal/common/traces.go b/processor/transformprocessor/internal/common/traces.go index 84cf139a8309..4b3dd117b1f0 100644 --- a/processor/transformprocessor/internal/common/traces.go +++ b/processor/transformprocessor/internal/common/traces.go @@ -123,7 +123,6 @@ func WithTraceErrorMode(errorMode ottl.ErrorMode) TraceParserCollectionOption { func NewTraceParserCollection(settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) { pcOptions := []ottl.ParserCollectionOption[TracesConsumer]{ withCommonContextParsers[TracesConsumer](), - ottl.EnableParserCollectionModifiedStatementLogging[TracesConsumer](true), } for _, option := range options { From e08b878fe36f0bed495384b73a492237cc3072ae Mon Sep 17 00:00:00 2001 From: edmocosta <11836452+edmocosta@users.noreply.github.com> Date: Thu, 16 Jan 2025 21:40:23 +0100 Subject: [PATCH 3/3] Remove default cache value and add changelog --- ...d-add-intial-contextinference-support.yaml | 27 +++++++++++++++++++ .../internal/logs/processor.go | 4 +-- .../internal/metrics/processor.go | 4 +-- .../internal/traces/processor.go | 4 +-- 4 files changed, 30 insertions(+), 9 deletions(-) create mode 100644 .chloggen/replace-pc-and-add-intial-contextinference-support.yaml diff --git a/.chloggen/replace-pc-and-add-intial-contextinference-support.yaml b/.chloggen/replace-pc-and-add-intial-contextinference-support.yaml new file mode 100644 index 000000000000..9299d503395c --- /dev/null +++ b/.chloggen/replace-pc-and-add-intial-contextinference-support.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/transformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Replace parser collection implementations with `ottl.ParserCollection` and add initial support for expressing statement's context via path names. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29017] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/transformprocessor/internal/logs/processor.go b/processor/transformprocessor/internal/logs/processor.go index 011a6fa40114..23037fe847ba 100644 --- a/processor/transformprocessor/internal/logs/processor.go +++ b/processor/transformprocessor/internal/logs/processor.go @@ -7,7 +7,6 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/multierr" "go.uber.org/zap" @@ -56,8 +55,7 @@ func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, e defer pdatautil.GroupByResourceLogs(ld.ResourceLogs()) } for _, c := range p.contexts { - cache := pcommon.NewMap() - err := c.ConsumeLogs(ctx, ld, &cache) + err := c.ConsumeLogs(ctx, ld, nil) if err != nil { p.logger.Error("failed processing logs", zap.Error(err)) return ld, err diff --git a/processor/transformprocessor/internal/metrics/processor.go b/processor/transformprocessor/internal/metrics/processor.go index bfab0bb48313..cc134d3d138f 100644 --- a/processor/transformprocessor/internal/metrics/processor.go +++ b/processor/transformprocessor/internal/metrics/processor.go @@ -7,7 +7,6 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/multierr" "go.uber.org/zap" @@ -49,8 +48,7 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E func (p *Processor) ProcessMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { for _, c := range p.contexts { - cache := pcommon.NewMap() - err := c.ConsumeMetrics(ctx, md, &cache) + err := c.ConsumeMetrics(ctx, md, nil) if err != nil { p.logger.Error("failed processing metrics", zap.Error(err)) return md, err diff --git a/processor/transformprocessor/internal/traces/processor.go b/processor/transformprocessor/internal/traces/processor.go index bc46f6d0819c..6af07a4a942e 100644 --- a/processor/transformprocessor/internal/traces/processor.go +++ b/processor/transformprocessor/internal/traces/processor.go @@ -7,7 +7,6 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/multierr" "go.uber.org/zap" @@ -49,8 +48,7 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E func (p *Processor) ProcessTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { for _, c := range p.contexts { - cache := pcommon.NewMap() - err := c.ConsumeTraces(ctx, td, &cache) + err := c.ConsumeTraces(ctx, td, nil) if err != nil { p.logger.Error("failed processing traces", zap.Error(err)) return td, err