diff --git a/translator/cmdutil/translatorutil.go b/translator/cmdutil/translatorutil.go index 7e65aedd9d..6e7c1f7f59 100644 --- a/translator/cmdutil/translatorutil.go +++ b/translator/cmdutil/translatorutil.go @@ -216,8 +216,7 @@ func TranslateJsonMapToTomlConfig(jsonConfigValue interface{}) (interface{}, err } func TranslateJsonMapToYamlConfig(jsonConfigValue interface{}) (interface{}, error) { - t := otel.NewTranslator() - cfg, err := t.Translate(jsonConfigValue, context.CurrentContext().Os()) + cfg, err := otel.Translate(jsonConfigValue, context.CurrentContext().Os()) if err != nil { return nil, err } diff --git a/translator/translate/otel/common/common.go b/translator/translate/otel/common/common.go index a930e75e21..e99ff50e68 100644 --- a/translator/translate/otel/common/common.go +++ b/translator/translate/otel/common/common.go @@ -76,8 +76,8 @@ type Translator[C any] interface { // TranslatorMap is a map of translators by their types. type TranslatorMap[C any] map[component.ID]Translator[C] -// Add is a convenience method to add a translator to the map. -func (t TranslatorMap[C]) Add(translator Translator[C]) { +// Set is a convenience method to add a translator to the map. +func (t TranslatorMap[C]) Set(translator Translator[C]) { t[translator.ID()] = translator } @@ -87,10 +87,10 @@ func (t TranslatorMap[C]) Get(id component.ID) (Translator[C], bool) { return translator, ok } -// Merge adds the translators in the input to the existing map. +// Merge sets the translators in the input to the existing map. func (t TranslatorMap[C]) Merge(m TranslatorMap[C]) { for _, v := range m { - t.Add(v) + t.Set(v) } } @@ -107,7 +107,7 @@ func (t TranslatorMap[C]) SortedKeys() []component.ID { func NewTranslatorMap[C any](translators ...Translator[C]) TranslatorMap[C] { translatorMap := make(TranslatorMap[C], len(translators)) for _, translator := range translators { - translatorMap.Add(translator) + translatorMap.Set(translator) } return translatorMap } diff --git a/translator/translate/otel/exporter/awsemf/kubernetes.go b/translator/translate/otel/exporter/awsemf/kubernetes.go index 40d88f7908..ca0353ed23 100644 --- a/translator/translate/otel/exporter/awsemf/kubernetes.go +++ b/translator/translate/otel/exporter/awsemf/kubernetes.go @@ -4,9 +4,8 @@ package awsemf import ( - "go.opentelemetry.io/collector/confmap" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter" + "go.opentelemetry.io/collector/confmap" "github.com/aws/private-amazon-cloudwatch-agent-staging/translator/translate/otel/common" ) diff --git a/translator/translate/otel/exporter/awsemf/translator_test.go b/translator/translate/otel/exporter/awsemf/translator_test.go index faae777c83..22d38e0ee6 100644 --- a/translator/translate/otel/exporter/awsemf/translator_test.go +++ b/translator/translate/otel/exporter/awsemf/translator_test.go @@ -6,11 +6,10 @@ package awsemf import ( "testing" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/confmap" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" legacytranslator "github.com/aws/private-amazon-cloudwatch-agent-staging/translator" ) diff --git a/translator/translate/otel/pipeline/emf_logs/translator.go b/translator/translate/otel/pipeline/emf_logs/translator.go index f8f377d245..83245ca9d6 100644 --- a/translator/translate/otel/pipeline/emf_logs/translator.go +++ b/translator/translate/otel/pipeline/emf_logs/translator.go @@ -53,15 +53,15 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators } if serviceAddress, ok := common.GetString(conf, serviceAddressEMFKey); ok { if strings.Contains(serviceAddress, common.Udp) { - translators.Receivers.Add(udp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs)) + translators.Receivers.Set(udp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs)) } else { - translators.Receivers.Add(tcp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs)) + translators.Receivers.Set(tcp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs)) } } else if serviceAddress, ok := common.GetString(conf, serviceAddressStructuredLogKey); ok { if strings.Contains(serviceAddress, common.Udp) { - translators.Receivers.Add(udp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs)) + translators.Receivers.Set(udp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs)) } else { - translators.Receivers.Add(tcp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs)) + translators.Receivers.Set(tcp_logs.NewTranslatorWithName(common.PipelineNameEmfLogs)) } } else { translators.Receivers = common.NewTranslatorMap( diff --git a/translator/translate/otel/pipeline/host/translator.go b/translator/translate/otel/pipeline/host/translator.go index acb7984967..ab8658c6ad 100644 --- a/translator/translate/otel/pipeline/host/translator.go +++ b/translator/translate/otel/pipeline/host/translator.go @@ -55,17 +55,17 @@ func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators, // we need to add delta processor because (only) diskio and net input plugins report delta metric if common.PipelineNameHostDeltaMetrics == t.name { log.Printf("D! delta processor required because metrics with diskio or net are set") - translators.Processors.Add(cumulativetodeltaprocessor.NewTranslatorWithName(t.name)) + translators.Processors.Set(cumulativetodeltaprocessor.NewTranslatorWithName(t.name)) } if conf.IsSet(common.ConfigKey(common.MetricsKey, "append_dimensions")) { log.Printf("D! ec2tagger processor required because append_dimensions is set") - translators.Processors.Add(ec2taggerprocessor.NewTranslator()) + translators.Processors.Set(ec2taggerprocessor.NewTranslator()) } if metricsdecorator.IsSet(conf) { log.Printf("D! metric decorator required because measurement fields are set") - translators.Processors.Add(metricsdecorator.NewTranslator()) + translators.Processors.Set(metricsdecorator.NewTranslator()) } return &translators, nil } diff --git a/translator/translate/otel/pipeline/translator.go b/translator/translate/otel/pipeline/translator.go index 6657ec11aa..71e51d87c7 100644 --- a/translator/translate/otel/pipeline/translator.go +++ b/translator/translate/otel/pipeline/translator.go @@ -17,6 +17,8 @@ var ( ErrNoPipelines = errors.New("no valid pipelines") ) +type Translator common.Translator[*common.ComponentTranslators] + type Translation struct { // Pipelines is a map of component IDs to service pipelines. Pipelines map[component.ID]*service.PipelineConfig @@ -24,13 +26,13 @@ type Translation struct { } type translator struct { - translators []common.Translator[*common.ComponentTranslators] + translators common.TranslatorMap[*common.ComponentTranslators] } var _ common.Translator[*Translation] = (*translator)(nil) -func NewTranslator(translators ...common.Translator[*common.ComponentTranslators]) common.Translator[*Translation] { - return &translator{translators} +func NewTranslator(translators common.TranslatorMap[*common.ComponentTranslators]) common.Translator[*Translation] { + return &translator{translators: translators} } func (t *translator) ID() component.ID { @@ -48,9 +50,9 @@ func (t *translator) Translate(conf *confmap.Conf) (*Translation, error) { Extensions: common.NewTranslatorMap[component.Config](), }, } - for _, pt := range t.translators { + for id, pt := range t.translators { if pipeline, _ := pt.Translate(conf); pipeline != nil { - translation.Pipelines[pt.ID()] = &service.PipelineConfig{ + translation.Pipelines[id] = &service.PipelineConfig{ Receivers: pipeline.Receivers.SortedKeys(), Processors: pipeline.Processors.SortedKeys(), Exporters: pipeline.Exporters.SortedKeys(), diff --git a/translator/translate/otel/pipeline/translator_test.go b/translator/translate/otel/pipeline/translator_test.go index d94cdbd09f..623de20a83 100644 --- a/translator/translate/otel/pipeline/translator_test.go +++ b/translator/translate/otel/pipeline/translator_test.go @@ -28,15 +28,13 @@ func (t testTranslator) ID() component.ID { } func TestTranslator(t *testing.T) { - pt := NewTranslator() + pt := NewTranslator(common.NewTranslatorMap[*common.ComponentTranslators]()) got, err := pt.Translate(confmap.New()) require.Equal(t, ErrNoPipelines, err) require.Nil(t, got) - pt = NewTranslator( - &testTranslator{ - result: &common.ComponentTranslators{}, - }, - ) + pt = NewTranslator(common.NewTranslatorMap[*common.ComponentTranslators](&testTranslator{ + result: &common.ComponentTranslators{}, + })) got, err = pt.Translate(confmap.New()) require.NoError(t, err) require.NotNil(t, got) diff --git a/translator/translate/otel/pipeline/xray/translator.go b/translator/translate/otel/pipeline/xray/translator.go index df82dbd288..8576c37b76 100644 --- a/translator/translate/otel/pipeline/xray/translator.go +++ b/translator/translate/otel/pipeline/xray/translator.go @@ -49,10 +49,10 @@ func (t *translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators Exporters: common.NewTranslatorMap(awsxrayexporter.NewTranslator()), } if conf.IsSet(xrayKey) { - translators.Receivers.Add(awsxrayreceiver.NewTranslator()) + translators.Receivers.Set(awsxrayreceiver.NewTranslator()) } if conf.IsSet(otlpKey) { - translators.Receivers.Add(otlp.NewTranslator(otlp.WithDataType(component.DataTypeTraces))) + translators.Receivers.Set(otlp.NewTranslator(otlp.WithDataType(component.DataTypeTraces))) } return translators, nil } diff --git a/translator/translate/otel/receiver/adapter/translators.go b/translator/translate/otel/receiver/adapter/translators.go index 7f5d69f193..3a57e26781 100644 --- a/translator/translate/otel/receiver/adapter/translators.go +++ b/translator/translate/otel/receiver/adapter/translators.go @@ -108,7 +108,7 @@ func fromWindowsMetrics(conf *confmap.Conf) common.TranslatorMap[component.Confi for inputName := range inputs { if windowsInputSet.Contains(inputName) { cfgKey := common.ConfigKey(metricKey, inputName) - translators.Add(NewTranslator(toAlias(inputName), cfgKey, collections.GetOrDefault( + translators.Set(NewTranslator(toAlias(inputName), cfgKey, collections.GetOrDefault( defaultCollectionIntervalMap, inputName, defaultMetricsCollectionInterval, @@ -137,7 +137,7 @@ func fromInputs(conf *confmap.Conf, baseKey string) common.TranslatorMap[compone if multipleInputSet.Contains(inputName) { translators.Merge(fromMultipleInput(conf, inputName, "")) } else { - translators.Add(NewTranslator(toAlias(inputName), cfgKey, collections.GetOrDefault( + translators.Set(NewTranslator(toAlias(inputName), cfgKey, collections.GetOrDefault( defaultCollectionIntervalMap, inputName, defaultMetricsCollectionInterval, @@ -181,7 +181,7 @@ func fromMultipleInput(conf *confmap.Conf, inputName, os string) common.Translat // Array type validation needs to be specific https://stackoverflow.com/a/47989212 for _, procstatMonitored := range procstatMonitoredSet { if componentPsValue, ok := psKey[procstatMonitored]; ok { - translators.Add(NewTranslatorWithName( + translators.Set(NewTranslatorWithName( componentPsValue.(string), procstat.SectionKey, cfgKey, @@ -193,19 +193,19 @@ func fromMultipleInput(conf *confmap.Conf, inputName, os string) common.Translat } } else if os == translatorconfig.OS_TYPE_WINDOWS && !windowsInputSet.Contains(inputName) { /* For customized metrics from Windows and window performance counters metrics - [[inputs.win_perf_counters.object]] - ObjectName = "Processor" - Instances = ["*"] - Counters = ["% Idle Time", "% Interrupt Time", "% Privileged Time", "% User Time", "% Processor Time"] - Measurement = "win_cpu" - - [[inputs.win_perf_counters.object]] - ObjectName = "LogicalDisk" - Instances = ["*"] - Counters = ["% Idle Time", "% Disk Time","% Disk Read Time", "% Disk Write Time", "% User Time", "Current Disk Queue Length"] - Measurement = "win_disk" + [[inputs.win_perf_counters.object]] + ObjectName = "Processor" + Instances = ["*"] + Counters = ["% Idle Time", "% Interrupt Time", "% Privileged Time", "% User Time", "% Processor Time"] + Measurement = "win_cpu" + + [[inputs.win_perf_counters.object]] + ObjectName = "LogicalDisk" + Instances = ["*"] + Counters = ["% Idle Time", "% Disk Time","% Disk Read Time", "% Disk Write Time", "% User Time", "Current Disk Queue Length"] + Measurement = "win_disk" */ - translators.Add(NewTranslatorWithName( + translators.Set(NewTranslatorWithName( inputName, customizedmetrics.WinPerfCountersKey, cfgKey, diff --git a/translator/translate/otel/translate_otel.go b/translator/translate/otel/translate_otel.go index 7f454452e4..6c2dafcd7f 100644 --- a/translator/translate/otel/translate_otel.go +++ b/translator/translate/otel/translate_otel.go @@ -29,63 +29,16 @@ import ( "github.com/aws/private-amazon-cloudwatch-agent-staging/translator/translate/otel/receiver/adapter" ) -// Translator is used to create an OTEL config. -type Translator struct { -} - -// NewTranslator creates a new Translator. -func NewTranslator() *Translator { - return &Translator{} -} - -// parseAgentLogFile returns the log file path form the JSON config, or the -// default value. -func parseAgentLogFile(conf *confmap.Conf) string { - v, ok := common.GetString(conf, common.ConfigKey("agent", "logfile")) - if !ok { - return agent.GetDefaultValue() - } - return v -} - -// parseAgentLogLevel returns the logging level from the JSON config, or the -// default value. -func parseAgentLogLevel(conf *confmap.Conf) zapcore.Level { - // "quiet" takes precedence over "debug" in Telegraf. - v, _ := common.GetBool(conf, common.ConfigKey("agent", "quiet")) - if v { - return zapcore.ErrorLevel - } - v, _ = common.GetBool(conf, common.ConfigKey("agent", "debug")) - if v { - return zapcore.DebugLevel - } - return zapcore.InfoLevel -} +var registry = common.NewTranslatorMap[*common.ComponentTranslators]() -// getLoggingConfig uses the given JSON config to determine the correct -// logging configuration that should go in the YAML. -func getLoggingConfig(conf *confmap.Conf) telemetry.LogsConfig { - var outputPaths []string - filename := parseAgentLogFile(conf) - // A slice with an empty string causes OTEL issues, so avoid it. - if filename != "" { - outputPaths = []string{filename} - } - logLevel := parseAgentLogLevel(conf) - return telemetry.LogsConfig{ - OutputPaths: outputPaths, - Level: logLevel, - Encoding: common.Console, - Sampling: &telemetry.LogsSamplingConfig{ - Initial: 2, - Thereafter: 500, - }, +func RegisterPipeline(translators ...pipeline.Translator) { + for _, translator := range translators { + registry.Set(translator) } } // Translate converts a JSON config into an OTEL config. -func (t *Translator) Translate(jsonConfig interface{}, os string) (*otelcol.Config, error) { +func Translate(jsonConfig interface{}, os string) (*otelcol.Config, error) { m, ok := jsonConfig.(map[string]interface{}) if !ok { return nil, errors.New("invalid json config") @@ -106,20 +59,22 @@ func (t *Translator) Translate(jsonConfig interface{}, os string) (*otelcol.Conf hostReceivers := common.NewTranslatorMap[component.Config]() for k, v := range adapterReceivers { if k.Type() == receiverAdapter.Type(common.DiskIOKey) || k.Type() == receiverAdapter.Type(common.NetKey) { - deltaMetricsReceivers.Add(v) + deltaMetricsReceivers.Set(v) } else { - hostReceivers.Add(v) + hostReceivers.Set(v) } } - pipelines, err := pipeline.NewTranslator( + translators := common.NewTranslatorMap( host.NewTranslator(common.PipelineNameHost, hostReceivers), host.NewTranslator(common.PipelineNameHostDeltaMetrics, deltaMetricsReceivers), containerinsights.NewTranslator(), prometheus.NewTranslator(), emf_logs.NewTranslator(), xray.NewTranslator(), - ).Translate(conf) + ) + translators.Merge(registry) + pipelines, err := pipeline.NewTranslator(translators).Translate(conf) if err != nil { return nil, err } @@ -137,7 +92,7 @@ func (t *Translator) Translate(jsonConfig interface{}, os string) (*otelcol.Conf Extensions: pipelines.Translators.Extensions.SortedKeys(), }, } - if err = t.buildComponents(conf, cfg, pipelines.Translators); err != nil { + if err = build(conf, cfg, pipelines.Translators); err != nil { return nil, fmt.Errorf("unable to build components in pipeline: %w", err) } if err = cfg.Validate(); err != nil { @@ -146,8 +101,54 @@ func (t *Translator) Translate(jsonConfig interface{}, os string) (*otelcol.Conf return cfg, nil } -// buildComponents uses the pipelines and extensions defined in the config to build the components. -func (t *Translator) buildComponents(conf *confmap.Conf, cfg *otelcol.Config, translators common.ComponentTranslators) error { +// parseAgentLogFile returns the log file path form the JSON config, or the +// default value. +func parseAgentLogFile(conf *confmap.Conf) string { + v, ok := common.GetString(conf, common.ConfigKey("agent", "logfile")) + if !ok { + return agent.GetDefaultValue() + } + return v +} + +// parseAgentLogLevel returns the logging level from the JSON config, or the +// default value. +func parseAgentLogLevel(conf *confmap.Conf) zapcore.Level { + // "quiet" takes precedence over "debug" in Telegraf. + v, _ := common.GetBool(conf, common.ConfigKey("agent", "quiet")) + if v { + return zapcore.ErrorLevel + } + v, _ = common.GetBool(conf, common.ConfigKey("agent", "debug")) + if v { + return zapcore.DebugLevel + } + return zapcore.InfoLevel +} + +// getLoggingConfig uses the given JSON config to determine the correct +// logging configuration that should go in the YAML. +func getLoggingConfig(conf *confmap.Conf) telemetry.LogsConfig { + var outputPaths []string + filename := parseAgentLogFile(conf) + // A slice with an empty string causes OTEL issues, so avoid it. + if filename != "" { + outputPaths = []string{filename} + } + logLevel := parseAgentLogLevel(conf) + return telemetry.LogsConfig{ + OutputPaths: outputPaths, + Level: logLevel, + Encoding: common.Console, + Sampling: &telemetry.LogsSamplingConfig{ + Initial: 2, + Thereafter: 500, + }, + } +} + +// build uses the pipelines and extensions defined in the config to build the components. +func build(conf *confmap.Conf, cfg *otelcol.Config, translators common.ComponentTranslators) error { errs := buildComponents(conf, cfg.Service.Extensions, cfg.Extensions, translators.Extensions.Get) for _, p := range cfg.Service.Pipelines { errs = multierr.Append(errs, buildComponents(conf, p.Receivers, cfg.Receivers, translators.Receivers.Get)) diff --git a/translator/translate/otel/translate_otel_test.go b/translator/translate/otel/translate_otel_test.go index ddfefece62..8a59486ae6 100644 --- a/translator/translate/otel/translate_otel_test.go +++ b/translator/translate/otel/translate_otel_test.go @@ -6,15 +6,18 @@ package otel import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" "github.com/aws/private-amazon-cloudwatch-agent-staging/translator" "github.com/aws/private-amazon-cloudwatch-agent-staging/translator/translate/agent" + "github.com/aws/private-amazon-cloudwatch-agent-staging/translator/translate/otel/common" ) func TestTranslator(t *testing.T) { agent.Global_Config.Region = "us-east-1" - ot := NewTranslator() testCases := map[string]struct { input interface{} wantErrContains string @@ -46,16 +49,45 @@ func TestTranslator(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { translator.SetTargetPlatform("linux") - got, err := ot.Translate(testCase.input, "linux") + got, err := Translate(testCase.input, "linux") if testCase.wantErrContains != "" { require.Error(t, err) - require.Nil(t, got) + assert.Nil(t, got) t.Log(err) - require.ErrorContains(t, err, testCase.wantErrContains) + assert.ErrorContains(t, err, testCase.wantErrContains) } else { require.NoError(t, err) - require.NotNil(t, got) + assert.NotNil(t, got) } }) } } + +type testTranslator struct { + id component.ID + version int +} + +func (t testTranslator) Translate(_ *confmap.Conf) (*common.ComponentTranslators, error) { + return nil, nil +} + +func (t testTranslator) ID() component.ID { + return t.id +} + +var _ common.Translator[*common.ComponentTranslators] = (*testTranslator)(nil) + +func TestRegisterPipeline(t *testing.T) { + original := &testTranslator{id: component.NewID("test"), version: 1} + tm := common.NewTranslatorMap[*common.ComponentTranslators](original) + assert.Len(t, registry, 0) + newTranslator := &testTranslator{id: component.NewID("test"), version: 2} + RegisterPipeline(newTranslator) + assert.Len(t, registry, 1) + tm.Merge(registry) + got, ok := tm.Get(component.NewID("test")) + assert.True(t, ok) + assert.Equal(t, newTranslator, got) + assert.NotEqual(t, original, got) +}