From f3389998889b4da04d9616153d0d12250233b6f2 Mon Sep 17 00:00:00 2001 From: Jeffrey Chien Date: Tue, 24 Oct 2023 16:19:04 -0400 Subject: [PATCH] Replace uses of agentinfo with agenthealth. (#925) --- cfg/aws/credentials.go | 4 +- cfg/envconfig/envconfig.go | 17 ++++++ cfg/envconfig/envconfig_test.go | 10 ++++ extension/agenthealth/extension.go | 4 +- extension/agenthealth/extension_test.go | 6 +- extension/agenthealth/factory.go | 2 +- .../agenthealth/handler/stats/agent/agent.go | 3 + .../handler/stats/provider/flag.go | 60 +++++++++++++++---- .../handler/stats/provider/flag_test.go | 9 +++ go.mod | 2 +- go.sum | 4 +- plugins/outputs/cloudwatch/cloudwatch.go | 18 +++--- plugins/outputs/cloudwatch/cloudwatch_test.go | 46 +++++++++++++- plugins/outputs/cloudwatch/config.go | 2 + plugins/outputs/cloudwatch/factory.go | 13 ++-- .../outputs/cloudwatchlogs/cloudwatchlogs.go | 31 ++++++++-- plugins/outputs/cloudwatchlogs/pusher.go | 7 +-- plugins/outputs/cloudwatchlogs/pusher_test.go | 4 +- .../ec2tagger/ec2metadataprovider.go | 8 +-- translator/config/envconst.go | 26 ++++---- translator/util/ec2util/ec2util.go | 6 +- 21 files changed, 211 insertions(+), 71 deletions(-) diff --git a/cfg/aws/credentials.go b/cfg/aws/credentials.go index 34ab40f54a..34ca454904 100644 --- a/cfg/aws/credentials.go +++ b/cfg/aws/credentials.go @@ -19,7 +19,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sts" - "github.com/aws/amazon-cloudwatch-agent/handlers/agentinfo" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/provider" ) const ( @@ -116,7 +116,7 @@ func getSession(config *aws.Config) *session.Session { if len(found) > 0 { log.Printf("W! Unused shared config file(s) found: %v. If you would like to use them, "+ "please update your common-config.toml.", found) - agentinfo.RecordSharedConfigFallback() + provider.GetFlagsStats().SetFlag(provider.FlagSharedConfigFallback) } } return ses diff --git a/cfg/envconfig/envconfig.go b/cfg/envconfig/envconfig.go index 555b5ac1a4..67e85a8021 100644 --- a/cfg/envconfig/envconfig.go +++ b/cfg/envconfig/envconfig.go @@ -20,6 +20,19 @@ const ( CWAGENT_LOG_LEVEL = "CWAGENT_LOG_LEVEL" CWAGENT_USAGE_DATA = "CWAGENT_USAGE_DATA" IMDS_NUMBER_RETRY = "IMDS_NUMBER_RETRY" + RunInContainer = "RUN_IN_CONTAINER" + RunInAWS = "RUN_IN_AWS" + RunWithIRSA = "RUN_WITH_IRSA" + UseDefaultConfig = "USE_DEFAULT_CONFIG" + HostName = "HOST_NAME" + PodName = "POD_NAME" + HostIP = "HOST_IP" + CWConfigContent = "CW_CONFIG_CONTENT" +) + +const ( + // TrueValue is the expected string set on an environment variable to indicate true. + TrueValue = "True" ) var ( @@ -40,3 +53,7 @@ func IsUsageDataEnabled() bool { }) return usageDataEnabled } + +func IsRunningInContainer() bool { + return os.Getenv(RunInContainer) == TrueValue +} diff --git a/cfg/envconfig/envconfig_test.go b/cfg/envconfig/envconfig_test.go index d136bbb114..a69b4bf74b 100644 --- a/cfg/envconfig/envconfig_test.go +++ b/cfg/envconfig/envconfig_test.go @@ -21,3 +21,13 @@ func TestIsUsageDataEnabled(t *testing.T) { t.Setenv(CWAGENT_USAGE_DATA, "FALSE") assert.False(t, getUsageDataEnabled()) } + +func TestIsRunningInContainer(t *testing.T) { + assert.False(t, IsRunningInContainer()) + + t.Setenv(RunInContainer, "TRUE") + assert.False(t, IsRunningInContainer()) + + t.Setenv(RunInContainer, TrueValue) + assert.True(t, IsRunningInContainer()) +} diff --git a/extension/agenthealth/extension.go b/extension/agenthealth/extension.go index 26cacc6443..14ab08eb57 100644 --- a/extension/agenthealth/extension.go +++ b/extension/agenthealth/extension.go @@ -32,6 +32,6 @@ func (ah *agentHealth) Handlers() ([]awsmiddleware.RequestHandler, []awsmiddlewa return requestHandlers, responseHandlers } -func newAgentHealth(logger *zap.Logger, cfg *Config) (*agentHealth, error) { - return &agentHealth{logger: logger, cfg: cfg}, nil +func NewAgentHealth(logger *zap.Logger, cfg *Config) awsmiddleware.Extension { + return &agentHealth{logger: logger, cfg: cfg} } diff --git a/extension/agenthealth/extension_test.go b/extension/agenthealth/extension_test.go index 9cd98ddde1..504dc8c50e 100644 --- a/extension/agenthealth/extension_test.go +++ b/extension/agenthealth/extension_test.go @@ -14,8 +14,8 @@ import ( func TestExtension(t *testing.T) { ctx := context.Background() - extension, err := newAgentHealth(zap.NewNop(), &Config{IsUsageDataEnabled: true}) - assert.NoError(t, err) + cfg := &Config{IsUsageDataEnabled: true} + extension := NewAgentHealth(zap.NewNop(), cfg) assert.NotNil(t, extension) assert.NoError(t, extension.Start(ctx, componenttest.NewNopHost())) requestHandlers, responseHandlers := extension.Handlers() @@ -23,7 +23,7 @@ func TestExtension(t *testing.T) { assert.Len(t, requestHandlers, 3) // client stats assert.Len(t, responseHandlers, 1) - extension.cfg.IsUsageDataEnabled = false + cfg.IsUsageDataEnabled = false requestHandlers, responseHandlers = extension.Handlers() // user agent assert.Len(t, requestHandlers, 1) diff --git a/extension/agenthealth/factory.go b/extension/agenthealth/factory.go index 47bca97016..fe60efec6f 100644 --- a/extension/agenthealth/factory.go +++ b/extension/agenthealth/factory.go @@ -35,5 +35,5 @@ func createDefaultConfig() component.Config { } func createExtension(_ context.Context, settings extension.CreateSettings, cfg component.Config) (extension.Extension, error) { - return newAgentHealth(settings.Logger, cfg.(*Config)) + return NewAgentHealth(settings.Logger, cfg.(*Config)), nil } diff --git a/extension/agenthealth/handler/stats/agent/agent.go b/extension/agenthealth/handler/stats/agent/agent.go index d6982777b6..abe77f5df0 100644 --- a/extension/agenthealth/handler/stats/agent/agent.go +++ b/extension/agenthealth/handler/stats/agent/agent.go @@ -26,6 +26,9 @@ type Stats struct { ImdsFallbackSucceed *int `json:"ifs,omitempty"` AppSignals *int `json:"as,omitempty"` EnhancedContainerInsights *int `json:"eci,omitempty"` + RunningInContainer *int `json:"ric,omitempty"` + RegionType *string `json:"rt,omitempty"` + Mode *string `json:"m,omitempty"` } // Merge the other Stats into the current. If the field is not nil, diff --git a/extension/agenthealth/handler/stats/provider/flag.go b/extension/agenthealth/handler/stats/provider/flag.go index 30152373a3..c90df8fd1a 100644 --- a/extension/agenthealth/handler/stats/provider/flag.go +++ b/extension/agenthealth/handler/stats/provider/flag.go @@ -9,6 +9,7 @@ import ( "github.com/aws/aws-sdk-go/aws" + "github.com/aws/amazon-cloudwatch-agent/cfg/envconfig" "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" ) @@ -16,13 +17,21 @@ const ( flagGetInterval = 5 * time.Minute ) -type Flag int +type BoolFlag int const ( - FlagIMDSFallbackSucceed = iota + FlagIMDSFallbackSucceed BoolFlag = iota FlagSharedConfigFallback FlagAppSignal FlagEnhancedContainerInsights + FlagRunningInContainer +) + +type StringFlag int + +const ( + FlagMode StringFlag = iota + FlagRegionType ) var ( @@ -32,7 +41,8 @@ var ( type FlagStats interface { agent.StatsProvider - SetFlag(flag Flag) + SetFlag(flag BoolFlag) + SetFlagWithValue(flag StringFlag, value string) } type flagStats struct { @@ -47,31 +57,61 @@ func (p *flagStats) update() { p.mu.Lock() defer p.mu.Unlock() p.stats = agent.Stats{ - ImdsFallbackSucceed: p.getFlag(FlagIMDSFallbackSucceed), - SharedConfigFallback: p.getFlag(FlagSharedConfigFallback), - AppSignals: p.getFlag(FlagAppSignal), - EnhancedContainerInsights: p.getFlag(FlagEnhancedContainerInsights), + ImdsFallbackSucceed: p.getIntFlag(FlagIMDSFallbackSucceed, false), + SharedConfigFallback: p.getIntFlag(FlagSharedConfigFallback, false), + AppSignals: p.getIntFlag(FlagAppSignal, false), + EnhancedContainerInsights: p.getIntFlag(FlagEnhancedContainerInsights, false), + RunningInContainer: p.getIntFlag(FlagRunningInContainer, true), + Mode: p.getStringFlag(FlagMode), + RegionType: p.getStringFlag(FlagRegionType), } } -func (p *flagStats) getFlag(flag Flag) *int { +func (p *flagStats) getIntFlag(flag BoolFlag, missingAsZero bool) *int { if _, ok := p.flags.Load(flag); ok { return aws.Int(1) } + if missingAsZero { + return aws.Int(0) + } return nil } -func (p *flagStats) SetFlag(flag Flag) { +func (p *flagStats) getStringFlag(flag StringFlag) *string { + value, ok := p.flags.Load(flag) + if !ok { + return nil + } + var str string + str, ok = value.(string) + if !ok { + return nil + } + return aws.String(str) +} + +func (p *flagStats) SetFlag(flag BoolFlag) { if _, ok := p.flags.Load(flag); !ok { p.flags.Store(flag, true) p.update() } } +func (p *flagStats) SetFlagWithValue(flag StringFlag, value string) { + if _, ok := p.flags.Load(flag); !ok { + p.flags.Store(flag, value) + p.update() + } +} + func newFlagStats(interval time.Duration) *flagStats { - return &flagStats{ + stats := &flagStats{ intervalStats: newIntervalStats(interval), } + if envconfig.IsRunningInContainer() { + stats.SetFlag(FlagRunningInContainer) + } + return stats } func GetFlagsStats() FlagStats { diff --git a/extension/agenthealth/handler/stats/provider/flag_test.go b/extension/agenthealth/handler/stats/provider/flag_test.go index b3044c0780..f4890a7b9e 100644 --- a/extension/agenthealth/handler/stats/provider/flag_test.go +++ b/extension/agenthealth/handler/stats/provider/flag_test.go @@ -8,13 +8,18 @@ import ( "time" "github.com/stretchr/testify/assert" + + "github.com/aws/amazon-cloudwatch-agent/cfg/envconfig" ) func TestFlagStats(t *testing.T) { + t.Setenv(envconfig.RunInContainer, envconfig.TrueValue) provider := newFlagStats(time.Microsecond) got := provider.stats assert.Nil(t, got.ImdsFallbackSucceed) assert.Nil(t, got.SharedConfigFallback) + assert.NotNil(t, got.RunningInContainer) + assert.Equal(t, 1, *got.RunningInContainer) provider.SetFlag(FlagIMDSFallbackSucceed) assert.Nil(t, got.ImdsFallbackSucceed) got = provider.stats @@ -25,4 +30,8 @@ func TestFlagStats(t *testing.T) { got = provider.stats assert.NotNil(t, got.SharedConfigFallback) assert.Equal(t, 1, *got.SharedConfigFallback) + provider.SetFlagWithValue(FlagMode, "test") + got = provider.stats + assert.NotNil(t, got.Mode) + assert.Equal(t, "test", *got.Mode) } diff --git a/go.mod b/go.mod index 026eb11c18..9249abc62b 100644 --- a/go.mod +++ b/go.mod @@ -85,7 +85,7 @@ replace github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api require ( github.com/BurntSushi/toml v1.3.2 github.com/Jeffail/gabs v1.4.0 - github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware v0.0.0-20231023161526-9bd8785e9c2e + github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware v0.0.0-20231023230448-f645697bf350 github.com/aws/aws-sdk-go v1.45.24 github.com/aws/aws-sdk-go-v2 v1.21.2 github.com/aws/aws-sdk-go-v2/config v1.18.25 diff --git a/go.sum b/go.sum index 2f1fd27cd5..9eb766c274 100644 --- a/go.sum +++ b/go.sum @@ -146,8 +146,8 @@ github.com/amazon-contributing/opentelemetry-collector-contrib/exporter/awsemfex github.com/amazon-contributing/opentelemetry-collector-contrib/exporter/awsemfexporter v0.0.0-20231023161526-9bd8785e9c2e/go.mod h1:UAXcRSojI8I0Kb9iS9a2v7J/iPrQ1loJIsBprSaVdFo= github.com/amazon-contributing/opentelemetry-collector-contrib/exporter/awsxrayexporter v0.0.0-20231023161526-9bd8785e9c2e h1:wwkcWoKzZM1S92tSxSRxraQcUsJk+CS8UsKUR5Wcgow= github.com/amazon-contributing/opentelemetry-collector-contrib/exporter/awsxrayexporter v0.0.0-20231023161526-9bd8785e9c2e/go.mod h1:cr4dmBlfnMVYT+gyKUAKh39zQu5u/UAukxQj15MdZ18= -github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware v0.0.0-20231023161526-9bd8785e9c2e h1:tlA6NWgE+cKvBErHLl+oDAYG+oJ3TJV+N6jeJnYr5iI= -github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware v0.0.0-20231023161526-9bd8785e9c2e/go.mod h1:uOQa5/9Jle9VADEdWCXL4AbJr35NJQil30tapcTHQlw= +github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware v0.0.0-20231023230448-f645697bf350 h1:+75XAqf0Og8cshAdekRcqWf3v38Uw34XJRFbul6jbv0= +github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware v0.0.0-20231023230448-f645697bf350/go.mod h1:uOQa5/9Jle9VADEdWCXL4AbJr35NJQil30tapcTHQlw= github.com/amazon-contributing/opentelemetry-collector-contrib/internal/aws/awsutil v0.0.0-20231023161526-9bd8785e9c2e h1:uQk5BvFVNMNCQswMx3gilWwPiqikvWx3BBFwMs85Stw= github.com/amazon-contributing/opentelemetry-collector-contrib/internal/aws/awsutil v0.0.0-20231023161526-9bd8785e9c2e/go.mod h1:9iAsO2SC8NIsa8/xCmC2Pj4MZPmYdvm+1/n89M74JS4= github.com/amazon-contributing/opentelemetry-collector-contrib/internal/aws/containerinsight v0.0.0-20231023161526-9bd8785e9c2e h1:FvMVzM0uAQmE1lPdKdmSCPpZnE0O3yg14J2oMwrBXt0= diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go index 69cb5b96c9..e936a72622 100644 --- a/plugins/outputs/cloudwatch/cloudwatch.go +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/cloudwatch" @@ -22,11 +23,12 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" "golang.org/x/exp/maps" configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/provider" "github.com/aws/amazon-cloudwatch-agent/handlers" - "github.com/aws/amazon-cloudwatch-agent/handlers/agentinfo" "github.com/aws/amazon-cloudwatch-agent/internal/publisher" "github.com/aws/amazon-cloudwatch-agent/internal/retryer" "github.com/aws/amazon-cloudwatch-agent/internal/util/collections" @@ -54,6 +56,7 @@ const ( type CloudWatch struct { config *Config + logger *zap.Logger svc cloudwatchiface.CloudWatchAPI // todo: may want to increase the size of the chan since the type changed. // 1 telegraf Metric could have many Fields. @@ -69,7 +72,6 @@ type CloudWatch struct { aggregator Aggregator aggregatorShutdownChan chan struct{} aggregatorWaitGroup sync.WaitGroup - agentInfo agentinfo.AgentInfo lastRequestBytes int } @@ -81,7 +83,6 @@ func (c *CloudWatch) Capabilities() consumer.Capabilities { } func (c *CloudWatch) Start(_ context.Context, host component.Host) error { - c.agentInfo = agentinfo.New("", c.config.RegionType, c.config.Mode) c.publisher, _ = publisher.NewPublisher( publisher.NewNonBlockingFifoQueue(metricChanBufferSize), maxConcurrentPublisher, @@ -96,6 +97,8 @@ func (c *CloudWatch) Start(_ context.Context, host component.Host) error { Filename: c.config.SharedCredentialFilename, Token: c.config.Token, } + provider.GetFlagsStats().SetFlagWithValue(provider.FlagRegionType, c.config.RegionType) + provider.GetFlagsStats().SetFlagWithValue(provider.FlagMode, c.config.Mode) configProvider := credentialConfig.Credentials() logger := models.NewLogger("outputs", "cloudwatch", "") logThrottleRetryer := retryer.NewLogThrottleRetryer(logger) @@ -108,8 +111,9 @@ func (c *CloudWatch) Start(_ context.Context, host component.Host) error { Logger: configaws.SDKLogger{}, }) svc.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{opPutLogEvents, opPutMetricData})) - svc.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("User-Agent", c.agentInfo.UserAgent())) - svc.Handlers.Build.PushBackNamed(handlers.NewDynamicCustomHeaderHandler("X-Amz-Agent-Stats", c.agentInfo.StatsHeader)) + if c.config.MiddlewareID != nil { + awsmiddleware.TryConfigure(c.logger, host, *c.config.MiddlewareID, awsmiddleware.SDKv1(&svc.Handlers)) + } //Format unique roll up list c.config.RollupDimensions = GetUniqueRollupList(c.config.RollupDimensions) c.svc = svc @@ -267,7 +271,7 @@ func (c *CloudWatch) publish() { if !bufferFullOccurred { // Set to true so this only happens once per push. bufferFullOccurred = true - // Keep interval above above 1 second. + // Keep interval above 1 second. if currentInterval.Seconds() > 1 { currentInterval /= 2 if currentInterval.Seconds() < 1 { @@ -341,9 +345,7 @@ func (c *CloudWatch) WriteToCloudWatch(req interface{}) { } var err error for i := 0; i < defaultRetryCount; i++ { - startTime := time.Now() _, err = c.svc.PutMetricData(params) - c.agentInfo.RecordOpData(time.Since(startTime), c.lastRequestBytes, err) if err != nil { awsErr, ok := err.(awserr.Error) if !ok { diff --git a/plugins/outputs/cloudwatch/cloudwatch_test.go b/plugins/outputs/cloudwatch/cloudwatch_test.go index d95d68cef8..79aa8a5c25 100644 --- a/plugins/outputs/cloudwatch/cloudwatch_test.go +++ b/plugins/outputs/cloudwatch/cloudwatch_test.go @@ -7,11 +7,14 @@ import ( "context" "log" "math" + "net/http" + "net/http/httptest" "strconv" "strings" "testing" "time" + "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/cloudwatch" @@ -21,8 +24,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" - "github.com/aws/amazon-cloudwatch-agent/handlers/agentinfo" "github.com/aws/amazon-cloudwatch-agent/internal/publisher" "github.com/aws/amazon-cloudwatch-agent/metric/distribution" ) @@ -382,7 +386,6 @@ func newCloudWatchClient( MaxDatumsPerCall: defaultMaxDatumsPerCall, MaxValuesPerDatum: defaultMaxValuesPerDatum, }, - agentInfo: agentinfo.New("", "", ""), } cloudwatch.startRoutines() return cloudwatch @@ -495,6 +498,45 @@ func TestPublish(t *testing.T) { cw.Shutdown(ctx) } +func TestMiddleware(t *testing.T) { + t.Setenv("AWS_ACCESS_KEY_ID", "test") + t.Setenv("AWS_SECRET_ACCESS_KEY", "test") + id := component.NewID("test") + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + cw := &CloudWatch{ + config: &Config{ + Region: "test-region", + Namespace: "test-namespace", + ForceFlushInterval: time.Second, + EndpointOverride: server.URL, + MiddlewareID: &id, + }, + logger: zap.NewNop(), + } + ctx := context.Background() + handler := new(awsmiddleware.MockHandler) + handler.On("ID").Return("test") + handler.On("Position").Return(awsmiddleware.After) + handler.On("HandleRequest", mock.Anything, mock.Anything) + handler.On("HandleResponse", mock.Anything, mock.Anything) + middleware := new(awsmiddleware.MockMiddlewareExtension) + middleware.On("Handlers").Return([]awsmiddleware.RequestHandler{handler}, []awsmiddleware.ResponseHandler{handler}) + extensions := map[component.ID]component.Component{id: middleware} + host := new(awsmiddleware.MockExtensionsHost) + host.On("GetExtensions").Return(extensions) + assert.NoError(t, cw.Start(ctx, host)) + // Expect 1500 metrics batched in 2 API calls. + pmetrics := createTestMetrics(1500, 1, 1, "B/s") + assert.NoError(t, cw.ConsumeMetrics(ctx, pmetrics)) + time.Sleep(2*time.Second + 2*cw.config.ForceFlushInterval) + handler.AssertCalled(t, "HandleRequest", mock.Anything, mock.Anything) + handler.AssertCalled(t, "HandleResponse", mock.Anything, mock.Anything) + require.NoError(t, cw.Shutdown(ctx)) +} + func TestBackoffRetries(t *testing.T) { c := &CloudWatch{} sleeps := []time.Duration{ diff --git a/plugins/outputs/cloudwatch/config.go b/plugins/outputs/cloudwatch/config.go index 9c324529bb..43469bc9d5 100644 --- a/plugins/outputs/cloudwatch/config.go +++ b/plugins/outputs/cloudwatch/config.go @@ -35,6 +35,8 @@ type Config struct { // "Enabled" - A boolean field to enable/disable this option. Default is `false`. // If enabled, all the resource attributes will be converted to metric labels by default. ResourceToTelemetrySettings resourcetotelemetry.Settings `mapstructure:"resource_to_telemetry_conversion"` + // MiddlewareID is an ID for an extension that can be used to configure the AWS client. + MiddlewareID *component.ID `mapstructure:"middleware,omitempty"` } var _ component.Config = (*Config)(nil) diff --git a/plugins/outputs/cloudwatch/factory.go b/plugins/outputs/cloudwatch/factory.go index 1e5048ed70..f1a554b703 100644 --- a/plugins/outputs/cloudwatch/factory.go +++ b/plugins/outputs/cloudwatch/factory.go @@ -43,20 +43,21 @@ func createMetricsExporter( settings exporter.CreateSettings, config component.Config, ) (exporter.Metrics, error) { - exp := &CloudWatch{ + cw := &CloudWatch{ config: config.(*Config), + logger: settings.Logger, } - exporter, err := exporterhelper.NewMetricsExporter( + exp, err := exporterhelper.NewMetricsExporter( ctx, settings, config, - exp.ConsumeMetrics, - exporterhelper.WithStart(exp.Start), - exporterhelper.WithShutdown(exp.Shutdown), + cw.ConsumeMetrics, + exporterhelper.WithStart(cw.Start), + exporterhelper.WithShutdown(cw.Shutdown), ) if err != nil { return nil, err } return resourcetotelemetry.WrapMetricsExporter( - config.(*Config).ResourceToTelemetrySettings, exporter), nil + config.(*Config).ResourceToTelemetrySettings, exp), nil } diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go index 6c6794ce0c..071e96af98 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go @@ -6,21 +6,25 @@ package cloudwatchlogs import ( "encoding/json" "fmt" + "regexp" "strings" "sync" "time" + "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" + "go.uber.org/zap" configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws" "github.com/aws/amazon-cloudwatch-agent/cfg/envconfig" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/provider" "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/useragent" "github.com/aws/amazon-cloudwatch-agent/handlers" - "github.com/aws/amazon-cloudwatch-agent/handlers/agentinfo" "github.com/aws/amazon-cloudwatch-agent/internal" "github.com/aws/amazon-cloudwatch-agent/internal/retryer" "github.com/aws/amazon-cloudwatch-agent/logs" @@ -43,6 +47,10 @@ const ( attributesInFields = "attributesInFields" ) +var ( + containerInsightsRegexp = regexp.MustCompile("^/aws/.*containerinsights/.*/(performance|prometheus)$") +) + type CloudWatchLogs struct { Region string `toml:"region"` RegionType string `toml:"region_type"` @@ -69,6 +77,7 @@ type CloudWatchLogs struct { pusherStopChan chan struct{} pusherWaitGroup sync.WaitGroup cwDests map[Target]*cwDest + middleware awsmiddleware.Middleware } func (c *CloudWatchLogs) Connect() error { @@ -137,12 +146,18 @@ func (c *CloudWatchLogs) getDest(t Target) *cwDest { Logger: configaws.SDKLogger{}, }, ) - agentInfo := agentinfo.New(t.Group, c.RegionType, c.Mode) + provider.GetFlagsStats().SetFlagWithValue(provider.FlagRegionType, c.RegionType) + provider.GetFlagsStats().SetFlagWithValue(provider.FlagMode, c.Mode) + if containerInsightsRegexp.MatchString(t.Group) { + useragent.Get().SetContainerInsightsFlag() + } client.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{"PutLogEvents"})) - client.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("User-Agent", useragent.Get().Header(envconfig.IsUsageDataEnabled()))) - client.Handlers.Build.PushBackNamed(handlers.NewDynamicCustomHeaderHandler("X-Amz-Agent-Stats", agentInfo.StatsHeader)) - - pusher := NewPusher(t, client, c.ForceFlushInterval.Duration, maxRetryTimeout, c.Log, c.pusherStopChan, &c.pusherWaitGroup, agentInfo) + if c.middleware != nil { + if err := awsmiddleware.NewConfigurer(c.middleware.Handlers()).Configure(awsmiddleware.SDKv1(&client.Handlers)); err != nil { + c.Log.Errorf("Unable to configure middleware on cloudwatch logs client: %v", err) + } + } + pusher := NewPusher(t, client, c.ForceFlushInterval.Duration, maxRetryTimeout, c.Log, c.pusherStopChan, &c.pusherWaitGroup) cwd := &cwDest{pusher: pusher, retryer: logThrottleRetryer} c.cwDests[t] = cwd return cwd @@ -379,6 +394,10 @@ func init() { ForceFlushInterval: internal.Duration{Duration: defaultFlushTimeout}, pusherStopChan: make(chan struct{}), cwDests: make(map[Target]*cwDest), + middleware: agenthealth.NewAgentHealth( + zap.NewNop(), + &agenthealth.Config{IsUsageDataEnabled: envconfig.IsUsageDataEnabled()}, + ), } }) } diff --git a/plugins/outputs/cloudwatchlogs/pusher.go b/plugins/outputs/cloudwatchlogs/pusher.go index 83b8f55282..1651f18cdb 100644 --- a/plugins/outputs/cloudwatchlogs/pusher.go +++ b/plugins/outputs/cloudwatchlogs/pusher.go @@ -14,7 +14,6 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/influxdata/telegraf" - "github.com/aws/amazon-cloudwatch-agent/handlers/agentinfo" "github.com/aws/amazon-cloudwatch-agent/logs" "github.com/aws/amazon-cloudwatch-agent/profiler" ) @@ -60,10 +59,9 @@ type pusher struct { initNonBlockingChOnce sync.Once startNonBlockCh chan struct{} wg *sync.WaitGroup - agentInfo agentinfo.AgentInfo } -func NewPusher(target Target, service CloudWatchLogsService, flushTimeout time.Duration, retryDuration time.Duration, logger telegraf.Logger, stop <-chan struct{}, wg *sync.WaitGroup, agentInfo agentinfo.AgentInfo) *pusher { +func NewPusher(target Target, service CloudWatchLogsService, flushTimeout time.Duration, retryDuration time.Duration, logger telegraf.Logger, stop <-chan struct{}, wg *sync.WaitGroup) *pusher { p := &pusher{ Target: target, Service: service, @@ -76,7 +74,6 @@ func NewPusher(target Target, service CloudWatchLogsService, flushTimeout time.D stop: stop, startNonBlockCh: make(chan struct{}), wg: wg, - agentInfo: agentInfo, } p.putRetentionPolicy() p.wg.Add(1) @@ -232,9 +229,7 @@ func (p *pusher) send() { retryCount := 0 for { input.SequenceToken = p.sequenceToken - opStartTime := time.Now() output, err := p.Service.PutLogEvents(input) - p.agentInfo.RecordOpData(time.Since(opStartTime), p.bufferredSize, err) if err == nil { if output.NextSequenceToken != nil { p.sequenceToken = output.NextSequenceToken diff --git a/plugins/outputs/cloudwatchlogs/pusher_test.go b/plugins/outputs/cloudwatchlogs/pusher_test.go index 7695a98fbe..4fc93c8ca2 100644 --- a/plugins/outputs/cloudwatchlogs/pusher_test.go +++ b/plugins/outputs/cloudwatchlogs/pusher_test.go @@ -20,8 +20,6 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/influxdata/telegraf/models" "github.com/stretchr/testify/require" - - "github.com/aws/amazon-cloudwatch-agent/handlers/agentinfo" ) var wg sync.WaitGroup @@ -765,6 +763,6 @@ func TestResendWouldStopAfterExhaustedRetries(t *testing.T) { func testPreparation(retention int, s *svcMock, flushTimeout time.Duration, retryDuration time.Duration) (chan struct{}, *pusher) { stop := make(chan struct{}) - p := NewPusher(Target{"G", "S", retention}, s, flushTimeout, retryDuration, models.NewLogger("cloudwatchlogs", "test", ""), stop, &wg, agentinfo.New("", "", "")) + p := NewPusher(Target{"G", "S", retention}, s, flushTimeout, retryDuration, models.NewLogger("cloudwatchlogs", "test", ""), stop, &wg) return stop, p } diff --git a/plugins/processors/ec2tagger/ec2metadataprovider.go b/plugins/processors/ec2tagger/ec2metadataprovider.go index 04f66a1c9e..b90258605a 100644 --- a/plugins/processors/ec2tagger/ec2metadataprovider.go +++ b/plugins/processors/ec2tagger/ec2metadataprovider.go @@ -12,7 +12,7 @@ import ( "github.com/aws/aws-sdk-go/aws/ec2metadata" configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws" - "github.com/aws/amazon-cloudwatch-agent/handlers/agentinfo" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/provider" "github.com/aws/amazon-cloudwatch-agent/internal/retryer" ) @@ -52,7 +52,7 @@ func (c *metadataClient) InstanceID(ctx context.Context) (string, error) { log.Printf("D! could not get instance id without imds v1 fallback enable thus enable fallback") instanceInner, errorInner := c.metadataFallbackEnabled.GetMetadataWithContext(ctx, "instance-id") if errorInner == nil { - agentinfo.SetImdsFallbackSucceed() + provider.GetFlagsStats().SetFlag(provider.FlagIMDSFallbackSucceed) } return instanceInner, errorInner } @@ -65,7 +65,7 @@ func (c *metadataClient) Hostname(ctx context.Context) (string, error) { log.Printf("D! could not get hostname without imds v1 fallback enable thus enable fallback") hostnameInner, errorInner := c.metadataFallbackEnabled.GetMetadataWithContext(ctx, "hostname") if errorInner == nil { - agentinfo.SetImdsFallbackSucceed() + provider.GetFlagsStats().SetFlag(provider.FlagIMDSFallbackSucceed) } return hostnameInner, errorInner } @@ -78,7 +78,7 @@ func (c *metadataClient) Get(ctx context.Context) (ec2metadata.EC2InstanceIdenti log.Printf("D! could not get instance document without imds v1 fallback enable thus enable fallback") instanceDocumentInner, errorInner := c.metadataFallbackEnabled.GetInstanceIdentityDocumentWithContext(ctx) if errorInner == nil { - agentinfo.SetImdsFallbackSucceed() + provider.GetFlagsStats().SetFlag(provider.FlagIMDSFallbackSucceed) } return instanceDocumentInner, errorInner } diff --git a/translator/config/envconst.go b/translator/config/envconst.go index dd757b301b..ed1785c138 100644 --- a/translator/config/envconst.go +++ b/translator/config/envconst.go @@ -3,17 +3,19 @@ package config +import "github.com/aws/amazon-cloudwatch-agent/cfg/envconfig" + const ( - RUN_IN_CONTAINER = "RUN_IN_CONTAINER" - RUN_IN_CONTAINER_TRUE = "True" - RUN_IN_AWS = "RUN_IN_AWS" - RUN_IN_AWS_TRUE = "True" - RUN_WITH_IRSA = "RUN_WITH_IRSA" - RUN_WITH_IRSA_TRUE = "True" - USE_DEFAULT_CONFIG = "USE_DEFAULT_CONFIG" - USE_DEFAULT_CONFIG_TRUE = "True" - HOST_NAME = "HOST_NAME" - POD_NAME = "POD_NAME" - HOST_IP = "HOST_IP" - CWConfigContent = "CW_CONFIG_CONTENT" + RUN_IN_CONTAINER = envconfig.RunInContainer + RUN_IN_CONTAINER_TRUE = envconfig.TrueValue + RUN_IN_AWS = envconfig.RunInAWS + RUN_IN_AWS_TRUE = envconfig.TrueValue + RUN_WITH_IRSA = envconfig.RunWithIRSA + RUN_WITH_IRSA_TRUE = envconfig.TrueValue + USE_DEFAULT_CONFIG = envconfig.UseDefaultConfig + USE_DEFAULT_CONFIG_TRUE = envconfig.TrueValue + HOST_NAME = envconfig.HostName + POD_NAME = envconfig.PodName + HOST_IP = envconfig.HostIP + CWConfigContent = envconfig.CWConfigContent ) diff --git a/translator/util/ec2util/ec2util.go b/translator/util/ec2util/ec2util.go index 6b3ce5e37b..5da0790d23 100644 --- a/translator/util/ec2util/ec2util.go +++ b/translator/util/ec2util/ec2util.go @@ -14,7 +14,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws" - "github.com/aws/amazon-cloudwatch-agent/handlers/agentinfo" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/provider" "github.com/aws/amazon-cloudwatch-agent/internal/retryer" "github.com/aws/amazon-cloudwatch-agent/translator/config" "github.com/aws/amazon-cloudwatch-agent/translator/context" @@ -116,7 +116,7 @@ func (e *ec2Util) deriveEC2MetadataFromIMDS() error { hostnameInner, errInner := mdEnableFallback.GetMetadata("hostname") if errInner == nil { e.Hostname = hostnameInner - agentinfo.SetImdsFallbackSucceed() + provider.GetFlagsStats().SetFlag(provider.FlagIMDSFallbackSucceed) } else { fmt.Println("E! [EC2] Fetch hostname from EC2 metadata fail:", errInner) } @@ -136,7 +136,7 @@ func (e *ec2Util) deriveEC2MetadataFromIMDS() error { e.AccountID = instanceIdentityDocumentInner.AccountID e.PrivateIP = instanceIdentityDocumentInner.PrivateIP e.InstanceID = instanceIdentityDocumentInner.InstanceID - agentinfo.SetImdsFallbackSucceed() + provider.GetFlagsStats().SetFlag(provider.FlagIMDSFallbackSucceed) } else { fmt.Println("E! [EC2] Fetch identity document from EC2 metadata fail:", errInner) }