From 4027afa9028e7e3baf6a21e703e7978b6e8d083c Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Mon, 16 Sep 2024 11:54:43 -0400 Subject: [PATCH] operator and migrator support --- .../schemaprocessor/internal/alias/alias.go | 6 +- .../internal/migrate/attributes.go | 55 ++---- .../internal/migrate/attributes_test.go | 101 +---------- .../internal/migrate/conditional.go | 64 +++---- .../internal/migrate/conditional_test.go | 118 +------------ .../internal/migrate/migrator.go | 16 ++ .../internal/migrate/multi_conditional.go | 86 ++++++++++ .../migrate/multi_conditional_test.go | 149 ++++++++++++++++ .../internal/migrate/signal.go | 45 +---- .../internal/migrate/signal_test.go | 4 +- .../internal/operator/attributes_operators.go | 159 ++++++++++++++++++ .../conditional_attributes_operators.go | 20 +++ .../multi_conditional_attribute_operators.go | 34 ++++ .../internal/operator/signal_interfaces.go | 27 +++ .../internal/operator/signal_name.go | 37 ++++ 15 files changed, 583 insertions(+), 338 deletions(-) create mode 100644 processor/schemaprocessor/internal/migrate/migrator.go create mode 100644 processor/schemaprocessor/internal/migrate/multi_conditional.go create mode 100644 processor/schemaprocessor/internal/migrate/multi_conditional_test.go create mode 100644 processor/schemaprocessor/internal/operator/attributes_operators.go create mode 100644 processor/schemaprocessor/internal/operator/conditional_attributes_operators.go create mode 100644 processor/schemaprocessor/internal/operator/multi_conditional_attribute_operators.go create mode 100644 processor/schemaprocessor/internal/operator/signal_interfaces.go create mode 100644 processor/schemaprocessor/internal/operator/signal_name.go diff --git a/processor/schemaprocessor/internal/alias/alias.go b/processor/schemaprocessor/internal/alias/alias.go index 918a08d44d78..7e5e01436350 100644 --- a/processor/schemaprocessor/internal/alias/alias.go +++ b/processor/schemaprocessor/internal/alias/alias.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -// Package Alias is a subset of the interfaces defined by pdata and family +// Package alias is a subset of the interfaces defined by pdata and family // package to allow for higher code reuse without using generics. package alias // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias" @@ -30,6 +30,10 @@ type NamedSignal interface { SetName(name string) } +type Attributed interface { + Attributes() pcommon.Map +} + var ( _ Resource = (*plog.ResourceLogs)(nil) _ Resource = (*pmetric.ResourceMetrics)(nil) diff --git a/processor/schemaprocessor/internal/migrate/attributes.go b/processor/schemaprocessor/internal/migrate/attributes.go index 37a020cf6123..707703ff3137 100644 --- a/processor/schemaprocessor/internal/migrate/attributes.go +++ b/processor/schemaprocessor/internal/migrate/attributes.go @@ -11,24 +11,21 @@ import ( "go.uber.org/multierr" ) -// AttributeChangeSet represents an unscoped entry that can be applied. -// +// AttributeChangeSet represents a rename_attributes type operation. // The listed changes are duplicated twice // to allow for simplified means of transition to or from a revision. type AttributeChangeSet struct { - updates ast.AttributeMap + // The keys are the old attribute name used in the previous version, the values are the + // new attribute name starting from this version (comment from ast.AttributeMap) + updates ast.AttributeMap + // the inverse of the updates map rollback ast.AttributeMap } -// AttributeChangeSetSlice allows for `AttributeChangeSet` -// to be chained together as they are defined within the schema -// and be applied sequentially to ensure deterministic behavior. -type AttributeChangeSetSlice []*AttributeChangeSet - // NewAttributeChangeSet allows for typed strings to be used as part // of the invocation that will be converted into the default string type. -func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet { - attr := &AttributeChangeSet{ +func NewAttributeChangeSet(mappings ast.AttributeMap) AttributeChangeSet { + attr := AttributeChangeSet{ updates: make(map[string]string, len(mappings)), rollback: make(map[string]string, len(mappings)), } @@ -39,15 +36,17 @@ func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet { return attr } +func (a AttributeChangeSet) IsMigrator() {} + func (a *AttributeChangeSet) Apply(attrs pcommon.Map) error { - return a.do(StateSelectorApply, attrs) + return a.Do(StateSelectorApply, attrs) } func (a *AttributeChangeSet) Rollback(attrs pcommon.Map) error { - return a.do(StateSelectorRollback, attrs) + return a.Do(StateSelectorRollback, attrs) } -func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error) { +func (a *AttributeChangeSet) Do(ss StateSelector, attrs pcommon.Map) (errs error) { var ( updated = make(map[string]struct{}) results = pcommon.NewMap() @@ -81,33 +80,3 @@ func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error results.CopyTo(attrs) return errs } - -// NewAttributeChangeSetSlice combines all the provided `AttributeChangeSets` -// and allows them to be executed in the provided order. -func NewAttributeChangeSetSlice(changes ...*AttributeChangeSet) *AttributeChangeSetSlice { - values := new(AttributeChangeSetSlice) - for _, c := range changes { - (*values) = append((*values), c) - } - return values -} - -func (slice *AttributeChangeSetSlice) Apply(attrs pcommon.Map) error { - return slice.do(StateSelectorApply, attrs) -} - -func (slice *AttributeChangeSetSlice) Rollback(attrs pcommon.Map) error { - return slice.do(StateSelectorRollback, attrs) -} - -func (slice *AttributeChangeSetSlice) do(ss StateSelector, attrs pcommon.Map) (errs error) { - for i := 0; i < len(*slice); i++ { - switch ss { - case StateSelectorApply: - errs = multierr.Append(errs, (*slice)[i].Apply(attrs)) - case StateSelectorRollback: - errs = multierr.Append(errs, (*slice)[len(*slice)-1-i].Rollback(attrs)) - } - } - return errs -} diff --git a/processor/schemaprocessor/internal/migrate/attributes_test.go b/processor/schemaprocessor/internal/migrate/attributes_test.go index f3cc5d79dc6e..d209a397ee78 100644 --- a/processor/schemaprocessor/internal/migrate/attributes_test.go +++ b/processor/schemaprocessor/internal/migrate/attributes_test.go @@ -27,7 +27,7 @@ func TestNewAttributeChangeSet(t *testing.T) { "hello": "world", }) - expect := &AttributeChangeSet{ + expect := AttributeChangeSet{ updates: map[string]string{ "hello": "world", }, @@ -45,7 +45,7 @@ func TestAttributeChangeSetApply(t *testing.T) { for _, tc := range []struct { name string - acs *AttributeChangeSet + acs AttributeChangeSet attrs pcommon.Map expect pcommon.Map errVal string @@ -120,7 +120,7 @@ func TestAttributeChangeSetRollback(t *testing.T) { for _, tc := range []struct { name string - acs *AttributeChangeSet + acs AttributeChangeSet attrs pcommon.Map expect pcommon.Map errVal string @@ -189,98 +189,3 @@ func TestAttributeChangeSetRollback(t *testing.T) { }) } } - -func TestNewAttributeChangeSetSliceApply(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - changes *AttributeChangeSetSlice - attr pcommon.Map - expect pcommon.Map - }{ - { - name: "no changes listed", - changes: NewAttributeChangeSetSlice(), - attr: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.1") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.1") - }), - }, - { - name: "changes defined", - changes: NewAttributeChangeSetSlice( - NewAttributeChangeSet(map[string]string{ - "service_version": "service.version", - }), - NewAttributeChangeSet(map[string]string{ - "service.version": "application.service.version", - }), - ), - attr: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service_version", "v0.0.1") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("application.service.version", "v0.0.1") - }), - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - assert.NoError(t, tc.changes.Apply(tc.attr)) - assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes") - }) - } -} - -func TestNewAttributeChangeSetSliceApplyRollback(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - changes *AttributeChangeSetSlice - attr pcommon.Map - expect pcommon.Map - }{ - { - name: "no changes listed", - changes: NewAttributeChangeSetSlice(), - attr: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.1") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.1") - }), - }, - { - name: "changes defined", - changes: NewAttributeChangeSetSlice( - NewAttributeChangeSet(map[string]string{ - "service_version": "service.version", - }), - NewAttributeChangeSet(map[string]string{ - "service.version": "application.service.version", - }), - ), - attr: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("application.service.version", "v0.0.1") - - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service_version", "v0.0.1") - }), - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - assert.NoError(t, tc.changes.Rollback(tc.attr)) - assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes") - }) - } -} diff --git a/processor/schemaprocessor/internal/migrate/conditional.go b/processor/schemaprocessor/internal/migrate/conditional.go index b3eea651f9fc..812ff46325e5 100644 --- a/processor/schemaprocessor/internal/migrate/conditional.go +++ b/processor/schemaprocessor/internal/migrate/conditional.go @@ -6,7 +6,6 @@ package migrate // import "github.com/open-telemetry/opentelemetry-collector-con import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/otel/schema/v1.0/ast" - "go.uber.org/multierr" ) // ValueMatch defines the expected match type @@ -15,74 +14,55 @@ type ValueMatch interface { ~string } +// ConditionalAttributeSet represents a set of attributes that will type ConditionalAttributeSet struct { - on *map[string]struct{} - attrs *AttributeChangeSet + on map[string]struct{} + attrs AttributeChangeSet } type ConditionalAttributeSetSlice []*ConditionalAttributeSet -func NewConditionalAttributeSet[Match ValueMatch](mappings ast.AttributeMap, matches ...Match) *ConditionalAttributeSet { +func NewConditionalAttributeSet[Match ValueMatch](mappings ast.AttributeMap, matches ...Match) ConditionalAttributeSet { on := make(map[string]struct{}) for _, m := range matches { on[string(m)] = struct{}{} } - return &ConditionalAttributeSet{ - on: &on, + return ConditionalAttributeSet{ + on: on, attrs: NewAttributeChangeSet(mappings), } } -func (ca *ConditionalAttributeSet) Apply(attrs pcommon.Map, values ...string) (errs error) { +func (ca ConditionalAttributeSet) IsMigrator() {} + +func (ca *ConditionalAttributeSet) Do(ss StateSelector, attrs pcommon.Map, values ...string) (errs error) { if ca.check(values...) { - errs = ca.attrs.Apply(attrs) + switch ss { + case StateSelectorApply: + errs = ca.attrs.Apply(attrs) + case StateSelectorRollback: + errs = ca.attrs.Rollback(attrs) + } } return errs } +func (ca *ConditionalAttributeSet) Apply(attrs pcommon.Map, values ...string) (errs error) { + return ca.Do(StateSelectorApply, attrs, values...) +} func (ca *ConditionalAttributeSet) Rollback(attrs pcommon.Map, values ...string) (errs error) { - if ca.check(values...) { - errs = ca.attrs.Rollback(attrs) - } - return errs + return ca.Do(StateSelectorRollback, attrs, values...) } +// todo make it harder to misuse this! diff between no values and 0 values func (ca *ConditionalAttributeSet) check(values ...string) bool { - if len(*ca.on) == 0 { + if len(ca.on) == 0 { return true } for _, v := range values { - if _, ok := (*ca.on)[v]; !ok { + if _, ok := (ca.on)[v]; !ok { return false } } return true } - -func NewConditionalAttributeSetSlice(conditions ...*ConditionalAttributeSet) *ConditionalAttributeSetSlice { - values := new(ConditionalAttributeSetSlice) - for _, c := range conditions { - (*values) = append((*values), c) - } - return values -} - -func (slice *ConditionalAttributeSetSlice) Apply(attrs pcommon.Map, values ...string) error { - return slice.do(StateSelectorApply, attrs, values) -} - -func (slice *ConditionalAttributeSetSlice) Rollback(attrs pcommon.Map, values ...string) error { - return slice.do(StateSelectorRollback, attrs, values) -} - -func (slice *ConditionalAttributeSetSlice) do(ss StateSelector, attrs pcommon.Map, values []string) (errs error) { - for i := 0; i < len((*slice)); i++ { - switch ss { - case StateSelectorApply: - errs = multierr.Append(errs, (*slice)[i].Apply(attrs, values...)) - case StateSelectorRollback: - errs = multierr.Append(errs, (*slice)[len((*slice))-i-1].Rollback(attrs, values...)) - } - } - return errs -} diff --git a/processor/schemaprocessor/internal/migrate/conditional_test.go b/processor/schemaprocessor/internal/migrate/conditional_test.go index 0c199c98d050..56e9b2a8ade4 100644 --- a/processor/schemaprocessor/internal/migrate/conditional_test.go +++ b/processor/schemaprocessor/internal/migrate/conditional_test.go @@ -15,7 +15,7 @@ func TestConditionalAttributeSetApply(t *testing.T) { for _, tc := range []struct { name string - cond *ConditionalAttributeSet + cond ConditionalAttributeSet check string attr pcommon.Map expect pcommon.Map @@ -95,7 +95,7 @@ func TestConditionalAttributeSetRollback(t *testing.T) { for _, tc := range []struct { name string - cond *ConditionalAttributeSet + cond ConditionalAttributeSet check string attr pcommon.Map expect pcommon.Map @@ -169,117 +169,3 @@ func TestConditionalAttributeSetRollback(t *testing.T) { }) } } - -func TestConditionalAttribueSetSliceApply(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - slice *ConditionalAttributeSetSlice - check string - attrs pcommon.Map - expect pcommon.Map - }{ - { - name: "No changes", - slice: NewConditionalAttributeSetSlice(), - check: "application start", - attrs: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - }, - { - name: "Not matched check value", - slice: NewConditionalAttributeSetSlice( - NewConditionalAttributeSet[string]( - map[string]string{ - "service_version": "service.version", - }, - ), - // intentially silly to be make it clear - // that this should not be applied - NewConditionalAttributeSet( - map[string]string{ - "service.version": "shark.attack", - }, - "shark spotted", - ), - ), - check: "application start", - attrs: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service_version", "v0.0.0") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - assert.NoError(t, tc.slice.Apply(tc.attrs, tc.check)) - assert.Equal(t, tc.expect.AsRaw(), tc.attrs.AsRaw(), "Must match the expected values") - }) - } -} - -func TestConditionalAttribueSetSliceRollback(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - slice *ConditionalAttributeSetSlice - check string - attrs pcommon.Map - expect pcommon.Map - }{ - { - name: "No changes", - slice: NewConditionalAttributeSetSlice(), - check: "application start", - attrs: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - }, - { - name: "Not matched check value", - slice: NewConditionalAttributeSetSlice( - NewConditionalAttributeSet[string]( - map[string]string{ - "service_version": "service.version", - }, - ), - // intentially silly to be make it clear - // that this should not be applied - NewConditionalAttributeSet( - map[string]string{ - "service.version": "shark.attack", - }, - "shark spotted", - ), - ), - check: "application start", - attrs: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service_version", "v0.0.0") - }), - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - assert.NoError(t, tc.slice.Rollback(tc.attrs, tc.check)) - assert.Equal(t, tc.expect.AsRaw(), tc.attrs.AsRaw(), "Must match the expected values") - }) - } -} diff --git a/processor/schemaprocessor/internal/migrate/migrator.go b/processor/schemaprocessor/internal/migrate/migrator.go new file mode 100644 index 000000000000..8dea3973eb08 --- /dev/null +++ b/processor/schemaprocessor/internal/migrate/migrator.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package migrate // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" + +// Migrator is an interface that all migration types must implement. It is basically a marker interface. All Operators are also Migrators +type Migrator interface { + IsMigrator() +} + +var ( + _ Migrator = (*AttributeChangeSet)(nil) + _ Migrator = (*MultiConditionalAttributeSet)(nil) + _ Migrator = (*SignalNameChange)(nil) + _ Migrator = (*ConditionalAttributeSet)(nil) +) diff --git a/processor/schemaprocessor/internal/migrate/multi_conditional.go b/processor/schemaprocessor/internal/migrate/multi_conditional.go new file mode 100644 index 000000000000..80a91667d798 --- /dev/null +++ b/processor/schemaprocessor/internal/migrate/multi_conditional.go @@ -0,0 +1,86 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package migrate // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" + +import ( + "errors" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/otel/schema/v1.0/ast" +) + +type set = map[string]struct{} + +// MultiConditionalAttributeSet maps from string keys to possible values for each of those keys. If a valid key is passed for each value, the conditional returns true +type MultiConditionalAttributeSet struct { + // map from string keys (in the intended case "event.name" and "span.name" to a set of acceptable values) + keysToPossibleValues map[string]set + attrs AttributeChangeSet +} + +type MultiConditionalAttributeSetSlice []*MultiConditionalAttributeSet + +func NewMultiConditionalAttributeSet[Match ValueMatch](mappings ast.AttributeMap, matches map[string][]Match) MultiConditionalAttributeSet { + keysToPossibleValues := make(map[string]set) + for k, values := range matches { + on := make(map[string]struct{}) + for _, val := range values { + on[string(val)] = struct{}{} + } + keysToPossibleValues[k] = on + } + return MultiConditionalAttributeSet{ + keysToPossibleValues: keysToPossibleValues, + attrs: NewAttributeChangeSet(mappings), + } +} + +func (ca MultiConditionalAttributeSet) IsMigrator() {} + +func (ca *MultiConditionalAttributeSet) Apply(attrs pcommon.Map, keyToCheckVals map[string]string) (errs error) { + return ca.Do(StateSelectorApply, attrs, keyToCheckVals) +} + +func (ca *MultiConditionalAttributeSet) Rollback(attrs pcommon.Map, keyToCheckVals map[string]string) (errs error) { + return ca.Do(StateSelectorRollback, attrs, keyToCheckVals) +} + +// Do function applies the attribute changes if the values match the expected values. Uses the Do method of the embedded AttributeChangeSet +func (ca *MultiConditionalAttributeSet) Do(ss StateSelector, attrs pcommon.Map, keyToCheckVals map[string]string) (errs error) { + match, err := ca.check(keyToCheckVals) + if err != nil { + return err + } + if match { + errs = ca.attrs.Do(ss, attrs) + } + return errs +} + +func (ca *MultiConditionalAttributeSet) check(keyToCheckVals map[string]string) (bool, error) { + if len(ca.keysToPossibleValues) == 0 { + return true, nil + } + if len(ca.keysToPossibleValues) != len(keyToCheckVals) { + return false, errors.New("passed in wrong number of matchers to MultiConditionalAttributeSet") + } + for k, inVal := range keyToCheckVals { + // We must already have a key matching the input key! If not, return an error + // indicates a programming error, should be impossible if using the class correctly + valToMatch, ok := (ca.keysToPossibleValues)[k] + if !ok { + return false, errors.New("passed in a key that doesn't exist in MultiConditionalAttributeSet") + } + // if there's nothing in here, match all values + if len(valToMatch) == 0 { + continue + } + if _, ok := valToMatch[inVal]; !ok { + return false, nil + } + + } + // if we've gone through every one of the keys, and they've all generated matches, return true + return true, nil +} diff --git a/processor/schemaprocessor/internal/migrate/multi_conditional_test.go b/processor/schemaprocessor/internal/migrate/multi_conditional_test.go new file mode 100644 index 000000000000..5e136fad3a77 --- /dev/null +++ b/processor/schemaprocessor/internal/migrate/multi_conditional_test.go @@ -0,0 +1,149 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package migrate + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestMultiConditionalAttributeSetApply(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + cond MultiConditionalAttributeSet + inCondData map[string]string + inAttr pcommon.Map + expect pcommon.Map + }{ + { + name: "No changes defined", + cond: NewMultiConditionalAttributeSet[string](map[string]string{}, map[string][]string{}), + inCondData: map[string]string{"span.name": "database operation"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + }, + { + name: "Not matched in value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{"span.name": {"application start"}}, + ), + inCondData: map[string]string{"span.name": "datatbase operation"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + }, + { + name: "No condition set, applys to all", + cond: NewMultiConditionalAttributeSet[string]( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{}, + ), + inCondData: map[string]string{"span.name": "datatbase operation"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("application.version", "v0.0.0") + }), + }, + { + name: "Matched one condition, setting value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{ + "span.name": {"application start", "application end"}, + }, + ), + inCondData: map[string]string{"span.name": "application start"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("application.version", "v0.0.0") + }), + }, + { + name: "Matched one condition, other value, setting value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{ + "span.name": {"application start", "application end"}, + }, + ), + inCondData: map[string]string{"span.name": "application end"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("application.version", "v0.0.0") + }), + }, + { + name: "Matched one out of two conditions, don't set value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{ + "trace.name": {"application start"}, + "span.name": {"application end"}, + }, + ), + inCondData: map[string]string{"span.name": "application start", "trace.name": "application end"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + }, + { + name: "Matched both conditions, set value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{ + "span.name": {"application start"}, + "trace.name": {"application end"}, + }, + ), + inCondData: map[string]string{"span.name": "application start", "trace.name": "application end"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("application.version", "v0.0.0") + }), + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.NoError(t, tc.cond.Apply(tc.inAttr, tc.inCondData)) + assert.Equal(t, tc.expect.AsRaw(), tc.inAttr.AsRaw(), "Must match the expected value") + }) + } +} diff --git a/processor/schemaprocessor/internal/migrate/signal.go b/processor/schemaprocessor/internal/migrate/signal.go index 0570819c4a97..7a8d5417f686 100644 --- a/processor/schemaprocessor/internal/migrate/signal.go +++ b/processor/schemaprocessor/internal/migrate/signal.go @@ -12,18 +12,16 @@ type SignalType interface { } // SignalNameChange allows for migrating types that -// implement the `alias.Signal` interface. +// implement the `alias.NamedSignal` interface. type SignalNameChange struct { updates map[string]string rollback map[string]string } -type SignalNameChangeSlice []*SignalNameChange - -// NewSignalNameChange will create a `Signal` that will check the provided mappings if it can update a `alias.Signal` +// NewSignalNameChange will create a `Signal` that will check the provided mappings if it can update a `alias.NamedSignal` // and if no values are provided for `matches`, then all values will be updated. -func NewSignalNameChange[Key SignalType, Value SignalType](mappings map[Key]Value) *SignalNameChange { - sig := &SignalNameChange{ +func NewSignalNameChange[Key SignalType, Value SignalType](mappings map[Key]Value) SignalNameChange { + sig := SignalNameChange{ updates: make(map[string]string, len(mappings)), rollback: make(map[string]string, len(mappings)), } @@ -34,15 +32,17 @@ func NewSignalNameChange[Key SignalType, Value SignalType](mappings map[Key]Valu return sig } +func (s SignalNameChange) IsMigrator() {} + func (s *SignalNameChange) Apply(signal alias.NamedSignal) { - s.do(StateSelectorApply, signal) + s.Do(StateSelectorApply, signal) } func (s *SignalNameChange) Rollback(signal alias.NamedSignal) { - s.do(StateSelectorRollback, signal) + s.Do(StateSelectorRollback, signal) } -func (s *SignalNameChange) do(ss StateSelector, signal alias.NamedSignal) { +func (s *SignalNameChange) Do(ss StateSelector, signal alias.NamedSignal) { var ( name string matched bool @@ -57,30 +57,3 @@ func (s *SignalNameChange) do(ss StateSelector, signal alias.NamedSignal) { signal.SetName(name) } } - -func NewSignalNameChangeSlice(changes ...*SignalNameChange) *SignalNameChangeSlice { - values := new(SignalNameChangeSlice) - for _, c := range changes { - (*values) = append((*values), c) - } - return values -} - -func (slice *SignalNameChangeSlice) Apply(signal alias.NamedSignal) { - slice.do(StateSelectorApply, signal) -} - -func (slice *SignalNameChangeSlice) Rollback(signal alias.NamedSignal) { - slice.do(StateSelectorRollback, signal) -} - -func (slice *SignalNameChangeSlice) do(ss StateSelector, signal alias.NamedSignal) { - for i := 0; i < len((*slice)); i++ { - switch ss { - case StateSelectorApply: - (*slice)[i].Apply(signal) - case StateSelectorRollback: - (*slice)[len((*slice))-i-1].Rollback(signal) - } - } -} diff --git a/processor/schemaprocessor/internal/migrate/signal_test.go b/processor/schemaprocessor/internal/migrate/signal_test.go index 14f90951f46c..ada3be0a282e 100644 --- a/processor/schemaprocessor/internal/migrate/signal_test.go +++ b/processor/schemaprocessor/internal/migrate/signal_test.go @@ -17,7 +17,7 @@ func TestSignalApply(t *testing.T) { for _, tc := range []struct { name string - sig *SignalNameChange + sig SignalNameChange val alias.NamedSignal expect string }{ @@ -75,7 +75,7 @@ func TestSignalRollback(t *testing.T) { for _, tc := range []struct { name string - sig *SignalNameChange + sig SignalNameChange val alias.NamedSignal expect string }{ diff --git a/processor/schemaprocessor/internal/operator/attributes_operators.go b/processor/schemaprocessor/internal/operator/attributes_operators.go new file mode 100644 index 000000000000..95ac7b0bed7e --- /dev/null +++ b/processor/schemaprocessor/internal/operator/attributes_operators.go @@ -0,0 +1,159 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package operator contains various Operators that represent a high level operation - typically a single "change" block from the schema change file. They rely on Migrators to do the actual work of applying the change to the data. Operators accept and operate on a specific type of pdata (logs, metrics, etc) +package operator // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator" + +import ( + "errors" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +// MetricDataPointAttributeOperator is an Operator that acts on Metric DataPoints' attributes +type MetricDataPointAttributeOperator struct { + ConditionalAttributeChange migrate.ConditionalAttributeSet +} + +func (o MetricDataPointAttributeOperator) IsMigrator() {} + +func (o MetricDataPointAttributeOperator) Do(ss migrate.StateSelector, metric pmetric.Metric) error { + // todo(ankit) handle MetricTypeEmpty + var datam alias.Attributed + switch metric.Type() { + case pmetric.MetricTypeExponentialHistogram: + for dp := 0; dp < metric.ExponentialHistogram().DataPoints().Len(); dp++ { + datam = metric.ExponentialHistogram().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + case pmetric.MetricTypeHistogram: + for dp := 0; dp < metric.Histogram().DataPoints().Len(); dp++ { + datam = metric.Histogram().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + case pmetric.MetricTypeGauge: + for dp := 0; dp < metric.Gauge().DataPoints().Len(); dp++ { + datam = metric.Gauge().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + case pmetric.MetricTypeSum: + for dp := 0; dp < metric.Sum().DataPoints().Len(); dp++ { + datam = metric.Sum().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + case pmetric.MetricTypeSummary: + for dp := 0; dp < metric.Summary().DataPoints().Len(); dp++ { + datam = metric.Summary().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + default: + return errors.New("unsupported metric type") + } + + return nil +} + +// LogAttributeOperator is an Operator that acts on LogRecords' attributes +type LogAttributeOperator struct { + AttributeChange migrate.AttributeChangeSet +} + +func (o LogAttributeOperator) Apply(data any) error { + logs, ok := data.(plog.ScopeLogs) + if !ok { + return errors.New("invalid data type for LogAttributeOperator") + } + for l := 0; l < logs.LogRecords().Len(); l++ { + log := logs.LogRecords().At(l) + if err := o.AttributeChange.Apply(log.Attributes()); err != nil { + return err + } + } + return nil +} + +func (o LogAttributeOperator) Rollback(data any) error { + logs, ok := data.(plog.ScopeLogs) + if !ok { + return errors.New("invalid data type for LogAttributeOperator") + } + + for l := 0; l < logs.LogRecords().Len(); l++ { + log := logs.LogRecords().At(l) + if err := o.AttributeChange.Rollback(log.Attributes()); err != nil { + return err + } + } + return nil +} + +type SpanAttributeOperator struct { + AttributeChange migrate.AttributeChangeSet +} + +func (o SpanAttributeOperator) Apply(data any) error { + traces, ok := data.(ptrace.ScopeSpans) + if !ok { + return errors.New("invalid data type for SpanAttributeOperator") + } + for l := 0; l < traces.Spans().Len(); l++ { + span := traces.Spans().At(l) + if err := o.AttributeChange.Apply(span.Attributes()); err != nil { + return err + } + } + return nil +} + +func (o SpanAttributeOperator) Rollback(data any) error { + traces, ok := data.(ptrace.ScopeSpans) + if !ok { + return errors.New("invalid data type for SpanAttributeOperator") + } + for l := 0; l < traces.Spans().Len(); l++ { + span := traces.Spans().At(l) + if err := o.AttributeChange.Rollback(span.Attributes()); err != nil { + return err + } + } + return nil +} + +type SpanEventAttributeOperator struct { + AttributeChange migrate.AttributeChangeSet +} + +func (o SpanEventAttributeOperator) Apply(span ptrace.Span) error { + for l := 0; l < span.Events().Len(); l++ { + span := span.Events().At(l) + if err := o.AttributeChange.Apply(span.Attributes()); err != nil { + return err + } + } + return nil +} + +func (o SpanEventAttributeOperator) Rollback(span ptrace.Span) error { + for l := 0; l < span.Events().Len(); l++ { + span := span.Events().At(l) + if err := o.AttributeChange.Rollback(span.Attributes()); err != nil { + return err + } + } + return nil +} diff --git a/processor/schemaprocessor/internal/operator/conditional_attributes_operators.go b/processor/schemaprocessor/internal/operator/conditional_attributes_operators.go new file mode 100644 index 000000000000..866c52e00312 --- /dev/null +++ b/processor/schemaprocessor/internal/operator/conditional_attributes_operators.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package operator // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator" + +import ( + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +type SpanConditionalAttributeOperator struct { + Migrator migrate.ConditionalAttributeSet +} + +func (o SpanConditionalAttributeOperator) IsMigrator() {} + +func (o SpanConditionalAttributeOperator) Do(ss migrate.StateSelector, span ptrace.Span) error { + return o.Migrator.Do(ss, span.Attributes(), span.Name()) +} diff --git a/processor/schemaprocessor/internal/operator/multi_conditional_attribute_operators.go b/processor/schemaprocessor/internal/operator/multi_conditional_attribute_operators.go new file mode 100644 index 000000000000..9b302c6032d3 --- /dev/null +++ b/processor/schemaprocessor/internal/operator/multi_conditional_attribute_operators.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package operator // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator" + +import ( + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +type SpanEventConditionalAttributeOperator struct { + migrator migrate.MultiConditionalAttributeSet +} + +func NewSpanEventConditionalAttributeOperator(migrator migrate.MultiConditionalAttributeSet) SpanEventConditionalAttributeOperator { + return SpanEventConditionalAttributeOperator{migrator: migrator} +} + +func (o SpanEventConditionalAttributeOperator) IsMigrator() {} + +func (o SpanEventConditionalAttributeOperator) Do(ss migrate.StateSelector, span ptrace.Span) error { + for e := 0; e < span.Events().Len(); e++ { + event := span.Events().At(e) + if err := o.migrator.Do(ss, event.Attributes(), + map[string]string{ + "event.name": event.Name(), + "span.name": span.Name(), + }); err != nil { + return err + } + } + return nil +} diff --git a/processor/schemaprocessor/internal/operator/signal_interfaces.go b/processor/schemaprocessor/internal/operator/signal_interfaces.go new file mode 100644 index 000000000000..c51dbd4cfae3 --- /dev/null +++ b/processor/schemaprocessor/internal/operator/signal_interfaces.go @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package operator // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator" + +import ( + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +type LogOperator interface { + migrate.Migrator + Do(ss migrate.StateSelector, log plog.LogRecord) error +} + +type MetricOperator interface { + migrate.Migrator + Do(ss migrate.StateSelector, metric pmetric.Metric) error +} + +type SpanOperator interface { + migrate.Migrator + Do(ss migrate.StateSelector, signal ptrace.Span) error +} diff --git a/processor/schemaprocessor/internal/operator/signal_name.go b/processor/schemaprocessor/internal/operator/signal_name.go new file mode 100644 index 000000000000..763efb98aecf --- /dev/null +++ b/processor/schemaprocessor/internal/operator/signal_name.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package operator // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator" + +import ( + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +type SpanEventSignalNameChange struct { + SignalNameChange migrate.SignalNameChange +} + +func (c SpanEventSignalNameChange) IsMigrator() {} + +func (c SpanEventSignalNameChange) Do(ss migrate.StateSelector, span ptrace.Span) error { + for e := 0; e < span.Events().Len(); e++ { + event := span.Events().At(e) + c.SignalNameChange.Do(ss, event) + } + return nil +} + +// a similar type as SpanEventSignalNameChange, but for metrics +type MetricSignalNameChange struct { + SignalNameChange migrate.SignalNameChange +} + +func (c MetricSignalNameChange) IsMigrator() {} + +func (c MetricSignalNameChange) Do(ss migrate.StateSelector, metric pmetric.Metric) error { + c.SignalNameChange.Do(ss, metric) + return nil +}