Skip to content

Commit

Permalink
[Schema Processor] complete implementation squashed
Browse files Browse the repository at this point in the history
  • Loading branch information
MovieStoreGuy authored and ankitpatel96 committed Oct 16, 2024
1 parent 9589ec9 commit af3a4fd
Show file tree
Hide file tree
Showing 45 changed files with 4,229 additions and 18 deletions.
8 changes: 6 additions & 2 deletions processor/schemaprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@ go 1.22.0

require (
github.com/google/go-cmp v0.6.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.110.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.110.0
go.opentelemetry.io/collector/config/confighttp v0.110.0
go.opentelemetry.io/collector/config/configtelemetry v0.110.0
go.opentelemetry.io/collector/confmap v1.16.0
go.opentelemetry.io/collector/consumer v0.110.0
go.opentelemetry.io/collector/consumer/consumertest v0.110.0
go.opentelemetry.io/collector/pdata v1.16.0
go.opentelemetry.io/collector/processor v0.110.0
go.opentelemetry.io/otel/metric v1.30.0
go.opentelemetry.io/otel/schema v0.0.9
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
)

require (
github.com/Masterminds/semver/v3 v3.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
Expand All @@ -37,14 +42,14 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.110.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/cors v1.11.1 // indirect
go.opentelemetry.io/collector/client v1.16.0 // indirect
go.opentelemetry.io/collector/component/componentstatus v0.110.0 // indirect
go.opentelemetry.io/collector/config/configauth v0.110.0 // indirect
go.opentelemetry.io/collector/config/configcompression v1.16.0 // indirect
go.opentelemetry.io/collector/config/configopaque v1.16.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.110.0 // indirect
go.opentelemetry.io/collector/config/configtls v1.16.0 // indirect
go.opentelemetry.io/collector/config/internal v0.110.0 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.110.0 // indirect
Expand All @@ -57,7 +62,6 @@ require (
go.opentelemetry.io/collector/processor/processorprofiles v0.110.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect
go.opentelemetry.io/otel v1.30.0 // indirect
go.opentelemetry.io/otel/metric v1.30.0 // indirect
go.opentelemetry.io/otel/sdk v1.30.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect
go.opentelemetry.io/otel/trace v1.30.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions processor/schemaprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

135 changes: 135 additions & 0 deletions processor/schemaprocessor/internal/translation/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"

import (
"context"
"errors"
"fmt"
"sync"

"go.uber.org/zap"
)

var (
errNilValueProvided = errors.New("nil value provided")
)

// Manager is responsible for ensuring that schemas are kept up to date
// with the most recent version that are requested.
type Manager interface {
// RequestTranslation will provide either the defined Translation
// if it is a known target, or, return a noop variation.
// In the event that a matched Translation, on a missed version
// there is a potential to block during this process.
// Otherwise, the translation will allow concurrent reads.
RequestTranslation(ctx context.Context, schemaURL string) Translation

// SetProviders will update the list of providers used by the manager
// to look up schemaURLs
SetProviders(providers ...Provider) error
}

type manager struct {
log *zap.Logger

rw sync.RWMutex
providers []Provider
match map[string]*Version
translations map[string]*translator
}

var (
_ Manager = (*manager)(nil)
)

// NewManager creates a manager that will allow for management
// of schema, the options allow for additional properties to be
// added to manager to enable additional locations of where to check
// for translations file.
func NewManager(targets []string, log *zap.Logger) (Manager, error) {
if log == nil {
return nil, fmt.Errorf("logger: %w", errNilValueProvided)
}

match := make(map[string]*Version, len(targets))
for _, target := range targets {
family, version, err := GetFamilyAndVersion(target)
if err != nil {
return nil, err
}
match[family] = version
}

return &manager{
log: log,
match: match,
translations: make(map[string]*translator),
}, nil
}

func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Translation {
family, version, err := GetFamilyAndVersion(schemaURL)
if err != nil {
m.log.Debug("No valid schema url was provided, using no-op schema",
zap.String("schema-url", schemaURL),
)
return nopTranslation{}
}

target, match := m.match[family]
if !match {
m.log.Debug("Not a known target, providing Nop Translation",
zap.String("schema-url", schemaURL),
)
return nopTranslation{}
}

m.rw.RLock()
t, exists := m.translations[family]
m.rw.RUnlock()

if exists && t.SupportedVersion(version) {
return t
}

for _, p := range m.providers {
content, err := p.Lookup(ctx, schemaURL)
if err != nil {
m.log.Error("Failed to lookup schemaURL",
zap.Error(err),
zap.String("schemaURL", schemaURL),
)
// todo(ankit) figure out what to do when the providers dont respond something good
}
t, err := newTranslatorFromReader(
m.log.Named("translator").With(
zap.String("family", family),
zap.Stringer("target", target),
),
joinSchemaFamilyAndVersion(family, target),
content,
)
if err != nil {
m.log.Error("Failed to create translator", zap.Error(err))
continue
}
m.rw.Lock()
m.translations[family] = t
m.rw.Unlock()
return t
}

return nopTranslation{}
}

func (m *manager) SetProviders(providers ...Provider) error {
if len(providers) == 0 {
return fmt.Errorf("zero providers set: %w", errNilValueProvided)
}
m.rw.Lock()
m.providers = append(m.providers[:0], providers...)
m.rw.Unlock()
return nil
}
71 changes: 71 additions & 0 deletions processor/schemaprocessor/internal/translation/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translation

import (
"context"
_ "embed"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

//go:embed testdata/schema.yaml
var exampleTranslation []byte

func TranslationHandler(t *testing.T) http.Handler {
assert.NotEmpty(t, exampleTranslation, "SchemaContent MUST not be empty")
return http.HandlerFunc(func(wr http.ResponseWriter, _ *http.Request) {
_, err := wr.Write(exampleTranslation)
assert.NoError(t, err, "Must not have issues writing schema content")
})
}

func TestRequestTranslation(t *testing.T) {
t.Parallel()

s := httptest.NewServer(TranslationHandler(t))
t.Cleanup(s.Close)

var (
schemaURL = fmt.Sprintf("%s/1.1.0", s.URL)
)

m, err := NewManager(
[]string{schemaURL},
zaptest.NewLogger(t),
)
require.NoError(t, err, "Must not error when created manager")
require.NoError(t, m.SetProviders(NewHTTPProvider(s.Client())), "Must have no issues trying to set providers")

nop, ok := m.RequestTranslation(context.Background(), "/not/a/valid/schema/URL").(nopTranslation)
require.True(t, ok, "Must return a NoopTranslation if no valid schema URL is provided")
require.NotNil(t, nop, "Must have a valid translation")

tn, ok := m.RequestTranslation(context.Background(), schemaURL).(*translator)
require.True(t, ok, "Can cast to the concrete type")
require.NotNil(t, tn, "Must have a valid translation")

assert.True(t, tn.SupportedVersion(&Version{1, 0, 0}), "Must have the version listed as supported")

count := 0
prevRev := &Version{1, 0, 0}
it, status := tn.iterator(context.Background(), prevRev)
assert.Equal(t, Update, status, "Must return a status of update")
for currRev, more := it(); more; currRev, more = it() {
assert.True(t, prevRev.LessThan(currRev.Version()))
prevRev = currRev.Version()
count++
}

tn, ok = m.RequestTranslation(context.Background(), schemaURL).(*translator)
require.True(t, ok, "Can cast to the concrete type")
require.NotNil(t, tn, "Must have a valid translation")

}
75 changes: 75 additions & 0 deletions processor/schemaprocessor/internal/translation/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"

import (
"bytes"
"context"
"embed"
"fmt"
"io"
"net/http"
"net/url"
)

// Provider allows for collector extensions to be used to look up schemaURLs
type Provider interface {
// Lookup whill check the underlying provider to see if content exists
// for the provided schemaURL, in the even that it doesn't an error is returned.
Lookup(ctx context.Context, schemaURL string) (content io.Reader, err error)
}

type httpProvider struct {
client *http.Client
}

var (
_ Provider = (*httpProvider)(nil)
)

func NewHTTPProvider(client *http.Client) Provider {
return &httpProvider{client: client}
}

func (hp *httpProvider) Lookup(ctx context.Context, schemaURL string) (io.Reader, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, schemaURL, http.NoBody)
if err != nil {
return nil, err
}
resp, err := hp.client.Do(req)
if err != nil {
return nil, err
}
content := bytes.NewBuffer(nil)
if _, err := content.ReadFrom(resp.Body); err != nil {
return nil, err
}
if err := resp.Body.Close(); err != nil {
return nil, err
}
if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("invalid status code returned: %d", resp.StatusCode)
}
return content, nil
}

type testProvider struct {
fs *embed.FS
}

func NewTestProvider(fs *embed.FS) Provider {
return &testProvider{fs: fs}
}

func (tp testProvider) Lookup(_ context.Context, schemaURL string) (io.Reader, error) {
parsedPath, err := url.Parse(schemaURL)
if err != nil {
return nil, err
}
f, err := tp.fs.Open(parsedPath.Path[1:])
if err != nil {
return nil, err
}
return f, nil
}
Loading

0 comments on commit af3a4fd

Please sign in to comment.