Skip to content

Commit

Permalink
Add registry for pipeline translators. (#222)
Browse files Browse the repository at this point in the history
  • Loading branch information
jefchien authored Jun 1, 2023
1 parent 7a011ca commit cd1f1ab
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 111 deletions.
3 changes: 1 addition & 2 deletions translator/cmdutil/translatorutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ func TranslateJsonMapToTomlConfig(jsonConfigValue interface{}) (interface{}, err
}

func TranslateJsonMapToYamlConfig(jsonConfigValue interface{}) (interface{}, error) {
t := otel.NewTranslator()
cfg, err := t.Translate(jsonConfigValue, context.CurrentContext().Os())
cfg, err := otel.Translate(jsonConfigValue, context.CurrentContext().Os())
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions translator/translate/otel/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ type Translator[C any] interface {
// TranslatorMap is a map of translators by their types.
type TranslatorMap[C any] map[component.ID]Translator[C]

// Add is a convenience method to add a translator to the map.
func (t TranslatorMap[C]) Add(translator Translator[C]) {
// Set is a convenience method to add a translator to the map.
func (t TranslatorMap[C]) Set(translator Translator[C]) {
t[translator.ID()] = translator
}

Expand All @@ -87,10 +87,10 @@ func (t TranslatorMap[C]) Get(id component.ID) (Translator[C], bool) {
return translator, ok
}

// Merge adds the translators in the input to the existing map.
// Merge sets the translators in the input to the existing map.
func (t TranslatorMap[C]) Merge(m TranslatorMap[C]) {
for _, v := range m {
t.Add(v)
t.Set(v)
}
}

Expand All @@ -107,7 +107,7 @@ func (t TranslatorMap[C]) SortedKeys() []component.ID {
func NewTranslatorMap[C any](translators ...Translator[C]) TranslatorMap[C] {
translatorMap := make(TranslatorMap[C], len(translators))
for _, translator := range translators {
translatorMap.Add(translator)
translatorMap.Set(translator)
}
return translatorMap
}
Expand Down
3 changes: 1 addition & 2 deletions translator/translate/otel/exporter/awsemf/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
package awsemf

import (
"go.opentelemetry.io/collector/confmap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"
"go.opentelemetry.io/collector/confmap"

"github.com/aws/private-amazon-cloudwatch-agent-staging/translator/translate/otel/common"
)
Expand Down
5 changes: 2 additions & 3 deletions translator/translate/otel/exporter/awsemf/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ package awsemf
import (
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/confmap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/confmap"

legacytranslator "github.com/aws/private-amazon-cloudwatch-agent-staging/translator"
)
Expand Down
8 changes: 4 additions & 4 deletions translator/translate/otel/pipeline/emf_logs/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators
}
if serviceAddress, ok := common.GetString(conf, serviceAddressEMFKey); ok {
if strings.Contains(serviceAddress, common.Udp) {
translators.Receivers.Add(udp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs))
translators.Receivers.Set(udp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs))
} else {
translators.Receivers.Add(tcp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs))
translators.Receivers.Set(tcp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs))
}
} else if serviceAddress, ok := common.GetString(conf, serviceAddressStructuredLogKey); ok {
if strings.Contains(serviceAddress, common.Udp) {
translators.Receivers.Add(udp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs))
translators.Receivers.Set(udp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs))
} else {
translators.Receivers.Add(tcp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs))
translators.Receivers.Set(tcp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs))
}
} else {
translators.Receivers = common.NewTranslatorMap(
Expand Down
6 changes: 3 additions & 3 deletions translator/translate/otel/pipeline/host/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators,
// we need to add delta processor because (only) diskio and net input plugins report delta metric
if common.PipelineNameHostDeltaMetrics == t.name {
log.Printf("D! delta processor required because metrics with diskio or net are set")
translators.Processors.Add(cumulativetodeltaprocessor.NewTranslatorWithName(t.name))
translators.Processors.Set(cumulativetodeltaprocessor.NewTranslatorWithName(t.name))
}

if conf.IsSet(common.ConfigKey(common.MetricsKey, "append_dimensions")) {
log.Printf("D! ec2tagger processor required because append_dimensions is set")
translators.Processors.Add(ec2taggerprocessor.NewTranslator())
translators.Processors.Set(ec2taggerprocessor.NewTranslator())
}

if metricsdecorator.IsSet(conf) {
log.Printf("D! metric decorator required because measurement fields are set")
translators.Processors.Add(metricsdecorator.NewTranslator())
translators.Processors.Set(metricsdecorator.NewTranslator())
}
return &translators, nil
}
12 changes: 7 additions & 5 deletions translator/translate/otel/pipeline/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@ var (
ErrNoPipelines = errors.New("no valid pipelines")
)

type Translator common.Translator[*common.ComponentTranslators]

type Translation struct {
// Pipelines is a map of component IDs to service pipelines.
Pipelines map[component.ID]*service.PipelineConfig
Translators common.ComponentTranslators
}

type translator struct {
translators []common.Translator[*common.ComponentTranslators]
translators common.TranslatorMap[*common.ComponentTranslators]
}

var _ common.Translator[*Translation] = (*translator)(nil)

func NewTranslator(translators ...common.Translator[*common.ComponentTranslators]) common.Translator[*Translation] {
return &translator{translators}
func NewTranslator(translators common.TranslatorMap[*common.ComponentTranslators]) common.Translator[*Translation] {
return &translator{translators: translators}
}

func (t *translator) ID() component.ID {
Expand All @@ -48,9 +50,9 @@ func (t *translator) Translate(conf *confmap.Conf) (*Translation, error) {
Extensions: common.NewTranslatorMap[component.Config](),
},
}
for _, pt := range t.translators {
for id, pt := range t.translators {
if pipeline, _ := pt.Translate(conf); pipeline != nil {
translation.Pipelines[pt.ID()] = &service.PipelineConfig{
translation.Pipelines[id] = &service.PipelineConfig{
Receivers: pipeline.Receivers.SortedKeys(),
Processors: pipeline.Processors.SortedKeys(),
Exporters: pipeline.Exporters.SortedKeys(),
Expand Down
10 changes: 4 additions & 6 deletions translator/translate/otel/pipeline/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@ func (t testTranslator) ID() component.ID {
}

func TestTranslator(t *testing.T) {
pt := NewTranslator()
pt := NewTranslator(common.NewTranslatorMap[*common.ComponentTranslators]())
got, err := pt.Translate(confmap.New())
require.Equal(t, ErrNoPipelines, err)
require.Nil(t, got)
pt = NewTranslator(
&testTranslator{
result: &common.ComponentTranslators{},
},
)
pt = NewTranslator(common.NewTranslatorMap[*common.ComponentTranslators](&testTranslator{
result: &common.ComponentTranslators{},
}))
got, err = pt.Translate(confmap.New())
require.NoError(t, err)
require.NotNil(t, got)
Expand Down
4 changes: 2 additions & 2 deletions translator/translate/otel/pipeline/xray/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators
Exporters: common.NewTranslatorMap(awsxrayexporter.NewTranslator()),
}
if conf.IsSet(xrayKey) {
translators.Receivers.Add(awsxrayreceiver.NewTranslator())
translators.Receivers.Set(awsxrayreceiver.NewTranslator())
}
if conf.IsSet(otlpKey) {
translators.Receivers.Add(otlp.NewTranslator(otlp.WithDataType(component.DataTypeTraces)))
translators.Receivers.Set(otlp.NewTranslator(otlp.WithDataType(component.DataTypeTraces)))
}
return translators, nil
}
30 changes: 15 additions & 15 deletions translator/translate/otel/receiver/adapter/translators.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func fromWindowsMetrics(conf *confmap.Conf) common.TranslatorMap[component.Confi
for inputName := range inputs {
if windowsInputSet.Contains(inputName) {
cfgKey := common.ConfigKey(metricKey, inputName)
translators.Add(NewTranslator(toAlias(inputName), cfgKey, collections.GetOrDefault(
translators.Set(NewTranslator(toAlias(inputName), cfgKey, collections.GetOrDefault(
defaultCollectionIntervalMap,
inputName,
defaultMetricsCollectionInterval,
Expand Down Expand Up @@ -137,7 +137,7 @@ func fromInputs(conf *confmap.Conf, baseKey string) common.TranslatorMap[compone
if multipleInputSet.Contains(inputName) {
translators.Merge(fromMultipleInput(conf, inputName, ""))
} else {
translators.Add(NewTranslator(toAlias(inputName), cfgKey, collections.GetOrDefault(
translators.Set(NewTranslator(toAlias(inputName), cfgKey, collections.GetOrDefault(
defaultCollectionIntervalMap,
inputName,
defaultMetricsCollectionInterval,
Expand Down Expand Up @@ -181,7 +181,7 @@ func fromMultipleInput(conf *confmap.Conf, inputName, os string) common.Translat
// Array type validation needs to be specific https://stackoverflow.com/a/47989212
for _, procstatMonitored := range procstatMonitoredSet {
if componentPsValue, ok := psKey[procstatMonitored]; ok {
translators.Add(NewTranslatorWithName(
translators.Set(NewTranslatorWithName(
componentPsValue.(string),
procstat.SectionKey,
cfgKey,
Expand All @@ -193,19 +193,19 @@ func fromMultipleInput(conf *confmap.Conf, inputName, os string) common.Translat
}
} else if os == translatorconfig.OS_TYPE_WINDOWS && !windowsInputSet.Contains(inputName) {
/* For customized metrics from Windows and window performance counters metrics
[[inputs.win_perf_counters.object]]
ObjectName = "Processor"
Instances = ["*"]
Counters = ["% Idle Time", "% Interrupt Time", "% Privileged Time", "% User Time", "% Processor Time"]
Measurement = "win_cpu"
[[inputs.win_perf_counters.object]]
ObjectName = "LogicalDisk"
Instances = ["*"]
Counters = ["% Idle Time", "% Disk Time","% Disk Read Time", "% Disk Write Time", "% User Time", "Current Disk Queue Length"]
Measurement = "win_disk"
[[inputs.win_perf_counters.object]]
ObjectName = "Processor"
Instances = ["*"]
Counters = ["% Idle Time", "% Interrupt Time", "% Privileged Time", "% User Time", "% Processor Time"]
Measurement = "win_cpu"
[[inputs.win_perf_counters.object]]
ObjectName = "LogicalDisk"
Instances = ["*"]
Counters = ["% Idle Time", "% Disk Time","% Disk Read Time", "% Disk Write Time", "% User Time", "Current Disk Queue Length"]
Measurement = "win_disk"
*/
translators.Add(NewTranslatorWithName(
translators.Set(NewTranslatorWithName(
inputName,
customizedmetrics.WinPerfCountersKey,
cfgKey,
Expand Down
Loading

0 comments on commit cd1f1ab

Please sign in to comment.