Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Schema Processor Revamp Implementation Parent PR #35248

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions processor/schemaprocessor/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ graph LR;
end

```
The [Transformer](transformer.go) is registered as a Processor in the Collector by the factory.
Data flows into the Transformer, which uses the Schema URL to fetch the translation from the Translation Manager.
The [schemaprocessor](processor.go) is registered as a Processor in the Collector by the factory.
Data flows into the Processor, which uses the Schema URL to fetch the translation from the Translation Manager.
The Translation Manager (at internal/translation/manager.go in a future PR) is responsible for fetching and caching the translations. It takes in a schema URL and returns a Translator struct.

The Translator struct contains the target schema URL, the target schema version, and a list of Revisions. The Translator figures out what the version of the incoming data is and what Revisions to apply to the incoming data to get it to the target schema version. The Translator is also responsible for applying the Revisions to the incoming data - it iterates through these Revisions and applies them to the incoming data.
Expand Down
2 changes: 1 addition & 1 deletion processor/schemaprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
| Stability | [development]: traces, metrics, logs |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fschema%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fschema) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fschema%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fschema) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96), [@dineshg13](https://www.github.com/dineshg13) |

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
<!-- end autogenerated section -->
Expand Down
8 changes: 4 additions & 4 deletions processor/schemaprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var processorCapabilities = consumer.Capabilities{MutatesData: true}
// factory will store any of the precompiled schemas in future
type factory struct{}

// newDefaultConfiguration returns the configuration for schema transformer processor
// newDefaultConfiguration returns the configuration for schema processor
// with the default values being used throughout it
func newDefaultConfiguration() component.Config {
return &Config{
Expand All @@ -47,7 +47,7 @@ func (f factory) createLogsProcessor(
cfg component.Config,
next consumer.Logs,
) (processor.Logs, error) {
transformer, err := newTransformer(ctx, cfg, set)
transformer, err := newSchemaProcessor(ctx, cfg, set)
if err != nil {
return nil, err
}
Expand All @@ -68,7 +68,7 @@ func (f factory) createMetricsProcessor(
cfg component.Config,
next consumer.Metrics,
) (processor.Metrics, error) {
transformer, err := newTransformer(ctx, cfg, set)
transformer, err := newSchemaProcessor(ctx, cfg, set)
if err != nil {
return nil, err
}
Expand All @@ -89,7 +89,7 @@ func (f factory) createTracesProcessor(
cfg component.Config,
next consumer.Traces,
) (processor.Traces, error) {
transformer, err := newTransformer(ctx, cfg, set)
transformer, err := newSchemaProcessor(ctx, cfg, set)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions processor/schemaprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ require (
go.opentelemetry.io/collector/consumer v1.25.0
go.opentelemetry.io/collector/consumer/consumertest v0.119.0
go.opentelemetry.io/collector/pdata v1.25.0
go.opentelemetry.io/collector/pipeline v0.119.0
go.opentelemetry.io/collector/processor v0.119.0
go.opentelemetry.io/collector/processor/processortest v0.119.0
go.opentelemetry.io/otel/metric v1.34.0
go.opentelemetry.io/otel/schema v0.0.12
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
Expand Down Expand Up @@ -59,11 +61,9 @@ require (
go.opentelemetry.io/collector/extension/auth v0.119.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.119.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.119.0 // indirect
go.opentelemetry.io/collector/pipeline v0.119.0 // indirect
go.opentelemetry.io/collector/processor/xprocessor v0.119.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
Expand Down
131 changes: 131 additions & 0 deletions processor/schemaprocessor/internal/translation/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"

import (
"context"
"errors"
"fmt"
"sync"

"go.uber.org/zap"
)

var errNilValueProvided = errors.New("nil value provided")

// Manager is responsible for ensuring that schemas are kept up to date
// with the most recent version that are requested.
type Manager interface {
// RequestTranslation will provide either the defined Translation
// if it is a known target, or, return a noop variation.
// In the event that a matched Translation, on a missed version
// there is a potential to block during this process.
// Otherwise, the translation will allow concurrent reads.
RequestTranslation(ctx context.Context, schemaURL string) Translation

// SetProviders will update the list of providers used by the manager
// to look up schemaURLs
SetProviders(providers ...Provider) error
}

type manager struct {
log *zap.Logger

rw sync.RWMutex
providers []Provider
match map[string]*Version
translations map[string]*translator
}

var _ Manager = (*manager)(nil)

// NewManager creates a manager that will allow for management
// of schema, the options allow for additional properties to be
// added to manager to enable additional locations of where to check
// for translations file.
func NewManager(targets []string, log *zap.Logger) (Manager, error) {
if log == nil {
return nil, fmt.Errorf("logger: %w", errNilValueProvided)
}

match := make(map[string]*Version, len(targets))
for _, target := range targets {
family, version, err := GetFamilyAndVersion(target)
if err != nil {
return nil, err
}
match[family] = version
}

return &manager{
log: log,
match: match,
translations: make(map[string]*translator),
}, nil
}

func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Translation {
family, version, err := GetFamilyAndVersion(schemaURL)
if err != nil {
m.log.Debug("No valid schema url was provided, using no-op schema",
zap.String("schema-url", schemaURL),
)
return nopTranslation{}
}

target, match := m.match[family]
if !match {
m.log.Debug("Not a known target, providing Nop Translation",
zap.String("schema-url", schemaURL),
)
return nopTranslation{}
}

m.rw.RLock()
t, exists := m.translations[family]
m.rw.RUnlock()

if exists && t.SupportedVersion(version) {
return t
}

for _, p := range m.providers {
content, err := p.Lookup(ctx, schemaURL)
if err != nil {
m.log.Error("Failed to lookup schemaURL",
zap.Error(err),
zap.String("schemaURL", schemaURL),
)
// todo(ankit) figure out what to do when the providers dont respond something good
}
t, err := newTranslatorFromReader(
m.log.Named("translator").With(
zap.String("family", family),
zap.Stringer("target", target),
),
joinSchemaFamilyAndVersion(family, target),
content,
)
if err != nil {
m.log.Error("Failed to create translator", zap.Error(err))
continue
}
m.rw.Lock()
m.translations[family] = t
m.rw.Unlock()
return t
}

return nopTranslation{}
}

func (m *manager) SetProviders(providers ...Provider) error {
if len(providers) == 0 {
return fmt.Errorf("zero providers set: %w", errNilValueProvided)
}
m.rw.Lock()
m.providers = append(m.providers[:0], providers...)
m.rw.Unlock()
return nil
}
68 changes: 68 additions & 0 deletions processor/schemaprocessor/internal/translation/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translation

import (
"context"
_ "embed"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

//go:embed testdata/schema.yaml
var exampleTranslation []byte

func TranslationHandler(t *testing.T) http.Handler {
assert.NotEmpty(t, exampleTranslation, "SchemaContent MUST not be empty")
return http.HandlerFunc(func(wr http.ResponseWriter, _ *http.Request) {
_, err := wr.Write(exampleTranslation)
assert.NoError(t, err, "Must not have issues writing schema content")
})
}

func TestRequestTranslation(t *testing.T) {
t.Parallel()

s := httptest.NewServer(TranslationHandler(t))
t.Cleanup(s.Close)

schemaURL := fmt.Sprintf("%s/1.1.0", s.URL)

m, err := NewManager(
[]string{schemaURL},
zaptest.NewLogger(t),
)
require.NoError(t, err, "Must not error when created manager")
require.NoError(t, m.SetProviders(NewHTTPProvider(s.Client())), "Must have no issues trying to set providers")

nop, ok := m.RequestTranslation(context.Background(), "/not/a/valid/schema/URL").(nopTranslation)
require.True(t, ok, "Must return a NoopTranslation if no valid schema URL is provided")
require.NotNil(t, nop, "Must have a valid translation")

tn, ok := m.RequestTranslation(context.Background(), schemaURL).(*translator)
require.True(t, ok, "Can cast to the concrete type")
require.NotNil(t, tn, "Must have a valid translation")

assert.True(t, tn.SupportedVersion(&Version{1, 0, 0}), "Must have the version listed as supported")

count := 0
prevRev := &Version{1, 0, 0}
it, status := tn.iterator(prevRev)
assert.Equal(t, Update, status, "Must return a status of update")
for currRev, more := it(); more; currRev, more = it() {
assert.True(t, prevRev.LessThan(currRev.Version()))
prevRev = currRev.Version()
count++
}

tn, ok = m.RequestTranslation(context.Background(), schemaURL).(*translator)
require.True(t, ok, "Can cast to the concrete type")
require.NotNil(t, tn, "Must have a valid translation")
}
73 changes: 73 additions & 0 deletions processor/schemaprocessor/internal/translation/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"

import (
"bytes"
"context"
"embed"
"fmt"
"io"
"net/http"
"net/url"
)

// Provider allows for collector extensions to be used to look up schemaURLs
type Provider interface {
// Lookup whill check the underlying provider to see if content exists
// for the provided schemaURL, in the even that it doesn't an error is returned.
Lookup(ctx context.Context, schemaURL string) (content io.Reader, err error)
}

type httpProvider struct {
client *http.Client
}

var _ Provider = (*httpProvider)(nil)

func NewHTTPProvider(client *http.Client) Provider {
return &httpProvider{client: client}
}

func (hp *httpProvider) Lookup(ctx context.Context, schemaURL string) (io.Reader, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, schemaURL, http.NoBody)
if err != nil {
return nil, err
}
resp, err := hp.client.Do(req)
if err != nil {
return nil, err
}
content := bytes.NewBuffer(nil)
if _, err := content.ReadFrom(resp.Body); err != nil {
return nil, err
}
if err := resp.Body.Close(); err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("invalid status code returned: %d", resp.StatusCode)
}
return content, nil
}

type testProvider struct {
fs *embed.FS
}

func NewTestProvider(fs *embed.FS) Provider {
return &testProvider{fs: fs}
}

func (tp testProvider) Lookup(_ context.Context, schemaURL string) (io.Reader, error) {
parsedPath, err := url.Parse(schemaURL)
if err != nil {
return nil, err
}
f, err := tp.fs.Open(parsedPath.Path[1:])
if err != nil {
return nil, err
}
return f, nil
}
Loading
Loading