Skip to content

Commit

Permalink
operator and migrator support
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitpatel96 committed Sep 30, 2024
1 parent c8f2553 commit 20ec9bc
Show file tree
Hide file tree
Showing 17 changed files with 614 additions and 339 deletions.
28 changes: 28 additions & 0 deletions processor/schemaprocessor/DESIGN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Design

The Schema Processor is split into several different components.

Here's a general structure diagram:

```mermaid
graph LR;
A[Previous Collector Component] --> B[Transformer]
B -- Schema URL --> C[Translation Manager]
C -- Translation --> B
B --> H[Translator]
H --> E[Revision]
E --> I[ChangeList]
subgraph Interpreter
direction RL
I --> F[Operator]
F --> G[Migrator]
end
```
The [Transformer](transformer.go) is registered as a Processor in the Collector by the factory.
Data flows into the Tranformer, which uses the Schema URL to fetch the translation from the Translation Manager.
The [Translation Manager](internal/translation/manager.go) is responsible for fetching and caching the translations. It takes in a schema URL and returns a Translator struct.

The Translator struct contains the target schema URL, the target schema version, and a list of Revisions. The Translator figures out what the version of the incoming data is and what Revisions to apply to the incoming data to get it to the target schema version. The Translator is also responsible for applying the Revisions to the incoming data - it iterates through these Revisions and applies them to the incoming data.

Each Revision represents all the changes within a specific version. It consists of several [ChangeLists](internal/changelist/changelist.go) - one for each type of change block (at the time of writing - `all`, `resources`, `spans`, `spanEvents`, `metrics`, `logs`). Each ChangeList is similar to an interpreter to a programming language - in this case the programming language is the schema file! Each "change" in the transformation maps to either to an operator or migrator. [OPEN DESIGN POINT - for the all, logs, and resources type changes - the ChangeList delegates just to an Migrator. This is a little strange since otherwise it dispatches to an Operator. Should we just implement Operators for these few cases as well?]. They iterate through whatever changes they are constructed with, and call an operator[operator](internal/operator) or [migrator](internal/migrate) for each type of change. The operator accepts a typed value - a log, a metric, etc. It then, under the hood, calls one of a few Migrators. The Migrators do the fundamental work of changing attributes, changing names, etc. The Migrators generally operate on lower levels than the Operators - they operate on `Attributes`, or an `alias.NamedSignal` (a signal that implements `Name()` and `SetName()`).
4 changes: 3 additions & 1 deletion processor/schemaprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
| Stability | [development]: traces, metrics, logs |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fschema%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fschema) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fschema%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fschema) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
<!-- end autogenerated section -->
Expand Down Expand Up @@ -59,3 +59,5 @@ processors:
```
For more complete examples, please refer to [config.yml](./testdata/config.yml).
There's a rough design/overview of the processor in the [DESIGN.md](./DESIGN.md) file.
6 changes: 5 additions & 1 deletion processor/schemaprocessor/internal/alias/alias.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package Alias is a subset of the interfaces defined by pdata and family
// Package alias is a subset of the interfaces defined by pdata and family
// package to allow for higher code reuse without using generics.
package alias // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias"

Expand Down Expand Up @@ -30,6 +30,10 @@ type NamedSignal interface {
SetName(name string)
}

type Attributed interface {
Attributes() pcommon.Map
}

var (
_ Resource = (*plog.ResourceLogs)(nil)
_ Resource = (*pmetric.ResourceMetrics)(nil)
Expand Down
55 changes: 12 additions & 43 deletions processor/schemaprocessor/internal/migrate/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,21 @@ import (
"go.uber.org/multierr"
)

// AttributeChangeSet represents an unscoped entry that can be applied.
//
// AttributeChangeSet represents a rename_attributes type operation.
// The listed changes are duplicated twice
// to allow for simplified means of transition to or from a revision.
type AttributeChangeSet struct {
updates ast.AttributeMap
// The keys are the old attribute name used in the previous version, the values are the
// new attribute name starting from this version (comment from ast.AttributeMap)
updates ast.AttributeMap
// the inverse of the updates map
rollback ast.AttributeMap
}

// AttributeChangeSetSlice allows for `AttributeChangeSet`
// to be chained together as they are defined within the schema
// and be applied sequentially to ensure deterministic behavior.
type AttributeChangeSetSlice []*AttributeChangeSet

// NewAttributeChangeSet allows for typed strings to be used as part
// of the invocation that will be converted into the default string type.
func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
attr := &AttributeChangeSet{
func NewAttributeChangeSet(mappings ast.AttributeMap) AttributeChangeSet {
attr := AttributeChangeSet{
updates: make(map[string]string, len(mappings)),
rollback: make(map[string]string, len(mappings)),
}
Expand All @@ -39,15 +36,17 @@ func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
return attr
}

func (a AttributeChangeSet) IsMigrator() {}

func (a *AttributeChangeSet) Apply(attrs pcommon.Map) error {
return a.do(StateSelectorApply, attrs)
return a.Do(StateSelectorApply, attrs)
}

func (a *AttributeChangeSet) Rollback(attrs pcommon.Map) error {
return a.do(StateSelectorRollback, attrs)
return a.Do(StateSelectorRollback, attrs)
}

func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error) {
func (a *AttributeChangeSet) Do(ss StateSelector, attrs pcommon.Map) (errs error) {
var (
updated = make(map[string]struct{})
results = pcommon.NewMap()
Expand Down Expand Up @@ -81,33 +80,3 @@ func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error
results.CopyTo(attrs)
return errs
}

// NewAttributeChangeSetSlice combines all the provided `AttributeChangeSets`
// and allows them to be executed in the provided order.
func NewAttributeChangeSetSlice(changes ...*AttributeChangeSet) *AttributeChangeSetSlice {
values := new(AttributeChangeSetSlice)
for _, c := range changes {
(*values) = append((*values), c)
}
return values
}

func (slice *AttributeChangeSetSlice) Apply(attrs pcommon.Map) error {
return slice.do(StateSelectorApply, attrs)
}

func (slice *AttributeChangeSetSlice) Rollback(attrs pcommon.Map) error {
return slice.do(StateSelectorRollback, attrs)
}

func (slice *AttributeChangeSetSlice) do(ss StateSelector, attrs pcommon.Map) (errs error) {
for i := 0; i < len(*slice); i++ {
switch ss {
case StateSelectorApply:
errs = multierr.Append(errs, (*slice)[i].Apply(attrs))
case StateSelectorRollback:
errs = multierr.Append(errs, (*slice)[len(*slice)-1-i].Rollback(attrs))
}
}
return errs
}
101 changes: 3 additions & 98 deletions processor/schemaprocessor/internal/migrate/attributes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestNewAttributeChangeSet(t *testing.T) {
"hello": "world",
})

expect := &AttributeChangeSet{
expect := AttributeChangeSet{
updates: map[string]string{
"hello": "world",
},
Expand All @@ -45,7 +45,7 @@ func TestAttributeChangeSetApply(t *testing.T) {

for _, tc := range []struct {
name string
acs *AttributeChangeSet
acs AttributeChangeSet
attrs pcommon.Map
expect pcommon.Map
errVal string
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestAttributeChangeSetRollback(t *testing.T) {

for _, tc := range []struct {
name string
acs *AttributeChangeSet
acs AttributeChangeSet
attrs pcommon.Map
expect pcommon.Map
errVal string
Expand Down Expand Up @@ -189,98 +189,3 @@ func TestAttributeChangeSetRollback(t *testing.T) {
})
}
}

func TestNewAttributeChangeSetSliceApply(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
changes *AttributeChangeSetSlice
attr pcommon.Map
expect pcommon.Map
}{
{
name: "no changes listed",
changes: NewAttributeChangeSetSlice(),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
},
{
name: "changes defined",
changes: NewAttributeChangeSetSlice(
NewAttributeChangeSet(map[string]string{
"service_version": "service.version",
}),
NewAttributeChangeSet(map[string]string{
"service.version": "application.service.version",
}),
),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service_version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("application.service.version", "v0.0.1")
}),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

assert.NoError(t, tc.changes.Apply(tc.attr))
assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes")
})
}
}

func TestNewAttributeChangeSetSliceApplyRollback(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
changes *AttributeChangeSetSlice
attr pcommon.Map
expect pcommon.Map
}{
{
name: "no changes listed",
changes: NewAttributeChangeSetSlice(),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service.version", "v0.0.1")
}),
},
{
name: "changes defined",
changes: NewAttributeChangeSetSlice(
NewAttributeChangeSet(map[string]string{
"service_version": "service.version",
}),
NewAttributeChangeSet(map[string]string{
"service.version": "application.service.version",
}),
),
attr: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("application.service.version", "v0.0.1")

}),
expect: testHelperBuildMap(func(m pcommon.Map) {
m.PutStr("service_version", "v0.0.1")
}),
},
} {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

assert.NoError(t, tc.changes.Rollback(tc.attr))
assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes")
})
}
}
64 changes: 22 additions & 42 deletions processor/schemaprocessor/internal/migrate/conditional.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package migrate // import "github.com/open-telemetry/opentelemetry-collector-con
import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/otel/schema/v1.0/ast"
"go.uber.org/multierr"
)

// ValueMatch defines the expected match type
Expand All @@ -15,74 +14,55 @@ type ValueMatch interface {
~string
}

// ConditionalAttributeSet represents a set of attributes that will
type ConditionalAttributeSet struct {
on *map[string]struct{}
attrs *AttributeChangeSet
on map[string]struct{}
attrs AttributeChangeSet
}

type ConditionalAttributeSetSlice []*ConditionalAttributeSet

func NewConditionalAttributeSet[Match ValueMatch](mappings ast.AttributeMap, matches ...Match) *ConditionalAttributeSet {
func NewConditionalAttributeSet[Match ValueMatch](mappings ast.AttributeMap, matches ...Match) ConditionalAttributeSet {
on := make(map[string]struct{})
for _, m := range matches {
on[string(m)] = struct{}{}
}
return &ConditionalAttributeSet{
on: &on,
return ConditionalAttributeSet{
on: on,
attrs: NewAttributeChangeSet(mappings),
}
}

func (ca *ConditionalAttributeSet) Apply(attrs pcommon.Map, values ...string) (errs error) {
func (ca ConditionalAttributeSet) IsMigrator() {}

func (ca *ConditionalAttributeSet) Do(ss StateSelector, attrs pcommon.Map, values ...string) (errs error) {
if ca.check(values...) {
errs = ca.attrs.Apply(attrs)
switch ss {
case StateSelectorApply:
errs = ca.attrs.Apply(attrs)
case StateSelectorRollback:
errs = ca.attrs.Rollback(attrs)
}
}
return errs
}
func (ca *ConditionalAttributeSet) Apply(attrs pcommon.Map, values ...string) (errs error) {
return ca.Do(StateSelectorApply, attrs, values...)
}

func (ca *ConditionalAttributeSet) Rollback(attrs pcommon.Map, values ...string) (errs error) {
if ca.check(values...) {
errs = ca.attrs.Rollback(attrs)
}
return errs
return ca.Do(StateSelectorRollback, attrs, values...)
}

// todo make it harder to misuse this! diff between no values and 0 values
func (ca *ConditionalAttributeSet) check(values ...string) bool {
if len(*ca.on) == 0 {
if len(ca.on) == 0 {
return true
}
for _, v := range values {
if _, ok := (*ca.on)[v]; !ok {
if _, ok := (ca.on)[v]; !ok {
return false
}
}
return true
}

func NewConditionalAttributeSetSlice(conditions ...*ConditionalAttributeSet) *ConditionalAttributeSetSlice {
values := new(ConditionalAttributeSetSlice)
for _, c := range conditions {
(*values) = append((*values), c)
}
return values
}

func (slice *ConditionalAttributeSetSlice) Apply(attrs pcommon.Map, values ...string) error {
return slice.do(StateSelectorApply, attrs, values)
}

func (slice *ConditionalAttributeSetSlice) Rollback(attrs pcommon.Map, values ...string) error {
return slice.do(StateSelectorRollback, attrs, values)
}

func (slice *ConditionalAttributeSetSlice) do(ss StateSelector, attrs pcommon.Map, values []string) (errs error) {
for i := 0; i < len((*slice)); i++ {
switch ss {
case StateSelectorApply:
errs = multierr.Append(errs, (*slice)[i].Apply(attrs, values...))
case StateSelectorRollback:
errs = multierr.Append(errs, (*slice)[len((*slice))-i-1].Rollback(attrs, values...))
}
}
return errs
}
Loading

0 comments on commit 20ec9bc

Please sign in to comment.