From f3d2e3307ae8b2bc8c8c304922cedc63a4837295 Mon Sep 17 00:00:00 2001 From: Parampreet Singh <50599809+Paramadon@users.noreply.github.com> Date: Thu, 12 Dec 2024 09:56:44 -0500 Subject: [PATCH] Adding Statuscode Handler to Agent Health Extension (#1423) --- .golangci.yml | 2 +- extension/agenthealth/config.go | 5 +- extension/agenthealth/config_test.go | 4 +- extension/agenthealth/extension.go | 29 +++- extension/agenthealth/extension_test.go | 23 ++- extension/agenthealth/factory.go | 6 +- extension/agenthealth/factory_test.go | 4 +- .../agenthealth/handler/stats/agent/agent.go | 120 ++++++++++--- .../handler/stats/agent/agent_test.go | 86 ++++++++- .../agenthealth/handler/stats/handler.go | 32 +++- .../agenthealth/handler/stats/handler_test.go | 30 +++- .../handler/stats/provider/process.go | 2 +- .../handler/stats/provider/process_test.go | 25 ++- .../handler/stats/provider/statuscode.go | 163 ++++++++++++++++++ .../handler/stats/provider/statuscode_test.go | 130 ++++++++++++++ .../outputs/cloudwatchlogs/cloudwatchlogs.go | 2 +- .../otel/extension/agenthealth/translator.go | 22 ++- .../extension/agenthealth/translator_test.go | 8 +- 18 files changed, 621 insertions(+), 72 deletions(-) create mode 100644 extension/agenthealth/handler/stats/provider/statuscode.go create mode 100644 extension/agenthealth/handler/stats/provider/statuscode_test.go diff --git a/.golangci.yml b/.golangci.yml index f4fbed2f51..88f10bb240 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -59,4 +59,4 @@ linters: - nonamedreturns issues: - new-from-rev: 3221f76 \ No newline at end of file + new-from-rev: 9af4477 \ No newline at end of file diff --git a/extension/agenthealth/config.go b/extension/agenthealth/config.go index dd1f94c06c..53c6901fff 100644 --- a/extension/agenthealth/config.go +++ b/extension/agenthealth/config.go @@ -10,8 +10,9 @@ import ( ) type Config struct { - IsUsageDataEnabled bool `mapstructure:"is_usage_data_enabled"` - Stats agent.StatsConfig `mapstructure:"stats"` + IsUsageDataEnabled bool `mapstructure:"is_usage_data_enabled"` + Stats *agent.StatsConfig `mapstructure:"stats,omitempty"` + IsStatusCodeEnabled bool `mapstructure:"is_status_code_enabled,omitempty"` } var _ component.Config = (*Config)(nil) diff --git a/extension/agenthealth/config_test.go b/extension/agenthealth/config_test.go index ee0ef301f6..967c681619 100644 --- a/extension/agenthealth/config_test.go +++ b/extension/agenthealth/config_test.go @@ -26,11 +26,11 @@ func TestLoadConfig(t *testing.T) { }, { id: component.NewIDWithName(TypeStr, "1"), - want: &Config{IsUsageDataEnabled: false, Stats: agent.StatsConfig{Operations: []string{agent.AllowAllOperations}}}, + want: &Config{IsUsageDataEnabled: false, Stats: nil}, }, { id: component.NewIDWithName(TypeStr, "2"), - want: &Config{IsUsageDataEnabled: true, Stats: agent.StatsConfig{Operations: []string{"ListBuckets"}}}, + want: &Config{IsUsageDataEnabled: true, Stats: &agent.StatsConfig{Operations: []string{"ListBuckets"}}}, }, } for _, testCase := range testCases { diff --git a/extension/agenthealth/extension.go b/extension/agenthealth/extension.go index 14ab08eb57..213d252e91 100644 --- a/extension/agenthealth/extension.go +++ b/extension/agenthealth/extension.go @@ -9,6 +9,7 @@ import ( "go.uber.org/zap" "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/useragent" ) @@ -24,11 +25,31 @@ var _ awsmiddleware.Extension = (*agentHealth)(nil) func (ah *agentHealth) Handlers() ([]awsmiddleware.RequestHandler, []awsmiddleware.ResponseHandler) { var responseHandlers []awsmiddleware.ResponseHandler requestHandlers := []awsmiddleware.RequestHandler{useragent.NewHandler(ah.cfg.IsUsageDataEnabled)} - if ah.cfg.IsUsageDataEnabled { - req, res := stats.NewHandlers(ah.logger, ah.cfg.Stats) - requestHandlers = append(requestHandlers, req...) - responseHandlers = append(responseHandlers, res...) + + if !ah.cfg.IsUsageDataEnabled { + ah.logger.Debug("Usage data is disabled, skipping stats handlers") + return requestHandlers, responseHandlers + } + + statusCodeEnabled := ah.cfg.IsStatusCodeEnabled + + var statsResponseHandlers []awsmiddleware.ResponseHandler + var statsRequestHandlers []awsmiddleware.RequestHandler + var statsConfig agent.StatsConfig + var agentStatsEnabled bool + + if ah.cfg.Stats != nil { + statsConfig = *ah.cfg.Stats + agentStatsEnabled = true + } else { + agentStatsEnabled = false } + + statsRequestHandlers, statsResponseHandlers = stats.NewHandlers(ah.logger, statsConfig, statusCodeEnabled, agentStatsEnabled) + + requestHandlers = append(requestHandlers, statsRequestHandlers...) + responseHandlers = append(responseHandlers, statsResponseHandlers...) + return requestHandlers, responseHandlers } diff --git a/extension/agenthealth/extension_test.go b/extension/agenthealth/extension_test.go index 504dc8c50e..d29be58aac 100644 --- a/extension/agenthealth/extension_test.go +++ b/extension/agenthealth/extension_test.go @@ -10,11 +10,13 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" "go.uber.org/zap" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" ) func TestExtension(t *testing.T) { ctx := context.Background() - cfg := &Config{IsUsageDataEnabled: true} + cfg := &Config{IsUsageDataEnabled: true, IsStatusCodeEnabled: true, Stats: &agent.StatsConfig{Operations: []string{"ListBuckets"}}} extension := NewAgentHealth(zap.NewNop(), cfg) assert.NotNil(t, extension) assert.NoError(t, extension.Start(ctx, componenttest.NewNopHost())) @@ -22,6 +24,25 @@ func TestExtension(t *testing.T) { // user agent, client stats, stats assert.Len(t, requestHandlers, 3) // client stats + assert.Len(t, responseHandlers, 2) + cfg.IsUsageDataEnabled = false + requestHandlers, responseHandlers = extension.Handlers() + // user agent + assert.Len(t, requestHandlers, 1) + assert.Len(t, responseHandlers, 0) + assert.NoError(t, extension.Shutdown(ctx)) +} + +func TestExtensionStatusCodeOnly(t *testing.T) { + ctx := context.Background() + cfg := &Config{IsUsageDataEnabled: true, IsStatusCodeEnabled: true} + extension := NewAgentHealth(zap.NewNop(), cfg) + assert.NotNil(t, extension) + assert.NoError(t, extension.Start(ctx, componenttest.NewNopHost())) + requestHandlers, responseHandlers := extension.Handlers() + // user agent, client stats, stats + assert.Len(t, requestHandlers, 1) + // client stats assert.Len(t, responseHandlers, 1) cfg.IsUsageDataEnabled = false requestHandlers, responseHandlers = extension.Handlers() diff --git a/extension/agenthealth/factory.go b/extension/agenthealth/factory.go index e8e97587a5..e075846c08 100644 --- a/extension/agenthealth/factory.go +++ b/extension/agenthealth/factory.go @@ -8,8 +8,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension" - - "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" ) var ( @@ -28,9 +26,7 @@ func NewFactory() extension.Factory { func createDefaultConfig() component.Config { return &Config{ IsUsageDataEnabled: true, - Stats: agent.StatsConfig{ - Operations: []string{agent.AllowAllOperations}, - }, + Stats: nil, } } diff --git a/extension/agenthealth/factory_test.go b/extension/agenthealth/factory_test.go index 4899dfb425..a36a81f246 100644 --- a/extension/agenthealth/factory_test.go +++ b/extension/agenthealth/factory_test.go @@ -10,13 +10,11 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/extension/extensiontest" - - "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" ) func TestCreateDefaultConfig(t *testing.T) { cfg := NewFactory().CreateDefaultConfig() - assert.Equal(t, &Config{IsUsageDataEnabled: true, Stats: agent.StatsConfig{Operations: []string{agent.AllowAllOperations}}}, cfg) + assert.Equal(t, &Config{IsUsageDataEnabled: true, Stats: nil}, cfg) assert.NoError(t, componenttest.CheckConfigStruct(cfg)) } diff --git a/extension/agenthealth/handler/stats/agent/agent.go b/extension/agenthealth/handler/stats/agent/agent.go index 83237d54e6..d80303510d 100644 --- a/extension/agenthealth/handler/stats/agent/agent.go +++ b/extension/agenthealth/handler/stats/agent/agent.go @@ -15,28 +15,29 @@ const ( ) type Stats struct { - CpuPercent *float64 `json:"cpu,omitempty"` - MemoryBytes *uint64 `json:"mem,omitempty"` - FileDescriptorCount *int32 `json:"fd,omitempty"` - ThreadCount *int32 `json:"th,omitempty"` - LatencyMillis *int64 `json:"lat,omitempty"` - PayloadBytes *int `json:"load,omitempty"` - StatusCode *int `json:"code,omitempty"` - SharedConfigFallback *int `json:"scfb,omitempty"` - ImdsFallbackSucceed *int `json:"ifs,omitempty"` - AppSignals *int `json:"as,omitempty"` - EnhancedContainerInsights *int `json:"eci,omitempty"` - RunningInContainer *int `json:"ric,omitempty"` - RegionType *string `json:"rt,omitempty"` - Mode *string `json:"m,omitempty"` - EntityRejected *int `json:"ent,omitempty"` + CPUPercent *float64 `json:"cpu,omitempty"` + MemoryBytes *uint64 `json:"mem,omitempty"` + FileDescriptorCount *int32 `json:"fd,omitempty"` + ThreadCount *int32 `json:"th,omitempty"` + LatencyMillis *int64 `json:"lat,omitempty"` + PayloadBytes *int `json:"load,omitempty"` + StatusCode *int `json:"code,omitempty"` + SharedConfigFallback *int `json:"scfb,omitempty"` + ImdsFallbackSucceed *int `json:"ifs,omitempty"` + AppSignals *int `json:"as,omitempty"` + EnhancedContainerInsights *int `json:"eci,omitempty"` + RunningInContainer *int `json:"ric,omitempty"` + RegionType *string `json:"rt,omitempty"` + Mode *string `json:"m,omitempty"` + EntityRejected *int `json:"ent,omitempty"` + StatusCodes map[string][5]int `json:"codes,omitempty"` //represents status codes 200,400,408,413,429, } // Merge the other Stats into the current. If the field is not nil, // then it'll overwrite the existing one. func (s *Stats) Merge(other Stats) { - if other.CpuPercent != nil { - s.CpuPercent = other.CpuPercent + if other.CPUPercent != nil { + s.CPUPercent = other.CPUPercent } if other.MemoryBytes != nil { s.MemoryBytes = other.MemoryBytes @@ -80,6 +81,26 @@ func (s *Stats) Merge(other Stats) { if other.EntityRejected != nil { s.EntityRejected = other.EntityRejected } + if other.StatusCodes != nil { + if s.StatusCodes == nil { + s.StatusCodes = make(map[string][5]int) + } + + for key, value := range other.StatusCodes { + if existing, ok := s.StatusCodes[key]; ok { + s.StatusCodes[key] = [5]int{ + existing[0] + value[0], // 200 + existing[1] + value[1], // 400 + existing[2] + value[2], // 408 + existing[3] + value[3], // 413 + existing[4] + value[4], // 429 + } + } else { + s.StatusCodes[key] = value + } + } + } + } func (s *Stats) Marshal() (string, error) { @@ -104,6 +125,29 @@ func (of OperationsFilter) IsAllowed(operationName string) bool { return of.allowAll || of.operations.Contains(operationName) } +type StatsConfig struct { + // Operations are the allowed operation names to gather stats for. + Operations []string `mapstructure:"operations,omitempty"` + // UsageFlags are the usage flags to set on start up. + UsageFlags map[Flag]any `mapstructure:"usage_flags,omitempty"` +} + +var StatusCodeOperations = []string{ // all the operations that are allowed + "PutRetentionPolicy", + "DescribeInstances", + "DescribeTags", + "DescribeVolumes", + "DescribeContainerInstances", + "DescribeServices", + "DescribeTaskDefinition", + "ListServices", + "ListTasks", + "DescribeTasks", + "CreateLogGroup", + "CreateLogStream", + "AssumeRole", +} + func NewOperationsFilter(operations ...string) OperationsFilter { allowed := collections.NewSet[string](operations...) return OperationsFilter{ @@ -112,9 +156,41 @@ func NewOperationsFilter(operations ...string) OperationsFilter { } } -type StatsConfig struct { - // Operations are the allowed operation names to gather stats for. - Operations []string `mapstructure:"operations,omitempty"` - // UsageFlags are the usage flags to set on start up. - UsageFlags map[Flag]any `mapstructure:"usage_flags,omitempty"` +// NewStatusCodeOperationsFilter creates a new filter for allowed operations and status codes. +func NewStatusCodeOperationsFilter() OperationsFilter { + return NewOperationsFilter(StatusCodeOperations...) +} + +// GetShortOperationName maps long operation names to short ones. +func GetShortOperationName(operation string) string { + switch operation { + case "PutRetentionPolicy": + return "prp" + case "DescribeInstances": + return "di" + case "DescribeTags": + return "dt" + case "DescribeTasks": + return "dts" + case "DescribeVolumes": + return "dv" + case "DescribeContainerInstances": + return "dci" + case "DescribeServices": + return "ds" + case "DescribeTaskDefinition": + return "dtd" + case "ListServices": + return "ls" + case "ListTasks": + return "lt" + case "CreateLogGroup": + return "clg" + case "CreateLogStream": + return "cls" + case "AssumeRole": + return "ar" + default: + return "" + } } diff --git a/extension/agenthealth/handler/stats/agent/agent_test.go b/extension/agenthealth/handler/stats/agent/agent_test.go index c379facc19..7a82904fa4 100644 --- a/extension/agenthealth/handler/stats/agent/agent_test.go +++ b/extension/agenthealth/handler/stats/agent/agent_test.go @@ -11,17 +11,17 @@ import ( ) func TestMerge(t *testing.T) { - stats := &Stats{CpuPercent: aws.Float64(1.2)} - assert.EqualValues(t, 1.2, *stats.CpuPercent) + stats := &Stats{CPUPercent: aws.Float64(1.2)} + assert.EqualValues(t, 1.2, *stats.CPUPercent) assert.Nil(t, stats.MemoryBytes) stats.Merge(Stats{ - CpuPercent: aws.Float64(1.3), + CPUPercent: aws.Float64(1.3), MemoryBytes: aws.Uint64(123), }) - assert.EqualValues(t, 1.3, *stats.CpuPercent) + assert.EqualValues(t, 1.3, *stats.CPUPercent) assert.EqualValues(t, 123, *stats.MemoryBytes) stats.Merge(Stats{ - CpuPercent: aws.Float64(1.5), + CPUPercent: aws.Float64(1.5), MemoryBytes: aws.Uint64(133), FileDescriptorCount: aws.Int32(456), ThreadCount: aws.Int32(789), @@ -36,7 +36,7 @@ func TestMerge(t *testing.T) { RegionType: aws.String("RegionType"), Mode: aws.String("Mode"), }) - assert.EqualValues(t, 1.5, *stats.CpuPercent) + assert.EqualValues(t, 1.5, *stats.CPUPercent) assert.EqualValues(t, 133, *stats.MemoryBytes) assert.EqualValues(t, 456, *stats.FileDescriptorCount) assert.EqualValues(t, 789, *stats.ThreadCount) @@ -52,6 +52,76 @@ func TestMerge(t *testing.T) { assert.EqualValues(t, "Mode", *stats.Mode) } +func TestMergeWithStatusCodes(t *testing.T) { + stats := &Stats{ + StatusCodes: map[string][5]int{ + "operation1": {1, 2, 3, 4, 5}, + }, + } + + stats.Merge(Stats{ + StatusCodes: map[string][5]int{ + "operation1": {2, 3, 4, 5, 6}, // Existing operation with new values + "operation2": {0, 1, 2, 3, 4}, // New operation + }, + }) + + assert.Equal(t, [5]int{3, 5, 7, 9, 11}, stats.StatusCodes["operation1"]) // Values should sum + assert.Equal(t, [5]int{0, 1, 2, 3, 4}, stats.StatusCodes["operation2"]) // New operation added + + stats.Merge(Stats{ + StatusCodes: nil, + }) + + assert.Equal(t, [5]int{3, 5, 7, 9, 11}, stats.StatusCodes["operation1"]) + assert.Equal(t, [5]int{0, 1, 2, 3, 4}, stats.StatusCodes["operation2"]) +} + +func TestMarshalWithStatusCodes(t *testing.T) { + testCases := map[string]struct { + stats *Stats + want string + }{ + "WithEmptyStatusCodes": { + stats: &Stats{ + StatusCodes: map[string][5]int{}, + }, + want: "", + }, + "WithStatusCodes": { + stats: &Stats{ + StatusCodes: map[string][5]int{ + "operation1": {1, 2, 3, 4, 5}, + "operation2": {0, 1, 2, 3, 4}, + }, + }, + want: `"codes":{"operation1":[1,2,3,4,5],"operation2":[0,1,2,3,4]}`, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + got, err := testCase.stats.Marshal() + assert.NoError(t, err) + assert.Contains(t, testCase.want, got) + }) + } +} + +func TestMergeFullWithStatusCodes(t *testing.T) { + stats := &Stats{ + CPUPercent: aws.Float64(1.0), + StatusCodes: map[string][5]int{"operation1": {1, 0, 0, 0, 0}}, + } + stats.Merge(Stats{ + CPUPercent: aws.Float64(2.0), + StatusCodes: map[string][5]int{"operation1": {0, 1, 0, 0, 0}, "operation2": {1, 1, 1, 1, 1}}, + }) + + assert.Equal(t, 2.0, *stats.CPUPercent) + assert.Equal(t, [5]int{1, 1, 0, 0, 0}, stats.StatusCodes["operation1"]) + assert.Equal(t, [5]int{1, 1, 1, 1, 1}, stats.StatusCodes["operation2"]) +} + func TestMarshal(t *testing.T) { testCases := map[string]struct { stats *Stats @@ -63,7 +133,7 @@ func TestMarshal(t *testing.T) { }, "WithPartial": { stats: &Stats{ - CpuPercent: aws.Float64(1.2), + CPUPercent: aws.Float64(1.2), MemoryBytes: aws.Uint64(123), ThreadCount: aws.Int32(789), PayloadBytes: aws.Int(5678), @@ -72,7 +142,7 @@ func TestMarshal(t *testing.T) { }, "WithFull": { stats: &Stats{ - CpuPercent: aws.Float64(1.2), + CPUPercent: aws.Float64(1.2), MemoryBytes: aws.Uint64(123), FileDescriptorCount: aws.Int32(456), ThreadCount: aws.Int32(789), diff --git a/extension/agenthealth/handler/stats/handler.go b/extension/agenthealth/handler/stats/handler.go index 7e12f12b5c..a755f6f140 100644 --- a/extension/agenthealth/handler/stats/handler.go +++ b/extension/agenthealth/handler/stats/handler.go @@ -21,12 +21,34 @@ const ( headerKeyAgentStats = "X-Amz-Agent-Stats" ) -func NewHandlers(logger *zap.Logger, cfg agent.StatsConfig) ([]awsmiddleware.RequestHandler, []awsmiddleware.ResponseHandler) { - filter := agent.NewOperationsFilter(cfg.Operations...) - clientStats := client.NewHandler(filter) - stats := newStatsHandler(logger, filter, []agent.StatsProvider{clientStats, provider.GetProcessStats(), provider.GetFlagsStats()}) +func NewHandlers(logger *zap.Logger, cfg agent.StatsConfig, statusCodeEnabled bool, agentStatsEnabled bool) ([]awsmiddleware.RequestHandler, []awsmiddleware.ResponseHandler) { + var requestHandlers []awsmiddleware.RequestHandler + var responseHandlers []awsmiddleware.ResponseHandler + var statsProviders []agent.StatsProvider + + if !statusCodeEnabled && !agentStatsEnabled { + return nil, nil + } + + if statusCodeEnabled { + statusCodeFilter := agent.NewStatusCodeOperationsFilter() + statusCodeStatsProvider := provider.GetStatusCodeStatsProvider() + statusCodeHandler := provider.NewStatusCodeHandler(statusCodeStatsProvider, statusCodeFilter) + responseHandlers = append(responseHandlers, statusCodeHandler) + statsProviders = append(statsProviders, statusCodeStatsProvider) + } + + if agentStatsEnabled { + filter := agent.NewOperationsFilter(cfg.Operations...) + clientStats := client.NewHandler(filter) + statsProviders = append(statsProviders, clientStats, provider.GetProcessStats(), provider.GetFlagsStats()) + responseHandlers = append(responseHandlers, clientStats) + stats := newStatsHandler(logger, filter, statsProviders) + requestHandlers = append(requestHandlers, clientStats, stats) + } + agent.UsageFlags().SetValues(cfg.UsageFlags) - return []awsmiddleware.RequestHandler{stats, clientStats}, []awsmiddleware.ResponseHandler{clientStats} + return requestHandlers, responseHandlers } type statsHandler struct { diff --git a/extension/agenthealth/handler/stats/handler_test.go b/extension/agenthealth/handler/stats/handler_test.go index f40bebd481..30e603c02a 100644 --- a/extension/agenthealth/handler/stats/handler_test.go +++ b/extension/agenthealth/handler/stats/handler_test.go @@ -40,12 +40,16 @@ func TestStatsHandler(t *testing.T) { StatusCode: aws.Int(200), ImdsFallbackSucceed: aws.Int(1), SharedConfigFallback: aws.Int(1), + StatusCodes: map[string][5]int{ + "pmd": {1, 0, 0, 0, 0}, + "di": {0, 1, 0, 0, 0}, + }, } handler := newStatsHandler( zap.NewNop(), agent.NewOperationsFilter(), []agent.StatsProvider{ - newMockStatsProvider(&agent.Stats{CpuPercent: aws.Float64(1.2)}), + newMockStatsProvider(&agent.Stats{CPUPercent: aws.Float64(1.2)}), newMockStatsProvider(&agent.Stats{MemoryBytes: aws.Uint64(123)}), newMockStatsProvider(stats), }, @@ -59,15 +63,31 @@ func TestStatsHandler(t *testing.T) { assert.Equal(t, "", req.Header.Get(headerKeyAgentStats)) handler.filter = agent.NewOperationsFilter(agent.AllowAllOperations) handler.HandleRequest(ctx, req) - assert.Equal(t, `"cpu":1.2,"mem":123,"fd":456,"th":789,"lat":1234,"load":5678,"code":200,"scfb":1,"ifs":1`, req.Header.Get(headerKeyAgentStats)) + assert.Equal(t, `"cpu":1.2,"mem":123,"fd":456,"th":789,"lat":1234,"load":5678,"code":200,"scfb":1,"ifs":1,"codes":{"di":[0,1,0,0,0],"pmd":[1,0,0,0,0]}`, req.Header.Get(headerKeyAgentStats)) stats.StatusCode = aws.Int(404) stats.LatencyMillis = nil handler.HandleRequest(ctx, req) - assert.Equal(t, `"cpu":1.2,"mem":123,"fd":456,"th":789,"load":5678,"code":404,"scfb":1,"ifs":1`, req.Header.Get(headerKeyAgentStats)) + assert.Equal(t, `"cpu":1.2,"mem":123,"fd":456,"th":789,"load":5678,"code":404,"scfb":1,"ifs":1,"codes":{"di":[0,1,0,0,0],"pmd":[1,0,0,0,0]}`, req.Header.Get(headerKeyAgentStats)) } -func TestNewHandlers(t *testing.T) { - requestHandlers, responseHandlers := NewHandlers(zap.NewNop(), agent.StatsConfig{}) +func TestNewHandlersWithStatusCodeOnly(t *testing.T) { + requestHandlers, responseHandlers := NewHandlers(zap.NewNop(), agent.StatsConfig{}, true, false) + assert.Len(t, requestHandlers, 0) + assert.Len(t, responseHandlers, 1) +} +func TestNewHandlersWithAgentStatsOnly(t *testing.T) { + requestHandlers, responseHandlers := NewHandlers(zap.NewNop(), agent.StatsConfig{}, false, true) assert.Len(t, requestHandlers, 2) assert.Len(t, responseHandlers, 1) } + +func TestNewHandlersWithStatusCodeAndAgenthStats(t *testing.T) { + requestHandlers, responseHandlers := NewHandlers(zap.NewNop(), agent.StatsConfig{}, true, true) + assert.Len(t, requestHandlers, 2) + assert.Len(t, responseHandlers, 2) +} +func TestNewHandlersWithoutStatusCodeAndAgenthStats(t *testing.T) { + requestHandlers, responseHandlers := NewHandlers(zap.NewNop(), agent.StatsConfig{}, false, false) + assert.Len(t, requestHandlers, 0) + assert.Len(t, responseHandlers, 0) +} diff --git a/extension/agenthealth/handler/stats/provider/process.go b/extension/agenthealth/handler/stats/provider/process.go index 4e88ebbee5..e6687c9188 100644 --- a/extension/agenthealth/handler/stats/provider/process.go +++ b/extension/agenthealth/handler/stats/provider/process.go @@ -75,7 +75,7 @@ func (p *processStats) updateLoop() { func (p *processStats) refresh() { p.stats.Store(agent.Stats{ - CpuPercent: p.cpuPercent(), + CPUPercent: p.cpuPercent(), MemoryBytes: p.memoryBytes(), FileDescriptorCount: p.fileDescriptorCount(), ThreadCount: p.threadCount(), diff --git a/extension/agenthealth/handler/stats/provider/process_test.go b/extension/agenthealth/handler/stats/provider/process_test.go index 19fac625fb..7a350265eb 100644 --- a/extension/agenthealth/handler/stats/provider/process_test.go +++ b/extension/agenthealth/handler/stats/provider/process_test.go @@ -64,11 +64,11 @@ func TestProcessStats(t *testing.T) { mock := &mockProcessMetrics{} provider := newProcessStats(mock, time.Millisecond) got := provider.getStats() - assert.NotNil(t, got.CpuPercent) + assert.NotNil(t, got.CPUPercent) assert.NotNil(t, got.MemoryBytes) assert.NotNil(t, got.FileDescriptorCount) assert.NotNil(t, got.ThreadCount) - assert.EqualValues(t, 1, *got.CpuPercent) + assert.EqualValues(t, 1, *got.CPUPercent) assert.EqualValues(t, 2, *got.MemoryBytes) assert.EqualValues(t, 3, *got.FileDescriptorCount) assert.EqualValues(t, 4, *got.ThreadCount) @@ -77,6 +77,25 @@ func TestProcessStats(t *testing.T) { mock.mu.Unlock() provider.refresh() assert.Eventually(t, func() bool { - return provider.getStats() == agent.Stats{} + return isAgentStatsReset(provider.getStats()) }, 5*time.Millisecond, time.Millisecond) } + +func isAgentStatsReset(stats agent.Stats) bool { + return stats.CPUPercent == nil && + stats.MemoryBytes == nil && + stats.FileDescriptorCount == nil && + stats.ThreadCount == nil && + stats.LatencyMillis == nil && + stats.PayloadBytes == nil && + stats.StatusCode == nil && + stats.SharedConfigFallback == nil && + stats.ImdsFallbackSucceed == nil && + stats.AppSignals == nil && + stats.EnhancedContainerInsights == nil && + stats.RunningInContainer == nil && + stats.RegionType == nil && + stats.Mode == nil && + stats.EntityRejected == nil && + len(stats.StatusCodes) == 0 +} diff --git a/extension/agenthealth/handler/stats/provider/statuscode.go b/extension/agenthealth/handler/stats/provider/statuscode.go new file mode 100644 index 0000000000..17daaee6b0 --- /dev/null +++ b/extension/agenthealth/handler/stats/provider/statuscode.go @@ -0,0 +1,163 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package provider + +import ( + "context" + "net/http" + "sync" + "time" + + "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" +) + +const ( + statusResetInterval = 5 * time.Minute + statusHandlerID = "cloudwatchagent.StatusCodeHandler" +) + +var ( + statusCodeProviderSingleton *StatusCodeProvider + StatusCodeProviderOnce sync.Once +) + +// StatusCodeProvider handles processing of status codes and maintains stats. +type StatusCodeProvider struct { + currentStats map[string]*[5]int + mu sync.RWMutex + statusCodeChan chan statusCodeEntry + stopChan chan struct{} + resetTicker *time.Ticker + completedStats chan agent.Stats +} + +type statusCodeEntry struct { + operation string + statusCode int +} + +func GetStatusCodeStatsProvider() *StatusCodeProvider { + StatusCodeProviderOnce.Do(func() { + provider := &StatusCodeProvider{ + currentStats: make(map[string]*[5]int), + statusCodeChan: make(chan statusCodeEntry, 1000), + stopChan: make(chan struct{}), + resetTicker: time.NewTicker(statusResetInterval), + completedStats: make(chan agent.Stats, 1), // buffered channel + } + provider.startProcessing() + statusCodeProviderSingleton = provider + }) + return statusCodeProviderSingleton +} + +func (sp *StatusCodeProvider) startProcessing() { + go func() { + for { + select { + case entry := <-sp.statusCodeChan: + sp.processStatusCode(entry) + case <-sp.resetTicker.C: + sp.RotateStats() + case <-sp.stopChan: + sp.resetTicker.Stop() + return + } + } + }() +} + +func (sp *StatusCodeProvider) EnqueueStatusCode(operation string, statusCode int) { + sp.statusCodeChan <- statusCodeEntry{operation: operation, statusCode: statusCode} +} + +func (sp *StatusCodeProvider) processStatusCode(entry statusCodeEntry) { + sp.mu.Lock() + defer sp.mu.Unlock() + + stats, exists := sp.currentStats[entry.operation] + if !exists { + stats = &[5]int{} + sp.currentStats[entry.operation] = stats + } + + switch entry.statusCode { + case 200: + stats[0]++ + case 400: + stats[1]++ + case 408: + stats[2]++ + case 413: + stats[3]++ + case 429: + stats[4]++ + } +} + +func (sp *StatusCodeProvider) RotateStats() { + sp.mu.Lock() + newStats := agent.Stats{ + StatusCodes: make(map[string][5]int, len(sp.currentStats)), + } + for op, stats := range sp.currentStats { + newStats.StatusCodes[op] = *stats + } + sp.currentStats = make(map[string]*[5]int) + sp.mu.Unlock() + + select { + case existingStats := <-sp.completedStats: + existingStats.Merge(newStats) + newStats = existingStats + default: + } + + sp.completedStats <- newStats +} + +func (sp *StatusCodeProvider) Stats(_ string) agent.Stats { + select { + case stats := <-sp.completedStats: + return stats + default: + return agent.Stats{} + } +} + +type StatusCodeHandler struct { + StatusCodeProvider *StatusCodeProvider + filter agent.OperationsFilter +} + +func NewStatusCodeHandler(provider *StatusCodeProvider, filter agent.OperationsFilter) *StatusCodeHandler { + return &StatusCodeHandler{ + StatusCodeProvider: provider, + filter: filter, + } +} + +func (h *StatusCodeHandler) HandleResponse(ctx context.Context, r *http.Response) { + operation := awsmiddleware.GetOperationName(ctx) + if !h.filter.IsAllowed(operation) { + return + } + + operation = agent.GetShortOperationName(operation) + if operation == "" { + return + } + + h.StatusCodeProvider.EnqueueStatusCode(operation, r.StatusCode) +} + +func (h *StatusCodeHandler) ID() string { + return statusHandlerID +} + +func (h *StatusCodeHandler) Position() awsmiddleware.HandlerPosition { + return awsmiddleware.After +} diff --git a/extension/agenthealth/handler/stats/provider/statuscode_test.go b/extension/agenthealth/handler/stats/provider/statuscode_test.go new file mode 100644 index 0000000000..247fecda15 --- /dev/null +++ b/extension/agenthealth/handler/stats/provider/statuscode_test.go @@ -0,0 +1,130 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package provider_test + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/provider" +) + +func TestNewHandlers(t *testing.T) { + logger := zap.NewNop() // Use a no-op logger for testing + cfg := agent.StatsConfig{ + Operations: []string{"TestOperation"}, + } + + t.Run("Only StatusCodeEnabled", func(t *testing.T) { + requestHandlers, responseHandlers := stats.NewHandlers(logger, cfg, true, false) + + assert.Nil(t, requestHandlers, "Request handlers should not be nil") + assert.NotNil(t, responseHandlers, "Response handlers should not be nil") + assert.Len(t, requestHandlers, 0, "There should be 0 request handlers") + assert.Len(t, responseHandlers, 1, "There should be 1 response handler") + + assert.IsType(t, &provider.StatusCodeHandler{}, responseHandlers[0], "First response handler should be StatusCodeHandler") + }) + + t.Run("Only AgentStatsEnabled", func(t *testing.T) { + requestHandlers, responseHandlers := stats.NewHandlers(logger, cfg, false, true) + + assert.NotNil(t, requestHandlers, "Request handlers should not be nil") + assert.NotNil(t, responseHandlers, "Response handlers should not be nil") + assert.GreaterOrEqual(t, len(requestHandlers), 2, "There should be at least 2 request handlers") + assert.GreaterOrEqual(t, len(responseHandlers), 1, "There should be at least 1 response handler") + }) + + t.Run("Both Enabled", func(t *testing.T) { + requestHandlers, responseHandlers := stats.NewHandlers(logger, cfg, true, true) + + assert.NotNil(t, requestHandlers, "Request handlers should not be nil") + assert.NotNil(t, responseHandlers, "Response handlers should not be nil") + assert.GreaterOrEqual(t, len(requestHandlers), 2, "There should be at least 3 request handlers") + assert.GreaterOrEqual(t, len(responseHandlers), 2, "There should be at least 2 response handlers") + }) + + t.Run("Neither Enabled", func(t *testing.T) { + requestHandlers, responseHandlers := stats.NewHandlers(logger, cfg, false, false) + + assert.Nil(t, requestHandlers, "Request handlers should be nil") + assert.Nil(t, responseHandlers, "Response handlers should be nil") + }) +} + +func TestSingleton(t *testing.T) { + instance1 := provider.GetStatusCodeStatsProvider() + instance2 := provider.GetStatusCodeStatsProvider() + + if instance1 != instance2 { + t.Errorf("Expected both instances to be the same, but they are different") + } + + instance1.EnqueueStatusCode("DescribeInstances", 200) + stats1 := instance1.Stats("") + stats2 := instance2.Stats("") + + if stats1.StatusCodes["DescribeInstances"][0] != stats2.StatusCodes["DescribeInstances"][0] { + t.Errorf("Expected the state to be the same across instances, but it differs") + } +} + +func TestStatsResetRace(t *testing.T) { + sp := provider.GetStatusCodeStatsProvider() + + // Pre-populate some stats through the normal channel + sp.EnqueueStatusCode("op1", 200) + sp.EnqueueStatusCode("op2", 400) + + // Give time for the stats to be processed + time.Sleep(10 * time.Millisecond) + + // Trigger a rotation to get some stats in the completedStats channel + sp.RotateStats() + + var wg sync.WaitGroup + wg.Add(3) + + // Goroutine 1: Continuously call the Stats method + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + stats := sp.Stats("") + if stats.StatusCodes != nil { + total := 0 + for _, counts := range stats.StatusCodes { + for _, count := range counts { + total += count + } + } + assert.Greater(t, total, 0, "Should have some status codes counted") + } + } + }() + + // Goroutine 2: Add new status codes + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + sp.EnqueueStatusCode("op3", 200) + } + }() + + // Goroutine 3: Trigger rotations + go func() { + defer wg.Done() + for i := 0; i < 3; i++ { + time.Sleep(1 * time.Millisecond) + sp.RotateStats() + } + }() + + wg.Wait() +} diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go index e1b0f4f457..bbec34db90 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go @@ -403,7 +403,7 @@ func init() { zap.NewNop(), &agenthealth.Config{ IsUsageDataEnabled: envconfig.IsUsageDataEnabled(), - Stats: agent.StatsConfig{Operations: []string{"PutLogEvents"}}, + Stats: &agent.StatsConfig{Operations: []string{"PutLogEvents"}}, }, ), } diff --git a/translator/translate/otel/extension/agenthealth/translator.go b/translator/translate/otel/extension/agenthealth/translator.go index ef39f9390c..5e1d4c0b9b 100644 --- a/translator/translate/otel/extension/agenthealth/translator.go +++ b/translator/translate/otel/extension/agenthealth/translator.go @@ -31,14 +31,25 @@ var ( ) type translator struct { - name string - operations []string - isUsageDataEnabled bool - factory extension.Factory + name string + operations []string + isUsageDataEnabled bool + factory extension.Factory + isStatusCodeEnabled bool } var _ common.Translator[component.Config] = (*translator)(nil) +func NewTranslatorWithStatusCode(name component.DataType, operations []string, isStatusCodeEnabled bool) common.Translator[component.Config] { + return &translator{ + name: name.String(), + operations: operations, + factory: agenthealth.NewFactory(), + isUsageDataEnabled: envconfig.IsUsageDataEnabled(), + isStatusCodeEnabled: isStatusCodeEnabled, + } +} + func NewTranslator(name component.DataType, operations []string) common.Translator[component.Config] { return &translator{ name: name.String(), @@ -59,7 +70,8 @@ func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { if usageData, ok := common.GetBool(conf, common.ConfigKey(common.AgentKey, usageDataKey)); ok { cfg.IsUsageDataEnabled = cfg.IsUsageDataEnabled && usageData } - cfg.Stats = agent.StatsConfig{ + cfg.IsStatusCodeEnabled = t.isStatusCodeEnabled + cfg.Stats = &agent.StatsConfig{ Operations: t.operations, UsageFlags: map[agent.Flag]any{ agent.FlagMode: context.CurrentContext().ShortMode(), diff --git a/translator/translate/otel/extension/agenthealth/translator_test.go b/translator/translate/otel/extension/agenthealth/translator_test.go index 989372e04a..41501ab0bf 100644 --- a/translator/translate/otel/extension/agenthealth/translator_test.go +++ b/translator/translate/otel/extension/agenthealth/translator_test.go @@ -35,7 +35,7 @@ func TestTranslate(t *testing.T) { isEnvUsageData: true, want: &agenthealth.Config{ IsUsageDataEnabled: true, - Stats: agent.StatsConfig{ + Stats: &agent.StatsConfig{ Operations: operations, UsageFlags: usageFlags, }, @@ -46,7 +46,7 @@ func TestTranslate(t *testing.T) { isEnvUsageData: true, want: &agenthealth.Config{ IsUsageDataEnabled: false, - Stats: agent.StatsConfig{ + Stats: &agent.StatsConfig{ Operations: operations, UsageFlags: usageFlags, }, @@ -57,7 +57,7 @@ func TestTranslate(t *testing.T) { isEnvUsageData: false, want: &agenthealth.Config{ IsUsageDataEnabled: false, - Stats: agent.StatsConfig{ + Stats: &agent.StatsConfig{ Operations: operations, UsageFlags: usageFlags, }, @@ -68,7 +68,7 @@ func TestTranslate(t *testing.T) { isEnvUsageData: true, want: &agenthealth.Config{ IsUsageDataEnabled: true, - Stats: agent.StatsConfig{ + Stats: &agent.StatsConfig{ Operations: operations, UsageFlags: usageFlags, },