Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Schema Processor Revamp Implementation Parent PR #35248

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion processor/schemaprocessor/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ graph LR;
end
```
The [Transformer](transformer.go) is registered as a Processor in the Collector by the factory.
The [schemaprocessor](schemaprocessor.go) is registered as a Processor in the Collector by the factory.
Data flows into the Transformer, which uses the Schema URL to fetch the translation from the Translation Manager.
The Translation Manager (at internal/translation/manager.go in a future PR) is responsible for fetching and caching the translations. It takes in a schema URL and returns a Translator struct.

Expand Down
8 changes: 4 additions & 4 deletions processor/schemaprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var processorCapabilities = consumer.Capabilities{MutatesData: true}
// factory will store any of the precompiled schemas in future
type factory struct{}

// newDefaultConfiguration returns the configuration for schema transformer processor
// newDefaultConfiguration returns the configuration for schema processor
// with the default values being used throughout it
func newDefaultConfiguration() component.Config {
return &Config{
Expand All @@ -47,7 +47,7 @@ func (f factory) createLogsProcessor(
cfg component.Config,
next consumer.Logs,
) (processor.Logs, error) {
transformer, err := newTransformer(ctx, cfg, set)
transformer, err := newSchemaProcessor(ctx, cfg, set)
if err != nil {
return nil, err
}
Expand All @@ -68,7 +68,7 @@ func (f factory) createMetricsProcessor(
cfg component.Config,
next consumer.Metrics,
) (processor.Metrics, error) {
transformer, err := newTransformer(ctx, cfg, set)
transformer, err := newSchemaProcessor(ctx, cfg, set)
if err != nil {
return nil, err
}
Expand All @@ -89,7 +89,7 @@ func (f factory) createTracesProcessor(
cfg component.Config,
next consumer.Traces,
) (processor.Traces, error) {
transformer, err := newTransformer(ctx, cfg, set)
transformer, err := newSchemaProcessor(ctx, cfg, set)
if err != nil {
return nil, err
}
Expand Down
14 changes: 12 additions & 2 deletions processor/schemaprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22.0

require (
github.com/google/go-cmp v0.6.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.116.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.116.0
go.opentelemetry.io/collector/component/componenttest v0.116.0
Expand All @@ -12,15 +13,19 @@ require (
go.opentelemetry.io/collector/consumer v1.22.0
go.opentelemetry.io/collector/consumer/consumertest v0.116.0
go.opentelemetry.io/collector/pdata v1.22.0
go.opentelemetry.io/collector/pipeline v0.116.0
go.opentelemetry.io/collector/processor v0.116.0
go.opentelemetry.io/collector/processor/processortest v0.116.0
go.opentelemetry.io/otel/metric v1.32.0
go.opentelemetry.io/otel/schema v0.0.11
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
)

require (
github.com/Masterminds/semver/v3 v3.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
Expand All @@ -39,6 +44,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.116.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/cors v1.11.1 // indirect
Expand All @@ -55,11 +61,9 @@ require (
go.opentelemetry.io/collector/extension/auth v0.116.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.116.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.116.0 // indirect
go.opentelemetry.io/collector/pipeline v0.116.0 // indirect
go.opentelemetry.io/collector/processor/xprocessor v0.116.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect
go.opentelemetry.io/otel v1.32.0 // indirect
go.opentelemetry.io/otel/metric v1.32.0 // indirect
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
Expand All @@ -77,3 +81,9 @@ retract (
v0.76.1
v0.65.0
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
4 changes: 4 additions & 0 deletions processor/schemaprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

131 changes: 131 additions & 0 deletions processor/schemaprocessor/internal/translation/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"

import (
"context"
"errors"
"fmt"
"sync"

"go.uber.org/zap"
)

var errNilValueProvided = errors.New("nil value provided")

// Manager is responsible for ensuring that schemas are kept up to date
// with the most recent version that are requested.
type Manager interface {
// RequestTranslation will provide either the defined Translation
// if it is a known target, or, return a noop variation.
// In the event that a matched Translation, on a missed version
// there is a potential to block during this process.
// Otherwise, the translation will allow concurrent reads.
RequestTranslation(ctx context.Context, schemaURL string) Translation

// SetProviders will update the list of providers used by the manager
// to look up schemaURLs
SetProviders(providers ...Provider) error
}

type manager struct {
log *zap.Logger

rw sync.RWMutex
providers []Provider
match map[string]*Version
translations map[string]*translator
}

var _ Manager = (*manager)(nil)

// NewManager creates a manager that will allow for management
// of schema, the options allow for additional properties to be
// added to manager to enable additional locations of where to check
// for translations file.
func NewManager(targets []string, log *zap.Logger) (Manager, error) {
if log == nil {
return nil, fmt.Errorf("logger: %w", errNilValueProvided)
}

match := make(map[string]*Version, len(targets))
for _, target := range targets {
family, version, err := GetFamilyAndVersion(target)
if err != nil {
return nil, err
}
match[family] = version
}

return &manager{
log: log,
match: match,
translations: make(map[string]*translator),
}, nil
}

func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Translation {
family, version, err := GetFamilyAndVersion(schemaURL)
if err != nil {
m.log.Debug("No valid schema url was provided, using no-op schema",
zap.String("schema-url", schemaURL),
)
return nopTranslation{}
}

target, match := m.match[family]
if !match {
m.log.Debug("Not a known target, providing Nop Translation",
zap.String("schema-url", schemaURL),
)
return nopTranslation{}
}

m.rw.RLock()
t, exists := m.translations[family]
m.rw.RUnlock()

if exists && t.SupportedVersion(version) {
return t
}

for _, p := range m.providers {
content, err := p.Lookup(ctx, schemaURL)
if err != nil {
m.log.Error("Failed to lookup schemaURL",
zap.Error(err),
zap.String("schemaURL", schemaURL),
)
// todo(ankit) figure out what to do when the providers dont respond something good
}
t, err := newTranslatorFromReader(
m.log.Named("translator").With(
zap.String("family", family),
zap.Stringer("target", target),
),
joinSchemaFamilyAndVersion(family, target),
content,
)
if err != nil {
m.log.Error("Failed to create translator", zap.Error(err))
continue
}
m.rw.Lock()
m.translations[family] = t
m.rw.Unlock()
return t
}

return nopTranslation{}
}

func (m *manager) SetProviders(providers ...Provider) error {
if len(providers) == 0 {
return fmt.Errorf("zero providers set: %w", errNilValueProvided)
}
m.rw.Lock()
m.providers = append(m.providers[:0], providers...)
m.rw.Unlock()
return nil
}
68 changes: 68 additions & 0 deletions processor/schemaprocessor/internal/translation/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translation

import (
"context"
_ "embed"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

//go:embed testdata/schema.yaml
var exampleTranslation []byte

func TranslationHandler(t *testing.T) http.Handler {
assert.NotEmpty(t, exampleTranslation, "SchemaContent MUST not be empty")
return http.HandlerFunc(func(wr http.ResponseWriter, _ *http.Request) {
_, err := wr.Write(exampleTranslation)
assert.NoError(t, err, "Must not have issues writing schema content")
})
}

func TestRequestTranslation(t *testing.T) {
t.Parallel()

s := httptest.NewServer(TranslationHandler(t))
t.Cleanup(s.Close)

schemaURL := fmt.Sprintf("%s/1.1.0", s.URL)

m, err := NewManager(
[]string{schemaURL},
zaptest.NewLogger(t),
)
require.NoError(t, err, "Must not error when created manager")
require.NoError(t, m.SetProviders(NewHTTPProvider(s.Client())), "Must have no issues trying to set providers")

nop, ok := m.RequestTranslation(context.Background(), "/not/a/valid/schema/URL").(nopTranslation)
require.True(t, ok, "Must return a NoopTranslation if no valid schema URL is provided")
require.NotNil(t, nop, "Must have a valid translation")

tn, ok := m.RequestTranslation(context.Background(), schemaURL).(*translator)
require.True(t, ok, "Can cast to the concrete type")
require.NotNil(t, tn, "Must have a valid translation")

assert.True(t, tn.SupportedVersion(&Version{1, 0, 0}), "Must have the version listed as supported")

count := 0
prevRev := &Version{1, 0, 0}
it, status := tn.iterator(context.Background(), prevRev)
assert.Equal(t, Update, status, "Must return a status of update")
for currRev, more := it(); more; currRev, more = it() {
assert.True(t, prevRev.LessThan(currRev.Version()))
prevRev = currRev.Version()
count++
}

tn, ok = m.RequestTranslation(context.Background(), schemaURL).(*translator)
require.True(t, ok, "Can cast to the concrete type")
require.NotNil(t, tn, "Must have a valid translation")
}
73 changes: 73 additions & 0 deletions processor/schemaprocessor/internal/translation/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"

import (
"bytes"
"context"
"embed"
"fmt"
"io"
"net/http"
"net/url"
)

// Provider allows for collector extensions to be used to look up schemaURLs
type Provider interface {
// Lookup whill check the underlying provider to see if content exists
// for the provided schemaURL, in the even that it doesn't an error is returned.
Lookup(ctx context.Context, schemaURL string) (content io.Reader, err error)
}

type httpProvider struct {
client *http.Client
}

var _ Provider = (*httpProvider)(nil)

func NewHTTPProvider(client *http.Client) Provider {
return &httpProvider{client: client}
}

func (hp *httpProvider) Lookup(ctx context.Context, schemaURL string) (io.Reader, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, schemaURL, http.NoBody)
if err != nil {
return nil, err
}
resp, err := hp.client.Do(req)
if err != nil {
return nil, err
}
content := bytes.NewBuffer(nil)
if _, err := content.ReadFrom(resp.Body); err != nil {
return nil, err
}
if err := resp.Body.Close(); err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("invalid status code returned: %d", resp.StatusCode)
}
return content, nil
}

type testProvider struct {
fs *embed.FS
}

func NewTestProvider(fs *embed.FS) Provider {
return &testProvider{fs: fs}
}

func (tp testProvider) Lookup(_ context.Context, schemaURL string) (io.Reader, error) {
parsedPath, err := url.Parse(schemaURL)
if err != nil {
return nil, err
}
f, err := tp.fs.Open(parsedPath.Path[1:])
if err != nil {
return nil, err
}
return f, nil
}
Loading
Loading