diff --git a/processor/schemaprocessor/internal/changelist/changelist.go b/processor/schemaprocessor/internal/changelist/changelist.go index 6c2008f2eaf7..696142ad3a67 100644 --- a/processor/schemaprocessor/internal/changelist/changelist.go +++ b/processor/schemaprocessor/internal/changelist/changelist.go @@ -6,10 +6,11 @@ package changelist // import "github.com/open-telemetry/opentelemetry-collector- import ( "fmt" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator" ) @@ -47,22 +48,25 @@ func (c ChangeList) Do(ss migrate.StateSelector, signal any) error { } else { return fmt.Errorf("MetricOperator %T can't act on %T", thisMigrator, signal) } - case operator.AllOperator: - if err := thisMigrator.Do(ss, signal); err != nil { - return err + case operator.LogOperator: + if log, ok := signal.(plog.LogRecord); ok { + if err := thisMigrator.Do(ss, log); err != nil { + return err + } + } else { + return fmt.Errorf("LogOperator %T can't act on %T", thisMigrator, signal) } - - // no log operator because the only log operation is an attribute changeset - // this block is for the `resource` block, and the `log` block - // todo(ankit) switch these to specific typed ones? - case migrate.AttributeChangeSet: - switch attributeSignal := signal.(type) { - case alias.Attributed: - if err := thisMigrator.Do(ss, attributeSignal.Attributes()); err != nil { + case operator.ResourceOperator: + if resource, ok := signal.(pcommon.Resource); ok { + if err := thisMigrator.Do(ss, resource); err != nil { return err } - default: - return fmt.Errorf("unsupported signal type %T for AttributeChangeSet", attributeSignal) + } else { + return fmt.Errorf("ResourceOperator %T can't act on %T", thisMigrator, signal) + } + case operator.AllOperator: + if err := thisMigrator.Do(ss, signal); err != nil { + return err } default: return fmt.Errorf("unsupported migrator type %T", thisMigrator) diff --git a/processor/schemaprocessor/internal/translation/revision_v1.go b/processor/schemaprocessor/internal/translation/revision_v1.go index 0e44a2e7be85..ec54235b24b7 100644 --- a/processor/schemaprocessor/internal/translation/revision_v1.go +++ b/processor/schemaprocessor/internal/translation/revision_v1.go @@ -72,7 +72,8 @@ func newResourceChangeList(resource ast.Attributes) *changelist.ChangeList { for _, at := range resource.Changes { if renamed := at.RenameAttributes; renamed != nil { attributeChangeSet := migrate.NewAttributeChangeSet(renamed.AttributeMap) - values = append(values, attributeChangeSet) + resourceOperator := operator.ResourceAttributeOperator{AttributeChange: attributeChangeSet} + values = append(values, resourceOperator) } } return &changelist.ChangeList{Migrators: values} @@ -141,7 +142,8 @@ func newLogsChangelist(logs ast.Logs) *changelist.ChangeList { for _, at := range logs.Changes { if renamed := at.RenameAttributes; renamed != nil { attributeChangeSet := migrate.NewAttributeChangeSet(renamed.AttributeMap) - values = append(values, attributeChangeSet) + logOperator := operator.LogAttributeOperator{AttributeChange: attributeChangeSet} + values = append(values, logOperator) } } return &changelist.ChangeList{Migrators: values} diff --git a/processor/schemaprocessor/internal/translation/revision_v1_test.go b/processor/schemaprocessor/internal/translation/revision_v1_test.go index b7ff5b150cee..b1b4a9894fb9 100644 --- a/processor/schemaprocessor/internal/translation/revision_v1_test.go +++ b/processor/schemaprocessor/internal/translation/revision_v1_test.go @@ -171,9 +171,11 @@ func TestNewRevisionV1(t *testing.T) { "state": "status", }), }, - ResourceMigrator: migrate.NewAttributeChangeSet(map[string]string{ - "state": "status", - }), + ResourceMigrator: operator.ResourceAttributeOperator{ + AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ + "state": "status", + }), + }, }, operator.AllOperator{ // initialize one of each operator with the attribute set @@ -197,16 +199,18 @@ func TestNewRevisionV1(t *testing.T) { "status": "state", }), }, - ResourceMigrator: migrate.NewAttributeChangeSet(map[string]string{ - "status": "state", - }), + ResourceMigrator: operator.ResourceAttributeOperator{ + AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ + "status": "state", + }), + }, }, }, }, resources: &changelist.ChangeList{Migrators: []migrate.Migrator{ - migrate.NewAttributeChangeSet(map[string]string{ - "service_name": "service.name", - }), + operator.ResourceAttributeOperator{AttributeChange: migrate.NewAttributeChangeSet( + map[string]string{"service_name": "service.name"}, + )}, }}, spans: &changelist.ChangeList{Migrators: []migrate.Migrator{ operator.SpanConditionalAttributeOperator{Migrator: migrate.NewConditionalAttributeSet( @@ -243,9 +247,10 @@ func TestNewRevisionV1(t *testing.T) { )}, }}, logs: &changelist.ChangeList{Migrators: []migrate.Migrator{ - migrate.NewAttributeChangeSet(map[string]string{ + operator.LogAttributeOperator{AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ "ERROR": "error", }), + }, }}, }, },