From f1689da4876fd106c2f59aae490c74eec81c07e7 Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Wed, 22 Jan 2025 13:43:05 -0500 Subject: [PATCH] inal processor chunk split out --- processor/schemaprocessor/DESIGN.md | 4 +- processor/schemaprocessor/factory.go | 8 +- processor/schemaprocessor/go.mod | 4 +- processor/schemaprocessor/processor.go | 157 +++++ processor/schemaprocessor/processor_test.go | 551 ++++++++++++++++++ .../testdata/schema_sections/all/1.1.0 | 17 + .../testdata/schema_sections/all/logs_in.json | 68 +++ .../schema_sections/all/logs_out.json | 68 +++ .../logs_rename_attributes/1.1.0 | 17 + .../logs_rename_attributes/logs_in.json | 68 +++ .../logs_rename_attributes/logs_out.json | 68 +++ .../metrics_rename_attributes/1.1.0 | 21 + .../metrics_rename_attributes/metrics_in.json | 111 ++++ .../metrics_out.json | 111 ++++ .../metrics_rename_metrics/1.1.0 | 15 + .../metrics_rename_metrics/metrics_in.json | 59 ++ .../metrics_rename_metrics/metrics_out.json | 59 ++ .../testdata/schema_sections/resources/1.1.0 | 17 + .../schema_sections/resources/logs_in.json | 68 +++ .../schema_sections/resources/logs_out.json | 68 +++ .../span_events_rename_attributes/1.1.0 | 35 ++ .../traces_in.json | 128 ++++ .../traces_out.json | 128 ++++ .../span_events_rename_spans/1.1.0 | 17 + .../span_events_rename_spans/traces_in.json | 123 ++++ .../span_events_rename_spans/traces_out.json | 123 ++++ .../testdata/schema_sections/spans/1.1.0 | 21 + .../schema_sections/spans/traces_in.json | 64 ++ .../schema_sections/spans/traces_out.json | 64 ++ .../testschemas/schemaprecedence/1.0.0 | 19 + .../testschemas/schemaprecedence/1.1.0 | 19 + .../testschemas/schemaprecedence/1.2.0 | 19 + processor/schemaprocessor/transformer.go | 57 -- processor/schemaprocessor/transformer_test.go | 88 --- 34 files changed, 2311 insertions(+), 153 deletions(-) create mode 100644 processor/schemaprocessor/processor.go create mode 100644 processor/schemaprocessor/processor_test.go create mode 100644 processor/schemaprocessor/testdata/schema_sections/all/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/all/logs_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/all/logs_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/resources/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/resources/logs_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/resources/logs_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/spans/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/spans/traces_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/spans/traces_out.json create mode 100644 processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.0.0 create mode 100644 processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.1.0 create mode 100644 processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.2.0 delete mode 100644 processor/schemaprocessor/transformer.go delete mode 100644 processor/schemaprocessor/transformer_test.go diff --git a/processor/schemaprocessor/DESIGN.md b/processor/schemaprocessor/DESIGN.md index ae70dce24c60..ed074a8b4ed6 100644 --- a/processor/schemaprocessor/DESIGN.md +++ b/processor/schemaprocessor/DESIGN.md @@ -19,8 +19,8 @@ graph LR; end ``` -The [Transformer](transformer.go) is registered as a Processor in the Collector by the factory. -Data flows into the Transformer, which uses the Schema URL to fetch the translation from the Translation Manager. +The [schemaprocessor](processor.go) is registered as a Processor in the Collector by the factory. +Data flows into the Processor, which uses the Schema URL to fetch the translation from the Translation Manager. The Translation Manager (at internal/translation/manager.go in a future PR) is responsible for fetching and caching the translations. It takes in a schema URL and returns a Translator struct. The Translator struct contains the target schema URL, the target schema version, and a list of Revisions. The Translator figures out what the version of the incoming data is and what Revisions to apply to the incoming data to get it to the target schema version. The Translator is also responsible for applying the Revisions to the incoming data - it iterates through these Revisions and applies them to the incoming data. diff --git a/processor/schemaprocessor/factory.go b/processor/schemaprocessor/factory.go index 68525c6afb78..fb8e73fe327f 100644 --- a/processor/schemaprocessor/factory.go +++ b/processor/schemaprocessor/factory.go @@ -22,7 +22,7 @@ var processorCapabilities = consumer.Capabilities{MutatesData: true} // factory will store any of the precompiled schemas in future type factory struct{} -// newDefaultConfiguration returns the configuration for schema transformer processor +// newDefaultConfiguration returns the configuration for schema processor // with the default values being used throughout it func newDefaultConfiguration() component.Config { return &Config{ @@ -47,7 +47,7 @@ func (f factory) createLogsProcessor( cfg component.Config, next consumer.Logs, ) (processor.Logs, error) { - transformer, err := newTransformer(ctx, cfg, set) + transformer, err := newSchemaProcessor(ctx, cfg, set) if err != nil { return nil, err } @@ -68,7 +68,7 @@ func (f factory) createMetricsProcessor( cfg component.Config, next consumer.Metrics, ) (processor.Metrics, error) { - transformer, err := newTransformer(ctx, cfg, set) + transformer, err := newSchemaProcessor(ctx, cfg, set) if err != nil { return nil, err } @@ -89,7 +89,7 @@ func (f factory) createTracesProcessor( cfg component.Config, next consumer.Traces, ) (processor.Traces, error) { - transformer, err := newTransformer(ctx, cfg, set) + transformer, err := newSchemaProcessor(ctx, cfg, set) if err != nil { return nil, err } diff --git a/processor/schemaprocessor/go.mod b/processor/schemaprocessor/go.mod index 9c7dfac2a207..977054adc0f6 100644 --- a/processor/schemaprocessor/go.mod +++ b/processor/schemaprocessor/go.mod @@ -13,8 +13,10 @@ require ( go.opentelemetry.io/collector/consumer v1.25.0 go.opentelemetry.io/collector/consumer/consumertest v0.119.0 go.opentelemetry.io/collector/pdata v1.25.0 + go.opentelemetry.io/collector/pipeline v0.119.0 go.opentelemetry.io/collector/processor v0.119.0 go.opentelemetry.io/collector/processor/processortest v0.119.0 + go.opentelemetry.io/otel/metric v1.34.0 go.opentelemetry.io/otel/schema v0.0.12 go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 @@ -59,11 +61,9 @@ require ( go.opentelemetry.io/collector/extension/auth v0.119.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.119.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.119.0 // indirect - go.opentelemetry.io/collector/pipeline v0.119.0 // indirect go.opentelemetry.io/collector/processor/xprocessor v0.119.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect go.opentelemetry.io/otel v1.34.0 // indirect - go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/otel/sdk v1.34.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect diff --git a/processor/schemaprocessor/processor.go b/processor/schemaprocessor/processor.go new file mode 100644 index 000000000000..1146897155f5 --- /dev/null +++ b/processor/schemaprocessor/processor.go @@ -0,0 +1,157 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor" + +import ( + "context" + "errors" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation" +) + +type schemaprocessor struct { + telemetry component.TelemetrySettings + config *Config + + log *zap.Logger + + manager translation.Manager +} + +func newSchemaProcessor(_ context.Context, conf component.Config, set processor.Settings) (*schemaprocessor, error) { + cfg, ok := conf.(*Config) + if !ok { + return nil, errors.New("invalid configuration provided") + } + + m, err := translation.NewManager( + cfg.Targets, + set.Logger.Named("schema-manager"), + ) + if err != nil { + return nil, err + } + + return &schemaprocessor{ + config: cfg, + telemetry: set.TelemetrySettings, + log: set.Logger, + manager: m, + }, nil +} + +func (t schemaprocessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { + for rl := 0; rl < ld.ResourceLogs().Len(); rl++ { + rLog := ld.ResourceLogs().At(rl) + resourceSchemaURL := rLog.SchemaUrl() + err := t.manager. + RequestTranslation(ctx, resourceSchemaURL). + ApplyAllResourceChanges(rLog, resourceSchemaURL) + if err != nil { + return plog.Logs{}, err + } + for sl := 0; sl < rLog.ScopeLogs().Len(); sl++ { + log := rLog.ScopeLogs().At(sl) + logSchemaURL := log.SchemaUrl() + if logSchemaURL == "" { + logSchemaURL = resourceSchemaURL + } + + err := t.manager. + RequestTranslation(ctx, logSchemaURL). + ApplyScopeLogChanges(log, logSchemaURL) + if err != nil { + return plog.Logs{}, err + } + } + } + return ld, nil +} + +func (t schemaprocessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { + for rm := 0; rm < md.ResourceMetrics().Len(); rm++ { + rMetric := md.ResourceMetrics().At(rm) + resourceSchemaURL := rMetric.SchemaUrl() + err := t.manager. + RequestTranslation(ctx, resourceSchemaURL). + ApplyAllResourceChanges(rMetric, resourceSchemaURL) + if err != nil { + return pmetric.Metrics{}, err + } + for sm := 0; sm < rMetric.ScopeMetrics().Len(); sm++ { + metric := rMetric.ScopeMetrics().At(sm) + metricSchemaURL := metric.SchemaUrl() + if metricSchemaURL == "" { + metricSchemaURL = resourceSchemaURL + } + err := t.manager. + RequestTranslation(ctx, metricSchemaURL). + ApplyScopeMetricChanges(metric, metricSchemaURL) + if err != nil { + return pmetric.Metrics{}, err + } + } + } + return md, nil +} + +func (t schemaprocessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { + for rt := 0; rt < td.ResourceSpans().Len(); rt++ { + rTrace := td.ResourceSpans().At(rt) + // todo(ankit) do i need to check if this is empty? + resourceSchemaURL := rTrace.SchemaUrl() + err := t.manager. + RequestTranslation(ctx, resourceSchemaURL). + ApplyAllResourceChanges(rTrace, resourceSchemaURL) + if err != nil { + return ptrace.Traces{}, err + } + for ss := 0; ss < rTrace.ScopeSpans().Len(); ss++ { + span := rTrace.ScopeSpans().At(ss) + spanSchemaURL := span.SchemaUrl() + if spanSchemaURL == "" { + spanSchemaURL = resourceSchemaURL + } + err := t.manager. + RequestTranslation(ctx, spanSchemaURL). + ApplyScopeSpanChanges(span, spanSchemaURL) + if err != nil { + return ptrace.Traces{}, err + } + } + } + return td, nil +} + +// start will load the remote file definition if it isn't already cached +// and resolve the schema translation file +func (t *schemaprocessor) start(ctx context.Context, host component.Host) error { + var providers []translation.Provider + // Check for additional extensions that can be checked first before + // perfomring the http request + // TODO(MovieStoreGuy): Check for storage extensions + + client, err := t.config.ToClient(ctx, host, t.telemetry) + if err != nil { + return err + } + + if err := t.manager.SetProviders(append(providers, translation.NewHTTPProvider(client))...); err != nil { + return err + } + go func(ctx context.Context) { + for _, schemaURL := range t.config.Prefetch { + _ = t.manager.RequestTranslation(ctx, schemaURL) + } + }(ctx) + + return nil +} diff --git a/processor/schemaprocessor/processor_test.go b/processor/schemaprocessor/processor_test.go new file mode 100644 index 000000000000..925f6018984a --- /dev/null +++ b/processor/schemaprocessor/processor_test.go @@ -0,0 +1,551 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package schemaprocessor + +import ( + "context" + "embed" + _ "embed" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/otel/metric/noop" + "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation" +) + +func newTestProcessor(t *testing.T) *schemaprocessor { + trans, err := newSchemaProcessor(context.Background(), newDefaultConfiguration(), processor.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + MeterProvider: noop.MeterProvider{}, + }, + }) + require.NoError(t, err, "Must not error when creating default schemaprocessor") + return trans +} + +func TestProcessorStart(t *testing.T) { + t.Parallel() + + trans := newTestProcessor(t) + assert.NoError(t, trans.start(context.Background(), componenttest.NewNopHost())) +} + +//go:embed testdata +var f embed.FS + +func plogsFromJSON(t *testing.T, path string) plog.Logs { + t.Helper() + unmarshaler := plog.JSONUnmarshaler{} + signalJSON, err := f.ReadFile(path) + require.NoError(t, err) + inSignals, err := unmarshaler.UnmarshalLogs(signalJSON) + require.NoError(t, err) + require.NotNil(t, inSignals.ResourceLogs()) + require.NotEqual(t, 0, inSignals.LogRecordCount()) + return inSignals +} + +func ptracesFromJSON(t *testing.T, path string) ptrace.Traces { + t.Helper() + unmarshaler := ptrace.JSONUnmarshaler{} + signalJSON, err := f.ReadFile(path) + require.NoError(t, err) + inSignals, err := unmarshaler.UnmarshalTraces(signalJSON) + require.NoError(t, err) + require.NotNil(t, inSignals.ResourceSpans()) + require.NotEqual(t, 0, inSignals.SpanCount()) + return inSignals +} + +func pmetricsFromJSON(t *testing.T, path string) pmetric.Metrics { + t.Helper() + unmarshaler := pmetric.JSONUnmarshaler{} + signalJSON, err := f.ReadFile(path) + require.NoError(t, err) + inSignals, err := unmarshaler.UnmarshalMetrics(signalJSON) + require.NoError(t, err) + require.NotNil(t, inSignals.ResourceMetrics()) + require.NotEqual(t, 0, inSignals.MetricCount()) + return inSignals +} + +func buildTestProcessor(t *testing.T, targetURL string) *schemaprocessor { + t.Helper() + defaultConfig := newDefaultConfiguration() + castedConfig := defaultConfig.(*Config) + castedConfig.Targets = []string{targetURL} + processor, err := newSchemaProcessor(context.Background(), castedConfig, processor.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + }, + }) + require.NoError(t, err, "Must not error when creating schemaprocessor") + err = processor.manager.SetProviders(translation.NewTestProvider(&f)) + require.NoError(t, err) + return processor +} + +func TestProcessorSchemaBySections(t *testing.T) { + tests := []struct { + name string + section string + dataType pipeline.Signal + }{ + { + // todo(ankit) do i need to test all data types here? + name: "all_logs", + section: "all", + dataType: pipeline.SignalLogs, + }, + { + // todo(ankit) do i need to test all data types here? + name: "resources_logs", + section: "resources", + dataType: pipeline.SignalLogs, + }, + { + name: "spans", + section: "spans", + dataType: pipeline.SignalTraces, + }, + { + name: "span_events_rename_spans", + section: "span_events_rename_spans", + dataType: pipeline.SignalTraces, + }, + { + name: "span_events_rename_attributes", + section: "span_events_rename_attributes", + dataType: pipeline.SignalTraces, + }, + { + name: "metrics_rename_metrics", + section: "metrics_rename_metrics", + dataType: pipeline.SignalMetrics, + }, + { + name: "metrics_rename_attributes", + section: "metrics_rename_attributes", + dataType: pipeline.SignalMetrics, + }, + { + name: "logs_rename_attributes", + section: "logs_rename_attributes", + dataType: pipeline.SignalLogs, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + processorTarget := fmt.Sprintf("https://example.com/testdata/schema_sections/%s/1.0.0", tc.section) + processor := buildTestProcessor(t, processorTarget) + inDataPath := fmt.Sprintf("testdata/schema_sections/%s/%s_in.json", tc.section, tc.dataType) + outDataPath := fmt.Sprintf("testdata/schema_sections/%s/%s_out.json", tc.section, tc.dataType) + switch tc.dataType { + case pipeline.SignalLogs: + inLogs := plogsFromJSON(t, inDataPath) + expected := plogsFromJSON(t, outDataPath) + + logs, err := processor.processLogs(context.Background(), inLogs) + require.NoError(t, err) + require.NoError(t, plogtest.CompareLogs(expected, logs), "Must match the expected values") + case pipeline.SignalMetrics: + inMetrics := pmetricsFromJSON(t, inDataPath) + expected := pmetricsFromJSON(t, outDataPath) + + metrics, err := processor.processMetrics(context.Background(), inMetrics) + require.NoError(t, err) + require.NoError(t, pmetrictest.CompareMetrics(expected, metrics), "Must match the expected values") + case pipeline.SignalTraces: + inTraces := ptracesFromJSON(t, inDataPath) + expected := ptracesFromJSON(t, outDataPath) + + traces, err := processor.processTraces(context.Background(), inTraces) + require.NoError(t, err) + require.NoError(t, ptracetest.CompareTraces(expected, traces), "Must match the expected values") + default: + require.FailNow(t, "unrecognized data type") + return + } + }) + } +} + +func TestProcessorProcessing(t *testing.T) { + t.Parallel() + + trans := newTestProcessor(t) + t.Run("metrics", func(t *testing.T) { + in := pmetric.NewMetrics() + in.ResourceMetrics().AppendEmpty() + in.ResourceMetrics().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0") + in.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty() + m := in.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() + m.SetName("test-data") + m.SetDescription("Only used throughout tests") + m.SetUnit("seconds") + m.CopyTo(in.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)) + + out, err := trans.processMetrics(context.Background(), in) + assert.NoError(t, err, "Must not error when processing metrics") + assert.Equal(t, in, out, "Must return the same data (subject to change)") + }) + + t.Run("traces", func(t *testing.T) { + in := ptrace.NewTraces() + in.ResourceSpans().AppendEmpty() + in.ResourceSpans().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0") + in.ResourceSpans().At(0).ScopeSpans().AppendEmpty() + s := in.ResourceSpans().At(0).ScopeSpans().At(0).Spans().AppendEmpty() + s.SetName("http.request") + s.SetKind(ptrace.SpanKindConsumer) + s.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}) + s.CopyTo(in.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)) + + out, err := trans.processTraces(context.Background(), in) + assert.NoError(t, err, "Must not error when processing metrics") + assert.Equal(t, in, out, "Must return the same data (subject to change)") + }) + + t.Run("logs", func(t *testing.T) { + in := plog.NewLogs() + in.ResourceLogs().AppendEmpty() + in.ResourceLogs().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0") + in.ResourceLogs().At(0).ScopeLogs().AppendEmpty() + l := in.ResourceLogs().At(0).ScopeLogs().At(0).Scope() + l.SetName("magical-logs") + l.SetVersion("alpha") + l.CopyTo(in.ResourceLogs().At(0).ScopeLogs().At(0).Scope()) + + out, err := trans.processLogs(context.Background(), in) + assert.NoError(t, err, "Must not error when processing metrics") + assert.Equal(t, in, out, "Must return the same data (subject to change)") + }) +} + +type SchemaUsed int + +const ( + ResourceSchemaVersionUsed = iota + 1 + ScopeSchemaVersionUsed + NoopSchemaUsed +) + +// returns a test log with no schema versions set +func generateLogForTest() plog.Logs { + in := plog.NewLogs() + in.ResourceLogs().AppendEmpty() + in.ResourceLogs().At(0).ScopeLogs().AppendEmpty() + l := in.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty() + l.Attributes().PutStr("input", "test") + return in +} + +//go:embed testdata/testschemas +var testdataFiles embed.FS + +// case 1: resource schema set, scope schema not set, use resource schema +// case 2: resource schema not set, scope schema set, use scope schema inside +// case 3: resource schema set, scope schema set, use scope schema +// case 4: resource schema not set, scope schema not set, noop translation +func TestProcessorScopeLogSchemaPrecedence(t *testing.T) { + t.Parallel() + tests := []struct { + name string + input func() plog.Logs + whichSchemaUsed SchemaUsed + wantErr assert.ErrorAssertionFunc + }{ + { + name: "resourcesetscopeunset", + input: func() plog.Logs { + log := generateLogForTest() + log.ResourceLogs().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.0.0") + return log + }, + whichSchemaUsed: ResourceSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourceunsetscopeset", + input: func() plog.Logs { + log := generateLogForTest() + log.ResourceLogs().At(0).ScopeLogs().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.1.0") + return log + }, + whichSchemaUsed: ScopeSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourcesetscopeset", + input: func() plog.Logs { + log := generateLogForTest() + log.ResourceLogs().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.0.0") + log.ResourceLogs().At(0).ScopeLogs().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.1.0") + + return log + }, + whichSchemaUsed: ScopeSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourceunsetscopeunset", + input: func() plog.Logs { + log := generateLogForTest() + return log + }, + // want: "https://example.com/testdata/testschemas/schemaprecedence/1.0.0", + whichSchemaUsed: NoopSchemaUsed, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defaultConfig := newDefaultConfiguration() + castedConfig := defaultConfig.(*Config) + castedConfig.Targets = []string{"https://example.com/testdata/testschemas/schemaprecedence/1.2.0"} + processor, err := newSchemaProcessor(context.Background(), defaultConfig, processor.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + }, + }) + require.NoError(t, err, "Must not error when creating schemaprocessor") + + err = processor.manager.SetProviders(translation.NewTestProvider(&testdataFiles)) + require.NoError(t, err) + got, err := processor.processLogs(context.Background(), tt.input()) + if !tt.wantErr(t, err, fmt.Sprintf("processLogs(%v)", tt.input())) { + return + } + targetLog := got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + assert.Equal(t, 1, targetLog.Attributes().Len()) + _, usedResource := targetLog.Attributes().Get("one_one_zero_output") + _, usedScope := targetLog.Attributes().Get("one_two_zero_output") + _, usedNoop := targetLog.Attributes().Get("input") + switch tt.whichSchemaUsed { + case ResourceSchemaVersionUsed: + assert.True(t, usedResource, "processLogs(%v) not using correct schema,, attributes present: %v", tt.name, targetLog.Attributes().AsRaw()) + case ScopeSchemaVersionUsed: + assert.True(t, usedScope, "processLogs(%v) not using correct schema, attributes present: %v", tt.name, targetLog.Attributes().AsRaw()) + case NoopSchemaUsed: + assert.True(t, usedNoop, "processLogs(%v) not using correct schema,, attributes present: %v", tt.name, targetLog.Attributes().AsRaw()) + } + }) + } +} + +// returns a test trace with no schema versions set +func generateTraceForTest() ptrace.Traces { + in := ptrace.NewTraces() + in.ResourceSpans().AppendEmpty() + in.ResourceSpans().At(0).ScopeSpans().AppendEmpty() + l := in.ResourceSpans().At(0).ScopeSpans().At(0).Spans().AppendEmpty() + l.Attributes().PutStr("input", "test") + return in +} + +// literally the exact same tests as above +// case 1: resource schema set, scope schema not set, use resource schema +// case 2: resource schema not set, scope schema set, use scope schema inside +// case 3: resource schema set, scope schema set, use scope schema +// case 4: resource schema not set, scope schema not set, noop translation +func TestProcessorScopeTraceSchemaPrecedence(t *testing.T) { + t.Parallel() + tests := []struct { + name string + input func() ptrace.Traces + whichSchemaUsed SchemaUsed + wantErr assert.ErrorAssertionFunc + }{ + { + name: "resourcesetscopeunset", + input: func() ptrace.Traces { + trace := generateTraceForTest() + trace.ResourceSpans().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.0.0") + return trace + }, + whichSchemaUsed: ResourceSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourceunsetscopeset", + input: func() ptrace.Traces { + trace := generateTraceForTest() + trace.ResourceSpans().At(0).ScopeSpans().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.1.0") + return trace + }, + whichSchemaUsed: ScopeSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourcesetscopeset", + input: func() ptrace.Traces { + trace := generateTraceForTest() + trace.ResourceSpans().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.0.0") + trace.ResourceSpans().At(0).ScopeSpans().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.1.0") + + return trace + }, + whichSchemaUsed: ScopeSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourceunsetscopeunset", + input: func() ptrace.Traces { + trace := generateTraceForTest() + return trace + }, + // want: "https://example.com/testdata/testschemas/schemaprecedence/1.0.0", + whichSchemaUsed: NoopSchemaUsed, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defaultConfig := newDefaultConfiguration() + castedConfig := defaultConfig.(*Config) + castedConfig.Targets = []string{"https://example.com/testdata/testschemas/schemaprecedence/1.2.0"} + processor, err := newSchemaProcessor(context.Background(), defaultConfig, processor.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + }, + }) + require.NoError(t, err, "Must not error when creating schemaprocessor") + + err = processor.manager.SetProviders(translation.NewTestProvider(&testdataFiles)) + require.NoError(t, err) + got, err := processor.processTraces(context.Background(), tt.input()) + if !tt.wantErr(t, err, fmt.Sprintf("processTraces(%v)", tt.input())) { + return + } + targetTrace := got.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + assert.Equal(t, 1, targetTrace.Attributes().Len()) + _, usedResource := targetTrace.Attributes().Get("one_one_zero_output") + _, usedScope := targetTrace.Attributes().Get("one_two_zero_output") + _, usedNoop := targetTrace.Attributes().Get("input") + switch tt.whichSchemaUsed { + case ResourceSchemaVersionUsed: + assert.True(t, usedResource, "processTraces(%v) not using correct schema,, attributes present: %v", tt.name, targetTrace.Attributes().AsRaw()) + case ScopeSchemaVersionUsed: + assert.True(t, usedScope, "processTraces(%v) not using correct schema, attributes present: %v", tt.name, targetTrace.Attributes().AsRaw()) + case NoopSchemaUsed: + assert.True(t, usedNoop, "processTraces(%v) not using correct schema,, attributes present: %v", tt.name, targetTrace.Attributes().AsRaw()) + } + }) + } +} + +// returns a test metric with no schema versions set +func generateMetricForTest() pmetric.Metrics { + in := pmetric.NewMetrics() + in.ResourceMetrics().AppendEmpty() + in.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty() + l := in.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() + l.SetEmptySum().DataPoints().AppendEmpty().Attributes().PutStr("input", "test") + return in +} + +// case 1: resource schema set, scope schema not set, use resource schema +// case 2: resource schema not set, scope schema set, use scope schema inside +// case 3: resource schema set, scope schema set, use scope schema +// case 4: resource schema not set, scope schema not set, noop translation +func TestProcessorScopeMetricSchemaPrecedence(t *testing.T) { + t.Parallel() + tests := []struct { + name string + input func() pmetric.Metrics + whichSchemaUsed SchemaUsed + wantErr assert.ErrorAssertionFunc + }{ + { + name: "resourcesetscopeunset", + input: func() pmetric.Metrics { + metric := generateMetricForTest() + metric.ResourceMetrics().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.0.0") + return metric + }, + whichSchemaUsed: ResourceSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourceunsetscopeset", + input: func() pmetric.Metrics { + metric := generateMetricForTest() + metric.ResourceMetrics().At(0).ScopeMetrics().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.1.0") + return metric + }, + whichSchemaUsed: ScopeSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourcesetscopeset", + input: func() pmetric.Metrics { + metric := generateMetricForTest() + metric.ResourceMetrics().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.0.0") + metric.ResourceMetrics().At(0).ScopeMetrics().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.1.0") + + return metric + }, + whichSchemaUsed: ScopeSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourceunsetscopeunset", + input: func() pmetric.Metrics { + metric := generateMetricForTest() + return metric + }, + // want: "https://example.com/testdata/testschemas/schemaprecedence/1.0.0", + whichSchemaUsed: NoopSchemaUsed, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defaultConfig := newDefaultConfiguration() + castedConfig := defaultConfig.(*Config) + castedConfig.Targets = []string{"https://example.com/testdata/testschemas/schemaprecedence/1.2.0"} + processor, err := newSchemaProcessor(context.Background(), defaultConfig, processor.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + }, + }) + require.NoError(t, err, "Must not error when creating schemaprocessor") + + err = processor.manager.SetProviders(translation.NewTestProvider(&testdataFiles)) + require.NoError(t, err) + got, err := processor.processMetrics(context.Background(), tt.input()) + if !tt.wantErr(t, err, fmt.Sprintf("processMetrics(%v)", tt.input())) { + return + } + targetMetric := got.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0) + assert.Equal(t, 1, targetMetric.Attributes().Len()) + _, usedResource := targetMetric.Attributes().Get("one_one_zero_output") + _, usedScope := targetMetric.Attributes().Get("one_two_zero_output") + _, usedNoop := targetMetric.Attributes().Get("input") + switch tt.whichSchemaUsed { + case ResourceSchemaVersionUsed: + assert.True(t, usedResource, "processMetrics(%v) not using correct schema,, attributes present: %v", tt.name, targetMetric.Attributes().AsRaw()) + case ScopeSchemaVersionUsed: + assert.True(t, usedScope, "processMetrics(%v) not using correct schema, attributes present: %v", tt.name, targetMetric.Attributes().AsRaw()) + case NoopSchemaUsed: + assert.True(t, usedNoop, "processMetrics(%v) not using correct schema,, attributes present: %v", tt.name, targetMetric.Attributes().AsRaw()) + } + }) + } +} diff --git a/processor/schemaprocessor/testdata/schema_sections/all/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/all/1.1.0 new file mode 100644 index 000000000000..91ba999037f8 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/all/1.1.0 @@ -0,0 +1,17 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/all/1.1.0 + +versions: + 1.1.0: + all: + changes: + - rename_attributes: + attribute_map: + kubernetes.cluster.name: koobernetes.cluster.name + docker.image.name: d4r.image.name + - rename_attributes: + attribute_map: + koobernetes.cluster.name: k8s.cluster.name + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/all/logs_in.json b/processor/schemaprocessor/testdata/schema_sections/all/logs_in.json new file mode 100644 index 000000000000..43d9efd7a8f3 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/all/logs_in.json @@ -0,0 +1,68 @@ +{ + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "asdf", + "value": { + "stringValue": "1" + } + } + ] + }, + "scopeLogs": [ + { + "logRecords": [ + { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "sancia" + } + }, + { + "key": "d4r.image.name", + "value": { + "stringValue": "flooentd" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.1.0" + }, + { + "logRecords": [ + { + "attributes": [ + { + "key": "don't", + "value": { + "stringValue": "change" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.1.0" + }, + { + "resource": { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "tevanne" + } + } + ] + }, + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/all/logs_out.json b/processor/schemaprocessor/testdata/schema_sections/all/logs_out.json new file mode 100644 index 000000000000..dd2c9d5da44b --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/all/logs_out.json @@ -0,0 +1,68 @@ +{ + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "asdf", + "value": { + "stringValue": "1" + } + } + ] + }, + "scopeLogs": [ + { + "logRecords": [ + { + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "sancia" + } + }, + { + "key": "docker.image.name", + "value": { + "stringValue": "flooentd" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.0.0" + }, + { + "logRecords": [ + { + "attributes": [ + { + "key": "don't", + "value": { + "stringValue": "change" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.0.0" + }, + { + "resource": { + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "tevanne" + } + } + ] + }, + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/1.1.0 new file mode 100644 index 000000000000..d80d34f4342b --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/1.1.0 @@ -0,0 +1,17 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/logs_rename_attributes/1.1.0 + +versions: + 1.1.0: + logs: + changes: + - rename_attributes: + attribute_map: + kubernetes.cluster.name: koobernetes.cluster.name + docker.image.name: d4r.image.name + - rename_attributes: + attribute_map: + koobernetes.cluster.name: k8s.cluster.name + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_in.json b/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_in.json new file mode 100644 index 000000000000..89530320e976 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_in.json @@ -0,0 +1,68 @@ +{ + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "asdf", + "value": { + "stringValue": "1" + } + } + ] + }, + "scopeLogs": [ + { + "logRecords": [ + { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "sancia" + } + }, + { + "key": "d4r.image.name", + "value": { + "stringValue": "flooentd" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.1.0" + }, + { + "logRecords": [ + { + "attributes": [ + { + "key": "don't", + "value": { + "stringValue": "change" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.1.0" + }, + { + "resource": { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "tevanne" + } + } + ] + }, + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_out.json b/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_out.json new file mode 100644 index 000000000000..8752bb6789c0 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_out.json @@ -0,0 +1,68 @@ +{ + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "asdf", + "value": { + "stringValue": "1" + } + } + ] + }, + "scopeLogs": [ + { + "logRecords": [ + { + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "sancia" + } + }, + { + "key": "docker.image.name", + "value": { + "stringValue": "flooentd" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.0.0" + }, + { + "logRecords": [ + { + "attributes": [ + { + "key": "don't", + "value": { + "stringValue": "change" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.0.0" + }, + { + "resource": { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "tevanne" + } + } + ] + }, + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/1.1.0 new file mode 100644 index 000000000000..e60af6d44ff7 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/1.1.0 @@ -0,0 +1,21 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/metrics_rename_attributes/1.1.0 + +versions: + 1.1.0: + metrics: + changes: + - rename_attributes: + attribute_map: + kubernetes.cluster.name: koobernetes.cluster.name + - rename_attributes: + attribute_map: + koobernetes.cluster.name: k8s.cluster.name + - rename_attributes: + attribute_map: + docker.image.name: d4r.image.name + apply_to_metrics: + - testMetricTwo + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_in.json b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_in.json new file mode 100644 index 000000000000..8f83cb22e689 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_in.json @@ -0,0 +1,111 @@ +{ + "resourceMetrics": [ + { + "scopeMetrics": [ + { + "metrics": [ + { + "name": "testMetricOne", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + }, + { + "timeUnixNano": "1725146753842860000", + "asInt": "120", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ] + } + }, + { + "name": "testMetricTwo", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + }, + { + "timeUnixNano": "1725146753842860000", + "asInt": "120", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ] + } + }, + { + "name": "testMetricThree", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_attributes/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_attributes/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_out.json b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_out.json new file mode 100644 index 000000000000..9343fb88c4a7 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_out.json @@ -0,0 +1,111 @@ +{ + "resourceMetrics": [ + { + "scopeMetrics": [ + { + "metrics": [ + { + "name": "testMetricOne", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + }, + { + "timeUnixNano": "1725146753842860000", + "asInt": "120", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ] + } + }, + { + "name": "testMetricTwo", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "docker.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + }, + { + "timeUnixNano": "1725146753842860000", + "asInt": "120", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "docker.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ] + } + }, + { + "name": "testMetricThree", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_attributes/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_attributes/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/1.1.0 new file mode 100644 index 000000000000..fe60e6b71a1a --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/1.1.0 @@ -0,0 +1,15 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/metrics_rename_metrics/1.1.0 + +versions: + 1.1.0: + metrics: + changes: + - rename_metrics: + testMetric1: testMetricUno + testMetric2: testMetricTwo + - rename_metrics: + testMetricUno: testMetricOne + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_in.json b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_in.json new file mode 100644 index 000000000000..d6f816085d21 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_in.json @@ -0,0 +1,59 @@ +{ + "resourceMetrics": [ + { + "scopeMetrics": [ + { + "scope": { + "name": "instrumentation name", + "version": "instrumentation version", + "attributes": [ + { + "key": "instrumentation.attribute", + "value": { + "stringValue": "test" + } + } + ] + }, + "metrics": [ + { + "name": "testMetricOne", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + }, + { + "name": "testMetricTwo", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + }, + { + "name": "testMetricThree", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_metrics/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_metrics/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_out.json b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_out.json new file mode 100644 index 000000000000..0ad7d4bd25c5 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_out.json @@ -0,0 +1,59 @@ +{ + "resourceMetrics": [ + { + "scopeMetrics": [ + { + "scope": { + "name": "instrumentation name", + "version": "instrumentation version", + "attributes": [ + { + "key": "instrumentation.attribute", + "value": { + "stringValue": "test" + } + } + ] + }, + "metrics": [ + { + "name": "testMetric1", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + }, + { + "name": "testMetric2", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + }, + { + "name": "testMetricThree", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_metrics/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_metrics/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/resources/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/resources/1.1.0 new file mode 100644 index 000000000000..8a862b1ee750 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/resources/1.1.0 @@ -0,0 +1,17 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/resources/1.1.0 + +versions: + 1.1.0: + resources: + changes: + - rename_attributes: + attribute_map: + kubernetes.cluster.name: koobernetes.cluster.name + docker.image.name: d4r.image.name + - rename_attributes: + attribute_map: + koobernetes.cluster.name: k8s.cluster.name + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/resources/logs_in.json b/processor/schemaprocessor/testdata/schema_sections/resources/logs_in.json new file mode 100644 index 000000000000..e2ad6adc1065 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/resources/logs_in.json @@ -0,0 +1,68 @@ +{ + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "asdf", + "value": { + "stringValue": "1" + } + } + ] + }, + "scopeLogs": [ + { + "logRecords": [ + { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "sancia" + } + }, + { + "key": "d4r.image.name", + "value": { + "stringValue": "flooentd" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.1.0" + }, + { + "logRecords": [ + { + "attributes": [ + { + "key": "don't", + "value": { + "stringValue": "change" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.1.0" + }, + { + "resource": { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "tevanne" + } + } + ] + }, + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/resources/logs_out.json b/processor/schemaprocessor/testdata/schema_sections/resources/logs_out.json new file mode 100644 index 000000000000..dfa9bb90cfde --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/resources/logs_out.json @@ -0,0 +1,68 @@ +{ + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "asdf", + "value": { + "stringValue": "1" + } + } + ] + }, + "scopeLogs": [ + { + "logRecords": [ + { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "sancia" + } + }, + { + "key": "d4r.image.name", + "value": { + "stringValue": "flooentd" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.0.0" + }, + { + "logRecords": [ + { + "attributes": [ + { + "key": "don't", + "value": { + "stringValue": "change" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.0.0" + }, + { + "resource": { + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "tevanne" + } + } + ] + }, + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/1.1.0 new file mode 100644 index 000000000000..3b7edd3f9795 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/1.1.0 @@ -0,0 +1,35 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/spans/1.1.0 + +# rename attributes with no conditions, with apply to spans, with apply to events, with BOTH apply to spans and apply to events + +versions: + 1.1.0: + span_events: + changes: + - rename_attributes: + attribute_map: + kubernetes.cluster.name: koobernetes.cluster.name + - rename_attributes: + attribute_map: + koobernetes.cluster.name: k8s.cluster.name + - rename_attributes: + attribute_map: + docker.image.name: d4r.image.name + apply_to_spans: + - testSpan1 + - rename_attributes: + attribute_map: + cloud.provider: provider + apply_to_events: + - testEventOne + - rename_attributes: + attribute_map: + db.cassandra.keyspace: db.name + apply_to_spans: + - testSpan2 + apply_to_events: + - testEventTwo + # # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_in.json b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_in.json new file mode 100644 index 000000000000..848abedb5a30 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_in.json @@ -0,0 +1,128 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "testHost" + } + }, + { + "key": "service.name", + "value": { + "stringValue": "testService" + } + } + ] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "testSpan1", + "events": [ + { + "name": "testEventOne", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + }, + { + "key": "provider", + "value": { + "stringValue": "aws" + } + } + ] + }, + { + "name": "testEventTwo", + "attributes": [ + { + "key": "db.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "provider", + "value": { + "stringValue": "gcp" + } + } + ] + }, + { + "name": "testEventThree", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ] + } + ] + }, + { + "name": "testSpan2", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ], + "events": [ + { + "name": "testEventTwo", + "attributes": [ + { + "key": "db.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "provider", + "value": { + "stringValue": "aws" + } + } + ] + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_attributes/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_attributes/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_out.json b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_out.json new file mode 100644 index 000000000000..3ef8df0a88b5 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_out.json @@ -0,0 +1,128 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "testHost" + } + }, + { + "key": "service.name", + "value": { + "stringValue": "testService" + } + } + ] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "testSpan1", + "events": [ + { + "name": "testEventOne", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "docker.image.name", + "value": { + "doubleValue": 1.1 + } + }, + { + "key": "cloud.provider", + "value": { + "stringValue": "aws" + } + } + ] + }, + { + "name": "testEventTwo", + "attributes": [ + { + "key": "db.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "provider", + "value": { + "stringValue": "gcp" + } + } + ] + }, + { + "name": "testEventThree", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ] + } + ] + }, + { + "name": "testSpan2", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ], + "events": [ + { + "name": "testEventTwo", + "attributes": [ + { + "key": "db.cassandra.keyspace", + "value": { + "stringValue": "value" + } + }, + { + "key": "provider", + "value": { + "stringValue": "aws" + } + } + ] + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_attributes/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_attributes/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/1.1.0 new file mode 100644 index 000000000000..1655e562af86 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/1.1.0 @@ -0,0 +1,17 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/span_events_rename_spans/1.1.0 + +versions: + 1.1.0: + span_events: + changes: + - rename_events: + name_map: + testEvent1: testEventUno + testEvent2: testEventTwo + - rename_events: + name_map: + testEventUno: testEventOne + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_in.json b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_in.json new file mode 100644 index 000000000000..041558c66829 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_in.json @@ -0,0 +1,123 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "testHost" + } + }, + { + "key": "service.name", + "value": { + "stringValue": "testService" + } + } + ] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "testSpan1", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "double", + "value": { + "doubleValue": 1.1 + } + } + ], + "events": [ + { + "timeUnixNano": "1684620382541971000", + "name": "testEventOne", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ], + "droppedAttributesCount": 1 + }, + { + "timeUnixNano": "1684620382541971000", + "name": "testEventTwo", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ], + "droppedAttributesCount": 1 + }, + { + "timeUnixNano": "1684620382541971000", + "name": "testEventThree", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ], + "droppedAttributesCount": 1 + } + ] + }, + { + "name": "testSpan2", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "double", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_spans/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_spans/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_out.json b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_out.json new file mode 100644 index 000000000000..4c0c852c668e --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_out.json @@ -0,0 +1,123 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "testHost" + } + }, + { + "key": "service.name", + "value": { + "stringValue": "testService" + } + } + ] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "testSpan1", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "double", + "value": { + "doubleValue": 1.1 + } + } + ], + "events": [ + { + "timeUnixNano": "1684620382541971000", + "name": "testEvent1", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ], + "droppedAttributesCount": 1 + }, + { + "timeUnixNano": "1684620382541971000", + "name": "testEvent2", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ], + "droppedAttributesCount": 1 + }, + { + "timeUnixNano": "1684620382541971000", + "name": "testEventThree", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ], + "droppedAttributesCount": 1 + } + ] + }, + { + "name": "testSpan2", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "double", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_spans/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_spans/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/spans/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/spans/1.1.0 new file mode 100644 index 000000000000..d8d3e3d76ddd --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/spans/1.1.0 @@ -0,0 +1,21 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/spans/1.1.0 + +versions: + 1.1.0: + spans: + changes: + - rename_attributes: + attribute_map: + kubernetes.cluster.name: koobernetes.cluster.name + - rename_attributes: + attribute_map: + koobernetes.cluster.name: k8s.cluster.name + - rename_attributes: + attribute_map: + docker.image.name: d4r.image.name + apply_to_spans: + - testSpan2 + # # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/spans/traces_in.json b/processor/schemaprocessor/testdata/schema_sections/spans/traces_in.json new file mode 100644 index 000000000000..ef832fb92666 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/spans/traces_in.json @@ -0,0 +1,64 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "testHost" + } + }, + { + "key": "service.name", + "value": { + "stringValue": "testService" + } + } + ] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "testSpan1", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + }, + { + "name": "testSpan2", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/spans/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/spans/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/spans/traces_out.json b/processor/schemaprocessor/testdata/schema_sections/spans/traces_out.json new file mode 100644 index 000000000000..be31af92ad3a --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/spans/traces_out.json @@ -0,0 +1,64 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "testHost" + } + }, + { + "key": "service.name", + "value": { + "stringValue": "testService" + } + } + ] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "testSpan1", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + }, + { + "name": "testSpan2", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "docker.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/spans/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/spans/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.0.0 b/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.0.0 new file mode 100644 index 000000000000..0d403cb3f6cc --- /dev/null +++ b/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.0.0 @@ -0,0 +1,19 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/testschemas/schemaprecedence/1.0.0 + +versions: + 1.2.0: + all: + changes: + - rename_attributes: + attribute_map: + input: one_two_zero_output + 1.1.0: + all: + changes: + - rename_attributes: + attribute_map: + input: one_one_zero_output + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.1.0 b/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.1.0 new file mode 100644 index 000000000000..24b79450cda2 --- /dev/null +++ b/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.1.0 @@ -0,0 +1,19 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/testschemas/schemaprecedence/1.1.0 + +versions: + 1.2.0: + all: + changes: + - rename_attributes: + attribute_map: + input: one_two_zero_output + 1.1.0: + all: + changes: + - rename_attributes: + attribute_map: + input: one_one_zero_output + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.2.0 b/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.2.0 new file mode 100644 index 000000000000..02ccbcaa6a8e --- /dev/null +++ b/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.2.0 @@ -0,0 +1,19 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/testschemas/schemaprecedence/1.2.0 + +versions: + 1.2.0: + all: + changes: + - rename_attributes: + attribute_map: + input: one_two_zero_output + 1.1.0: + all: + changes: + - rename_attributes: + attribute_map: + input: one_one_zero_output + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/transformer.go b/processor/schemaprocessor/transformer.go deleted file mode 100644 index cbe5f2bd0a0a..000000000000 --- a/processor/schemaprocessor/transformer.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor" - -import ( - "context" - "errors" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.opentelemetry.io/collector/processor" - "go.uber.org/zap" -) - -type transformer struct { - targets []string - log *zap.Logger -} - -func newTransformer( - _ context.Context, - conf component.Config, - set processor.Settings, -) (*transformer, error) { - cfg, ok := conf.(*Config) - if !ok { - return nil, errors.New("invalid configuration provided") - } - return &transformer{ - log: set.Logger, - targets: cfg.Targets, - }, nil -} - -func (t transformer) processLogs(_ context.Context, ld plog.Logs) (plog.Logs, error) { - return ld, nil -} - -func (t transformer) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { - return md, nil -} - -func (t transformer) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { - return td, nil -} - -// start will load the remote file definition if it isn't already cached -// and resolve the schema translation file -func (t *transformer) start(_ context.Context, _ component.Host) error { - for _, target := range t.targets { - t.log.Info("Fetching remote schema url", zap.String("schema-url", target)) - } - return nil -} diff --git a/processor/schemaprocessor/transformer_test.go b/processor/schemaprocessor/transformer_test.go deleted file mode 100644 index ad1a6efc462f..000000000000 --- a/processor/schemaprocessor/transformer_test.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package schemaprocessor - -import ( - "context" - _ "embed" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.opentelemetry.io/collector/processor" - "go.uber.org/zap/zaptest" -) - -func newTestTransformer(t *testing.T) *transformer { - trans, err := newTransformer(context.Background(), newDefaultConfiguration(), processor.Settings{ - TelemetrySettings: component.TelemetrySettings{ - Logger: zaptest.NewLogger(t), - }, - }) - require.NoError(t, err, "Must not error when creating default transformer") - return trans -} - -func TestTransformerStart(t *testing.T) { - t.Parallel() - - trans := newTestTransformer(t) - assert.NoError(t, trans.start(context.Background(), nil)) -} - -func TestTransformerProcessing(t *testing.T) { - t.Parallel() - - trans := newTestTransformer(t) - t.Run("metrics", func(t *testing.T) { - in := pmetric.NewMetrics() - in.ResourceMetrics().AppendEmpty() - in.ResourceMetrics().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0") - in.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty() - m := in.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() - m.SetName("test-data") - m.SetDescription("Only used throughout tests") - m.SetUnit("seconds") - m.CopyTo(in.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)) - - out, err := trans.processMetrics(context.Background(), in) - assert.NoError(t, err, "Must not error when processing metrics") - assert.Equal(t, in, out, "Must return the same data (subject to change)") - }) - - t.Run("traces", func(t *testing.T) { - in := ptrace.NewTraces() - in.ResourceSpans().AppendEmpty() - in.ResourceSpans().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0") - in.ResourceSpans().At(0).ScopeSpans().AppendEmpty() - s := in.ResourceSpans().At(0).ScopeSpans().At(0).Spans().AppendEmpty() - s.SetName("http.request") - s.SetKind(ptrace.SpanKindConsumer) - s.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}) - s.CopyTo(in.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)) - - out, err := trans.processTraces(context.Background(), in) - assert.NoError(t, err, "Must not error when processing metrics") - assert.Equal(t, in, out, "Must return the same data (subject to change)") - }) - - t.Run("logs", func(t *testing.T) { - in := plog.NewLogs() - in.ResourceLogs().AppendEmpty() - in.ResourceLogs().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0") - in.ResourceLogs().At(0).ScopeLogs().AppendEmpty() - l := in.ResourceLogs().At(0).ScopeLogs().At(0).Scope() - l.SetName("magical-logs") - l.SetVersion("alpha") - l.CopyTo(in.ResourceLogs().At(0).ScopeLogs().At(0).Scope()) - - out, err := trans.processLogs(context.Background(), in) - assert.NoError(t, err, "Must not error when processing metrics") - assert.Equal(t, in, out, "Must return the same data (subject to change)") - }) -}