Skip to content

Commit

Permalink
Separate default components. (#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
jefchien authored Jun 1, 2023
1 parent 614759b commit 7a011ca
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 69 deletions.
78 changes: 9 additions & 69 deletions cmd/amazon-cloudwatch-agent/amazon-cloudwatch-agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,38 +28,20 @@ import (
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/wlog"
"github.com/kardianos/service"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awscloudwatchlogsexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsxrayexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/udplogreceiver"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/loggingexporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"golang.org/x/exp/maps"

configaws "github.com/aws/private-amazon-cloudwatch-agent-staging/cfg/aws"
"github.com/aws/private-amazon-cloudwatch-agent-staging/cfg/envconfig"
"github.com/aws/private-amazon-cloudwatch-agent-staging/cmd/amazon-cloudwatch-agent/internal"
"github.com/aws/private-amazon-cloudwatch-agent-staging/handlers/agentinfo"
"github.com/aws/private-amazon-cloudwatch-agent-staging/internal/util/collections"
"github.com/aws/private-amazon-cloudwatch-agent-staging/logs"
_ "github.com/aws/private-amazon-cloudwatch-agent-staging/plugins"
"github.com/aws/private-amazon-cloudwatch-agent-staging/plugins/outputs/cloudwatch"
"github.com/aws/private-amazon-cloudwatch-agent-staging/plugins/processors/ec2tagger"
"github.com/aws/private-amazon-cloudwatch-agent-staging/profiler"
"github.com/aws/private-amazon-cloudwatch-agent-staging/receiver/adapter"
"github.com/aws/private-amazon-cloudwatch-agent-staging/service/defaultcomponents"
"github.com/aws/private-amazon-cloudwatch-agent-staging/service/registry"
)

const (
Expand Down Expand Up @@ -375,63 +357,21 @@ func runAgent(ctx context.Context,
func components(telegrafConfig *config.Config) (otelcol.Factories, error) {
telegrafAdapter := adapter.NewAdapter(telegrafConfig)

factories := otelcol.Factories{}

receiverFactories := []receiver.Factory{
// OTel native receivers
awscontainerinsightreceiver.NewFactory(),
awsxrayreceiver.NewFactory(),
otlpreceiver.NewFactory(),
tcplogreceiver.NewFactory(),
udplogreceiver.NewFactory(),
}
adapterReceiverSet := collections.NewSet[string]()
// Adapted receivers from telegraf
for _, input := range telegrafConfig.Inputs {
adapterReceiverSet.Add(input.Config.Name)
}

for _, adapterReceiver := range maps.Keys(adapterReceiverSet) {
receiverFactories = append(receiverFactories, telegrafAdapter.NewReceiverFactory(adapterReceiver))
}

receivers, err := receiver.MakeFactoryMap(receiverFactories...)
factories, err := defaultcomponents.Factories()
if err != nil {
return factories, err
}

processors, err := processor.MakeFactoryMap(
batchprocessor.NewFactory(),
cumulativetodeltaprocessor.NewFactory(),
ec2tagger.NewFactory(),
transformprocessor.NewFactory(),
)
if err != nil {
return factories, err
}

exporters, err := exporter.MakeFactoryMap(
awsemfexporter.NewFactory(),
loggingexporter.NewFactory(),
cloudwatch.NewFactory(),
awscloudwatchlogsexporter.NewFactory(),
awsxrayexporter.NewFactory(),
)
if err != nil {
return factories, err
// Adapted receivers from telegraf
for _, input := range telegrafConfig.Inputs {
registry.Register(registry.WithReceiver(telegrafAdapter.NewReceiverFactory(input.Config.Name)))
}

extensions, err := extension.MakeFactoryMap()
if err != nil {
return factories, err
for _, apply := range registry.Options() {
apply(&factories)
}

factories = otelcol.Factories{
Receivers: receivers,
Processors: processors,
Exporters: exporters,
Extensions: extensions,
}
registry.Reset()

return factories, nil
}
Expand Down
67 changes: 67 additions & 0 deletions service/defaultcomponents/components.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package defaultcomponents

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awscloudwatchlogsexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsxrayexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsxrayreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcplogreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/udplogreceiver"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/loggingexporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"

"github.com/aws/private-amazon-cloudwatch-agent-staging/plugins/outputs/cloudwatch"
"github.com/aws/private-amazon-cloudwatch-agent-staging/plugins/processors/ec2tagger"
)

func Factories() (otelcol.Factories, error) {
var factories otelcol.Factories
var err error

if factories.Receivers, err = receiver.MakeFactoryMap(
awscontainerinsightreceiver.NewFactory(),
awsxrayreceiver.NewFactory(),
otlpreceiver.NewFactory(),
tcplogreceiver.NewFactory(),
udplogreceiver.NewFactory(),
); err != nil {
return otelcol.Factories{}, err
}

if factories.Processors, err = processor.MakeFactoryMap(
batchprocessor.NewFactory(),
cumulativetodeltaprocessor.NewFactory(),
ec2tagger.NewFactory(),
transformprocessor.NewFactory(),
); err != nil {
return otelcol.Factories{}, err
}

if factories.Exporters, err = exporter.MakeFactoryMap(
awscloudwatchlogsexporter.NewFactory(),
awsemfexporter.NewFactory(),
awsxrayexporter.NewFactory(),
cloudwatch.NewFactory(),
loggingexporter.NewFactory(),
); err != nil {
return otelcol.Factories{}, err
}

if factories.Extensions, err = extension.MakeFactoryMap(); err != nil {
return otelcol.Factories{}, err
}

return factories, nil
}
47 changes: 47 additions & 0 deletions service/defaultcomponents/components_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package defaultcomponents

import (
"testing"

"github.com/stretchr/testify/assert"
)

const (
receiversCount = 5
processorCount = 4
exportersCount = 5
extensionsCount = 0
)

func TestComponents(t *testing.T) {
factories, err := Factories()
assert.NoError(t, err)
receivers := factories.Receivers
assert.Len(t, receivers, receiversCount)
assert.NotNil(t, receivers["awscontainerinsightreceiver"])
assert.NotNil(t, receivers["awsxray"])
assert.NotNil(t, receivers["otlp"])
assert.NotNil(t, receivers["tcplog"])
assert.NotNil(t, receivers["udplog"])

processors := factories.Processors
assert.Len(t, processors, processorCount)
assert.NotNil(t, processors["batch"])
assert.NotNil(t, processors["cumulativetodelta"])
assert.NotNil(t, processors["ec2tagger"])
assert.NotNil(t, processors["transform"])

exporters := factories.Exporters
assert.Len(t, exporters, exportersCount)
assert.NotNil(t, exporters["awscloudwatchlogs"])
assert.NotNil(t, exporters["awsemf"])
assert.NotNil(t, exporters["awsxray"])
assert.NotNil(t, exporters["awscloudwatch"])
assert.NotNil(t, exporters["logging"])

extensions := factories.Extensions
assert.Len(t, extensions, extensionsCount)
}
72 changes: 72 additions & 0 deletions service/registry/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package registry

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
)

type Option func(factories *otelcol.Factories)

var registry []Option

// Options getter for registry.
func Options() []Option {
return registry
}

// Reset sets registry to nil.
func Reset() {
registry = nil
}

// Register adds the options to the registry.
func Register(options ...Option) {
registry = append(registry, options...)
}

// WithReceiver sets the receiver factory in the factories. Will overwrite duplicate types.
func WithReceiver(factory receiver.Factory) Option {
return func(factories *otelcol.Factories) {
if factories.Receivers == nil {
factories.Receivers = make(map[component.Type]receiver.Factory)
}
factories.Receivers[factory.Type()] = factory
}
}

// WithProcessor sets the processor factory in the factories. Will overwrite duplicate types.
func WithProcessor(factory processor.Factory) Option {
return func(factories *otelcol.Factories) {
if factories.Processors == nil {
factories.Processors = make(map[component.Type]processor.Factory)
}
factories.Processors[factory.Type()] = factory
}
}

// WithExporter sets the exporter factory in the factories. Will overwrite duplicate types.
func WithExporter(factory exporter.Factory) Option {
return func(factories *otelcol.Factories) {
if factories.Exporters == nil {
factories.Exporters = make(map[component.Type]exporter.Factory)
}
factories.Exporters[factory.Type()] = factory
}
}

// WithExtension sets the extension factory in the factories. Will overwrite duplicate types.
func WithExtension(factory extension.Factory) Option {
return func(factories *otelcol.Factories) {
if factories.Extensions == nil {
factories.Extensions = make(map[component.Type]extension.Factory)
}
factories.Extensions[factory.Type()] = factory
}
}
40 changes: 40 additions & 0 deletions service/registry/registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package registry

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/collector/receiver/receivertest"
)

func TestRegistry(t *testing.T) {
Register(WithReceiver(receivertest.NewNopFactory()), WithProcessor(processortest.NewNopFactory()))
Register(WithExporter(exportertest.NewNopFactory()), WithExtension(extensiontest.NewNopFactory()))
assert.Len(t, Options(), 4)
got := otelcol.Factories{}
for _, apply := range Options() {
apply(&got)
}
assert.NotNil(t, got.Receivers["nop"])
assert.NotNil(t, got.Processors["nop"])
assert.NotNil(t, got.Exporters["nop"])
assert.NotNil(t, got.Extensions["nop"])
assert.Len(t, got.Receivers, 1)
origReceiver := got.Receivers["nop"]
Register(WithReceiver(receivertest.NewNopFactory()))
for _, apply := range Options() {
apply(&got)
}
newReceiver := got.Receivers["nop"]
assert.NotEqual(t, origReceiver, newReceiver)
assert.Len(t, got.Receivers, 1)
Reset()
assert.Nil(t, Options())
}

0 comments on commit 7a011ca

Please sign in to comment.