From 20ec9bcf511d746cc4509ec3b0139d8900e83acb Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Mon, 16 Sep 2024 11:54:43 -0400 Subject: [PATCH] operator and migrator support --- processor/schemaprocessor/DESIGN.md | 28 +++ processor/schemaprocessor/README.md | 4 +- .../schemaprocessor/internal/alias/alias.go | 6 +- .../internal/migrate/attributes.go | 55 ++---- .../internal/migrate/attributes_test.go | 101 +---------- .../internal/migrate/conditional.go | 64 +++---- .../internal/migrate/conditional_test.go | 118 +------------ .../internal/migrate/migrator.go | 16 ++ .../internal/migrate/multi_conditional.go | 86 ++++++++++ .../migrate/multi_conditional_test.go | 149 ++++++++++++++++ .../internal/migrate/signal.go | 45 +---- .../internal/migrate/signal_test.go | 4 +- .../internal/operator/attributes_operators.go | 159 ++++++++++++++++++ .../conditional_attributes_operators.go | 20 +++ .../multi_conditional_attribute_operators.go | 34 ++++ .../internal/operator/signal_interfaces.go | 27 +++ .../internal/operator/signal_name.go | 37 ++++ 17 files changed, 614 insertions(+), 339 deletions(-) create mode 100644 processor/schemaprocessor/DESIGN.md create mode 100644 processor/schemaprocessor/internal/migrate/migrator.go create mode 100644 processor/schemaprocessor/internal/migrate/multi_conditional.go create mode 100644 processor/schemaprocessor/internal/migrate/multi_conditional_test.go create mode 100644 processor/schemaprocessor/internal/operator/attributes_operators.go create mode 100644 processor/schemaprocessor/internal/operator/conditional_attributes_operators.go create mode 100644 processor/schemaprocessor/internal/operator/multi_conditional_attribute_operators.go create mode 100644 processor/schemaprocessor/internal/operator/signal_interfaces.go create mode 100644 processor/schemaprocessor/internal/operator/signal_name.go diff --git a/processor/schemaprocessor/DESIGN.md b/processor/schemaprocessor/DESIGN.md new file mode 100644 index 000000000000..5261cf204d6b --- /dev/null +++ b/processor/schemaprocessor/DESIGN.md @@ -0,0 +1,28 @@ +# Design + +The Schema Processor is split into several different components. + +Here's a general structure diagram: + +```mermaid +graph LR; + A[Previous Collector Component] --> B[Transformer] + B -- Schema URL --> C[Translation Manager] + C -- Translation --> B + B --> H[Translator] + H --> E[Revision] + E --> I[ChangeList] + subgraph Interpreter + direction RL + I --> F[Operator] + F --> G[Migrator] + end + +``` +The [Transformer](transformer.go) is registered as a Processor in the Collector by the factory. +Data flows into the Tranformer, which uses the Schema URL to fetch the translation from the Translation Manager. +The [Translation Manager](internal/translation/manager.go) is responsible for fetching and caching the translations. It takes in a schema URL and returns a Translator struct. + +The Translator struct contains the target schema URL, the target schema version, and a list of Revisions. The Translator figures out what the version of the incoming data is and what Revisions to apply to the incoming data to get it to the target schema version. The Translator is also responsible for applying the Revisions to the incoming data - it iterates through these Revisions and applies them to the incoming data. + +Each Revision represents all the changes within a specific version. It consists of several [ChangeLists](internal/changelist/changelist.go) - one for each type of change block (at the time of writing - `all`, `resources`, `spans`, `spanEvents`, `metrics`, `logs`). Each ChangeList is similar to an interpreter to a programming language - in this case the programming language is the schema file! Each "change" in the transformation maps to either to an operator or migrator. [OPEN DESIGN POINT - for the all, logs, and resources type changes - the ChangeList delegates just to an Migrator. This is a little strange since otherwise it dispatches to an Operator. Should we just implement Operators for these few cases as well?]. They iterate through whatever changes they are constructed with, and call an operator[operator](internal/operator) or [migrator](internal/migrate) for each type of change. The operator accepts a typed value - a log, a metric, etc. It then, under the hood, calls one of a few Migrators. The Migrators do the fundamental work of changing attributes, changing names, etc. The Migrators generally operate on lower levels than the Operators - they operate on `Attributes`, or an `alias.NamedSignal` (a signal that implements `Name()` and `SetName()`). \ No newline at end of file diff --git a/processor/schemaprocessor/README.md b/processor/schemaprocessor/README.md index 11adf65cd22f..7d5b78f3dfc1 100644 --- a/processor/schemaprocessor/README.md +++ b/processor/schemaprocessor/README.md @@ -6,7 +6,7 @@ | Stability | [development]: traces, metrics, logs | | Distributions | [] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fschema%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fschema) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fschema%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fschema) | -| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy) | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96) | [development]: https://github.com/open-telemetry/opentelemetry-collector#development @@ -59,3 +59,5 @@ processors: ``` For more complete examples, please refer to [config.yml](./testdata/config.yml). + +There's a rough design/overview of the processor in the [DESIGN.md](./DESIGN.md) file. \ No newline at end of file diff --git a/processor/schemaprocessor/internal/alias/alias.go b/processor/schemaprocessor/internal/alias/alias.go index 918a08d44d78..7e5e01436350 100644 --- a/processor/schemaprocessor/internal/alias/alias.go +++ b/processor/schemaprocessor/internal/alias/alias.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -// Package Alias is a subset of the interfaces defined by pdata and family +// Package alias is a subset of the interfaces defined by pdata and family // package to allow for higher code reuse without using generics. package alias // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias" @@ -30,6 +30,10 @@ type NamedSignal interface { SetName(name string) } +type Attributed interface { + Attributes() pcommon.Map +} + var ( _ Resource = (*plog.ResourceLogs)(nil) _ Resource = (*pmetric.ResourceMetrics)(nil) diff --git a/processor/schemaprocessor/internal/migrate/attributes.go b/processor/schemaprocessor/internal/migrate/attributes.go index 37a020cf6123..707703ff3137 100644 --- a/processor/schemaprocessor/internal/migrate/attributes.go +++ b/processor/schemaprocessor/internal/migrate/attributes.go @@ -11,24 +11,21 @@ import ( "go.uber.org/multierr" ) -// AttributeChangeSet represents an unscoped entry that can be applied. -// +// AttributeChangeSet represents a rename_attributes type operation. // The listed changes are duplicated twice // to allow for simplified means of transition to or from a revision. type AttributeChangeSet struct { - updates ast.AttributeMap + // The keys are the old attribute name used in the previous version, the values are the + // new attribute name starting from this version (comment from ast.AttributeMap) + updates ast.AttributeMap + // the inverse of the updates map rollback ast.AttributeMap } -// AttributeChangeSetSlice allows for `AttributeChangeSet` -// to be chained together as they are defined within the schema -// and be applied sequentially to ensure deterministic behavior. -type AttributeChangeSetSlice []*AttributeChangeSet - // NewAttributeChangeSet allows for typed strings to be used as part // of the invocation that will be converted into the default string type. -func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet { - attr := &AttributeChangeSet{ +func NewAttributeChangeSet(mappings ast.AttributeMap) AttributeChangeSet { + attr := AttributeChangeSet{ updates: make(map[string]string, len(mappings)), rollback: make(map[string]string, len(mappings)), } @@ -39,15 +36,17 @@ func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet { return attr } +func (a AttributeChangeSet) IsMigrator() {} + func (a *AttributeChangeSet) Apply(attrs pcommon.Map) error { - return a.do(StateSelectorApply, attrs) + return a.Do(StateSelectorApply, attrs) } func (a *AttributeChangeSet) Rollback(attrs pcommon.Map) error { - return a.do(StateSelectorRollback, attrs) + return a.Do(StateSelectorRollback, attrs) } -func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error) { +func (a *AttributeChangeSet) Do(ss StateSelector, attrs pcommon.Map) (errs error) { var ( updated = make(map[string]struct{}) results = pcommon.NewMap() @@ -81,33 +80,3 @@ func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error results.CopyTo(attrs) return errs } - -// NewAttributeChangeSetSlice combines all the provided `AttributeChangeSets` -// and allows them to be executed in the provided order. -func NewAttributeChangeSetSlice(changes ...*AttributeChangeSet) *AttributeChangeSetSlice { - values := new(AttributeChangeSetSlice) - for _, c := range changes { - (*values) = append((*values), c) - } - return values -} - -func (slice *AttributeChangeSetSlice) Apply(attrs pcommon.Map) error { - return slice.do(StateSelectorApply, attrs) -} - -func (slice *AttributeChangeSetSlice) Rollback(attrs pcommon.Map) error { - return slice.do(StateSelectorRollback, attrs) -} - -func (slice *AttributeChangeSetSlice) do(ss StateSelector, attrs pcommon.Map) (errs error) { - for i := 0; i < len(*slice); i++ { - switch ss { - case StateSelectorApply: - errs = multierr.Append(errs, (*slice)[i].Apply(attrs)) - case StateSelectorRollback: - errs = multierr.Append(errs, (*slice)[len(*slice)-1-i].Rollback(attrs)) - } - } - return errs -} diff --git a/processor/schemaprocessor/internal/migrate/attributes_test.go b/processor/schemaprocessor/internal/migrate/attributes_test.go index f3cc5d79dc6e..d209a397ee78 100644 --- a/processor/schemaprocessor/internal/migrate/attributes_test.go +++ b/processor/schemaprocessor/internal/migrate/attributes_test.go @@ -27,7 +27,7 @@ func TestNewAttributeChangeSet(t *testing.T) { "hello": "world", }) - expect := &AttributeChangeSet{ + expect := AttributeChangeSet{ updates: map[string]string{ "hello": "world", }, @@ -45,7 +45,7 @@ func TestAttributeChangeSetApply(t *testing.T) { for _, tc := range []struct { name string - acs *AttributeChangeSet + acs AttributeChangeSet attrs pcommon.Map expect pcommon.Map errVal string @@ -120,7 +120,7 @@ func TestAttributeChangeSetRollback(t *testing.T) { for _, tc := range []struct { name string - acs *AttributeChangeSet + acs AttributeChangeSet attrs pcommon.Map expect pcommon.Map errVal string @@ -189,98 +189,3 @@ func TestAttributeChangeSetRollback(t *testing.T) { }) } } - -func TestNewAttributeChangeSetSliceApply(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - changes *AttributeChangeSetSlice - attr pcommon.Map - expect pcommon.Map - }{ - { - name: "no changes listed", - changes: NewAttributeChangeSetSlice(), - attr: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.1") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.1") - }), - }, - { - name: "changes defined", - changes: NewAttributeChangeSetSlice( - NewAttributeChangeSet(map[string]string{ - "service_version": "service.version", - }), - NewAttributeChangeSet(map[string]string{ - "service.version": "application.service.version", - }), - ), - attr: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service_version", "v0.0.1") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("application.service.version", "v0.0.1") - }), - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - assert.NoError(t, tc.changes.Apply(tc.attr)) - assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes") - }) - } -} - -func TestNewAttributeChangeSetSliceApplyRollback(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - changes *AttributeChangeSetSlice - attr pcommon.Map - expect pcommon.Map - }{ - { - name: "no changes listed", - changes: NewAttributeChangeSetSlice(), - attr: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.1") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.1") - }), - }, - { - name: "changes defined", - changes: NewAttributeChangeSetSlice( - NewAttributeChangeSet(map[string]string{ - "service_version": "service.version", - }), - NewAttributeChangeSet(map[string]string{ - "service.version": "application.service.version", - }), - ), - attr: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("application.service.version", "v0.0.1") - - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service_version", "v0.0.1") - }), - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - assert.NoError(t, tc.changes.Rollback(tc.attr)) - assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes") - }) - } -} diff --git a/processor/schemaprocessor/internal/migrate/conditional.go b/processor/schemaprocessor/internal/migrate/conditional.go index b3eea651f9fc..812ff46325e5 100644 --- a/processor/schemaprocessor/internal/migrate/conditional.go +++ b/processor/schemaprocessor/internal/migrate/conditional.go @@ -6,7 +6,6 @@ package migrate // import "github.com/open-telemetry/opentelemetry-collector-con import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/otel/schema/v1.0/ast" - "go.uber.org/multierr" ) // ValueMatch defines the expected match type @@ -15,74 +14,55 @@ type ValueMatch interface { ~string } +// ConditionalAttributeSet represents a set of attributes that will type ConditionalAttributeSet struct { - on *map[string]struct{} - attrs *AttributeChangeSet + on map[string]struct{} + attrs AttributeChangeSet } type ConditionalAttributeSetSlice []*ConditionalAttributeSet -func NewConditionalAttributeSet[Match ValueMatch](mappings ast.AttributeMap, matches ...Match) *ConditionalAttributeSet { +func NewConditionalAttributeSet[Match ValueMatch](mappings ast.AttributeMap, matches ...Match) ConditionalAttributeSet { on := make(map[string]struct{}) for _, m := range matches { on[string(m)] = struct{}{} } - return &ConditionalAttributeSet{ - on: &on, + return ConditionalAttributeSet{ + on: on, attrs: NewAttributeChangeSet(mappings), } } -func (ca *ConditionalAttributeSet) Apply(attrs pcommon.Map, values ...string) (errs error) { +func (ca ConditionalAttributeSet) IsMigrator() {} + +func (ca *ConditionalAttributeSet) Do(ss StateSelector, attrs pcommon.Map, values ...string) (errs error) { if ca.check(values...) { - errs = ca.attrs.Apply(attrs) + switch ss { + case StateSelectorApply: + errs = ca.attrs.Apply(attrs) + case StateSelectorRollback: + errs = ca.attrs.Rollback(attrs) + } } return errs } +func (ca *ConditionalAttributeSet) Apply(attrs pcommon.Map, values ...string) (errs error) { + return ca.Do(StateSelectorApply, attrs, values...) +} func (ca *ConditionalAttributeSet) Rollback(attrs pcommon.Map, values ...string) (errs error) { - if ca.check(values...) { - errs = ca.attrs.Rollback(attrs) - } - return errs + return ca.Do(StateSelectorRollback, attrs, values...) } +// todo make it harder to misuse this! diff between no values and 0 values func (ca *ConditionalAttributeSet) check(values ...string) bool { - if len(*ca.on) == 0 { + if len(ca.on) == 0 { return true } for _, v := range values { - if _, ok := (*ca.on)[v]; !ok { + if _, ok := (ca.on)[v]; !ok { return false } } return true } - -func NewConditionalAttributeSetSlice(conditions ...*ConditionalAttributeSet) *ConditionalAttributeSetSlice { - values := new(ConditionalAttributeSetSlice) - for _, c := range conditions { - (*values) = append((*values), c) - } - return values -} - -func (slice *ConditionalAttributeSetSlice) Apply(attrs pcommon.Map, values ...string) error { - return slice.do(StateSelectorApply, attrs, values) -} - -func (slice *ConditionalAttributeSetSlice) Rollback(attrs pcommon.Map, values ...string) error { - return slice.do(StateSelectorRollback, attrs, values) -} - -func (slice *ConditionalAttributeSetSlice) do(ss StateSelector, attrs pcommon.Map, values []string) (errs error) { - for i := 0; i < len((*slice)); i++ { - switch ss { - case StateSelectorApply: - errs = multierr.Append(errs, (*slice)[i].Apply(attrs, values...)) - case StateSelectorRollback: - errs = multierr.Append(errs, (*slice)[len((*slice))-i-1].Rollback(attrs, values...)) - } - } - return errs -} diff --git a/processor/schemaprocessor/internal/migrate/conditional_test.go b/processor/schemaprocessor/internal/migrate/conditional_test.go index 0c199c98d050..56e9b2a8ade4 100644 --- a/processor/schemaprocessor/internal/migrate/conditional_test.go +++ b/processor/schemaprocessor/internal/migrate/conditional_test.go @@ -15,7 +15,7 @@ func TestConditionalAttributeSetApply(t *testing.T) { for _, tc := range []struct { name string - cond *ConditionalAttributeSet + cond ConditionalAttributeSet check string attr pcommon.Map expect pcommon.Map @@ -95,7 +95,7 @@ func TestConditionalAttributeSetRollback(t *testing.T) { for _, tc := range []struct { name string - cond *ConditionalAttributeSet + cond ConditionalAttributeSet check string attr pcommon.Map expect pcommon.Map @@ -169,117 +169,3 @@ func TestConditionalAttributeSetRollback(t *testing.T) { }) } } - -func TestConditionalAttribueSetSliceApply(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - slice *ConditionalAttributeSetSlice - check string - attrs pcommon.Map - expect pcommon.Map - }{ - { - name: "No changes", - slice: NewConditionalAttributeSetSlice(), - check: "application start", - attrs: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - }, - { - name: "Not matched check value", - slice: NewConditionalAttributeSetSlice( - NewConditionalAttributeSet[string]( - map[string]string{ - "service_version": "service.version", - }, - ), - // intentially silly to be make it clear - // that this should not be applied - NewConditionalAttributeSet( - map[string]string{ - "service.version": "shark.attack", - }, - "shark spotted", - ), - ), - check: "application start", - attrs: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service_version", "v0.0.0") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - assert.NoError(t, tc.slice.Apply(tc.attrs, tc.check)) - assert.Equal(t, tc.expect.AsRaw(), tc.attrs.AsRaw(), "Must match the expected values") - }) - } -} - -func TestConditionalAttribueSetSliceRollback(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - slice *ConditionalAttributeSetSlice - check string - attrs pcommon.Map - expect pcommon.Map - }{ - { - name: "No changes", - slice: NewConditionalAttributeSetSlice(), - check: "application start", - attrs: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - }, - { - name: "Not matched check value", - slice: NewConditionalAttributeSetSlice( - NewConditionalAttributeSet[string]( - map[string]string{ - "service_version": "service.version", - }, - ), - // intentially silly to be make it clear - // that this should not be applied - NewConditionalAttributeSet( - map[string]string{ - "service.version": "shark.attack", - }, - "shark spotted", - ), - ), - check: "application start", - attrs: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service_version", "v0.0.0") - }), - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - assert.NoError(t, tc.slice.Rollback(tc.attrs, tc.check)) - assert.Equal(t, tc.expect.AsRaw(), tc.attrs.AsRaw(), "Must match the expected values") - }) - } -} diff --git a/processor/schemaprocessor/internal/migrate/migrator.go b/processor/schemaprocessor/internal/migrate/migrator.go new file mode 100644 index 000000000000..8dea3973eb08 --- /dev/null +++ b/processor/schemaprocessor/internal/migrate/migrator.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package migrate // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" + +// Migrator is an interface that all migration types must implement. It is basically a marker interface. All Operators are also Migrators +type Migrator interface { + IsMigrator() +} + +var ( + _ Migrator = (*AttributeChangeSet)(nil) + _ Migrator = (*MultiConditionalAttributeSet)(nil) + _ Migrator = (*SignalNameChange)(nil) + _ Migrator = (*ConditionalAttributeSet)(nil) +) diff --git a/processor/schemaprocessor/internal/migrate/multi_conditional.go b/processor/schemaprocessor/internal/migrate/multi_conditional.go new file mode 100644 index 000000000000..80a91667d798 --- /dev/null +++ b/processor/schemaprocessor/internal/migrate/multi_conditional.go @@ -0,0 +1,86 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package migrate // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" + +import ( + "errors" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/otel/schema/v1.0/ast" +) + +type set = map[string]struct{} + +// MultiConditionalAttributeSet maps from string keys to possible values for each of those keys. If a valid key is passed for each value, the conditional returns true +type MultiConditionalAttributeSet struct { + // map from string keys (in the intended case "event.name" and "span.name" to a set of acceptable values) + keysToPossibleValues map[string]set + attrs AttributeChangeSet +} + +type MultiConditionalAttributeSetSlice []*MultiConditionalAttributeSet + +func NewMultiConditionalAttributeSet[Match ValueMatch](mappings ast.AttributeMap, matches map[string][]Match) MultiConditionalAttributeSet { + keysToPossibleValues := make(map[string]set) + for k, values := range matches { + on := make(map[string]struct{}) + for _, val := range values { + on[string(val)] = struct{}{} + } + keysToPossibleValues[k] = on + } + return MultiConditionalAttributeSet{ + keysToPossibleValues: keysToPossibleValues, + attrs: NewAttributeChangeSet(mappings), + } +} + +func (ca MultiConditionalAttributeSet) IsMigrator() {} + +func (ca *MultiConditionalAttributeSet) Apply(attrs pcommon.Map, keyToCheckVals map[string]string) (errs error) { + return ca.Do(StateSelectorApply, attrs, keyToCheckVals) +} + +func (ca *MultiConditionalAttributeSet) Rollback(attrs pcommon.Map, keyToCheckVals map[string]string) (errs error) { + return ca.Do(StateSelectorRollback, attrs, keyToCheckVals) +} + +// Do function applies the attribute changes if the values match the expected values. Uses the Do method of the embedded AttributeChangeSet +func (ca *MultiConditionalAttributeSet) Do(ss StateSelector, attrs pcommon.Map, keyToCheckVals map[string]string) (errs error) { + match, err := ca.check(keyToCheckVals) + if err != nil { + return err + } + if match { + errs = ca.attrs.Do(ss, attrs) + } + return errs +} + +func (ca *MultiConditionalAttributeSet) check(keyToCheckVals map[string]string) (bool, error) { + if len(ca.keysToPossibleValues) == 0 { + return true, nil + } + if len(ca.keysToPossibleValues) != len(keyToCheckVals) { + return false, errors.New("passed in wrong number of matchers to MultiConditionalAttributeSet") + } + for k, inVal := range keyToCheckVals { + // We must already have a key matching the input key! If not, return an error + // indicates a programming error, should be impossible if using the class correctly + valToMatch, ok := (ca.keysToPossibleValues)[k] + if !ok { + return false, errors.New("passed in a key that doesn't exist in MultiConditionalAttributeSet") + } + // if there's nothing in here, match all values + if len(valToMatch) == 0 { + continue + } + if _, ok := valToMatch[inVal]; !ok { + return false, nil + } + + } + // if we've gone through every one of the keys, and they've all generated matches, return true + return true, nil +} diff --git a/processor/schemaprocessor/internal/migrate/multi_conditional_test.go b/processor/schemaprocessor/internal/migrate/multi_conditional_test.go new file mode 100644 index 000000000000..5e136fad3a77 --- /dev/null +++ b/processor/schemaprocessor/internal/migrate/multi_conditional_test.go @@ -0,0 +1,149 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package migrate + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestMultiConditionalAttributeSetApply(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + cond MultiConditionalAttributeSet + inCondData map[string]string + inAttr pcommon.Map + expect pcommon.Map + }{ + { + name: "No changes defined", + cond: NewMultiConditionalAttributeSet[string](map[string]string{}, map[string][]string{}), + inCondData: map[string]string{"span.name": "database operation"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + }, + { + name: "Not matched in value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{"span.name": {"application start"}}, + ), + inCondData: map[string]string{"span.name": "datatbase operation"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + }, + { + name: "No condition set, applys to all", + cond: NewMultiConditionalAttributeSet[string]( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{}, + ), + inCondData: map[string]string{"span.name": "datatbase operation"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("application.version", "v0.0.0") + }), + }, + { + name: "Matched one condition, setting value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{ + "span.name": {"application start", "application end"}, + }, + ), + inCondData: map[string]string{"span.name": "application start"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("application.version", "v0.0.0") + }), + }, + { + name: "Matched one condition, other value, setting value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{ + "span.name": {"application start", "application end"}, + }, + ), + inCondData: map[string]string{"span.name": "application end"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("application.version", "v0.0.0") + }), + }, + { + name: "Matched one out of two conditions, don't set value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{ + "trace.name": {"application start"}, + "span.name": {"application end"}, + }, + ), + inCondData: map[string]string{"span.name": "application start", "trace.name": "application end"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + }, + { + name: "Matched both conditions, set value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{ + "span.name": {"application start"}, + "trace.name": {"application end"}, + }, + ), + inCondData: map[string]string{"span.name": "application start", "trace.name": "application end"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("application.version", "v0.0.0") + }), + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.NoError(t, tc.cond.Apply(tc.inAttr, tc.inCondData)) + assert.Equal(t, tc.expect.AsRaw(), tc.inAttr.AsRaw(), "Must match the expected value") + }) + } +} diff --git a/processor/schemaprocessor/internal/migrate/signal.go b/processor/schemaprocessor/internal/migrate/signal.go index 0570819c4a97..7a8d5417f686 100644 --- a/processor/schemaprocessor/internal/migrate/signal.go +++ b/processor/schemaprocessor/internal/migrate/signal.go @@ -12,18 +12,16 @@ type SignalType interface { } // SignalNameChange allows for migrating types that -// implement the `alias.Signal` interface. +// implement the `alias.NamedSignal` interface. type SignalNameChange struct { updates map[string]string rollback map[string]string } -type SignalNameChangeSlice []*SignalNameChange - -// NewSignalNameChange will create a `Signal` that will check the provided mappings if it can update a `alias.Signal` +// NewSignalNameChange will create a `Signal` that will check the provided mappings if it can update a `alias.NamedSignal` // and if no values are provided for `matches`, then all values will be updated. -func NewSignalNameChange[Key SignalType, Value SignalType](mappings map[Key]Value) *SignalNameChange { - sig := &SignalNameChange{ +func NewSignalNameChange[Key SignalType, Value SignalType](mappings map[Key]Value) SignalNameChange { + sig := SignalNameChange{ updates: make(map[string]string, len(mappings)), rollback: make(map[string]string, len(mappings)), } @@ -34,15 +32,17 @@ func NewSignalNameChange[Key SignalType, Value SignalType](mappings map[Key]Valu return sig } +func (s SignalNameChange) IsMigrator() {} + func (s *SignalNameChange) Apply(signal alias.NamedSignal) { - s.do(StateSelectorApply, signal) + s.Do(StateSelectorApply, signal) } func (s *SignalNameChange) Rollback(signal alias.NamedSignal) { - s.do(StateSelectorRollback, signal) + s.Do(StateSelectorRollback, signal) } -func (s *SignalNameChange) do(ss StateSelector, signal alias.NamedSignal) { +func (s *SignalNameChange) Do(ss StateSelector, signal alias.NamedSignal) { var ( name string matched bool @@ -57,30 +57,3 @@ func (s *SignalNameChange) do(ss StateSelector, signal alias.NamedSignal) { signal.SetName(name) } } - -func NewSignalNameChangeSlice(changes ...*SignalNameChange) *SignalNameChangeSlice { - values := new(SignalNameChangeSlice) - for _, c := range changes { - (*values) = append((*values), c) - } - return values -} - -func (slice *SignalNameChangeSlice) Apply(signal alias.NamedSignal) { - slice.do(StateSelectorApply, signal) -} - -func (slice *SignalNameChangeSlice) Rollback(signal alias.NamedSignal) { - slice.do(StateSelectorRollback, signal) -} - -func (slice *SignalNameChangeSlice) do(ss StateSelector, signal alias.NamedSignal) { - for i := 0; i < len((*slice)); i++ { - switch ss { - case StateSelectorApply: - (*slice)[i].Apply(signal) - case StateSelectorRollback: - (*slice)[len((*slice))-i-1].Rollback(signal) - } - } -} diff --git a/processor/schemaprocessor/internal/migrate/signal_test.go b/processor/schemaprocessor/internal/migrate/signal_test.go index 14f90951f46c..ada3be0a282e 100644 --- a/processor/schemaprocessor/internal/migrate/signal_test.go +++ b/processor/schemaprocessor/internal/migrate/signal_test.go @@ -17,7 +17,7 @@ func TestSignalApply(t *testing.T) { for _, tc := range []struct { name string - sig *SignalNameChange + sig SignalNameChange val alias.NamedSignal expect string }{ @@ -75,7 +75,7 @@ func TestSignalRollback(t *testing.T) { for _, tc := range []struct { name string - sig *SignalNameChange + sig SignalNameChange val alias.NamedSignal expect string }{ diff --git a/processor/schemaprocessor/internal/operator/attributes_operators.go b/processor/schemaprocessor/internal/operator/attributes_operators.go new file mode 100644 index 000000000000..95ac7b0bed7e --- /dev/null +++ b/processor/schemaprocessor/internal/operator/attributes_operators.go @@ -0,0 +1,159 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package operator contains various Operators that represent a high level operation - typically a single "change" block from the schema change file. They rely on Migrators to do the actual work of applying the change to the data. Operators accept and operate on a specific type of pdata (logs, metrics, etc) +package operator // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator" + +import ( + "errors" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +// MetricDataPointAttributeOperator is an Operator that acts on Metric DataPoints' attributes +type MetricDataPointAttributeOperator struct { + ConditionalAttributeChange migrate.ConditionalAttributeSet +} + +func (o MetricDataPointAttributeOperator) IsMigrator() {} + +func (o MetricDataPointAttributeOperator) Do(ss migrate.StateSelector, metric pmetric.Metric) error { + // todo(ankit) handle MetricTypeEmpty + var datam alias.Attributed + switch metric.Type() { + case pmetric.MetricTypeExponentialHistogram: + for dp := 0; dp < metric.ExponentialHistogram().DataPoints().Len(); dp++ { + datam = metric.ExponentialHistogram().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + case pmetric.MetricTypeHistogram: + for dp := 0; dp < metric.Histogram().DataPoints().Len(); dp++ { + datam = metric.Histogram().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + case pmetric.MetricTypeGauge: + for dp := 0; dp < metric.Gauge().DataPoints().Len(); dp++ { + datam = metric.Gauge().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + case pmetric.MetricTypeSum: + for dp := 0; dp < metric.Sum().DataPoints().Len(); dp++ { + datam = metric.Sum().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + case pmetric.MetricTypeSummary: + for dp := 0; dp < metric.Summary().DataPoints().Len(); dp++ { + datam = metric.Summary().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + default: + return errors.New("unsupported metric type") + } + + return nil +} + +// LogAttributeOperator is an Operator that acts on LogRecords' attributes +type LogAttributeOperator struct { + AttributeChange migrate.AttributeChangeSet +} + +func (o LogAttributeOperator) Apply(data any) error { + logs, ok := data.(plog.ScopeLogs) + if !ok { + return errors.New("invalid data type for LogAttributeOperator") + } + for l := 0; l < logs.LogRecords().Len(); l++ { + log := logs.LogRecords().At(l) + if err := o.AttributeChange.Apply(log.Attributes()); err != nil { + return err + } + } + return nil +} + +func (o LogAttributeOperator) Rollback(data any) error { + logs, ok := data.(plog.ScopeLogs) + if !ok { + return errors.New("invalid data type for LogAttributeOperator") + } + + for l := 0; l < logs.LogRecords().Len(); l++ { + log := logs.LogRecords().At(l) + if err := o.AttributeChange.Rollback(log.Attributes()); err != nil { + return err + } + } + return nil +} + +type SpanAttributeOperator struct { + AttributeChange migrate.AttributeChangeSet +} + +func (o SpanAttributeOperator) Apply(data any) error { + traces, ok := data.(ptrace.ScopeSpans) + if !ok { + return errors.New("invalid data type for SpanAttributeOperator") + } + for l := 0; l < traces.Spans().Len(); l++ { + span := traces.Spans().At(l) + if err := o.AttributeChange.Apply(span.Attributes()); err != nil { + return err + } + } + return nil +} + +func (o SpanAttributeOperator) Rollback(data any) error { + traces, ok := data.(ptrace.ScopeSpans) + if !ok { + return errors.New("invalid data type for SpanAttributeOperator") + } + for l := 0; l < traces.Spans().Len(); l++ { + span := traces.Spans().At(l) + if err := o.AttributeChange.Rollback(span.Attributes()); err != nil { + return err + } + } + return nil +} + +type SpanEventAttributeOperator struct { + AttributeChange migrate.AttributeChangeSet +} + +func (o SpanEventAttributeOperator) Apply(span ptrace.Span) error { + for l := 0; l < span.Events().Len(); l++ { + span := span.Events().At(l) + if err := o.AttributeChange.Apply(span.Attributes()); err != nil { + return err + } + } + return nil +} + +func (o SpanEventAttributeOperator) Rollback(span ptrace.Span) error { + for l := 0; l < span.Events().Len(); l++ { + span := span.Events().At(l) + if err := o.AttributeChange.Rollback(span.Attributes()); err != nil { + return err + } + } + return nil +} diff --git a/processor/schemaprocessor/internal/operator/conditional_attributes_operators.go b/processor/schemaprocessor/internal/operator/conditional_attributes_operators.go new file mode 100644 index 000000000000..866c52e00312 --- /dev/null +++ b/processor/schemaprocessor/internal/operator/conditional_attributes_operators.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package operator // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator" + +import ( + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +type SpanConditionalAttributeOperator struct { + Migrator migrate.ConditionalAttributeSet +} + +func (o SpanConditionalAttributeOperator) IsMigrator() {} + +func (o SpanConditionalAttributeOperator) Do(ss migrate.StateSelector, span ptrace.Span) error { + return o.Migrator.Do(ss, span.Attributes(), span.Name()) +} diff --git a/processor/schemaprocessor/internal/operator/multi_conditional_attribute_operators.go b/processor/schemaprocessor/internal/operator/multi_conditional_attribute_operators.go new file mode 100644 index 000000000000..9b302c6032d3 --- /dev/null +++ b/processor/schemaprocessor/internal/operator/multi_conditional_attribute_operators.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package operator // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator" + +import ( + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +type SpanEventConditionalAttributeOperator struct { + migrator migrate.MultiConditionalAttributeSet +} + +func NewSpanEventConditionalAttributeOperator(migrator migrate.MultiConditionalAttributeSet) SpanEventConditionalAttributeOperator { + return SpanEventConditionalAttributeOperator{migrator: migrator} +} + +func (o SpanEventConditionalAttributeOperator) IsMigrator() {} + +func (o SpanEventConditionalAttributeOperator) Do(ss migrate.StateSelector, span ptrace.Span) error { + for e := 0; e < span.Events().Len(); e++ { + event := span.Events().At(e) + if err := o.migrator.Do(ss, event.Attributes(), + map[string]string{ + "event.name": event.Name(), + "span.name": span.Name(), + }); err != nil { + return err + } + } + return nil +} diff --git a/processor/schemaprocessor/internal/operator/signal_interfaces.go b/processor/schemaprocessor/internal/operator/signal_interfaces.go new file mode 100644 index 000000000000..c51dbd4cfae3 --- /dev/null +++ b/processor/schemaprocessor/internal/operator/signal_interfaces.go @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package operator // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator" + +import ( + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +type LogOperator interface { + migrate.Migrator + Do(ss migrate.StateSelector, log plog.LogRecord) error +} + +type MetricOperator interface { + migrate.Migrator + Do(ss migrate.StateSelector, metric pmetric.Metric) error +} + +type SpanOperator interface { + migrate.Migrator + Do(ss migrate.StateSelector, signal ptrace.Span) error +} diff --git a/processor/schemaprocessor/internal/operator/signal_name.go b/processor/schemaprocessor/internal/operator/signal_name.go new file mode 100644 index 000000000000..763efb98aecf --- /dev/null +++ b/processor/schemaprocessor/internal/operator/signal_name.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package operator // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator" + +import ( + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +type SpanEventSignalNameChange struct { + SignalNameChange migrate.SignalNameChange +} + +func (c SpanEventSignalNameChange) IsMigrator() {} + +func (c SpanEventSignalNameChange) Do(ss migrate.StateSelector, span ptrace.Span) error { + for e := 0; e < span.Events().Len(); e++ { + event := span.Events().At(e) + c.SignalNameChange.Do(ss, event) + } + return nil +} + +// a similar type as SpanEventSignalNameChange, but for metrics +type MetricSignalNameChange struct { + SignalNameChange migrate.SignalNameChange +} + +func (c MetricSignalNameChange) IsMigrator() {} + +func (c MetricSignalNameChange) Do(ss migrate.StateSelector, metric pmetric.Metric) error { + c.SignalNameChange.Do(ss, metric) + return nil +}