Skip to content

Commit

Permalink
operator and migrator support
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitpatel96 committed Sep 17, 2024
1 parent 2571dcd commit 4027afa
Show file tree
Hide file tree
Showing 15 changed files with 583 additions and 338 deletions.
6 changes: 5 additions & 1 deletion processor/schemaprocessor/internal/alias/alias.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package Alias is a subset of the interfaces defined by pdata and family
// Package alias is a subset of the interfaces defined by pdata and family
// package to allow for higher code reuse without using generics.
package alias // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias"

Expand Down Expand Up @@ -30,6 +30,10 @@ type NamedSignal interface {
SetName(name string)
}

type Attributed interface {
Attributes() pcommon.Map
}

var (
_ Resource = (*plog.ResourceLogs)(nil)
_ Resource = (*pmetric.ResourceMetrics)(nil)
Expand Down
55 changes: 12 additions & 43 deletions processor/schemaprocessor/internal/migrate/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,21 @@ import (
"go.uber.org/multierr"
)

// AttributeChangeSet represents an unscoped entry that can be applied.
//
// AttributeChangeSet represents a rename_attributes type operation.
// The listed changes are duplicated twice
// to allow for simplified means of transition to or from a revision.
type AttributeChangeSet struct {
updates ast.AttributeMap
// The keys are the old attribute name used in the previous version, the values are the
// new attribute name starting from this version (comment from ast.AttributeMap)
updates ast.AttributeMap
// the inverse of the updates map
rollback ast.AttributeMap
}

// AttributeChangeSetSlice allows for `AttributeChangeSet`
// to be chained together as they are defined within the schema
// and be applied sequentially to ensure deterministic behavior.
type AttributeChangeSetSlice []*AttributeChangeSet

// NewAttributeChangeSet allows for typed strings to be used as part
// of the invocation that will be converted into the default string type.
func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
attr := &AttributeChangeSet{
func NewAttributeChangeSet(mappings ast.AttributeMap) AttributeChangeSet {
attr := AttributeChangeSet{
updates: make(map[string]string, len(mappings)),
rollback: make(map[string]string, len(mappings)),
}
Expand All @@ -39,15 +36,17 @@ func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
return attr
}

func (a AttributeChangeSet) IsMigrator() {}

func (a *AttributeChangeSet) Apply(attrs pcommon.Map) error {
return a.do(StateSelectorApply, attrs)
return a.Do(StateSelectorApply, attrs)
}

func (a *AttributeChangeSet) Rollback(attrs pcommon.Map) error {
return a.do(StateSelectorRollback, attrs)
return a.Do(StateSelectorRollback, attrs)
}

func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error) {
func (a *AttributeChangeSet) Do(ss StateSelector, attrs pcommon.Map) (errs error) {
var (
updated = make(map[string]struct{})
results = pcommon.NewMap()
Expand Down Expand Up @@ -81,33 +80,3 @@ func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error
results.CopyTo(attrs)
return errs
}

// NewAttributeChangeSetSlice combines all the provided `AttributeChangeSets`
// and allows them to be executed in the provided order.
func NewAttributeChangeSetSlice(changes ...*AttributeChangeSet) *AttributeChangeSetSlice {
values := new(AttributeChangeSetSlice)
for _, c := range changes {
(*values) = append((*values), c)
}
return values
}

func (slice *AttributeChangeSetSlice) Apply(attrs pcommon.Map) error {
return slice.do(StateSelectorApply, attrs)
}

func (slice *AttributeChangeSetSlice) Rollback(attrs pcommon.Map) error {
return slice.do(StateSelectorRollback, attrs)
}

func (slice *AttributeChangeSetSlice) do(ss StateSelector, attrs pcommon.Map) (errs error) {
for i := 0; i < len(*slice); i++ {
switch ss {
case StateSelectorApply:
errs = multierr.Append(errs, (*slice)[i].Apply(attrs))
case StateSelectorRollback:
errs = multierr.Append(errs, (*slice)[len(*slice)-1-i].Rollback(attrs))
}
}
return errs
}
101 changes: 3 additions & 98 deletions processor/schemaprocessor/internal/migrate/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestNewAttributeChangeSet(t *testing.T) {
"hello": "world",
})

expect := &AttributeChangeSet{
expect := AttributeChangeSet{
updates: map[string]string{
"hello": "world",
},
Expand All @@ -45,7 +45,7 @@ func TestAttributeChangeSetApply(t *testing.T) {

for _, tc := range []struct {
name string
acs *AttributeChangeSet
acs AttributeChangeSet
attrs pcommon.Map
expect pcommon.Map
errVal string
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestAttributeChangeSetRollback(t *testing.T) {

for _, tc := range []struct {
name string
acs *AttributeChangeSet
acs AttributeChangeSet
attrs pcommon.Map
expect pcommon.Map
errVal string
Expand Down Expand Up @@ -189,98 +189,3 @@ func TestAttributeChangeSetRollback(t *testing.T) {
})
}
}

func TestNewAttributeChangeSetSliceApply(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
changes *AttributeChangeSetSlice
attr pcommon.Map
expect pcommon.Map
}{
{
name: "no changes listed",
changes: NewAttributeChangeSetSlice(),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
},
{
name: "changes defined",
changes: NewAttributeChangeSetSlice(
NewAttributeChangeSet(map[string]string{
"service_version": "service.version",
}),
NewAttributeChangeSet(map[string]string{
"service.version": "application.service.version",
}),
),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service_version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("application.service.version", "v0.0.1")
}),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

assert.NoError(t, tc.changes.Apply(tc.attr))
assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes")
})
}
}

func TestNewAttributeChangeSetSliceApplyRollback(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
changes *AttributeChangeSetSlice
attr pcommon.Map
expect pcommon.Map
}{
{
name: "no changes listed",
changes: NewAttributeChangeSetSlice(),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
},
{
name: "changes defined",
changes: NewAttributeChangeSetSlice(
NewAttributeChangeSet(map[string]string{
"service_version": "service.version",
}),
NewAttributeChangeSet(map[string]string{
"service.version": "application.service.version",
}),
),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("application.service.version", "v0.0.1")

}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service_version", "v0.0.1")
}),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

assert.NoError(t, tc.changes.Rollback(tc.attr))
assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes")
})
}
}
64 changes: 22 additions & 42 deletions processor/schemaprocessor/internal/migrate/conditional.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package migrate // import "github.com/open-telemetry/opentelemetry-collector-con
import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/otel/schema/v1.0/ast"
"go.uber.org/multierr"
)

// ValueMatch defines the expected match type
Expand All @@ -15,74 +14,55 @@ type ValueMatch interface {
~string
}

// ConditionalAttributeSet represents a set of attributes that will
type ConditionalAttributeSet struct {
on *map[string]struct{}
attrs *AttributeChangeSet
on map[string]struct{}
attrs AttributeChangeSet
}

type ConditionalAttributeSetSlice []*ConditionalAttributeSet

func NewConditionalAttributeSet[Match ValueMatch](mappings ast.AttributeMap, matches ...Match) *ConditionalAttributeSet {
func NewConditionalAttributeSet[Match ValueMatch](mappings ast.AttributeMap, matches ...Match) ConditionalAttributeSet {
on := make(map[string]struct{})
for _, m := range matches {
on[string(m)] = struct{}{}
}
return &ConditionalAttributeSet{
on: &on,
return ConditionalAttributeSet{
on: on,
attrs: NewAttributeChangeSet(mappings),
}
}

func (ca *ConditionalAttributeSet) Apply(attrs pcommon.Map, values ...string) (errs error) {
func (ca ConditionalAttributeSet) IsMigrator() {}

func (ca *ConditionalAttributeSet) Do(ss StateSelector, attrs pcommon.Map, values ...string) (errs error) {
if ca.check(values...) {
errs = ca.attrs.Apply(attrs)
switch ss {
case StateSelectorApply:
errs = ca.attrs.Apply(attrs)
case StateSelectorRollback:
errs = ca.attrs.Rollback(attrs)
}
}
return errs
}
func (ca *ConditionalAttributeSet) Apply(attrs pcommon.Map, values ...string) (errs error) {
return ca.Do(StateSelectorApply, attrs, values...)
}

func (ca *ConditionalAttributeSet) Rollback(attrs pcommon.Map, values ...string) (errs error) {
if ca.check(values...) {
errs = ca.attrs.Rollback(attrs)
}
return errs
return ca.Do(StateSelectorRollback, attrs, values...)
}

// todo make it harder to misuse this! diff between no values and 0 values
func (ca *ConditionalAttributeSet) check(values ...string) bool {
if len(*ca.on) == 0 {
if len(ca.on) == 0 {
return true
}
for _, v := range values {
if _, ok := (*ca.on)[v]; !ok {
if _, ok := (ca.on)[v]; !ok {
return false
}
}
return true
}

func NewConditionalAttributeSetSlice(conditions ...*ConditionalAttributeSet) *ConditionalAttributeSetSlice {
values := new(ConditionalAttributeSetSlice)
for _, c := range conditions {
(*values) = append((*values), c)
}
return values
}

func (slice *ConditionalAttributeSetSlice) Apply(attrs pcommon.Map, values ...string) error {
return slice.do(StateSelectorApply, attrs, values)
}

func (slice *ConditionalAttributeSetSlice) Rollback(attrs pcommon.Map, values ...string) error {
return slice.do(StateSelectorRollback, attrs, values)
}

func (slice *ConditionalAttributeSetSlice) do(ss StateSelector, attrs pcommon.Map, values []string) (errs error) {
for i := 0; i < len((*slice)); i++ {
switch ss {
case StateSelectorApply:
errs = multierr.Append(errs, (*slice)[i].Apply(attrs, values...))
case StateSelectorRollback:
errs = multierr.Append(errs, (*slice)[len((*slice))-i-1].Rollback(attrs, values...))
}
}
return errs
}
Loading

0 comments on commit 4027afa

Please sign in to comment.