diff --git a/extension/agenthealth/config.go b/extension/agenthealth/config.go index d7a963ab66..dd1f94c06c 100644 --- a/extension/agenthealth/config.go +++ b/extension/agenthealth/config.go @@ -3,10 +3,15 @@ package agenthealth -import "go.opentelemetry.io/collector/component" +import ( + "go.opentelemetry.io/collector/component" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" +) type Config struct { - IsUsageDataEnabled bool `mapstructure:"is_usage_data_enabled"` + IsUsageDataEnabled bool `mapstructure:"is_usage_data_enabled"` + Stats agent.StatsConfig `mapstructure:"stats"` } var _ component.Config = (*Config)(nil) diff --git a/extension/agenthealth/config_test.go b/extension/agenthealth/config_test.go index bdd95803c2..d1bb85b8f8 100644 --- a/extension/agenthealth/config_test.go +++ b/extension/agenthealth/config_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap/confmaptest" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" ) func TestLoadConfig(t *testing.T) { @@ -24,7 +26,11 @@ func TestLoadConfig(t *testing.T) { }, { id: component.NewIDWithName(TypeStr, "1"), - want: &Config{IsUsageDataEnabled: false}, + want: &Config{IsUsageDataEnabled: false, Stats: agent.StatsConfig{Operations: []string{agent.AllowAllOperations}}}, + }, + { + id: component.NewIDWithName(TypeStr, "2"), + want: &Config{IsUsageDataEnabled: true, Stats: agent.StatsConfig{Operations: []string{"ListBuckets"}}}, }, } for _, testCase := range testCases { diff --git a/extension/agenthealth/extension.go b/extension/agenthealth/extension.go index 7f650648d8..26cacc6443 100644 --- a/extension/agenthealth/extension.go +++ b/extension/agenthealth/extension.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/component" "go.uber.org/zap" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats" "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/useragent" ) @@ -23,6 +24,11 @@ var _ awsmiddleware.Extension = (*agentHealth)(nil) func (ah *agentHealth) Handlers() ([]awsmiddleware.RequestHandler, []awsmiddleware.ResponseHandler) { var responseHandlers []awsmiddleware.ResponseHandler requestHandlers := []awsmiddleware.RequestHandler{useragent.NewHandler(ah.cfg.IsUsageDataEnabled)} + if ah.cfg.IsUsageDataEnabled { + req, res := stats.NewHandlers(ah.logger, ah.cfg.Stats) + requestHandlers = append(requestHandlers, req...) + responseHandlers = append(responseHandlers, res...) + } return requestHandlers, responseHandlers } diff --git a/extension/agenthealth/extension_test.go b/extension/agenthealth/extension_test.go index b243954626..9cd98ddde1 100644 --- a/extension/agenthealth/extension_test.go +++ b/extension/agenthealth/extension_test.go @@ -18,13 +18,15 @@ func TestExtension(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, extension) assert.NoError(t, extension.Start(ctx, componenttest.NewNopHost())) - requests, responses := extension.Handlers() - assert.Len(t, requests, 1) - assert.Len(t, responses, 0) + requestHandlers, responseHandlers := extension.Handlers() + // user agent, client stats, stats + assert.Len(t, requestHandlers, 3) + // client stats + assert.Len(t, responseHandlers, 1) extension.cfg.IsUsageDataEnabled = false - extension.Handlers() - requests, responses = extension.Handlers() - assert.Len(t, requests, 1) - assert.Len(t, responses, 0) + requestHandlers, responseHandlers = extension.Handlers() + // user agent + assert.Len(t, requestHandlers, 1) + assert.Len(t, responseHandlers, 0) assert.NoError(t, extension.Shutdown(ctx)) } diff --git a/extension/agenthealth/factory.go b/extension/agenthealth/factory.go index 19ad7aa2ae..47bca97016 100644 --- a/extension/agenthealth/factory.go +++ b/extension/agenthealth/factory.go @@ -8,6 +8,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" ) const ( @@ -26,6 +28,9 @@ func NewFactory() extension.Factory { func createDefaultConfig() component.Config { return &Config{ IsUsageDataEnabled: true, + Stats: agent.StatsConfig{ + Operations: []string{agent.AllowAllOperations}, + }, } } diff --git a/extension/agenthealth/factory_test.go b/extension/agenthealth/factory_test.go index a805f59c0c..4899dfb425 100644 --- a/extension/agenthealth/factory_test.go +++ b/extension/agenthealth/factory_test.go @@ -10,11 +10,13 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/extension/extensiontest" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" ) func TestCreateDefaultConfig(t *testing.T) { cfg := NewFactory().CreateDefaultConfig() - assert.Equal(t, &Config{IsUsageDataEnabled: true}, cfg) + assert.Equal(t, &Config{IsUsageDataEnabled: true, Stats: agent.StatsConfig{Operations: []string{agent.AllowAllOperations}}}, cfg) assert.NoError(t, componenttest.CheckConfigStruct(cfg)) } diff --git a/extension/agenthealth/handler/stats/agent/agent.go b/extension/agenthealth/handler/stats/agent/agent.go new file mode 100644 index 0000000000..d6982777b6 --- /dev/null +++ b/extension/agenthealth/handler/stats/agent/agent.go @@ -0,0 +1,102 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package agent + +import ( + "encoding/json" + "strings" + + "github.com/aws/amazon-cloudwatch-agent/internal/util/collections" +) + +const ( + AllowAllOperations = "*" +) + +type Stats struct { + CpuPercent *float64 `json:"cpu,omitempty"` + MemoryBytes *uint64 `json:"mem,omitempty"` + FileDescriptorCount *int32 `json:"fd,omitempty"` + ThreadCount *int32 `json:"th,omitempty"` + LatencyMillis *int64 `json:"lat,omitempty"` + PayloadBytes *int `json:"load,omitempty"` + StatusCode *int `json:"code,omitempty"` + SharedConfigFallback *int `json:"scfb,omitempty"` + ImdsFallbackSucceed *int `json:"ifs,omitempty"` + AppSignals *int `json:"as,omitempty"` + EnhancedContainerInsights *int `json:"eci,omitempty"` +} + +// Merge the other Stats into the current. If the field is not nil, +// then it'll overwrite the existing one. +func (s *Stats) Merge(other Stats) { + if other.CpuPercent != nil { + s.CpuPercent = other.CpuPercent + } + if other.MemoryBytes != nil { + s.MemoryBytes = other.MemoryBytes + } + if other.FileDescriptorCount != nil { + s.FileDescriptorCount = other.FileDescriptorCount + } + if other.ThreadCount != nil { + s.ThreadCount = other.ThreadCount + } + if other.LatencyMillis != nil { + s.LatencyMillis = other.LatencyMillis + } + if other.PayloadBytes != nil { + s.PayloadBytes = other.PayloadBytes + } + if other.StatusCode != nil { + s.StatusCode = other.StatusCode + } + if other.SharedConfigFallback != nil { + s.SharedConfigFallback = other.SharedConfigFallback + } + if other.ImdsFallbackSucceed != nil { + s.ImdsFallbackSucceed = other.ImdsFallbackSucceed + } + if other.AppSignals != nil { + s.AppSignals = other.AppSignals + } + if other.EnhancedContainerInsights != nil { + s.EnhancedContainerInsights = other.EnhancedContainerInsights + } +} + +func (s *Stats) Marshal() (string, error) { + raw, err := json.Marshal(s) + if err != nil { + return "", err + } + content := strings.TrimPrefix(string(raw), "{") + return strings.TrimSuffix(content, "}"), nil +} + +type StatsProvider interface { + Stats(operation string) Stats +} + +type OperationsFilter struct { + operations collections.Set[string] + allowAll bool +} + +func (of OperationsFilter) IsAllowed(operationName string) bool { + return of.allowAll || of.operations.Contains(operationName) +} + +func NewOperationsFilter(operations ...string) OperationsFilter { + allowed := collections.NewSet[string](operations...) + return OperationsFilter{ + operations: allowed, + allowAll: allowed.Contains(AllowAllOperations), + } +} + +type StatsConfig struct { + // Operations are the allowed operation names to gather stats for. + Operations []string `mapstructure:"operations,omitempty"` +} diff --git a/extension/agenthealth/handler/stats/agent/agent_test.go b/extension/agenthealth/handler/stats/agent/agent_test.go new file mode 100644 index 0000000000..8322179dfc --- /dev/null +++ b/extension/agenthealth/handler/stats/agent/agent_test.go @@ -0,0 +1,106 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package agent + +import ( + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/stretchr/testify/assert" +) + +func TestMerge(t *testing.T) { + stats := &Stats{CpuPercent: aws.Float64(1.2)} + assert.EqualValues(t, 1.2, *stats.CpuPercent) + assert.Nil(t, stats.MemoryBytes) + stats.Merge(Stats{ + CpuPercent: aws.Float64(1.3), + MemoryBytes: aws.Uint64(123), + }) + assert.EqualValues(t, 1.3, *stats.CpuPercent) + assert.EqualValues(t, 123, *stats.MemoryBytes) + stats.Merge(Stats{ + FileDescriptorCount: aws.Int32(456), + ThreadCount: aws.Int32(789), + LatencyMillis: aws.Int64(1234), + PayloadBytes: aws.Int(5678), + StatusCode: aws.Int(200), + ImdsFallbackSucceed: aws.Int(1), + SharedConfigFallback: aws.Int(1), + AppSignals: aws.Int(1), + EnhancedContainerInsights: aws.Int(1), + }) + assert.EqualValues(t, 1.3, *stats.CpuPercent) + assert.EqualValues(t, 123, *stats.MemoryBytes) + assert.EqualValues(t, 456, *stats.FileDescriptorCount) + assert.EqualValues(t, 789, *stats.ThreadCount) + assert.EqualValues(t, 1234, *stats.LatencyMillis) + assert.EqualValues(t, 5678, *stats.PayloadBytes) + assert.EqualValues(t, 200, *stats.StatusCode) + assert.EqualValues(t, 1, *stats.ImdsFallbackSucceed) + assert.EqualValues(t, 1, *stats.SharedConfigFallback) + assert.EqualValues(t, 1, *stats.AppSignals) + assert.EqualValues(t, 1, *stats.EnhancedContainerInsights) +} + +func TestMarshal(t *testing.T) { + testCases := map[string]struct { + stats *Stats + want string + }{ + "WithEmpty": { + stats: &Stats{}, + want: "", + }, + "WithPartial": { + stats: &Stats{ + CpuPercent: aws.Float64(1.2), + MemoryBytes: aws.Uint64(123), + ThreadCount: aws.Int32(789), + PayloadBytes: aws.Int(5678), + }, + want: `"cpu":1.2,"mem":123,"th":789,"load":5678`, + }, + "WithFull": { + stats: &Stats{ + CpuPercent: aws.Float64(1.2), + MemoryBytes: aws.Uint64(123), + FileDescriptorCount: aws.Int32(456), + ThreadCount: aws.Int32(789), + LatencyMillis: aws.Int64(1234), + PayloadBytes: aws.Int(5678), + StatusCode: aws.Int(200), + ImdsFallbackSucceed: aws.Int(1), + }, + want: `"cpu":1.2,"mem":123,"fd":456,"th":789,"lat":1234,"load":5678,"code":200,"ifs":1`, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + got, err := testCase.stats.Marshal() + assert.NoError(t, err) + assert.Equal(t, testCase.want, got) + }) + } +} + +func TestOperationFilter(t *testing.T) { + testCases := map[string]struct { + allowedOperations []string + testOperations []string + want []bool + }{ + "WithNoneAllowed": {allowedOperations: nil, testOperations: []string{"nothing", "is", "allowed"}, want: []bool{false, false, false}}, + "WithSomeAllowed": {allowedOperations: []string{"are"}, testOperations: []string{"some", "are", "allowed"}, want: []bool{false, true, false}}, + "WithAllAllowed": {allowedOperations: []string{"*"}, testOperations: []string{"all", "are", "allowed"}, want: []bool{true, true, true}}, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + filter := NewOperationsFilter(testCase.allowedOperations...) + for index, testOperation := range testCase.testOperations { + assert.Equal(t, testCase.want[index], filter.IsAllowed(testOperation)) + } + }) + } +} diff --git a/extension/agenthealth/handler/stats/client/client.go b/extension/agenthealth/handler/stats/client/client.go new file mode 100644 index 0000000000..9b87d2533b --- /dev/null +++ b/extension/agenthealth/handler/stats/client/client.go @@ -0,0 +1,115 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package client + +import ( + "context" + "io" + "net/http" + "sync" + "time" + + "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" + "github.com/aws/aws-sdk-go/aws" + "github.com/jellydator/ttlcache/v3" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" +) + +const ( + handlerID = "cloudwatchagent.ClientStatsHandler" + ttlDuration = 10 * time.Second + cacheSize = 1000 +) + +type Stats interface { + awsmiddleware.RequestHandler + awsmiddleware.ResponseHandler + agent.StatsProvider +} + +type requestRecorder struct { + start time.Time + payloadBytes int64 +} + +type clientStatsHandler struct { + mu sync.Mutex + + filter agent.OperationsFilter + getOperationName func(ctx context.Context) string + getRequestID func(ctx context.Context) string + + statsByOperation map[string]agent.Stats + requestCache *ttlcache.Cache[string, *requestRecorder] +} + +var _ Stats = (*clientStatsHandler)(nil) + +func NewHandler(filter agent.OperationsFilter) Stats { + requestCache := ttlcache.New[string, *requestRecorder]( + ttlcache.WithTTL[string, *requestRecorder](ttlDuration), + ttlcache.WithCapacity[string, *requestRecorder](cacheSize), + ttlcache.WithDisableTouchOnHit[string, *requestRecorder](), + ) + go requestCache.Start() + return &clientStatsHandler{ + filter: filter, + getOperationName: awsmiddleware.GetOperationName, + getRequestID: awsmiddleware.GetRequestID, + requestCache: requestCache, + statsByOperation: make(map[string]agent.Stats), + } +} + +func (csh *clientStatsHandler) ID() string { + return handlerID +} + +func (csh *clientStatsHandler) Position() awsmiddleware.HandlerPosition { + return awsmiddleware.Before +} + +func (csh *clientStatsHandler) HandleRequest(ctx context.Context, r *http.Request) { + operation := csh.getOperationName(ctx) + if !csh.filter.IsAllowed(operation) { + return + } + csh.mu.Lock() + defer csh.mu.Unlock() + requestID := csh.getRequestID(ctx) + recorder := &requestRecorder{start: time.Now()} + recorder.payloadBytes, _ = io.Copy(io.Discard, r.Body) + csh.requestCache.Set(requestID, recorder, ttlcache.DefaultTTL) +} + +func (csh *clientStatsHandler) HandleResponse(ctx context.Context, r *http.Response) { + operation := csh.getOperationName(ctx) + if !csh.filter.IsAllowed(operation) { + return + } + csh.mu.Lock() + defer csh.mu.Unlock() + requestID := csh.getRequestID(ctx) + item, ok := csh.requestCache.GetAndDelete(requestID) + if !ok { + return + } + recorder := item.Value() + stats := agent.Stats{ + PayloadBytes: aws.Int(int(recorder.payloadBytes)), + StatusCode: aws.Int(r.StatusCode), + } + latency := time.Since(recorder.start).Milliseconds() + stats.LatencyMillis = aws.Int64(latency) + csh.statsByOperation[operation] = stats +} + +func (csh *clientStatsHandler) Stats(operation string) agent.Stats { + csh.mu.Lock() + defer csh.mu.Unlock() + stats := csh.statsByOperation[operation] + csh.statsByOperation[operation] = agent.Stats{} + return stats +} diff --git a/extension/agenthealth/handler/stats/client/client_test.go b/extension/agenthealth/handler/stats/client/client_test.go new file mode 100644 index 0000000000..3f8a9704cf --- /dev/null +++ b/extension/agenthealth/handler/stats/client/client_test.go @@ -0,0 +1,46 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package client + +import ( + "bytes" + "context" + "net/http" + "testing" + "time" + + "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" +) + +func TestHandle(t *testing.T) { + operation := "test" + handler := NewHandler(agent.NewOperationsFilter("test")) + handler.(*clientStatsHandler).getOperationName = func(context.Context) string { + return operation + } + assert.Equal(t, awsmiddleware.Before, handler.Position()) + assert.Equal(t, handlerID, handler.ID()) + body := []byte("test payload size") + req, err := http.NewRequest("", "localhost", bytes.NewBuffer(body)) + require.NoError(t, err) + ctx := context.Background() + handler.HandleRequest(ctx, req) + got := handler.Stats(operation) + assert.Nil(t, got.LatencyMillis) + assert.Nil(t, got.PayloadBytes) + assert.Nil(t, got.StatusCode) + time.Sleep(time.Millisecond) + handler.HandleResponse(ctx, &http.Response{StatusCode: http.StatusOK}) + got = handler.Stats(operation) + assert.NotNil(t, got.LatencyMillis) + assert.NotNil(t, got.PayloadBytes) + assert.NotNil(t, got.StatusCode) + assert.Equal(t, http.StatusOK, *got.StatusCode) + assert.Equal(t, 17, *got.PayloadBytes) + assert.GreaterOrEqual(t, *got.LatencyMillis, int64(1)) +} diff --git a/extension/agenthealth/handler/stats/handler.go b/extension/agenthealth/handler/stats/handler.go new file mode 100644 index 0000000000..77ccb597ad --- /dev/null +++ b/extension/agenthealth/handler/stats/handler.go @@ -0,0 +1,79 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package stats + +import ( + "context" + "net/http" + "sync" + + "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" + "go.uber.org/zap" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/client" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/provider" +) + +const ( + handlerID = "cloudwatchagent.StatsHandler" + headerKeyAgentStats = "X-Amz-Agent-Stats" +) + +func NewHandlers(logger *zap.Logger, cfg agent.StatsConfig) ([]awsmiddleware.RequestHandler, []awsmiddleware.ResponseHandler) { + filter := agent.NewOperationsFilter(cfg.Operations...) + clientStats := client.NewHandler(filter) + stats := newStatsHandler(logger, filter, []agent.StatsProvider{clientStats, provider.GetProcessStats(), provider.GetFlagsStats()}) + return []awsmiddleware.RequestHandler{clientStats, stats}, []awsmiddleware.ResponseHandler{clientStats} +} + +type statsHandler struct { + mu sync.Mutex + + logger *zap.Logger + filter agent.OperationsFilter + providers []agent.StatsProvider +} + +func newStatsHandler(logger *zap.Logger, filter agent.OperationsFilter, providers []agent.StatsProvider) *statsHandler { + sh := &statsHandler{ + logger: logger, + filter: filter, + providers: providers, + } + return sh +} + +var _ awsmiddleware.RequestHandler = (*statsHandler)(nil) + +func (sh *statsHandler) ID() string { + return handlerID +} + +func (sh *statsHandler) Position() awsmiddleware.HandlerPosition { + return awsmiddleware.After +} + +func (sh *statsHandler) HandleRequest(ctx context.Context, r *http.Request) { + operation := awsmiddleware.GetOperationName(ctx) + if !sh.filter.IsAllowed(operation) { + return + } + header := sh.Header(operation) + if header != "" { + r.Header.Set(headerKeyAgentStats, header) + } +} + +func (sh *statsHandler) Header(operation string) string { + stats := &agent.Stats{} + for _, p := range sh.providers { + stats.Merge(p.Stats(operation)) + } + header, err := stats.Marshal() + if err != nil { + sh.logger.Warn("Failed to serialize agent stats", zap.Error(err)) + } + return header +} diff --git a/extension/agenthealth/handler/stats/handler_test.go b/extension/agenthealth/handler/stats/handler_test.go new file mode 100644 index 0000000000..f40bebd481 --- /dev/null +++ b/extension/agenthealth/handler/stats/handler_test.go @@ -0,0 +1,73 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package stats + +import ( + "context" + "net/http" + "testing" + + "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" + "github.com/aws/aws-sdk-go/aws" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" +) + +type mockStatsProvider struct { + stats *agent.Stats +} + +var _ agent.StatsProvider = (*mockStatsProvider)(nil) + +func (m *mockStatsProvider) Stats(string) agent.Stats { + return *m.stats +} + +func newMockStatsProvider(stats *agent.Stats) agent.StatsProvider { + return &mockStatsProvider{stats: stats} +} + +func TestStatsHandler(t *testing.T) { + stats := &agent.Stats{ + FileDescriptorCount: aws.Int32(456), + ThreadCount: aws.Int32(789), + LatencyMillis: aws.Int64(1234), + PayloadBytes: aws.Int(5678), + StatusCode: aws.Int(200), + ImdsFallbackSucceed: aws.Int(1), + SharedConfigFallback: aws.Int(1), + } + handler := newStatsHandler( + zap.NewNop(), + agent.NewOperationsFilter(), + []agent.StatsProvider{ + newMockStatsProvider(&agent.Stats{CpuPercent: aws.Float64(1.2)}), + newMockStatsProvider(&agent.Stats{MemoryBytes: aws.Uint64(123)}), + newMockStatsProvider(stats), + }, + ) + ctx := context.Background() + assert.Equal(t, awsmiddleware.After, handler.Position()) + assert.Equal(t, handlerID, handler.ID()) + req, err := http.NewRequest("", "localhost", nil) + require.NoError(t, err) + handler.HandleRequest(ctx, req) + assert.Equal(t, "", req.Header.Get(headerKeyAgentStats)) + handler.filter = agent.NewOperationsFilter(agent.AllowAllOperations) + handler.HandleRequest(ctx, req) + assert.Equal(t, `"cpu":1.2,"mem":123,"fd":456,"th":789,"lat":1234,"load":5678,"code":200,"scfb":1,"ifs":1`, req.Header.Get(headerKeyAgentStats)) + stats.StatusCode = aws.Int(404) + stats.LatencyMillis = nil + handler.HandleRequest(ctx, req) + assert.Equal(t, `"cpu":1.2,"mem":123,"fd":456,"th":789,"load":5678,"code":404,"scfb":1,"ifs":1`, req.Header.Get(headerKeyAgentStats)) +} + +func TestNewHandlers(t *testing.T) { + requestHandlers, responseHandlers := NewHandlers(zap.NewNop(), agent.StatsConfig{}) + assert.Len(t, requestHandlers, 2) + assert.Len(t, responseHandlers, 1) +} diff --git a/extension/agenthealth/handler/stats/provider/flag.go b/extension/agenthealth/handler/stats/provider/flag.go new file mode 100644 index 0000000000..30152373a3 --- /dev/null +++ b/extension/agenthealth/handler/stats/provider/flag.go @@ -0,0 +1,82 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package provider + +import ( + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" +) + +const ( + flagGetInterval = 5 * time.Minute +) + +type Flag int + +const ( + FlagIMDSFallbackSucceed = iota + FlagSharedConfigFallback + FlagAppSignal + FlagEnhancedContainerInsights +) + +var ( + flagSingleton FlagStats + flagOnce sync.Once +) + +type FlagStats interface { + agent.StatsProvider + SetFlag(flag Flag) +} + +type flagStats struct { + *intervalStats + + flags sync.Map +} + +var _ FlagStats = (*flagStats)(nil) + +func (p *flagStats) update() { + p.mu.Lock() + defer p.mu.Unlock() + p.stats = agent.Stats{ + ImdsFallbackSucceed: p.getFlag(FlagIMDSFallbackSucceed), + SharedConfigFallback: p.getFlag(FlagSharedConfigFallback), + AppSignals: p.getFlag(FlagAppSignal), + EnhancedContainerInsights: p.getFlag(FlagEnhancedContainerInsights), + } +} + +func (p *flagStats) getFlag(flag Flag) *int { + if _, ok := p.flags.Load(flag); ok { + return aws.Int(1) + } + return nil +} + +func (p *flagStats) SetFlag(flag Flag) { + if _, ok := p.flags.Load(flag); !ok { + p.flags.Store(flag, true) + p.update() + } +} + +func newFlagStats(interval time.Duration) *flagStats { + return &flagStats{ + intervalStats: newIntervalStats(interval), + } +} + +func GetFlagsStats() FlagStats { + flagOnce.Do(func() { + flagSingleton = newFlagStats(flagGetInterval) + }) + return flagSingleton +} diff --git a/extension/agenthealth/handler/stats/provider/flag_test.go b/extension/agenthealth/handler/stats/provider/flag_test.go new file mode 100644 index 0000000000..b3044c0780 --- /dev/null +++ b/extension/agenthealth/handler/stats/provider/flag_test.go @@ -0,0 +1,28 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package provider + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestFlagStats(t *testing.T) { + provider := newFlagStats(time.Microsecond) + got := provider.stats + assert.Nil(t, got.ImdsFallbackSucceed) + assert.Nil(t, got.SharedConfigFallback) + provider.SetFlag(FlagIMDSFallbackSucceed) + assert.Nil(t, got.ImdsFallbackSucceed) + got = provider.stats + assert.NotNil(t, got.ImdsFallbackSucceed) + assert.Equal(t, 1, *got.ImdsFallbackSucceed) + assert.Nil(t, got.SharedConfigFallback) + provider.SetFlag(FlagSharedConfigFallback) + got = provider.stats + assert.NotNil(t, got.SharedConfigFallback) + assert.Equal(t, 1, *got.SharedConfigFallback) +} diff --git a/extension/agenthealth/handler/stats/provider/interval.go b/extension/agenthealth/handler/stats/provider/interval.go new file mode 100644 index 0000000000..ce0c885ce7 --- /dev/null +++ b/extension/agenthealth/handler/stats/provider/interval.go @@ -0,0 +1,51 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package provider + +import ( + "sync" + "time" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" +) + +// intervalStats restricts the Stats get function to once +// per interval. +type intervalStats struct { + mu sync.Mutex + interval time.Duration + + getOnce *sync.Once + lastGet time.Time + + stats agent.Stats +} + +var _ agent.StatsProvider = (*intervalStats)(nil) + +func (p *intervalStats) Stats(string) agent.Stats { + p.mu.Lock() + defer p.mu.Unlock() + var stats agent.Stats + p.getOnce.Do(func() { + p.lastGet = time.Now() + stats = p.stats + go p.allowNextGetAfter(p.interval) + }) + return stats +} + +func (p *intervalStats) allowNextGetAfter(interval time.Duration) { + time.Sleep(interval) + p.mu.Lock() + defer p.mu.Unlock() + p.getOnce = new(sync.Once) +} + +func newIntervalStats(interval time.Duration) *intervalStats { + return &intervalStats{ + getOnce: new(sync.Once), + interval: interval, + } +} diff --git a/extension/agenthealth/handler/stats/provider/interval_test.go b/extension/agenthealth/handler/stats/provider/interval_test.go new file mode 100644 index 0000000000..3e54c57637 --- /dev/null +++ b/extension/agenthealth/handler/stats/provider/interval_test.go @@ -0,0 +1,26 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package provider + +import ( + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/stretchr/testify/assert" +) + +func TestIntervalStats(t *testing.T) { + s := newIntervalStats(time.Millisecond) + s.stats.ThreadCount = aws.Int32(2) + got := s.Stats("") + assert.NotNil(t, got.ThreadCount) + got = s.Stats("") + assert.Nil(t, got.ThreadCount) + time.Sleep(2 * time.Millisecond) + got = s.Stats("") + assert.NotNil(t, got.ThreadCount) + got = s.Stats("") + assert.Nil(t, got.ThreadCount) +} diff --git a/extension/agenthealth/handler/stats/provider/process.go b/extension/agenthealth/handler/stats/provider/process.go new file mode 100644 index 0000000000..48ca9d9d83 --- /dev/null +++ b/extension/agenthealth/handler/stats/provider/process.go @@ -0,0 +1,103 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package provider + +import ( + "os" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/shirou/gopsutil/v3/process" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" +) + +const ( + processGetInterval = time.Minute +) + +var ( + processSingleton *processStats + processOnce sync.Once +) + +type processMetrics interface { + CPUPercent() (float64, error) + MemoryInfo() (*process.MemoryInfoStat, error) + NumFDs() (int32, error) + NumThreads() (int32, error) +} + +type processStats struct { + *intervalStats + + proc processMetrics +} + +var _ agent.StatsProvider = (*processStats)(nil) + +func (p *processStats) cpuPercent() *float64 { + if cpuPercent, err := p.proc.CPUPercent(); err == nil { + return aws.Float64(float64(int64(cpuPercent*10)) / 10) // truncate to 10th decimal place + } + return nil +} + +func (p *processStats) memoryBytes() *uint64 { + if memInfo, err := p.proc.MemoryInfo(); err == nil { + return aws.Uint64(memInfo.RSS) + } + return nil +} + +func (p *processStats) fileDescriptorCount() *int32 { + if fdCount, err := p.proc.NumFDs(); err == nil { + return aws.Int32(fdCount) + } + return nil +} + +func (p *processStats) threadCount() *int32 { + if thCount, err := p.proc.NumThreads(); err == nil { + return aws.Int32(thCount) + } + return nil +} + +func (p *processStats) updateLoop() { + ticker := time.NewTicker(p.interval) + for range ticker.C { + p.refresh() + } +} + +func (p *processStats) refresh() { + p.mu.Lock() + defer p.mu.Unlock() + p.stats = agent.Stats{ + CpuPercent: p.cpuPercent(), + MemoryBytes: p.memoryBytes(), + FileDescriptorCount: p.fileDescriptorCount(), + ThreadCount: p.threadCount(), + } +} + +func newProcessStats(proc processMetrics, interval time.Duration) *processStats { + ps := &processStats{ + intervalStats: newIntervalStats(interval), + proc: proc, + } + ps.refresh() + go ps.updateLoop() + return ps +} + +func GetProcessStats() agent.StatsProvider { + processOnce.Do(func() { + proc, _ := process.NewProcess(int32(os.Getpid())) + processSingleton = newProcessStats(proc, processGetInterval) + }) + return processSingleton +} diff --git a/extension/agenthealth/handler/stats/provider/process_test.go b/extension/agenthealth/handler/stats/provider/process_test.go new file mode 100644 index 0000000000..f1368a883d --- /dev/null +++ b/extension/agenthealth/handler/stats/provider/process_test.go @@ -0,0 +1,69 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package provider + +import ( + "errors" + "testing" + "time" + + "github.com/shirou/gopsutil/v3/process" + "github.com/stretchr/testify/assert" +) + +type mockProcessMetrics struct { + err error +} + +var _ processMetrics = (*mockProcessMetrics)(nil) + +func (m mockProcessMetrics) CPUPercent() (float64, error) { + if m.err != nil { + return -1, m.err + } + return 1, nil +} + +func (m mockProcessMetrics) MemoryInfo() (*process.MemoryInfoStat, error) { + if m.err != nil { + return nil, m.err + } + return &process.MemoryInfoStat{RSS: uint64(2)}, nil +} + +func (m mockProcessMetrics) NumFDs() (int32, error) { + if m.err != nil { + return -1, m.err + } + return 3, nil +} + +func (m mockProcessMetrics) NumThreads() (int32, error) { + if m.err != nil { + return -1, m.err + } + return 4, nil +} + +func TestProcessStats(t *testing.T) { + testErr := errors.New("test error") + mock := &mockProcessMetrics{} + provider := newProcessStats(mock, time.Millisecond) + got := provider.stats + assert.NotNil(t, got.CpuPercent) + assert.NotNil(t, got.MemoryBytes) + assert.NotNil(t, got.FileDescriptorCount) + assert.NotNil(t, got.ThreadCount) + assert.EqualValues(t, 1, *got.CpuPercent) + assert.EqualValues(t, 2, *got.MemoryBytes) + assert.EqualValues(t, 3, *got.FileDescriptorCount) + assert.EqualValues(t, 4, *got.ThreadCount) + mock.err = testErr + time.Sleep(2 * time.Millisecond) + got = provider.stats + assert.Nil(t, got.CpuPercent) + assert.Nil(t, got.MemoryBytes) + assert.Nil(t, got.FileDescriptorCount) + assert.Nil(t, got.ThreadCount) +} diff --git a/extension/agenthealth/handler/useragent/useragent.go b/extension/agenthealth/handler/useragent/useragent.go index fbb7cb61b9..0ab0a3e5cd 100644 --- a/extension/agenthealth/handler/useragent/useragent.go +++ b/extension/agenthealth/handler/useragent/useragent.go @@ -18,6 +18,7 @@ import ( "golang.org/x/exp/maps" "github.com/aws/amazon-cloudwatch-agent/cfg/envconfig" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/provider" "github.com/aws/amazon-cloudwatch-agent/internal/util/collections" "github.com/aws/amazon-cloudwatch-agent/internal/version" "github.com/aws/amazon-cloudwatch-agent/receiver/adapter" @@ -29,9 +30,10 @@ const ( flagRunAsUser = "run_as_user" flagContainerInsights = "container_insights" - flagPulse = "pulse" + flagAppSignals = "app_signals" flagEnhancedContainerInsights = "enhanced_container_insights" - separator = " " + + separator = " " typeInputs = "inputs" typeProcessors = "processors" @@ -91,10 +93,12 @@ func (ua *userAgent) SetComponents(otelCfg *otelcol.Config, telegrafCfg *telegra if exporter.Type() == "awsemf" { cfg := otelCfg.Exporters[exporter].(*awsemfexporter.Config) if cfg.IsPulseApmEnabled() { - ua.outputs.Add(flagPulse) + ua.outputs.Add(flagAppSignals) + provider.GetFlagsStats().SetFlag(provider.FlagAppSignal) } if cfg.IsEnhancedContainerInsights() { ua.outputs.Add(flagEnhancedContainerInsights) + provider.GetFlagsStats().SetFlag(provider.FlagEnhancedContainerInsights) } } } @@ -173,14 +177,13 @@ func isRunningAsRoot() bool { } func newUserAgent() *userAgent { - uah := &userAgent{ + return &userAgent{ id: uuid.NewString(), isRoot: isRunningAsRoot(), inputs: collections.NewSet[string](), processors: collections.NewSet[string](), outputs: collections.NewSet[string](), } - return uah } func Get() UserAgent { diff --git a/extension/agenthealth/handler/useragent/useragent_test.go b/extension/agenthealth/handler/useragent/useragent_test.go index 36dd9aaa8d..255ccd21d3 100644 --- a/extension/agenthealth/handler/useragent/useragent_test.go +++ b/extension/agenthealth/handler/useragent/useragent_test.go @@ -126,7 +126,7 @@ func TestEmf(t *testing.T) { assert.Equal(t, "inputs:(nop run_as_user)", ua.inputsStr.Load()) assert.Equal(t, "", ua.processorsStr.Load()) - assert.Equal(t, "outputs:(awsemf pulse)", ua.outputsStr.Load()) + assert.Equal(t, "outputs:(app_signals awsemf)", ua.outputsStr.Load()) } func TestSingleton(t *testing.T) { diff --git a/extension/agenthealth/testdata/config.yaml b/extension/agenthealth/testdata/config.yaml index 4bd8b55d92..57fdbd02fe 100644 --- a/extension/agenthealth/testdata/config.yaml +++ b/extension/agenthealth/testdata/config.yaml @@ -1,3 +1,8 @@ agenthealth: agenthealth/1: is_usage_data_enabled: false +agenthealth/2: + is_usage_data_enabled: true + stats: + operations: + - 'ListBuckets' diff --git a/go.mod b/go.mod index 398a4a1ee1..026eb11c18 100644 --- a/go.mod +++ b/go.mod @@ -106,6 +106,7 @@ require ( github.com/hashicorp/golang-lru v1.0.2 github.com/influxdata/telegraf v0.0.0-00010101000000-000000000000 github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8 + github.com/jellydator/ttlcache/v3 v3.1.0 github.com/kardianos/service v1.2.1 // Keep this pinned to v1.2.1. v1.2.2 causes the agent to not register as a service on Windows github.com/kr/pretty v0.3.1 github.com/oklog/run v1.1.0 diff --git a/go.sum b/go.sum index 0499fcf766..2f1fd27cd5 100644 --- a/go.sum +++ b/go.sum @@ -768,6 +768,8 @@ github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8 github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jellydator/ttlcache/v3 v3.1.0 h1:0gPFG0IHHP6xyUyXq+JaD8fwkDCqgqwohXNJBcYE71g= +github.com/jellydator/ttlcache/v3 v3.1.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= github.com/jhump/protoreflect v1.8.3-0.20210616212123-6cc1efa697ca h1:a0GZUdb+qnutF8shJxr2qs2qT3fnF+ptxTxPB8+oIvk= github.com/jhump/protoreflect v1.8.3-0.20210616212123-6cc1efa697ca/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=