Skip to content

Commit

Permalink
inal processor chunk split out
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitpatel96 committed Feb 12, 2025
1 parent ef63c0b commit f1689da
Show file tree
Hide file tree
Showing 34 changed files with 2,311 additions and 153 deletions.
4 changes: 2 additions & 2 deletions processor/schemaprocessor/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ graph LR;
end
```
The [Transformer](transformer.go) is registered as a Processor in the Collector by the factory.
Data flows into the Transformer, which uses the Schema URL to fetch the translation from the Translation Manager.
The [schemaprocessor](processor.go) is registered as a Processor in the Collector by the factory.
Data flows into the Processor, which uses the Schema URL to fetch the translation from the Translation Manager.
The Translation Manager (at internal/translation/manager.go in a future PR) is responsible for fetching and caching the translations. It takes in a schema URL and returns a Translator struct.

The Translator struct contains the target schema URL, the target schema version, and a list of Revisions. The Translator figures out what the version of the incoming data is and what Revisions to apply to the incoming data to get it to the target schema version. The Translator is also responsible for applying the Revisions to the incoming data - it iterates through these Revisions and applies them to the incoming data.
Expand Down
8 changes: 4 additions & 4 deletions processor/schemaprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var processorCapabilities = consumer.Capabilities{MutatesData: true}
// factory will store any of the precompiled schemas in future
type factory struct{}

// newDefaultConfiguration returns the configuration for schema transformer processor
// newDefaultConfiguration returns the configuration for schema processor
// with the default values being used throughout it
func newDefaultConfiguration() component.Config {
return &Config{
Expand All @@ -47,7 +47,7 @@ func (f factory) createLogsProcessor(
cfg component.Config,
next consumer.Logs,
) (processor.Logs, error) {
transformer, err := newTransformer(ctx, cfg, set)
transformer, err := newSchemaProcessor(ctx, cfg, set)
if err != nil {
return nil, err
}
Expand All @@ -68,7 +68,7 @@ func (f factory) createMetricsProcessor(
cfg component.Config,
next consumer.Metrics,
) (processor.Metrics, error) {
transformer, err := newTransformer(ctx, cfg, set)
transformer, err := newSchemaProcessor(ctx, cfg, set)
if err != nil {
return nil, err
}
Expand All @@ -89,7 +89,7 @@ func (f factory) createTracesProcessor(
cfg component.Config,
next consumer.Traces,
) (processor.Traces, error) {
transformer, err := newTransformer(ctx, cfg, set)
transformer, err := newSchemaProcessor(ctx, cfg, set)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions processor/schemaprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ require (
go.opentelemetry.io/collector/consumer v1.25.0
go.opentelemetry.io/collector/consumer/consumertest v0.119.0
go.opentelemetry.io/collector/pdata v1.25.0
go.opentelemetry.io/collector/pipeline v0.119.0
go.opentelemetry.io/collector/processor v0.119.0
go.opentelemetry.io/collector/processor/processortest v0.119.0
go.opentelemetry.io/otel/metric v1.34.0
go.opentelemetry.io/otel/schema v0.0.12
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
Expand Down Expand Up @@ -59,11 +61,9 @@ require (
go.opentelemetry.io/collector/extension/auth v0.119.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.119.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.119.0 // indirect
go.opentelemetry.io/collector/pipeline v0.119.0 // indirect
go.opentelemetry.io/collector/processor/xprocessor v0.119.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
Expand Down
157 changes: 157 additions & 0 deletions processor/schemaprocessor/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor"

import (
"context"
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"
)

type schemaprocessor struct {
telemetry component.TelemetrySettings
config *Config

log *zap.Logger

manager translation.Manager
}

func newSchemaProcessor(_ context.Context, conf component.Config, set processor.Settings) (*schemaprocessor, error) {
cfg, ok := conf.(*Config)
if !ok {
return nil, errors.New("invalid configuration provided")
}

m, err := translation.NewManager(
cfg.Targets,
set.Logger.Named("schema-manager"),
)
if err != nil {
return nil, err
}

return &schemaprocessor{
config: cfg,
telemetry: set.TelemetrySettings,
log: set.Logger,
manager: m,
}, nil
}

func (t schemaprocessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
for rl := 0; rl < ld.ResourceLogs().Len(); rl++ {
rLog := ld.ResourceLogs().At(rl)
resourceSchemaURL := rLog.SchemaUrl()
err := t.manager.
RequestTranslation(ctx, resourceSchemaURL).
ApplyAllResourceChanges(rLog, resourceSchemaURL)
if err != nil {
return plog.Logs{}, err
}
for sl := 0; sl < rLog.ScopeLogs().Len(); sl++ {
log := rLog.ScopeLogs().At(sl)
logSchemaURL := log.SchemaUrl()
if logSchemaURL == "" {
logSchemaURL = resourceSchemaURL
}

err := t.manager.
RequestTranslation(ctx, logSchemaURL).
ApplyScopeLogChanges(log, logSchemaURL)
if err != nil {
return plog.Logs{}, err
}
}
}
return ld, nil
}

func (t schemaprocessor) processMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
for rm := 0; rm < md.ResourceMetrics().Len(); rm++ {
rMetric := md.ResourceMetrics().At(rm)
resourceSchemaURL := rMetric.SchemaUrl()
err := t.manager.
RequestTranslation(ctx, resourceSchemaURL).
ApplyAllResourceChanges(rMetric, resourceSchemaURL)
if err != nil {
return pmetric.Metrics{}, err
}
for sm := 0; sm < rMetric.ScopeMetrics().Len(); sm++ {
metric := rMetric.ScopeMetrics().At(sm)
metricSchemaURL := metric.SchemaUrl()
if metricSchemaURL == "" {
metricSchemaURL = resourceSchemaURL
}
err := t.manager.
RequestTranslation(ctx, metricSchemaURL).
ApplyScopeMetricChanges(metric, metricSchemaURL)
if err != nil {
return pmetric.Metrics{}, err
}
}
}
return md, nil
}

func (t schemaprocessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
for rt := 0; rt < td.ResourceSpans().Len(); rt++ {
rTrace := td.ResourceSpans().At(rt)
// todo(ankit) do i need to check if this is empty?
resourceSchemaURL := rTrace.SchemaUrl()
err := t.manager.
RequestTranslation(ctx, resourceSchemaURL).
ApplyAllResourceChanges(rTrace, resourceSchemaURL)
if err != nil {
return ptrace.Traces{}, err
}
for ss := 0; ss < rTrace.ScopeSpans().Len(); ss++ {
span := rTrace.ScopeSpans().At(ss)
spanSchemaURL := span.SchemaUrl()
if spanSchemaURL == "" {
spanSchemaURL = resourceSchemaURL
}
err := t.manager.
RequestTranslation(ctx, spanSchemaURL).
ApplyScopeSpanChanges(span, spanSchemaURL)
if err != nil {
return ptrace.Traces{}, err
}
}
}
return td, nil
}

// start will load the remote file definition if it isn't already cached
// and resolve the schema translation file
func (t *schemaprocessor) start(ctx context.Context, host component.Host) error {
var providers []translation.Provider
// Check for additional extensions that can be checked first before
// perfomring the http request
// TODO(MovieStoreGuy): Check for storage extensions

client, err := t.config.ToClient(ctx, host, t.telemetry)
if err != nil {
return err
}

if err := t.manager.SetProviders(append(providers, translation.NewHTTPProvider(client))...); err != nil {
return err
}
go func(ctx context.Context) {
for _, schemaURL := range t.config.Prefetch {
_ = t.manager.RequestTranslation(ctx, schemaURL)
}
}(ctx)

return nil
}
Loading

0 comments on commit f1689da

Please sign in to comment.