From 96a23636512204bb133a758a3d3d65c580e688f2 Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Wed, 22 Jan 2025 00:23:43 -0500 Subject: [PATCH 1/5] split out manager provider --- .../internal/translation/manager.go | 131 ++++++++++++++++++ .../internal/translation/manager_test.go | 68 +++++++++ .../internal/translation/provider.go | 73 ++++++++++ .../internal/translation/provider_test.go | 52 +++++++ .../internal/translation/testdata/schema.yaml | 112 +++++++++++++++ 5 files changed, 436 insertions(+) create mode 100644 processor/schemaprocessor/internal/translation/manager.go create mode 100644 processor/schemaprocessor/internal/translation/manager_test.go create mode 100644 processor/schemaprocessor/internal/translation/provider.go create mode 100644 processor/schemaprocessor/internal/translation/provider_test.go create mode 100644 processor/schemaprocessor/internal/translation/testdata/schema.yaml diff --git a/processor/schemaprocessor/internal/translation/manager.go b/processor/schemaprocessor/internal/translation/manager.go new file mode 100644 index 000000000000..5d8431b89b51 --- /dev/null +++ b/processor/schemaprocessor/internal/translation/manager.go @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation" + +import ( + "context" + "errors" + "fmt" + "sync" + + "go.uber.org/zap" +) + +var errNilValueProvided = errors.New("nil value provided") + +// Manager is responsible for ensuring that schemas are kept up to date +// with the most recent version that are requested. +type Manager interface { + // RequestTranslation will provide either the defined Translation + // if it is a known target, or, return a noop variation. + // In the event that a matched Translation, on a missed version + // there is a potential to block during this process. + // Otherwise, the translation will allow concurrent reads. + RequestTranslation(ctx context.Context, schemaURL string) Translation + + // SetProviders will update the list of providers used by the manager + // to look up schemaURLs + SetProviders(providers ...Provider) error +} + +type manager struct { + log *zap.Logger + + rw sync.RWMutex + providers []Provider + match map[string]*Version + translations map[string]*translator +} + +var _ Manager = (*manager)(nil) + +// NewManager creates a manager that will allow for management +// of schema, the options allow for additional properties to be +// added to manager to enable additional locations of where to check +// for translations file. +func NewManager(targets []string, log *zap.Logger) (Manager, error) { + if log == nil { + return nil, fmt.Errorf("logger: %w", errNilValueProvided) + } + + match := make(map[string]*Version, len(targets)) + for _, target := range targets { + family, version, err := GetFamilyAndVersion(target) + if err != nil { + return nil, err + } + match[family] = version + } + + return &manager{ + log: log, + match: match, + translations: make(map[string]*translator), + }, nil +} + +func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Translation { + family, version, err := GetFamilyAndVersion(schemaURL) + if err != nil { + m.log.Debug("No valid schema url was provided, using no-op schema", + zap.String("schema-url", schemaURL), + ) + return nopTranslation{} + } + + target, match := m.match[family] + if !match { + m.log.Debug("Not a known target, providing Nop Translation", + zap.String("schema-url", schemaURL), + ) + return nopTranslation{} + } + + m.rw.RLock() + t, exists := m.translations[family] + m.rw.RUnlock() + + if exists && t.SupportedVersion(version) { + return t + } + + for _, p := range m.providers { + content, err := p.Lookup(ctx, schemaURL) + if err != nil { + m.log.Error("Failed to lookup schemaURL", + zap.Error(err), + zap.String("schemaURL", schemaURL), + ) + // todo(ankit) figure out what to do when the providers dont respond something good + } + t, err := newTranslatorFromReader( + m.log.Named("translator").With( + zap.String("family", family), + zap.Stringer("target", target), + ), + joinSchemaFamilyAndVersion(family, target), + content, + ) + if err != nil { + m.log.Error("Failed to create translator", zap.Error(err)) + continue + } + m.rw.Lock() + m.translations[family] = t + m.rw.Unlock() + return t + } + + return nopTranslation{} +} + +func (m *manager) SetProviders(providers ...Provider) error { + if len(providers) == 0 { + return fmt.Errorf("zero providers set: %w", errNilValueProvided) + } + m.rw.Lock() + m.providers = append(m.providers[:0], providers...) + m.rw.Unlock() + return nil +} diff --git a/processor/schemaprocessor/internal/translation/manager_test.go b/processor/schemaprocessor/internal/translation/manager_test.go new file mode 100644 index 000000000000..a6326d5a7679 --- /dev/null +++ b/processor/schemaprocessor/internal/translation/manager_test.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translation + +import ( + "context" + _ "embed" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +//go:embed testdata/schema.yaml +var exampleTranslation []byte + +func TranslationHandler(t *testing.T) http.Handler { + assert.NotEmpty(t, exampleTranslation, "SchemaContent MUST not be empty") + return http.HandlerFunc(func(wr http.ResponseWriter, _ *http.Request) { + _, err := wr.Write(exampleTranslation) + assert.NoError(t, err, "Must not have issues writing schema content") + }) +} + +func TestRequestTranslation(t *testing.T) { + t.Parallel() + + s := httptest.NewServer(TranslationHandler(t)) + t.Cleanup(s.Close) + + schemaURL := fmt.Sprintf("%s/1.1.0", s.URL) + + m, err := NewManager( + []string{schemaURL}, + zaptest.NewLogger(t), + ) + require.NoError(t, err, "Must not error when created manager") + require.NoError(t, m.SetProviders(NewHTTPProvider(s.Client())), "Must have no issues trying to set providers") + + nop, ok := m.RequestTranslation(context.Background(), "/not/a/valid/schema/URL").(nopTranslation) + require.True(t, ok, "Must return a NoopTranslation if no valid schema URL is provided") + require.NotNil(t, nop, "Must have a valid translation") + + tn, ok := m.RequestTranslation(context.Background(), schemaURL).(*translator) + require.True(t, ok, "Can cast to the concrete type") + require.NotNil(t, tn, "Must have a valid translation") + + assert.True(t, tn.SupportedVersion(&Version{1, 0, 0}), "Must have the version listed as supported") + + count := 0 + prevRev := &Version{1, 0, 0} + it, status := tn.iterator(prevRev) + assert.Equal(t, Update, status, "Must return a status of update") + for currRev, more := it(); more; currRev, more = it() { + assert.True(t, prevRev.LessThan(currRev.Version())) + prevRev = currRev.Version() + count++ + } + + tn, ok = m.RequestTranslation(context.Background(), schemaURL).(*translator) + require.True(t, ok, "Can cast to the concrete type") + require.NotNil(t, tn, "Must have a valid translation") +} diff --git a/processor/schemaprocessor/internal/translation/provider.go b/processor/schemaprocessor/internal/translation/provider.go new file mode 100644 index 000000000000..d82eb19d3414 --- /dev/null +++ b/processor/schemaprocessor/internal/translation/provider.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation" + +import ( + "bytes" + "context" + "embed" + "fmt" + "io" + "net/http" + "net/url" +) + +// Provider allows for collector extensions to be used to look up schemaURLs +type Provider interface { + // Lookup whill check the underlying provider to see if content exists + // for the provided schemaURL, in the even that it doesn't an error is returned. + Lookup(ctx context.Context, schemaURL string) (content io.Reader, err error) +} + +type httpProvider struct { + client *http.Client +} + +var _ Provider = (*httpProvider)(nil) + +func NewHTTPProvider(client *http.Client) Provider { + return &httpProvider{client: client} +} + +func (hp *httpProvider) Lookup(ctx context.Context, schemaURL string) (io.Reader, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, schemaURL, http.NoBody) + if err != nil { + return nil, err + } + resp, err := hp.client.Do(req) + if err != nil { + return nil, err + } + content := bytes.NewBuffer(nil) + if _, err := content.ReadFrom(resp.Body); err != nil { + return nil, err + } + if err := resp.Body.Close(); err != nil { + return nil, err + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("invalid status code returned: %d", resp.StatusCode) + } + return content, nil +} + +type testProvider struct { + fs *embed.FS +} + +func NewTestProvider(fs *embed.FS) Provider { + return &testProvider{fs: fs} +} + +func (tp testProvider) Lookup(_ context.Context, schemaURL string) (io.Reader, error) { + parsedPath, err := url.Parse(schemaURL) + if err != nil { + return nil, err + } + f, err := tp.fs.Open(parsedPath.Path[1:]) + if err != nil { + return nil, err + } + return f, nil +} diff --git a/processor/schemaprocessor/internal/translation/provider_test.go b/processor/schemaprocessor/internal/translation/provider_test.go new file mode 100644 index 000000000000..407cf25a8579 --- /dev/null +++ b/processor/schemaprocessor/internal/translation/provider_test.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translation + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestInvalidHTTPProviderTests(t *testing.T) { + t.Parallel() + + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.RequestURI != "/1.7.0" { + w.WriteHeader(http.StatusBadRequest) + return + } + _, err := io.Copy(w, LoadTranslationVersion(t, "complex_changeset.yml")) + assert.NoError(t, err, "Must not error when trying load dataset") + })) + t.Cleanup(s.Close) + + tests := []struct { + scenario string + url string + }{ + { + scenario: "A failed request happens", + url: fmt.Sprint(s.URL, "/not/a/valid/path/1.7.0"), + }, + { + scenario: "invalid url", + url: "unix:///localhost", + }, + } + + for _, tc := range tests { + t.Run(tc.scenario, func(t *testing.T) { + p := NewHTTPProvider(s.Client()) + content, err := p.Lookup(context.Background(), tc.url) + assert.Nil(t, content, "Expected to be nil") + assert.Error(t, err, "Must have errored processing request") + }) + } +} diff --git a/processor/schemaprocessor/internal/translation/testdata/schema.yaml b/processor/schemaprocessor/internal/translation/testdata/schema.yaml new file mode 100644 index 000000000000..ae7c1665bc78 --- /dev/null +++ b/processor/schemaprocessor/internal/translation/testdata/schema.yaml @@ -0,0 +1,112 @@ +# Defines the file format. MUST be set to 1.0.0. +file_format: 1.0.0 + +# The Schema URL that this file is published at. The version number in the URL +# MUST match the highest version number in the "versions" section below. +# Note: the schema version number in the URL is not related in any way to +# the file_format setting above. +schema_url: https://opentelemetry.io/schemas/1.1.0 + +# Definitions for each schema version in this family. +# Note: the ordering of versions is defined according to semver +# version number ordering rules. +versions: + 1.1.0: + # Definitions for version 1.1.0. + all: + # Definitions that apply to all data types. + changes: + # Transformations to apply when converting from version 1.0.0 to 1.1.0. + - rename_attributes: + attribute_map: + # map of key/values. The keys are the old attribute name used + # the previous version, the values are the new attribute name + # starting from this version. + # Rename k8s.* to kubernetes.* + k8s.cluster.name: kubernetes.cluster.name + k8s.namespace.name: kubernetes.namespace.name + k8s.node.name: kubernetes.node.name + k8s.node.uid: kubernetes.node.uid + k8s.pod.name: kubernetes.pod.name + k8s.pod.uid: kubernetes.pod.uid + k8s.container.name: kubernetes.container.name + k8s.replicaset.name: kubernetes.replicaset.name + k8s.replicaset.uid: kubernetes.replicaset.uid + k8s.cronjob.name: kubernetes.cronjob.name + k8s.cronjob.uid: kubernetes.cronjob.uid + k8s.job.name: kubernetes.job.name + k8s.job.uid: kubernetes.job.uid + k8s.statefulset.name: kubernetes.statefulset.name + k8s.statefulset.uid: kubernetes.statefulset.uid + k8s.daemonset.name: kubernetes.daemonset.name + k8s.daemonset.uid: kubernetes.daemonset.uid + k8s.deployment.name: kubernetes.deployment.name + k8s.deployment.uid: kubernetes.deployment.uid + + resources: + # Definitions that apply to Resource data type. + changes: + - rename_attributes: + attribute_map: + telemetry.auto.version: telemetry.auto_instr.version + + spans: + # Definitions that apply to Span data type. + changes: + - rename_attributes: + attribute_map: + # map of key/values. The keys are the old attribute name used + # in the previous version, the values are the new attribute name + # starting from this version. + peer.service: peer.service.name + apply_to_spans: + # apply only to spans named "HTTP GET" + - "HTTP GET" + + span_events: + # Definitions that apply to Span Event data type. + changes: + - rename_events: + # The keys are old event name used in the previous version, the + # values are the new event name starting from this version. + name_map: {stacktrace: stack_trace} + + - rename_attributes: + attribute_map: + peer.service: peer.service.name + apply_to_events: + # Optional event names to apply to. If empty applies to all events. + - exception.stack_trace + + metrics: + # Definitions that apply to Metric data type. + changes: + - rename_metrics: + # map of key/values. The keys are the old metric name used + # in the previous version, the values are the new metric name + # starting from this version. + container.cpu.usage.total: cpu.usage.total + container.memory.usage.max: memory.usage.max + + - rename_attributes: + attribute_map: + status: state + apply_to_metrics: + # Optional. If it is missing the transformation is applied + # to all metrics. If it is present the transformation is applied + # only to the metrics with the name that is found in the sequence + # specified below. + - system.cpu.utilization + - system.memory.usage + - system.memory.utilization + - system.paging.usage + + logs: + # Definitions that apply to LogRecord data type. + changes: + - rename_attributes: + attribute_map: + process.executable_name: process.executable.name + + 1.0.0: + # First version of this schema family. From 5fe87d3d9f0cfa5589da39629571cbb890769e4e Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Wed, 22 Jan 2025 13:53:51 -0500 Subject: [PATCH 2/5] remove nolint --- processor/schemaprocessor/internal/translation/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/schemaprocessor/internal/translation/version.go b/processor/schemaprocessor/internal/translation/version.go index f9d1888140ce..ec8048afa7e2 100644 --- a/processor/schemaprocessor/internal/translation/version.go +++ b/processor/schemaprocessor/internal/translation/version.go @@ -72,7 +72,7 @@ func GetFamilyAndVersion(schemaURL string) (family string, version *Version, err return u.String(), version, err } -func joinSchemaFamilyAndVersion(family string, version *Version) string { //nolint: unparam +func joinSchemaFamilyAndVersion(family string, version *Version) string { u, err := url.Parse(family) if err != nil { return "" From b5b947715e31c5137cd4cd8cfe61e6cea002ad74 Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Wed, 5 Feb 2025 12:52:11 -0500 Subject: [PATCH 3/5] add dinesh as codeowner --- processor/schemaprocessor/README.md | 2 +- processor/schemaprocessor/generated_package_test.go | 3 +-- processor/schemaprocessor/metadata.yaml | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/processor/schemaprocessor/README.md b/processor/schemaprocessor/README.md index 08e322121d71..be88823c1954 100644 --- a/processor/schemaprocessor/README.md +++ b/processor/schemaprocessor/README.md @@ -6,7 +6,7 @@ | Stability | [development]: traces, metrics, logs | | Distributions | [] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fschema%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fschema) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fschema%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fschema) | -| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96) | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96), [@dineshg13](https://www.github.com/dineshg13) | [development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development diff --git a/processor/schemaprocessor/generated_package_test.go b/processor/schemaprocessor/generated_package_test.go index 8593af713ccf..b4b9a973793d 100644 --- a/processor/schemaprocessor/generated_package_test.go +++ b/processor/schemaprocessor/generated_package_test.go @@ -3,9 +3,8 @@ package schemaprocessor import ( - "testing" - "go.uber.org/goleak" + "testing" ) func TestMain(m *testing.M) { diff --git a/processor/schemaprocessor/metadata.yaml b/processor/schemaprocessor/metadata.yaml index 731ef9a26e34..dc0962ce34b3 100644 --- a/processor/schemaprocessor/metadata.yaml +++ b/processor/schemaprocessor/metadata.yaml @@ -6,7 +6,7 @@ status: development: [traces, metrics, logs] distributions: [] codeowners: - active: [MovieStoreGuy, ankitpatel96] + active: [MovieStoreGuy, ankitpatel96, dineshg13] tests: config: From ef63c0b9a34836346bd09c5470229b814514a31f Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Wed, 5 Feb 2025 16:51:12 -0500 Subject: [PATCH 4/5] go generate --- processor/schemaprocessor/generated_package_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processor/schemaprocessor/generated_package_test.go b/processor/schemaprocessor/generated_package_test.go index b4b9a973793d..8593af713ccf 100644 --- a/processor/schemaprocessor/generated_package_test.go +++ b/processor/schemaprocessor/generated_package_test.go @@ -3,8 +3,9 @@ package schemaprocessor import ( - "go.uber.org/goleak" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { From f1689da4876fd106c2f59aae490c74eec81c07e7 Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Wed, 22 Jan 2025 13:43:05 -0500 Subject: [PATCH 5/5] inal processor chunk split out --- processor/schemaprocessor/DESIGN.md | 4 +- processor/schemaprocessor/factory.go | 8 +- processor/schemaprocessor/go.mod | 4 +- processor/schemaprocessor/processor.go | 157 +++++ processor/schemaprocessor/processor_test.go | 551 ++++++++++++++++++ .../testdata/schema_sections/all/1.1.0 | 17 + .../testdata/schema_sections/all/logs_in.json | 68 +++ .../schema_sections/all/logs_out.json | 68 +++ .../logs_rename_attributes/1.1.0 | 17 + .../logs_rename_attributes/logs_in.json | 68 +++ .../logs_rename_attributes/logs_out.json | 68 +++ .../metrics_rename_attributes/1.1.0 | 21 + .../metrics_rename_attributes/metrics_in.json | 111 ++++ .../metrics_out.json | 111 ++++ .../metrics_rename_metrics/1.1.0 | 15 + .../metrics_rename_metrics/metrics_in.json | 59 ++ .../metrics_rename_metrics/metrics_out.json | 59 ++ .../testdata/schema_sections/resources/1.1.0 | 17 + .../schema_sections/resources/logs_in.json | 68 +++ .../schema_sections/resources/logs_out.json | 68 +++ .../span_events_rename_attributes/1.1.0 | 35 ++ .../traces_in.json | 128 ++++ .../traces_out.json | 128 ++++ .../span_events_rename_spans/1.1.0 | 17 + .../span_events_rename_spans/traces_in.json | 123 ++++ .../span_events_rename_spans/traces_out.json | 123 ++++ .../testdata/schema_sections/spans/1.1.0 | 21 + .../schema_sections/spans/traces_in.json | 64 ++ .../schema_sections/spans/traces_out.json | 64 ++ .../testschemas/schemaprecedence/1.0.0 | 19 + .../testschemas/schemaprecedence/1.1.0 | 19 + .../testschemas/schemaprecedence/1.2.0 | 19 + processor/schemaprocessor/transformer.go | 57 -- processor/schemaprocessor/transformer_test.go | 88 --- 34 files changed, 2311 insertions(+), 153 deletions(-) create mode 100644 processor/schemaprocessor/processor.go create mode 100644 processor/schemaprocessor/processor_test.go create mode 100644 processor/schemaprocessor/testdata/schema_sections/all/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/all/logs_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/all/logs_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/resources/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/resources/logs_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/resources/logs_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_out.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/spans/1.1.0 create mode 100644 processor/schemaprocessor/testdata/schema_sections/spans/traces_in.json create mode 100644 processor/schemaprocessor/testdata/schema_sections/spans/traces_out.json create mode 100644 processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.0.0 create mode 100644 processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.1.0 create mode 100644 processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.2.0 delete mode 100644 processor/schemaprocessor/transformer.go delete mode 100644 processor/schemaprocessor/transformer_test.go diff --git a/processor/schemaprocessor/DESIGN.md b/processor/schemaprocessor/DESIGN.md index ae70dce24c60..ed074a8b4ed6 100644 --- a/processor/schemaprocessor/DESIGN.md +++ b/processor/schemaprocessor/DESIGN.md @@ -19,8 +19,8 @@ graph LR; end ``` -The [Transformer](transformer.go) is registered as a Processor in the Collector by the factory. -Data flows into the Transformer, which uses the Schema URL to fetch the translation from the Translation Manager. +The [schemaprocessor](processor.go) is registered as a Processor in the Collector by the factory. +Data flows into the Processor, which uses the Schema URL to fetch the translation from the Translation Manager. The Translation Manager (at internal/translation/manager.go in a future PR) is responsible for fetching and caching the translations. It takes in a schema URL and returns a Translator struct. The Translator struct contains the target schema URL, the target schema version, and a list of Revisions. The Translator figures out what the version of the incoming data is and what Revisions to apply to the incoming data to get it to the target schema version. The Translator is also responsible for applying the Revisions to the incoming data - it iterates through these Revisions and applies them to the incoming data. diff --git a/processor/schemaprocessor/factory.go b/processor/schemaprocessor/factory.go index 68525c6afb78..fb8e73fe327f 100644 --- a/processor/schemaprocessor/factory.go +++ b/processor/schemaprocessor/factory.go @@ -22,7 +22,7 @@ var processorCapabilities = consumer.Capabilities{MutatesData: true} // factory will store any of the precompiled schemas in future type factory struct{} -// newDefaultConfiguration returns the configuration for schema transformer processor +// newDefaultConfiguration returns the configuration for schema processor // with the default values being used throughout it func newDefaultConfiguration() component.Config { return &Config{ @@ -47,7 +47,7 @@ func (f factory) createLogsProcessor( cfg component.Config, next consumer.Logs, ) (processor.Logs, error) { - transformer, err := newTransformer(ctx, cfg, set) + transformer, err := newSchemaProcessor(ctx, cfg, set) if err != nil { return nil, err } @@ -68,7 +68,7 @@ func (f factory) createMetricsProcessor( cfg component.Config, next consumer.Metrics, ) (processor.Metrics, error) { - transformer, err := newTransformer(ctx, cfg, set) + transformer, err := newSchemaProcessor(ctx, cfg, set) if err != nil { return nil, err } @@ -89,7 +89,7 @@ func (f factory) createTracesProcessor( cfg component.Config, next consumer.Traces, ) (processor.Traces, error) { - transformer, err := newTransformer(ctx, cfg, set) + transformer, err := newSchemaProcessor(ctx, cfg, set) if err != nil { return nil, err } diff --git a/processor/schemaprocessor/go.mod b/processor/schemaprocessor/go.mod index 9c7dfac2a207..977054adc0f6 100644 --- a/processor/schemaprocessor/go.mod +++ b/processor/schemaprocessor/go.mod @@ -13,8 +13,10 @@ require ( go.opentelemetry.io/collector/consumer v1.25.0 go.opentelemetry.io/collector/consumer/consumertest v0.119.0 go.opentelemetry.io/collector/pdata v1.25.0 + go.opentelemetry.io/collector/pipeline v0.119.0 go.opentelemetry.io/collector/processor v0.119.0 go.opentelemetry.io/collector/processor/processortest v0.119.0 + go.opentelemetry.io/otel/metric v1.34.0 go.opentelemetry.io/otel/schema v0.0.12 go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 @@ -59,11 +61,9 @@ require ( go.opentelemetry.io/collector/extension/auth v0.119.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.119.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.119.0 // indirect - go.opentelemetry.io/collector/pipeline v0.119.0 // indirect go.opentelemetry.io/collector/processor/xprocessor v0.119.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect go.opentelemetry.io/otel v1.34.0 // indirect - go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/otel/sdk v1.34.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect diff --git a/processor/schemaprocessor/processor.go b/processor/schemaprocessor/processor.go new file mode 100644 index 000000000000..1146897155f5 --- /dev/null +++ b/processor/schemaprocessor/processor.go @@ -0,0 +1,157 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor" + +import ( + "context" + "errors" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation" +) + +type schemaprocessor struct { + telemetry component.TelemetrySettings + config *Config + + log *zap.Logger + + manager translation.Manager +} + +func newSchemaProcessor(_ context.Context, conf component.Config, set processor.Settings) (*schemaprocessor, error) { + cfg, ok := conf.(*Config) + if !ok { + return nil, errors.New("invalid configuration provided") + } + + m, err := translation.NewManager( + cfg.Targets, + set.Logger.Named("schema-manager"), + ) + if err != nil { + return nil, err + } + + return &schemaprocessor{ + config: cfg, + telemetry: set.TelemetrySettings, + log: set.Logger, + manager: m, + }, nil +} + +func (t schemaprocessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { + for rl := 0; rl < ld.ResourceLogs().Len(); rl++ { + rLog := ld.ResourceLogs().At(rl) + resourceSchemaURL := rLog.SchemaUrl() + err := t.manager. + RequestTranslation(ctx, resourceSchemaURL). + ApplyAllResourceChanges(rLog, resourceSchemaURL) + if err != nil { + return plog.Logs{}, err + } + for sl := 0; sl < rLog.ScopeLogs().Len(); sl++ { + log := rLog.ScopeLogs().At(sl) + logSchemaURL := log.SchemaUrl() + if logSchemaURL == "" { + logSchemaURL = resourceSchemaURL + } + + err := t.manager. + RequestTranslation(ctx, logSchemaURL). + ApplyScopeLogChanges(log, logSchemaURL) + if err != nil { + return plog.Logs{}, err + } + } + } + return ld, nil +} + +func (t schemaprocessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { + for rm := 0; rm < md.ResourceMetrics().Len(); rm++ { + rMetric := md.ResourceMetrics().At(rm) + resourceSchemaURL := rMetric.SchemaUrl() + err := t.manager. + RequestTranslation(ctx, resourceSchemaURL). + ApplyAllResourceChanges(rMetric, resourceSchemaURL) + if err != nil { + return pmetric.Metrics{}, err + } + for sm := 0; sm < rMetric.ScopeMetrics().Len(); sm++ { + metric := rMetric.ScopeMetrics().At(sm) + metricSchemaURL := metric.SchemaUrl() + if metricSchemaURL == "" { + metricSchemaURL = resourceSchemaURL + } + err := t.manager. + RequestTranslation(ctx, metricSchemaURL). + ApplyScopeMetricChanges(metric, metricSchemaURL) + if err != nil { + return pmetric.Metrics{}, err + } + } + } + return md, nil +} + +func (t schemaprocessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { + for rt := 0; rt < td.ResourceSpans().Len(); rt++ { + rTrace := td.ResourceSpans().At(rt) + // todo(ankit) do i need to check if this is empty? + resourceSchemaURL := rTrace.SchemaUrl() + err := t.manager. + RequestTranslation(ctx, resourceSchemaURL). + ApplyAllResourceChanges(rTrace, resourceSchemaURL) + if err != nil { + return ptrace.Traces{}, err + } + for ss := 0; ss < rTrace.ScopeSpans().Len(); ss++ { + span := rTrace.ScopeSpans().At(ss) + spanSchemaURL := span.SchemaUrl() + if spanSchemaURL == "" { + spanSchemaURL = resourceSchemaURL + } + err := t.manager. + RequestTranslation(ctx, spanSchemaURL). + ApplyScopeSpanChanges(span, spanSchemaURL) + if err != nil { + return ptrace.Traces{}, err + } + } + } + return td, nil +} + +// start will load the remote file definition if it isn't already cached +// and resolve the schema translation file +func (t *schemaprocessor) start(ctx context.Context, host component.Host) error { + var providers []translation.Provider + // Check for additional extensions that can be checked first before + // perfomring the http request + // TODO(MovieStoreGuy): Check for storage extensions + + client, err := t.config.ToClient(ctx, host, t.telemetry) + if err != nil { + return err + } + + if err := t.manager.SetProviders(append(providers, translation.NewHTTPProvider(client))...); err != nil { + return err + } + go func(ctx context.Context) { + for _, schemaURL := range t.config.Prefetch { + _ = t.manager.RequestTranslation(ctx, schemaURL) + } + }(ctx) + + return nil +} diff --git a/processor/schemaprocessor/processor_test.go b/processor/schemaprocessor/processor_test.go new file mode 100644 index 000000000000..925f6018984a --- /dev/null +++ b/processor/schemaprocessor/processor_test.go @@ -0,0 +1,551 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package schemaprocessor + +import ( + "context" + "embed" + _ "embed" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/otel/metric/noop" + "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation" +) + +func newTestProcessor(t *testing.T) *schemaprocessor { + trans, err := newSchemaProcessor(context.Background(), newDefaultConfiguration(), processor.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + MeterProvider: noop.MeterProvider{}, + }, + }) + require.NoError(t, err, "Must not error when creating default schemaprocessor") + return trans +} + +func TestProcessorStart(t *testing.T) { + t.Parallel() + + trans := newTestProcessor(t) + assert.NoError(t, trans.start(context.Background(), componenttest.NewNopHost())) +} + +//go:embed testdata +var f embed.FS + +func plogsFromJSON(t *testing.T, path string) plog.Logs { + t.Helper() + unmarshaler := plog.JSONUnmarshaler{} + signalJSON, err := f.ReadFile(path) + require.NoError(t, err) + inSignals, err := unmarshaler.UnmarshalLogs(signalJSON) + require.NoError(t, err) + require.NotNil(t, inSignals.ResourceLogs()) + require.NotEqual(t, 0, inSignals.LogRecordCount()) + return inSignals +} + +func ptracesFromJSON(t *testing.T, path string) ptrace.Traces { + t.Helper() + unmarshaler := ptrace.JSONUnmarshaler{} + signalJSON, err := f.ReadFile(path) + require.NoError(t, err) + inSignals, err := unmarshaler.UnmarshalTraces(signalJSON) + require.NoError(t, err) + require.NotNil(t, inSignals.ResourceSpans()) + require.NotEqual(t, 0, inSignals.SpanCount()) + return inSignals +} + +func pmetricsFromJSON(t *testing.T, path string) pmetric.Metrics { + t.Helper() + unmarshaler := pmetric.JSONUnmarshaler{} + signalJSON, err := f.ReadFile(path) + require.NoError(t, err) + inSignals, err := unmarshaler.UnmarshalMetrics(signalJSON) + require.NoError(t, err) + require.NotNil(t, inSignals.ResourceMetrics()) + require.NotEqual(t, 0, inSignals.MetricCount()) + return inSignals +} + +func buildTestProcessor(t *testing.T, targetURL string) *schemaprocessor { + t.Helper() + defaultConfig := newDefaultConfiguration() + castedConfig := defaultConfig.(*Config) + castedConfig.Targets = []string{targetURL} + processor, err := newSchemaProcessor(context.Background(), castedConfig, processor.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + }, + }) + require.NoError(t, err, "Must not error when creating schemaprocessor") + err = processor.manager.SetProviders(translation.NewTestProvider(&f)) + require.NoError(t, err) + return processor +} + +func TestProcessorSchemaBySections(t *testing.T) { + tests := []struct { + name string + section string + dataType pipeline.Signal + }{ + { + // todo(ankit) do i need to test all data types here? + name: "all_logs", + section: "all", + dataType: pipeline.SignalLogs, + }, + { + // todo(ankit) do i need to test all data types here? + name: "resources_logs", + section: "resources", + dataType: pipeline.SignalLogs, + }, + { + name: "spans", + section: "spans", + dataType: pipeline.SignalTraces, + }, + { + name: "span_events_rename_spans", + section: "span_events_rename_spans", + dataType: pipeline.SignalTraces, + }, + { + name: "span_events_rename_attributes", + section: "span_events_rename_attributes", + dataType: pipeline.SignalTraces, + }, + { + name: "metrics_rename_metrics", + section: "metrics_rename_metrics", + dataType: pipeline.SignalMetrics, + }, + { + name: "metrics_rename_attributes", + section: "metrics_rename_attributes", + dataType: pipeline.SignalMetrics, + }, + { + name: "logs_rename_attributes", + section: "logs_rename_attributes", + dataType: pipeline.SignalLogs, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + processorTarget := fmt.Sprintf("https://example.com/testdata/schema_sections/%s/1.0.0", tc.section) + processor := buildTestProcessor(t, processorTarget) + inDataPath := fmt.Sprintf("testdata/schema_sections/%s/%s_in.json", tc.section, tc.dataType) + outDataPath := fmt.Sprintf("testdata/schema_sections/%s/%s_out.json", tc.section, tc.dataType) + switch tc.dataType { + case pipeline.SignalLogs: + inLogs := plogsFromJSON(t, inDataPath) + expected := plogsFromJSON(t, outDataPath) + + logs, err := processor.processLogs(context.Background(), inLogs) + require.NoError(t, err) + require.NoError(t, plogtest.CompareLogs(expected, logs), "Must match the expected values") + case pipeline.SignalMetrics: + inMetrics := pmetricsFromJSON(t, inDataPath) + expected := pmetricsFromJSON(t, outDataPath) + + metrics, err := processor.processMetrics(context.Background(), inMetrics) + require.NoError(t, err) + require.NoError(t, pmetrictest.CompareMetrics(expected, metrics), "Must match the expected values") + case pipeline.SignalTraces: + inTraces := ptracesFromJSON(t, inDataPath) + expected := ptracesFromJSON(t, outDataPath) + + traces, err := processor.processTraces(context.Background(), inTraces) + require.NoError(t, err) + require.NoError(t, ptracetest.CompareTraces(expected, traces), "Must match the expected values") + default: + require.FailNow(t, "unrecognized data type") + return + } + }) + } +} + +func TestProcessorProcessing(t *testing.T) { + t.Parallel() + + trans := newTestProcessor(t) + t.Run("metrics", func(t *testing.T) { + in := pmetric.NewMetrics() + in.ResourceMetrics().AppendEmpty() + in.ResourceMetrics().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0") + in.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty() + m := in.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() + m.SetName("test-data") + m.SetDescription("Only used throughout tests") + m.SetUnit("seconds") + m.CopyTo(in.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)) + + out, err := trans.processMetrics(context.Background(), in) + assert.NoError(t, err, "Must not error when processing metrics") + assert.Equal(t, in, out, "Must return the same data (subject to change)") + }) + + t.Run("traces", func(t *testing.T) { + in := ptrace.NewTraces() + in.ResourceSpans().AppendEmpty() + in.ResourceSpans().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0") + in.ResourceSpans().At(0).ScopeSpans().AppendEmpty() + s := in.ResourceSpans().At(0).ScopeSpans().At(0).Spans().AppendEmpty() + s.SetName("http.request") + s.SetKind(ptrace.SpanKindConsumer) + s.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}) + s.CopyTo(in.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)) + + out, err := trans.processTraces(context.Background(), in) + assert.NoError(t, err, "Must not error when processing metrics") + assert.Equal(t, in, out, "Must return the same data (subject to change)") + }) + + t.Run("logs", func(t *testing.T) { + in := plog.NewLogs() + in.ResourceLogs().AppendEmpty() + in.ResourceLogs().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0") + in.ResourceLogs().At(0).ScopeLogs().AppendEmpty() + l := in.ResourceLogs().At(0).ScopeLogs().At(0).Scope() + l.SetName("magical-logs") + l.SetVersion("alpha") + l.CopyTo(in.ResourceLogs().At(0).ScopeLogs().At(0).Scope()) + + out, err := trans.processLogs(context.Background(), in) + assert.NoError(t, err, "Must not error when processing metrics") + assert.Equal(t, in, out, "Must return the same data (subject to change)") + }) +} + +type SchemaUsed int + +const ( + ResourceSchemaVersionUsed = iota + 1 + ScopeSchemaVersionUsed + NoopSchemaUsed +) + +// returns a test log with no schema versions set +func generateLogForTest() plog.Logs { + in := plog.NewLogs() + in.ResourceLogs().AppendEmpty() + in.ResourceLogs().At(0).ScopeLogs().AppendEmpty() + l := in.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty() + l.Attributes().PutStr("input", "test") + return in +} + +//go:embed testdata/testschemas +var testdataFiles embed.FS + +// case 1: resource schema set, scope schema not set, use resource schema +// case 2: resource schema not set, scope schema set, use scope schema inside +// case 3: resource schema set, scope schema set, use scope schema +// case 4: resource schema not set, scope schema not set, noop translation +func TestProcessorScopeLogSchemaPrecedence(t *testing.T) { + t.Parallel() + tests := []struct { + name string + input func() plog.Logs + whichSchemaUsed SchemaUsed + wantErr assert.ErrorAssertionFunc + }{ + { + name: "resourcesetscopeunset", + input: func() plog.Logs { + log := generateLogForTest() + log.ResourceLogs().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.0.0") + return log + }, + whichSchemaUsed: ResourceSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourceunsetscopeset", + input: func() plog.Logs { + log := generateLogForTest() + log.ResourceLogs().At(0).ScopeLogs().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.1.0") + return log + }, + whichSchemaUsed: ScopeSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourcesetscopeset", + input: func() plog.Logs { + log := generateLogForTest() + log.ResourceLogs().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.0.0") + log.ResourceLogs().At(0).ScopeLogs().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.1.0") + + return log + }, + whichSchemaUsed: ScopeSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourceunsetscopeunset", + input: func() plog.Logs { + log := generateLogForTest() + return log + }, + // want: "https://example.com/testdata/testschemas/schemaprecedence/1.0.0", + whichSchemaUsed: NoopSchemaUsed, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defaultConfig := newDefaultConfiguration() + castedConfig := defaultConfig.(*Config) + castedConfig.Targets = []string{"https://example.com/testdata/testschemas/schemaprecedence/1.2.0"} + processor, err := newSchemaProcessor(context.Background(), defaultConfig, processor.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + }, + }) + require.NoError(t, err, "Must not error when creating schemaprocessor") + + err = processor.manager.SetProviders(translation.NewTestProvider(&testdataFiles)) + require.NoError(t, err) + got, err := processor.processLogs(context.Background(), tt.input()) + if !tt.wantErr(t, err, fmt.Sprintf("processLogs(%v)", tt.input())) { + return + } + targetLog := got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + assert.Equal(t, 1, targetLog.Attributes().Len()) + _, usedResource := targetLog.Attributes().Get("one_one_zero_output") + _, usedScope := targetLog.Attributes().Get("one_two_zero_output") + _, usedNoop := targetLog.Attributes().Get("input") + switch tt.whichSchemaUsed { + case ResourceSchemaVersionUsed: + assert.True(t, usedResource, "processLogs(%v) not using correct schema,, attributes present: %v", tt.name, targetLog.Attributes().AsRaw()) + case ScopeSchemaVersionUsed: + assert.True(t, usedScope, "processLogs(%v) not using correct schema, attributes present: %v", tt.name, targetLog.Attributes().AsRaw()) + case NoopSchemaUsed: + assert.True(t, usedNoop, "processLogs(%v) not using correct schema,, attributes present: %v", tt.name, targetLog.Attributes().AsRaw()) + } + }) + } +} + +// returns a test trace with no schema versions set +func generateTraceForTest() ptrace.Traces { + in := ptrace.NewTraces() + in.ResourceSpans().AppendEmpty() + in.ResourceSpans().At(0).ScopeSpans().AppendEmpty() + l := in.ResourceSpans().At(0).ScopeSpans().At(0).Spans().AppendEmpty() + l.Attributes().PutStr("input", "test") + return in +} + +// literally the exact same tests as above +// case 1: resource schema set, scope schema not set, use resource schema +// case 2: resource schema not set, scope schema set, use scope schema inside +// case 3: resource schema set, scope schema set, use scope schema +// case 4: resource schema not set, scope schema not set, noop translation +func TestProcessorScopeTraceSchemaPrecedence(t *testing.T) { + t.Parallel() + tests := []struct { + name string + input func() ptrace.Traces + whichSchemaUsed SchemaUsed + wantErr assert.ErrorAssertionFunc + }{ + { + name: "resourcesetscopeunset", + input: func() ptrace.Traces { + trace := generateTraceForTest() + trace.ResourceSpans().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.0.0") + return trace + }, + whichSchemaUsed: ResourceSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourceunsetscopeset", + input: func() ptrace.Traces { + trace := generateTraceForTest() + trace.ResourceSpans().At(0).ScopeSpans().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.1.0") + return trace + }, + whichSchemaUsed: ScopeSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourcesetscopeset", + input: func() ptrace.Traces { + trace := generateTraceForTest() + trace.ResourceSpans().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.0.0") + trace.ResourceSpans().At(0).ScopeSpans().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.1.0") + + return trace + }, + whichSchemaUsed: ScopeSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourceunsetscopeunset", + input: func() ptrace.Traces { + trace := generateTraceForTest() + return trace + }, + // want: "https://example.com/testdata/testschemas/schemaprecedence/1.0.0", + whichSchemaUsed: NoopSchemaUsed, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defaultConfig := newDefaultConfiguration() + castedConfig := defaultConfig.(*Config) + castedConfig.Targets = []string{"https://example.com/testdata/testschemas/schemaprecedence/1.2.0"} + processor, err := newSchemaProcessor(context.Background(), defaultConfig, processor.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + }, + }) + require.NoError(t, err, "Must not error when creating schemaprocessor") + + err = processor.manager.SetProviders(translation.NewTestProvider(&testdataFiles)) + require.NoError(t, err) + got, err := processor.processTraces(context.Background(), tt.input()) + if !tt.wantErr(t, err, fmt.Sprintf("processTraces(%v)", tt.input())) { + return + } + targetTrace := got.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + assert.Equal(t, 1, targetTrace.Attributes().Len()) + _, usedResource := targetTrace.Attributes().Get("one_one_zero_output") + _, usedScope := targetTrace.Attributes().Get("one_two_zero_output") + _, usedNoop := targetTrace.Attributes().Get("input") + switch tt.whichSchemaUsed { + case ResourceSchemaVersionUsed: + assert.True(t, usedResource, "processTraces(%v) not using correct schema,, attributes present: %v", tt.name, targetTrace.Attributes().AsRaw()) + case ScopeSchemaVersionUsed: + assert.True(t, usedScope, "processTraces(%v) not using correct schema, attributes present: %v", tt.name, targetTrace.Attributes().AsRaw()) + case NoopSchemaUsed: + assert.True(t, usedNoop, "processTraces(%v) not using correct schema,, attributes present: %v", tt.name, targetTrace.Attributes().AsRaw()) + } + }) + } +} + +// returns a test metric with no schema versions set +func generateMetricForTest() pmetric.Metrics { + in := pmetric.NewMetrics() + in.ResourceMetrics().AppendEmpty() + in.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty() + l := in.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() + l.SetEmptySum().DataPoints().AppendEmpty().Attributes().PutStr("input", "test") + return in +} + +// case 1: resource schema set, scope schema not set, use resource schema +// case 2: resource schema not set, scope schema set, use scope schema inside +// case 3: resource schema set, scope schema set, use scope schema +// case 4: resource schema not set, scope schema not set, noop translation +func TestProcessorScopeMetricSchemaPrecedence(t *testing.T) { + t.Parallel() + tests := []struct { + name string + input func() pmetric.Metrics + whichSchemaUsed SchemaUsed + wantErr assert.ErrorAssertionFunc + }{ + { + name: "resourcesetscopeunset", + input: func() pmetric.Metrics { + metric := generateMetricForTest() + metric.ResourceMetrics().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.0.0") + return metric + }, + whichSchemaUsed: ResourceSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourceunsetscopeset", + input: func() pmetric.Metrics { + metric := generateMetricForTest() + metric.ResourceMetrics().At(0).ScopeMetrics().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.1.0") + return metric + }, + whichSchemaUsed: ScopeSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourcesetscopeset", + input: func() pmetric.Metrics { + metric := generateMetricForTest() + metric.ResourceMetrics().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.0.0") + metric.ResourceMetrics().At(0).ScopeMetrics().At(0).SetSchemaUrl("https://example.com/testdata/testschemas/schemaprecedence/1.1.0") + + return metric + }, + whichSchemaUsed: ScopeSchemaVersionUsed, + wantErr: assert.NoError, + }, + { + name: "resourceunsetscopeunset", + input: func() pmetric.Metrics { + metric := generateMetricForTest() + return metric + }, + // want: "https://example.com/testdata/testschemas/schemaprecedence/1.0.0", + whichSchemaUsed: NoopSchemaUsed, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defaultConfig := newDefaultConfiguration() + castedConfig := defaultConfig.(*Config) + castedConfig.Targets = []string{"https://example.com/testdata/testschemas/schemaprecedence/1.2.0"} + processor, err := newSchemaProcessor(context.Background(), defaultConfig, processor.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + }, + }) + require.NoError(t, err, "Must not error when creating schemaprocessor") + + err = processor.manager.SetProviders(translation.NewTestProvider(&testdataFiles)) + require.NoError(t, err) + got, err := processor.processMetrics(context.Background(), tt.input()) + if !tt.wantErr(t, err, fmt.Sprintf("processMetrics(%v)", tt.input())) { + return + } + targetMetric := got.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0) + assert.Equal(t, 1, targetMetric.Attributes().Len()) + _, usedResource := targetMetric.Attributes().Get("one_one_zero_output") + _, usedScope := targetMetric.Attributes().Get("one_two_zero_output") + _, usedNoop := targetMetric.Attributes().Get("input") + switch tt.whichSchemaUsed { + case ResourceSchemaVersionUsed: + assert.True(t, usedResource, "processMetrics(%v) not using correct schema,, attributes present: %v", tt.name, targetMetric.Attributes().AsRaw()) + case ScopeSchemaVersionUsed: + assert.True(t, usedScope, "processMetrics(%v) not using correct schema, attributes present: %v", tt.name, targetMetric.Attributes().AsRaw()) + case NoopSchemaUsed: + assert.True(t, usedNoop, "processMetrics(%v) not using correct schema,, attributes present: %v", tt.name, targetMetric.Attributes().AsRaw()) + } + }) + } +} diff --git a/processor/schemaprocessor/testdata/schema_sections/all/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/all/1.1.0 new file mode 100644 index 000000000000..91ba999037f8 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/all/1.1.0 @@ -0,0 +1,17 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/all/1.1.0 + +versions: + 1.1.0: + all: + changes: + - rename_attributes: + attribute_map: + kubernetes.cluster.name: koobernetes.cluster.name + docker.image.name: d4r.image.name + - rename_attributes: + attribute_map: + koobernetes.cluster.name: k8s.cluster.name + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/all/logs_in.json b/processor/schemaprocessor/testdata/schema_sections/all/logs_in.json new file mode 100644 index 000000000000..43d9efd7a8f3 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/all/logs_in.json @@ -0,0 +1,68 @@ +{ + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "asdf", + "value": { + "stringValue": "1" + } + } + ] + }, + "scopeLogs": [ + { + "logRecords": [ + { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "sancia" + } + }, + { + "key": "d4r.image.name", + "value": { + "stringValue": "flooentd" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.1.0" + }, + { + "logRecords": [ + { + "attributes": [ + { + "key": "don't", + "value": { + "stringValue": "change" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.1.0" + }, + { + "resource": { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "tevanne" + } + } + ] + }, + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/all/logs_out.json b/processor/schemaprocessor/testdata/schema_sections/all/logs_out.json new file mode 100644 index 000000000000..dd2c9d5da44b --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/all/logs_out.json @@ -0,0 +1,68 @@ +{ + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "asdf", + "value": { + "stringValue": "1" + } + } + ] + }, + "scopeLogs": [ + { + "logRecords": [ + { + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "sancia" + } + }, + { + "key": "docker.image.name", + "value": { + "stringValue": "flooentd" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.0.0" + }, + { + "logRecords": [ + { + "attributes": [ + { + "key": "don't", + "value": { + "stringValue": "change" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.0.0" + }, + { + "resource": { + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "tevanne" + } + } + ] + }, + "schemaUrl": "https://example.com/testdata/schema_sections/all/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/1.1.0 new file mode 100644 index 000000000000..d80d34f4342b --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/1.1.0 @@ -0,0 +1,17 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/logs_rename_attributes/1.1.0 + +versions: + 1.1.0: + logs: + changes: + - rename_attributes: + attribute_map: + kubernetes.cluster.name: koobernetes.cluster.name + docker.image.name: d4r.image.name + - rename_attributes: + attribute_map: + koobernetes.cluster.name: k8s.cluster.name + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_in.json b/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_in.json new file mode 100644 index 000000000000..89530320e976 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_in.json @@ -0,0 +1,68 @@ +{ + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "asdf", + "value": { + "stringValue": "1" + } + } + ] + }, + "scopeLogs": [ + { + "logRecords": [ + { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "sancia" + } + }, + { + "key": "d4r.image.name", + "value": { + "stringValue": "flooentd" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.1.0" + }, + { + "logRecords": [ + { + "attributes": [ + { + "key": "don't", + "value": { + "stringValue": "change" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.1.0" + }, + { + "resource": { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "tevanne" + } + } + ] + }, + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_out.json b/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_out.json new file mode 100644 index 000000000000..8752bb6789c0 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/logs_rename_attributes/logs_out.json @@ -0,0 +1,68 @@ +{ + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "asdf", + "value": { + "stringValue": "1" + } + } + ] + }, + "scopeLogs": [ + { + "logRecords": [ + { + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "sancia" + } + }, + { + "key": "docker.image.name", + "value": { + "stringValue": "flooentd" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.0.0" + }, + { + "logRecords": [ + { + "attributes": [ + { + "key": "don't", + "value": { + "stringValue": "change" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.0.0" + }, + { + "resource": { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "tevanne" + } + } + ] + }, + "schemaUrl": "https://example.com/testdata/schema_sections/logs_rename_attributes/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/1.1.0 new file mode 100644 index 000000000000..e60af6d44ff7 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/1.1.0 @@ -0,0 +1,21 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/metrics_rename_attributes/1.1.0 + +versions: + 1.1.0: + metrics: + changes: + - rename_attributes: + attribute_map: + kubernetes.cluster.name: koobernetes.cluster.name + - rename_attributes: + attribute_map: + koobernetes.cluster.name: k8s.cluster.name + - rename_attributes: + attribute_map: + docker.image.name: d4r.image.name + apply_to_metrics: + - testMetricTwo + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_in.json b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_in.json new file mode 100644 index 000000000000..8f83cb22e689 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_in.json @@ -0,0 +1,111 @@ +{ + "resourceMetrics": [ + { + "scopeMetrics": [ + { + "metrics": [ + { + "name": "testMetricOne", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + }, + { + "timeUnixNano": "1725146753842860000", + "asInt": "120", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ] + } + }, + { + "name": "testMetricTwo", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + }, + { + "timeUnixNano": "1725146753842860000", + "asInt": "120", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ] + } + }, + { + "name": "testMetricThree", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_attributes/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_attributes/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_out.json b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_out.json new file mode 100644 index 000000000000..9343fb88c4a7 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_attributes/metrics_out.json @@ -0,0 +1,111 @@ +{ + "resourceMetrics": [ + { + "scopeMetrics": [ + { + "metrics": [ + { + "name": "testMetricOne", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + }, + { + "timeUnixNano": "1725146753842860000", + "asInt": "120", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ] + } + }, + { + "name": "testMetricTwo", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "docker.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + }, + { + "timeUnixNano": "1725146753842860000", + "asInt": "120", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "docker.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ] + } + }, + { + "name": "testMetricThree", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_attributes/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_attributes/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/1.1.0 new file mode 100644 index 000000000000..fe60e6b71a1a --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/1.1.0 @@ -0,0 +1,15 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/metrics_rename_metrics/1.1.0 + +versions: + 1.1.0: + metrics: + changes: + - rename_metrics: + testMetric1: testMetricUno + testMetric2: testMetricTwo + - rename_metrics: + testMetricUno: testMetricOne + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_in.json b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_in.json new file mode 100644 index 000000000000..d6f816085d21 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_in.json @@ -0,0 +1,59 @@ +{ + "resourceMetrics": [ + { + "scopeMetrics": [ + { + "scope": { + "name": "instrumentation name", + "version": "instrumentation version", + "attributes": [ + { + "key": "instrumentation.attribute", + "value": { + "stringValue": "test" + } + } + ] + }, + "metrics": [ + { + "name": "testMetricOne", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + }, + { + "name": "testMetricTwo", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + }, + { + "name": "testMetricThree", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_metrics/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_metrics/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_out.json b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_out.json new file mode 100644 index 000000000000..0ad7d4bd25c5 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/metrics_rename_metrics/metrics_out.json @@ -0,0 +1,59 @@ +{ + "resourceMetrics": [ + { + "scopeMetrics": [ + { + "scope": { + "name": "instrumentation name", + "version": "instrumentation version", + "attributes": [ + { + "key": "instrumentation.attribute", + "value": { + "stringValue": "test" + } + } + ] + }, + "metrics": [ + { + "name": "testMetric1", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + }, + { + "name": "testMetric2", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + }, + { + "name": "testMetricThree", + "sum": { + "dataPoints": [ + { + "timeUnixNano": "1725146753842860000", + "asInt": "100" + } + ] + } + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_metrics/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/metrics_rename_metrics/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/resources/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/resources/1.1.0 new file mode 100644 index 000000000000..8a862b1ee750 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/resources/1.1.0 @@ -0,0 +1,17 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/resources/1.1.0 + +versions: + 1.1.0: + resources: + changes: + - rename_attributes: + attribute_map: + kubernetes.cluster.name: koobernetes.cluster.name + docker.image.name: d4r.image.name + - rename_attributes: + attribute_map: + koobernetes.cluster.name: k8s.cluster.name + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/resources/logs_in.json b/processor/schemaprocessor/testdata/schema_sections/resources/logs_in.json new file mode 100644 index 000000000000..e2ad6adc1065 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/resources/logs_in.json @@ -0,0 +1,68 @@ +{ + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "asdf", + "value": { + "stringValue": "1" + } + } + ] + }, + "scopeLogs": [ + { + "logRecords": [ + { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "sancia" + } + }, + { + "key": "d4r.image.name", + "value": { + "stringValue": "flooentd" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.1.0" + }, + { + "logRecords": [ + { + "attributes": [ + { + "key": "don't", + "value": { + "stringValue": "change" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.1.0" + }, + { + "resource": { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "tevanne" + } + } + ] + }, + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/resources/logs_out.json b/processor/schemaprocessor/testdata/schema_sections/resources/logs_out.json new file mode 100644 index 000000000000..dfa9bb90cfde --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/resources/logs_out.json @@ -0,0 +1,68 @@ +{ + "resourceLogs": [ + { + "resource": { + "attributes": [ + { + "key": "asdf", + "value": { + "stringValue": "1" + } + } + ] + }, + "scopeLogs": [ + { + "logRecords": [ + { + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "sancia" + } + }, + { + "key": "d4r.image.name", + "value": { + "stringValue": "flooentd" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.0.0" + }, + { + "logRecords": [ + { + "attributes": [ + { + "key": "don't", + "value": { + "stringValue": "change" + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.0.0" + }, + { + "resource": { + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "tevanne" + } + } + ] + }, + "schemaUrl": "https://example.com/testdata/schema_sections/resources/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/1.1.0 new file mode 100644 index 000000000000..3b7edd3f9795 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/1.1.0 @@ -0,0 +1,35 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/spans/1.1.0 + +# rename attributes with no conditions, with apply to spans, with apply to events, with BOTH apply to spans and apply to events + +versions: + 1.1.0: + span_events: + changes: + - rename_attributes: + attribute_map: + kubernetes.cluster.name: koobernetes.cluster.name + - rename_attributes: + attribute_map: + koobernetes.cluster.name: k8s.cluster.name + - rename_attributes: + attribute_map: + docker.image.name: d4r.image.name + apply_to_spans: + - testSpan1 + - rename_attributes: + attribute_map: + cloud.provider: provider + apply_to_events: + - testEventOne + - rename_attributes: + attribute_map: + db.cassandra.keyspace: db.name + apply_to_spans: + - testSpan2 + apply_to_events: + - testEventTwo + # # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_in.json b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_in.json new file mode 100644 index 000000000000..848abedb5a30 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_in.json @@ -0,0 +1,128 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "testHost" + } + }, + { + "key": "service.name", + "value": { + "stringValue": "testService" + } + } + ] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "testSpan1", + "events": [ + { + "name": "testEventOne", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + }, + { + "key": "provider", + "value": { + "stringValue": "aws" + } + } + ] + }, + { + "name": "testEventTwo", + "attributes": [ + { + "key": "db.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "provider", + "value": { + "stringValue": "gcp" + } + } + ] + }, + { + "name": "testEventThree", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ] + } + ] + }, + { + "name": "testSpan2", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ], + "events": [ + { + "name": "testEventTwo", + "attributes": [ + { + "key": "db.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "provider", + "value": { + "stringValue": "aws" + } + } + ] + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_attributes/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_attributes/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_out.json b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_out.json new file mode 100644 index 000000000000..3ef8df0a88b5 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_attributes/traces_out.json @@ -0,0 +1,128 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "testHost" + } + }, + { + "key": "service.name", + "value": { + "stringValue": "testService" + } + } + ] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "testSpan1", + "events": [ + { + "name": "testEventOne", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "docker.image.name", + "value": { + "doubleValue": 1.1 + } + }, + { + "key": "cloud.provider", + "value": { + "stringValue": "aws" + } + } + ] + }, + { + "name": "testEventTwo", + "attributes": [ + { + "key": "db.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "provider", + "value": { + "stringValue": "gcp" + } + } + ] + }, + { + "name": "testEventThree", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ] + } + ] + }, + { + "name": "testSpan2", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ], + "events": [ + { + "name": "testEventTwo", + "attributes": [ + { + "key": "db.cassandra.keyspace", + "value": { + "stringValue": "value" + } + }, + { + "key": "provider", + "value": { + "stringValue": "aws" + } + } + ] + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_attributes/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_attributes/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/1.1.0 new file mode 100644 index 000000000000..1655e562af86 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/1.1.0 @@ -0,0 +1,17 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/span_events_rename_spans/1.1.0 + +versions: + 1.1.0: + span_events: + changes: + - rename_events: + name_map: + testEvent1: testEventUno + testEvent2: testEventTwo + - rename_events: + name_map: + testEventUno: testEventOne + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_in.json b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_in.json new file mode 100644 index 000000000000..041558c66829 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_in.json @@ -0,0 +1,123 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "testHost" + } + }, + { + "key": "service.name", + "value": { + "stringValue": "testService" + } + } + ] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "testSpan1", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "double", + "value": { + "doubleValue": 1.1 + } + } + ], + "events": [ + { + "timeUnixNano": "1684620382541971000", + "name": "testEventOne", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ], + "droppedAttributesCount": 1 + }, + { + "timeUnixNano": "1684620382541971000", + "name": "testEventTwo", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ], + "droppedAttributesCount": 1 + }, + { + "timeUnixNano": "1684620382541971000", + "name": "testEventThree", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ], + "droppedAttributesCount": 1 + } + ] + }, + { + "name": "testSpan2", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "double", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_spans/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_spans/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_out.json b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_out.json new file mode 100644 index 000000000000..4c0c852c668e --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/span_events_rename_spans/traces_out.json @@ -0,0 +1,123 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "testHost" + } + }, + { + "key": "service.name", + "value": { + "stringValue": "testService" + } + } + ] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "testSpan1", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "double", + "value": { + "doubleValue": 1.1 + } + } + ], + "events": [ + { + "timeUnixNano": "1684620382541971000", + "name": "testEvent1", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ], + "droppedAttributesCount": 1 + }, + { + "timeUnixNano": "1684620382541971000", + "name": "testEvent2", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ], + "droppedAttributesCount": 1 + }, + { + "timeUnixNano": "1684620382541971000", + "name": "testEventThree", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "bool", + "value": { + "boolValue": true + } + } + ], + "droppedAttributesCount": 1 + } + ] + }, + { + "name": "testSpan2", + "attributes": [ + { + "key": "string", + "value": { + "stringValue": "value" + } + }, + { + "key": "double", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_spans/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/span_events_rename_spans/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/spans/1.1.0 b/processor/schemaprocessor/testdata/schema_sections/spans/1.1.0 new file mode 100644 index 000000000000..d8d3e3d76ddd --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/spans/1.1.0 @@ -0,0 +1,21 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/schema_sections/spans/1.1.0 + +versions: + 1.1.0: + spans: + changes: + - rename_attributes: + attribute_map: + kubernetes.cluster.name: koobernetes.cluster.name + - rename_attributes: + attribute_map: + koobernetes.cluster.name: k8s.cluster.name + - rename_attributes: + attribute_map: + docker.image.name: d4r.image.name + apply_to_spans: + - testSpan2 + # # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/schema_sections/spans/traces_in.json b/processor/schemaprocessor/testdata/schema_sections/spans/traces_in.json new file mode 100644 index 000000000000..ef832fb92666 --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/spans/traces_in.json @@ -0,0 +1,64 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "testHost" + } + }, + { + "key": "service.name", + "value": { + "stringValue": "testService" + } + } + ] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "testSpan1", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + }, + { + "name": "testSpan2", + "attributes": [ + { + "key": "k8s.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/spans/1.1.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/spans/1.1.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/schema_sections/spans/traces_out.json b/processor/schemaprocessor/testdata/schema_sections/spans/traces_out.json new file mode 100644 index 000000000000..be31af92ad3a --- /dev/null +++ b/processor/schemaprocessor/testdata/schema_sections/spans/traces_out.json @@ -0,0 +1,64 @@ +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { + "key": "host.name", + "value": { + "stringValue": "testHost" + } + }, + { + "key": "service.name", + "value": { + "stringValue": "testService" + } + } + ] + }, + "scopeSpans": [ + { + "spans": [ + { + "name": "testSpan1", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "d4r.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + }, + { + "name": "testSpan2", + "attributes": [ + { + "key": "kubernetes.cluster.name", + "value": { + "stringValue": "value" + } + }, + { + "key": "docker.image.name", + "value": { + "doubleValue": 1.1 + } + } + ] + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/spans/1.0.0" + } + ], + "schemaUrl": "https://example.com/testdata/schema_sections/spans/1.0.0" + } + ] +} diff --git a/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.0.0 b/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.0.0 new file mode 100644 index 000000000000..0d403cb3f6cc --- /dev/null +++ b/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.0.0 @@ -0,0 +1,19 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/testschemas/schemaprecedence/1.0.0 + +versions: + 1.2.0: + all: + changes: + - rename_attributes: + attribute_map: + input: one_two_zero_output + 1.1.0: + all: + changes: + - rename_attributes: + attribute_map: + input: one_one_zero_output + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.1.0 b/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.1.0 new file mode 100644 index 000000000000..24b79450cda2 --- /dev/null +++ b/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.1.0 @@ -0,0 +1,19 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/testschemas/schemaprecedence/1.1.0 + +versions: + 1.2.0: + all: + changes: + - rename_attributes: + attribute_map: + input: one_two_zero_output + 1.1.0: + all: + changes: + - rename_attributes: + attribute_map: + input: one_one_zero_output + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.2.0 b/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.2.0 new file mode 100644 index 000000000000..02ccbcaa6a8e --- /dev/null +++ b/processor/schemaprocessor/testdata/testschemas/schemaprecedence/1.2.0 @@ -0,0 +1,19 @@ +file_format: 1.0.0 + +schema_url: https://example.com/testdata/testschemas/schemaprecedence/1.2.0 + +versions: + 1.2.0: + all: + changes: + - rename_attributes: + attribute_map: + input: one_two_zero_output + 1.1.0: + all: + changes: + - rename_attributes: + attribute_map: + input: one_one_zero_output + # this has to be here + 1.0.0: \ No newline at end of file diff --git a/processor/schemaprocessor/transformer.go b/processor/schemaprocessor/transformer.go deleted file mode 100644 index cbe5f2bd0a0a..000000000000 --- a/processor/schemaprocessor/transformer.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor" - -import ( - "context" - "errors" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.opentelemetry.io/collector/processor" - "go.uber.org/zap" -) - -type transformer struct { - targets []string - log *zap.Logger -} - -func newTransformer( - _ context.Context, - conf component.Config, - set processor.Settings, -) (*transformer, error) { - cfg, ok := conf.(*Config) - if !ok { - return nil, errors.New("invalid configuration provided") - } - return &transformer{ - log: set.Logger, - targets: cfg.Targets, - }, nil -} - -func (t transformer) processLogs(_ context.Context, ld plog.Logs) (plog.Logs, error) { - return ld, nil -} - -func (t transformer) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { - return md, nil -} - -func (t transformer) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { - return td, nil -} - -// start will load the remote file definition if it isn't already cached -// and resolve the schema translation file -func (t *transformer) start(_ context.Context, _ component.Host) error { - for _, target := range t.targets { - t.log.Info("Fetching remote schema url", zap.String("schema-url", target)) - } - return nil -} diff --git a/processor/schemaprocessor/transformer_test.go b/processor/schemaprocessor/transformer_test.go deleted file mode 100644 index ad1a6efc462f..000000000000 --- a/processor/schemaprocessor/transformer_test.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package schemaprocessor - -import ( - "context" - _ "embed" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.opentelemetry.io/collector/processor" - "go.uber.org/zap/zaptest" -) - -func newTestTransformer(t *testing.T) *transformer { - trans, err := newTransformer(context.Background(), newDefaultConfiguration(), processor.Settings{ - TelemetrySettings: component.TelemetrySettings{ - Logger: zaptest.NewLogger(t), - }, - }) - require.NoError(t, err, "Must not error when creating default transformer") - return trans -} - -func TestTransformerStart(t *testing.T) { - t.Parallel() - - trans := newTestTransformer(t) - assert.NoError(t, trans.start(context.Background(), nil)) -} - -func TestTransformerProcessing(t *testing.T) { - t.Parallel() - - trans := newTestTransformer(t) - t.Run("metrics", func(t *testing.T) { - in := pmetric.NewMetrics() - in.ResourceMetrics().AppendEmpty() - in.ResourceMetrics().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0") - in.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty() - m := in.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().AppendEmpty() - m.SetName("test-data") - m.SetDescription("Only used throughout tests") - m.SetUnit("seconds") - m.CopyTo(in.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)) - - out, err := trans.processMetrics(context.Background(), in) - assert.NoError(t, err, "Must not error when processing metrics") - assert.Equal(t, in, out, "Must return the same data (subject to change)") - }) - - t.Run("traces", func(t *testing.T) { - in := ptrace.NewTraces() - in.ResourceSpans().AppendEmpty() - in.ResourceSpans().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0") - in.ResourceSpans().At(0).ScopeSpans().AppendEmpty() - s := in.ResourceSpans().At(0).ScopeSpans().At(0).Spans().AppendEmpty() - s.SetName("http.request") - s.SetKind(ptrace.SpanKindConsumer) - s.SetSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}) - s.CopyTo(in.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)) - - out, err := trans.processTraces(context.Background(), in) - assert.NoError(t, err, "Must not error when processing metrics") - assert.Equal(t, in, out, "Must return the same data (subject to change)") - }) - - t.Run("logs", func(t *testing.T) { - in := plog.NewLogs() - in.ResourceLogs().AppendEmpty() - in.ResourceLogs().At(0).SetSchemaUrl("http://opentelemetry.io/schemas/1.9.0") - in.ResourceLogs().At(0).ScopeLogs().AppendEmpty() - l := in.ResourceLogs().At(0).ScopeLogs().At(0).Scope() - l.SetName("magical-logs") - l.SetVersion("alpha") - l.CopyTo(in.ResourceLogs().At(0).ScopeLogs().At(0).Scope()) - - out, err := trans.processLogs(context.Background(), in) - assert.NoError(t, err, "Must not error when processing metrics") - assert.Equal(t, in, out, "Must return the same data (subject to change)") - }) -}