From 7a011ca8f25ca8ebb089b3c65752d1fe68208ac4 Mon Sep 17 00:00:00 2001 From: Jeffrey Chien Date: Thu, 1 Jun 2023 12:39:31 -0400 Subject: [PATCH] Separate default components. (#220) --- .../amazon-cloudwatch-agent.go | 78 +++---------------- service/defaultcomponents/components.go | 67 ++++++++++++++++ service/defaultcomponents/components_test.go | 47 +++++++++++ service/registry/registry.go | 72 +++++++++++++++++ service/registry/registry_test.go | 40 ++++++++++ 5 files changed, 235 insertions(+), 69 deletions(-) create mode 100644 service/defaultcomponents/components.go create mode 100644 service/defaultcomponents/components_test.go create mode 100644 service/registry/registry.go create mode 100644 service/registry/registry_test.go diff --git a/cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go b/cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go index be37f64d22..51f94379c5 100644 --- a/cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go +++ b/cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go @@ -28,38 +28,20 @@ import ( "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/wlog" "github.com/kardianos/service" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awscloudwatchlogsexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsxrayexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/udplogreceiver" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/provider/fileprovider" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/loggingexporter" - "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/otelcol" - "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/processor/batchprocessor" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/otlpreceiver" - "golang.org/x/exp/maps" configaws "github.com/aws/private-amazon-cloudwatch-agent-staging/cfg/aws" "github.com/aws/private-amazon-cloudwatch-agent-staging/cfg/envconfig" "github.com/aws/private-amazon-cloudwatch-agent-staging/cmd/amazon-cloudwatch-agent/internal" "github.com/aws/private-amazon-cloudwatch-agent-staging/handlers/agentinfo" - "github.com/aws/private-amazon-cloudwatch-agent-staging/internal/util/collections" "github.com/aws/private-amazon-cloudwatch-agent-staging/logs" _ "github.com/aws/private-amazon-cloudwatch-agent-staging/plugins" - "github.com/aws/private-amazon-cloudwatch-agent-staging/plugins/outputs/cloudwatch" - "github.com/aws/private-amazon-cloudwatch-agent-staging/plugins/processors/ec2tagger" "github.com/aws/private-amazon-cloudwatch-agent-staging/profiler" "github.com/aws/private-amazon-cloudwatch-agent-staging/receiver/adapter" + "github.com/aws/private-amazon-cloudwatch-agent-staging/service/defaultcomponents" + "github.com/aws/private-amazon-cloudwatch-agent-staging/service/registry" ) const ( @@ -375,63 +357,21 @@ func runAgent(ctx context.Context, func components(telegrafConfig *config.Config) (otelcol.Factories, error) { telegrafAdapter := adapter.NewAdapter(telegrafConfig) - factories := otelcol.Factories{} - - receiverFactories := []receiver.Factory{ - // OTel native receivers - awscontainerinsightreceiver.NewFactory(), - awsxrayreceiver.NewFactory(), - otlpreceiver.NewFactory(), - tcplogreceiver.NewFactory(), - udplogreceiver.NewFactory(), - } - adapterReceiverSet := collections.NewSet[string]() - // Adapted receivers from telegraf - for _, input := range telegrafConfig.Inputs { - adapterReceiverSet.Add(input.Config.Name) - } - - for _, adapterReceiver := range maps.Keys(adapterReceiverSet) { - receiverFactories = append(receiverFactories, telegrafAdapter.NewReceiverFactory(adapterReceiver)) - } - - receivers, err := receiver.MakeFactoryMap(receiverFactories...) + factories, err := defaultcomponents.Factories() if err != nil { return factories, err } - processors, err := processor.MakeFactoryMap( - batchprocessor.NewFactory(), - cumulativetodeltaprocessor.NewFactory(), - ec2tagger.NewFactory(), - transformprocessor.NewFactory(), - ) - if err != nil { - return factories, err - } - - exporters, err := exporter.MakeFactoryMap( - awsemfexporter.NewFactory(), - loggingexporter.NewFactory(), - cloudwatch.NewFactory(), - awscloudwatchlogsexporter.NewFactory(), - awsxrayexporter.NewFactory(), - ) - if err != nil { - return factories, err + // Adapted receivers from telegraf + for _, input := range telegrafConfig.Inputs { + registry.Register(registry.WithReceiver(telegrafAdapter.NewReceiverFactory(input.Config.Name))) } - extensions, err := extension.MakeFactoryMap() - if err != nil { - return factories, err + for _, apply := range registry.Options() { + apply(&factories) } - factories = otelcol.Factories{ - Receivers: receivers, - Processors: processors, - Exporters: exporters, - Extensions: extensions, - } + registry.Reset() return factories, nil } diff --git a/service/defaultcomponents/components.go b/service/defaultcomponents/components.go new file mode 100644 index 0000000000..6fc22d257a --- /dev/null +++ b/service/defaultcomponents/components.go @@ -0,0 +1,67 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package defaultcomponents + +import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awscloudwatchlogsexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsxrayexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/udplogreceiver" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/loggingexporter" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/otelcol" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/batchprocessor" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/otlpreceiver" + + "github.com/aws/private-amazon-cloudwatch-agent-staging/plugins/outputs/cloudwatch" + "github.com/aws/private-amazon-cloudwatch-agent-staging/plugins/processors/ec2tagger" +) + +func Factories() (otelcol.Factories, error) { + var factories otelcol.Factories + var err error + + if factories.Receivers, err = receiver.MakeFactoryMap( + awscontainerinsightreceiver.NewFactory(), + awsxrayreceiver.NewFactory(), + otlpreceiver.NewFactory(), + tcplogreceiver.NewFactory(), + udplogreceiver.NewFactory(), + ); err != nil { + return otelcol.Factories{}, err + } + + if factories.Processors, err = processor.MakeFactoryMap( + batchprocessor.NewFactory(), + cumulativetodeltaprocessor.NewFactory(), + ec2tagger.NewFactory(), + transformprocessor.NewFactory(), + ); err != nil { + return otelcol.Factories{}, err + } + + if factories.Exporters, err = exporter.MakeFactoryMap( + awscloudwatchlogsexporter.NewFactory(), + awsemfexporter.NewFactory(), + awsxrayexporter.NewFactory(), + cloudwatch.NewFactory(), + loggingexporter.NewFactory(), + ); err != nil { + return otelcol.Factories{}, err + } + + if factories.Extensions, err = extension.MakeFactoryMap(); err != nil { + return otelcol.Factories{}, err + } + + return factories, nil +} diff --git a/service/defaultcomponents/components_test.go b/service/defaultcomponents/components_test.go new file mode 100644 index 0000000000..29eaa7955f --- /dev/null +++ b/service/defaultcomponents/components_test.go @@ -0,0 +1,47 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package defaultcomponents + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +const ( + receiversCount = 5 + processorCount = 4 + exportersCount = 5 + extensionsCount = 0 +) + +func TestComponents(t *testing.T) { + factories, err := Factories() + assert.NoError(t, err) + receivers := factories.Receivers + assert.Len(t, receivers, receiversCount) + assert.NotNil(t, receivers["awscontainerinsightreceiver"]) + assert.NotNil(t, receivers["awsxray"]) + assert.NotNil(t, receivers["otlp"]) + assert.NotNil(t, receivers["tcplog"]) + assert.NotNil(t, receivers["udplog"]) + + processors := factories.Processors + assert.Len(t, processors, processorCount) + assert.NotNil(t, processors["batch"]) + assert.NotNil(t, processors["cumulativetodelta"]) + assert.NotNil(t, processors["ec2tagger"]) + assert.NotNil(t, processors["transform"]) + + exporters := factories.Exporters + assert.Len(t, exporters, exportersCount) + assert.NotNil(t, exporters["awscloudwatchlogs"]) + assert.NotNil(t, exporters["awsemf"]) + assert.NotNil(t, exporters["awsxray"]) + assert.NotNil(t, exporters["awscloudwatch"]) + assert.NotNil(t, exporters["logging"]) + + extensions := factories.Extensions + assert.Len(t, extensions, extensionsCount) +} diff --git a/service/registry/registry.go b/service/registry/registry.go new file mode 100644 index 0000000000..3b6aeadc8a --- /dev/null +++ b/service/registry/registry.go @@ -0,0 +1,72 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package registry + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/otelcol" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/receiver" +) + +type Option func(factories *otelcol.Factories) + +var registry []Option + +// Options getter for registry. +func Options() []Option { + return registry +} + +// Reset sets registry to nil. +func Reset() { + registry = nil +} + +// Register adds the options to the registry. +func Register(options ...Option) { + registry = append(registry, options...) +} + +// WithReceiver sets the receiver factory in the factories. Will overwrite duplicate types. +func WithReceiver(factory receiver.Factory) Option { + return func(factories *otelcol.Factories) { + if factories.Receivers == nil { + factories.Receivers = make(map[component.Type]receiver.Factory) + } + factories.Receivers[factory.Type()] = factory + } +} + +// WithProcessor sets the processor factory in the factories. Will overwrite duplicate types. +func WithProcessor(factory processor.Factory) Option { + return func(factories *otelcol.Factories) { + if factories.Processors == nil { + factories.Processors = make(map[component.Type]processor.Factory) + } + factories.Processors[factory.Type()] = factory + } +} + +// WithExporter sets the exporter factory in the factories. Will overwrite duplicate types. +func WithExporter(factory exporter.Factory) Option { + return func(factories *otelcol.Factories) { + if factories.Exporters == nil { + factories.Exporters = make(map[component.Type]exporter.Factory) + } + factories.Exporters[factory.Type()] = factory + } +} + +// WithExtension sets the extension factory in the factories. Will overwrite duplicate types. +func WithExtension(factory extension.Factory) Option { + return func(factories *otelcol.Factories) { + if factories.Extensions == nil { + factories.Extensions = make(map[component.Type]extension.Factory) + } + factories.Extensions[factory.Type()] = factory + } +} diff --git a/service/registry/registry_test.go b/service/registry/registry_test.go new file mode 100644 index 0000000000..0a8f7317c0 --- /dev/null +++ b/service/registry/registry_test.go @@ -0,0 +1,40 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package registry + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/extension/extensiontest" + "go.opentelemetry.io/collector/otelcol" + "go.opentelemetry.io/collector/processor/processortest" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +func TestRegistry(t *testing.T) { + Register(WithReceiver(receivertest.NewNopFactory()), WithProcessor(processortest.NewNopFactory())) + Register(WithExporter(exportertest.NewNopFactory()), WithExtension(extensiontest.NewNopFactory())) + assert.Len(t, Options(), 4) + got := otelcol.Factories{} + for _, apply := range Options() { + apply(&got) + } + assert.NotNil(t, got.Receivers["nop"]) + assert.NotNil(t, got.Processors["nop"]) + assert.NotNil(t, got.Exporters["nop"]) + assert.NotNil(t, got.Extensions["nop"]) + assert.Len(t, got.Receivers, 1) + origReceiver := got.Receivers["nop"] + Register(WithReceiver(receivertest.NewNopFactory())) + for _, apply := range Options() { + apply(&got) + } + newReceiver := got.Receivers["nop"] + assert.NotEqual(t, origReceiver, newReceiver) + assert.Len(t, got.Receivers, 1) + Reset() + assert.Nil(t, Options()) +}