From 52a4af3c17c01d6a81ac111d0e3c22dac01e2723 Mon Sep 17 00:00:00 2001 From: Jeffrey Chien Date: Thu, 26 Oct 2023 15:35:02 -0400 Subject: [PATCH] Fix client stats payload measurement (#931) --- .../agenthealth/handler/stats/agent/agent.go | 9 +++++ .../handler/stats/agent/agent_test.go | 14 +++++-- .../handler/stats/client/client.go | 38 ++++++++++--------- .../handler/stats/client/client_test.go | 2 +- .../agenthealth/handler/stats/handler.go | 4 +- .../handler/stats/provider/flag.go | 29 ++++++-------- .../handler/stats/provider/flag_test.go | 8 ++-- .../handler/stats/provider/interval.go | 19 +++++++--- .../handler/stats/provider/interval_test.go | 6 ++- .../handler/stats/provider/process.go | 6 +-- .../handler/stats/provider/process_test.go | 4 +- .../agenthealth/handler/useragent/handler.go | 5 +++ .../handler/useragent/useragent.go | 3 -- .../outputs/cloudwatchlogs/cloudwatchlogs.go | 8 +++- 14 files changed, 93 insertions(+), 62 deletions(-) diff --git a/extension/agenthealth/handler/stats/agent/agent.go b/extension/agenthealth/handler/stats/agent/agent.go index abe77f5df0..6eaf97d4bd 100644 --- a/extension/agenthealth/handler/stats/agent/agent.go +++ b/extension/agenthealth/handler/stats/agent/agent.go @@ -67,6 +67,15 @@ func (s *Stats) Merge(other Stats) { if other.EnhancedContainerInsights != nil { s.EnhancedContainerInsights = other.EnhancedContainerInsights } + if other.RunningInContainer != nil { + s.RunningInContainer = other.RunningInContainer + } + if other.RegionType != nil { + s.RegionType = other.RegionType + } + if other.Mode != nil { + s.Mode = other.Mode + } } func (s *Stats) Marshal() (string, error) { diff --git a/extension/agenthealth/handler/stats/agent/agent_test.go b/extension/agenthealth/handler/stats/agent/agent_test.go index 8322179dfc..c379facc19 100644 --- a/extension/agenthealth/handler/stats/agent/agent_test.go +++ b/extension/agenthealth/handler/stats/agent/agent_test.go @@ -21,18 +21,23 @@ func TestMerge(t *testing.T) { assert.EqualValues(t, 1.3, *stats.CpuPercent) assert.EqualValues(t, 123, *stats.MemoryBytes) stats.Merge(Stats{ + CpuPercent: aws.Float64(1.5), + MemoryBytes: aws.Uint64(133), FileDescriptorCount: aws.Int32(456), ThreadCount: aws.Int32(789), LatencyMillis: aws.Int64(1234), PayloadBytes: aws.Int(5678), StatusCode: aws.Int(200), - ImdsFallbackSucceed: aws.Int(1), SharedConfigFallback: aws.Int(1), + ImdsFallbackSucceed: aws.Int(1), AppSignals: aws.Int(1), EnhancedContainerInsights: aws.Int(1), + RunningInContainer: aws.Int(0), + RegionType: aws.String("RegionType"), + Mode: aws.String("Mode"), }) - assert.EqualValues(t, 1.3, *stats.CpuPercent) - assert.EqualValues(t, 123, *stats.MemoryBytes) + assert.EqualValues(t, 1.5, *stats.CpuPercent) + assert.EqualValues(t, 133, *stats.MemoryBytes) assert.EqualValues(t, 456, *stats.FileDescriptorCount) assert.EqualValues(t, 789, *stats.ThreadCount) assert.EqualValues(t, 1234, *stats.LatencyMillis) @@ -42,6 +47,9 @@ func TestMerge(t *testing.T) { assert.EqualValues(t, 1, *stats.SharedConfigFallback) assert.EqualValues(t, 1, *stats.AppSignals) assert.EqualValues(t, 1, *stats.EnhancedContainerInsights) + assert.EqualValues(t, 0, *stats.RunningInContainer) + assert.EqualValues(t, "RegionType", *stats.RegionType) + assert.EqualValues(t, "Mode", *stats.Mode) } func TestMarshal(t *testing.T) { diff --git a/extension/agenthealth/handler/stats/client/client.go b/extension/agenthealth/handler/stats/client/client.go index 9b87d2533b..40567d929e 100644 --- a/extension/agenthealth/handler/stats/client/client.go +++ b/extension/agenthealth/handler/stats/client/client.go @@ -18,7 +18,7 @@ import ( ) const ( - handlerID = "cloudwatchagent.ClientStatsHandler" + handlerID = "cloudwatchagent.ClientStats" ttlDuration = 10 * time.Second cacheSize = 1000 ) @@ -35,13 +35,11 @@ type requestRecorder struct { } type clientStatsHandler struct { - mu sync.Mutex - filter agent.OperationsFilter getOperationName func(ctx context.Context) string getRequestID func(ctx context.Context) string - statsByOperation map[string]agent.Stats + statsByOperation sync.Map requestCache *ttlcache.Cache[string, *requestRecorder] } @@ -59,7 +57,6 @@ func NewHandler(filter agent.OperationsFilter) Stats { getOperationName: awsmiddleware.GetOperationName, getRequestID: awsmiddleware.GetRequestID, requestCache: requestCache, - statsByOperation: make(map[string]agent.Stats), } } @@ -68,7 +65,7 @@ func (csh *clientStatsHandler) ID() string { } func (csh *clientStatsHandler) Position() awsmiddleware.HandlerPosition { - return awsmiddleware.Before + return awsmiddleware.After } func (csh *clientStatsHandler) HandleRequest(ctx context.Context, r *http.Request) { @@ -76,11 +73,14 @@ func (csh *clientStatsHandler) HandleRequest(ctx context.Context, r *http.Reques if !csh.filter.IsAllowed(operation) { return } - csh.mu.Lock() - defer csh.mu.Unlock() requestID := csh.getRequestID(ctx) recorder := &requestRecorder{start: time.Now()} - recorder.payloadBytes, _ = io.Copy(io.Discard, r.Body) + if r.GetBody != nil { + body, err := r.GetBody() + if err == nil { + recorder.payloadBytes, err = io.Copy(io.Discard, body) + } + } csh.requestCache.Set(requestID, recorder, ttlcache.DefaultTTL) } @@ -89,8 +89,6 @@ func (csh *clientStatsHandler) HandleResponse(ctx context.Context, r *http.Respo if !csh.filter.IsAllowed(operation) { return } - csh.mu.Lock() - defer csh.mu.Unlock() requestID := csh.getRequestID(ctx) item, ok := csh.requestCache.GetAndDelete(requestID) if !ok { @@ -101,15 +99,19 @@ func (csh *clientStatsHandler) HandleResponse(ctx context.Context, r *http.Respo PayloadBytes: aws.Int(int(recorder.payloadBytes)), StatusCode: aws.Int(r.StatusCode), } - latency := time.Since(recorder.start).Milliseconds() - stats.LatencyMillis = aws.Int64(latency) - csh.statsByOperation[operation] = stats + latency := time.Since(recorder.start) + stats.LatencyMillis = aws.Int64(latency.Milliseconds()) + csh.statsByOperation.Store(operation, stats) } func (csh *clientStatsHandler) Stats(operation string) agent.Stats { - csh.mu.Lock() - defer csh.mu.Unlock() - stats := csh.statsByOperation[operation] - csh.statsByOperation[operation] = agent.Stats{} + value, ok := csh.statsByOperation.Load(operation) + if !ok { + return agent.Stats{} + } + stats, ok := value.(agent.Stats) + if !ok { + return agent.Stats{} + } return stats } diff --git a/extension/agenthealth/handler/stats/client/client_test.go b/extension/agenthealth/handler/stats/client/client_test.go index 3f8a9704cf..649d4b340a 100644 --- a/extension/agenthealth/handler/stats/client/client_test.go +++ b/extension/agenthealth/handler/stats/client/client_test.go @@ -23,7 +23,7 @@ func TestHandle(t *testing.T) { handler.(*clientStatsHandler).getOperationName = func(context.Context) string { return operation } - assert.Equal(t, awsmiddleware.Before, handler.Position()) + assert.Equal(t, awsmiddleware.After, handler.Position()) assert.Equal(t, handlerID, handler.ID()) body := []byte("test payload size") req, err := http.NewRequest("", "localhost", bytes.NewBuffer(body)) diff --git a/extension/agenthealth/handler/stats/handler.go b/extension/agenthealth/handler/stats/handler.go index 77ccb597ad..f5fe991125 100644 --- a/extension/agenthealth/handler/stats/handler.go +++ b/extension/agenthealth/handler/stats/handler.go @@ -17,7 +17,7 @@ import ( ) const ( - handlerID = "cloudwatchagent.StatsHandler" + handlerID = "cloudwatchagent.AgentStats" headerKeyAgentStats = "X-Amz-Agent-Stats" ) @@ -25,7 +25,7 @@ func NewHandlers(logger *zap.Logger, cfg agent.StatsConfig) ([]awsmiddleware.Req filter := agent.NewOperationsFilter(cfg.Operations...) clientStats := client.NewHandler(filter) stats := newStatsHandler(logger, filter, []agent.StatsProvider{clientStats, provider.GetProcessStats(), provider.GetFlagsStats()}) - return []awsmiddleware.RequestHandler{clientStats, stats}, []awsmiddleware.ResponseHandler{clientStats} + return []awsmiddleware.RequestHandler{stats, clientStats}, []awsmiddleware.ResponseHandler{clientStats} } type statsHandler struct { diff --git a/extension/agenthealth/handler/stats/provider/flag.go b/extension/agenthealth/handler/stats/provider/flag.go index c90df8fd1a..de684e1f86 100644 --- a/extension/agenthealth/handler/stats/provider/flag.go +++ b/extension/agenthealth/handler/stats/provider/flag.go @@ -17,20 +17,15 @@ const ( flagGetInterval = 5 * time.Minute ) -type BoolFlag int +type Flag int const ( - FlagIMDSFallbackSucceed BoolFlag = iota + FlagIMDSFallbackSucceed Flag = iota FlagSharedConfigFallback FlagAppSignal FlagEnhancedContainerInsights FlagRunningInContainer -) - -type StringFlag int - -const ( - FlagMode StringFlag = iota + FlagMode FlagRegionType ) @@ -41,8 +36,8 @@ var ( type FlagStats interface { agent.StatsProvider - SetFlag(flag BoolFlag) - SetFlagWithValue(flag StringFlag, value string) + SetFlag(flag Flag) + SetFlagWithValue(flag Flag, value string) } type flagStats struct { @@ -54,9 +49,7 @@ type flagStats struct { var _ FlagStats = (*flagStats)(nil) func (p *flagStats) update() { - p.mu.Lock() - defer p.mu.Unlock() - p.stats = agent.Stats{ + p.stats.Store(agent.Stats{ ImdsFallbackSucceed: p.getIntFlag(FlagIMDSFallbackSucceed, false), SharedConfigFallback: p.getIntFlag(FlagSharedConfigFallback, false), AppSignals: p.getIntFlag(FlagAppSignal, false), @@ -64,10 +57,10 @@ func (p *flagStats) update() { RunningInContainer: p.getIntFlag(FlagRunningInContainer, true), Mode: p.getStringFlag(FlagMode), RegionType: p.getStringFlag(FlagRegionType), - } + }) } -func (p *flagStats) getIntFlag(flag BoolFlag, missingAsZero bool) *int { +func (p *flagStats) getIntFlag(flag Flag, missingAsZero bool) *int { if _, ok := p.flags.Load(flag); ok { return aws.Int(1) } @@ -77,7 +70,7 @@ func (p *flagStats) getIntFlag(flag BoolFlag, missingAsZero bool) *int { return nil } -func (p *flagStats) getStringFlag(flag StringFlag) *string { +func (p *flagStats) getStringFlag(flag Flag) *string { value, ok := p.flags.Load(flag) if !ok { return nil @@ -90,14 +83,14 @@ func (p *flagStats) getStringFlag(flag StringFlag) *string { return aws.String(str) } -func (p *flagStats) SetFlag(flag BoolFlag) { +func (p *flagStats) SetFlag(flag Flag) { if _, ok := p.flags.Load(flag); !ok { p.flags.Store(flag, true) p.update() } } -func (p *flagStats) SetFlagWithValue(flag StringFlag, value string) { +func (p *flagStats) SetFlagWithValue(flag Flag, value string) { if _, ok := p.flags.Load(flag); !ok { p.flags.Store(flag, value) p.update() diff --git a/extension/agenthealth/handler/stats/provider/flag_test.go b/extension/agenthealth/handler/stats/provider/flag_test.go index f4890a7b9e..cbc42c094a 100644 --- a/extension/agenthealth/handler/stats/provider/flag_test.go +++ b/extension/agenthealth/handler/stats/provider/flag_test.go @@ -15,23 +15,23 @@ import ( func TestFlagStats(t *testing.T) { t.Setenv(envconfig.RunInContainer, envconfig.TrueValue) provider := newFlagStats(time.Microsecond) - got := provider.stats + got := provider.getStats() assert.Nil(t, got.ImdsFallbackSucceed) assert.Nil(t, got.SharedConfigFallback) assert.NotNil(t, got.RunningInContainer) assert.Equal(t, 1, *got.RunningInContainer) provider.SetFlag(FlagIMDSFallbackSucceed) assert.Nil(t, got.ImdsFallbackSucceed) - got = provider.stats + got = provider.getStats() assert.NotNil(t, got.ImdsFallbackSucceed) assert.Equal(t, 1, *got.ImdsFallbackSucceed) assert.Nil(t, got.SharedConfigFallback) provider.SetFlag(FlagSharedConfigFallback) - got = provider.stats + got = provider.getStats() assert.NotNil(t, got.SharedConfigFallback) assert.Equal(t, 1, *got.SharedConfigFallback) provider.SetFlagWithValue(FlagMode, "test") - got = provider.stats + got = provider.getStats() assert.NotNil(t, got.Mode) assert.Equal(t, "test", *got.Mode) } diff --git a/extension/agenthealth/handler/stats/provider/interval.go b/extension/agenthealth/handler/stats/provider/interval.go index ce0c885ce7..bc5dc8e24e 100644 --- a/extension/agenthealth/handler/stats/provider/interval.go +++ b/extension/agenthealth/handler/stats/provider/interval.go @@ -5,6 +5,7 @@ package provider import ( "sync" + "sync/atomic" "time" "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" @@ -13,29 +14,37 @@ import ( // intervalStats restricts the Stats get function to once // per interval. type intervalStats struct { - mu sync.Mutex + mu sync.RWMutex interval time.Duration getOnce *sync.Once lastGet time.Time - stats agent.Stats + stats atomic.Value } var _ agent.StatsProvider = (*intervalStats)(nil) func (p *intervalStats) Stats(string) agent.Stats { - p.mu.Lock() - defer p.mu.Unlock() + p.mu.RLock() + defer p.mu.RUnlock() var stats agent.Stats p.getOnce.Do(func() { p.lastGet = time.Now() - stats = p.stats + stats = p.getStats() go p.allowNextGetAfter(p.interval) }) return stats } +func (p *intervalStats) getStats() agent.Stats { + var stats agent.Stats + if value := p.stats.Load(); value != nil { + stats = value.(agent.Stats) + } + return stats +} + func (p *intervalStats) allowNextGetAfter(interval time.Duration) { time.Sleep(interval) p.mu.Lock() diff --git a/extension/agenthealth/handler/stats/provider/interval_test.go b/extension/agenthealth/handler/stats/provider/interval_test.go index 3e54c57637..cd2be8899b 100644 --- a/extension/agenthealth/handler/stats/provider/interval_test.go +++ b/extension/agenthealth/handler/stats/provider/interval_test.go @@ -9,11 +9,15 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/stretchr/testify/assert" + + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" ) func TestIntervalStats(t *testing.T) { s := newIntervalStats(time.Millisecond) - s.stats.ThreadCount = aws.Int32(2) + s.stats.Store(agent.Stats{ + ThreadCount: aws.Int32(2), + }) got := s.Stats("") assert.NotNil(t, got.ThreadCount) got = s.Stats("") diff --git a/extension/agenthealth/handler/stats/provider/process.go b/extension/agenthealth/handler/stats/provider/process.go index 48ca9d9d83..4e88ebbee5 100644 --- a/extension/agenthealth/handler/stats/provider/process.go +++ b/extension/agenthealth/handler/stats/provider/process.go @@ -74,14 +74,12 @@ func (p *processStats) updateLoop() { } func (p *processStats) refresh() { - p.mu.Lock() - defer p.mu.Unlock() - p.stats = agent.Stats{ + p.stats.Store(agent.Stats{ CpuPercent: p.cpuPercent(), MemoryBytes: p.memoryBytes(), FileDescriptorCount: p.fileDescriptorCount(), ThreadCount: p.threadCount(), - } + }) } func newProcessStats(proc processMetrics, interval time.Duration) *processStats { diff --git a/extension/agenthealth/handler/stats/provider/process_test.go b/extension/agenthealth/handler/stats/provider/process_test.go index f1368a883d..3448ad2fcd 100644 --- a/extension/agenthealth/handler/stats/provider/process_test.go +++ b/extension/agenthealth/handler/stats/provider/process_test.go @@ -50,7 +50,7 @@ func TestProcessStats(t *testing.T) { testErr := errors.New("test error") mock := &mockProcessMetrics{} provider := newProcessStats(mock, time.Millisecond) - got := provider.stats + got := provider.getStats() assert.NotNil(t, got.CpuPercent) assert.NotNil(t, got.MemoryBytes) assert.NotNil(t, got.FileDescriptorCount) @@ -61,7 +61,7 @@ func TestProcessStats(t *testing.T) { assert.EqualValues(t, 4, *got.ThreadCount) mock.err = testErr time.Sleep(2 * time.Millisecond) - got = provider.stats + got = provider.getStats() assert.Nil(t, got.CpuPercent) assert.Nil(t, got.MemoryBytes) assert.Nil(t, got.FileDescriptorCount) diff --git a/extension/agenthealth/handler/useragent/handler.go b/extension/agenthealth/handler/useragent/handler.go index 1c2f41d2d6..c5933fa026 100644 --- a/extension/agenthealth/handler/useragent/handler.go +++ b/extension/agenthealth/handler/useragent/handler.go @@ -11,6 +11,11 @@ import ( "go.uber.org/atomic" ) +const ( + handlerID = "cloudwatchagent.UserAgent" + headerKeyUserAgent = "User-Agent" +) + type userAgentHandler struct { userAgent UserAgent isUsageDataEnabled bool diff --git a/extension/agenthealth/handler/useragent/useragent.go b/extension/agenthealth/handler/useragent/useragent.go index 0ab0a3e5cd..976058d9ba 100644 --- a/extension/agenthealth/handler/useragent/useragent.go +++ b/extension/agenthealth/handler/useragent/useragent.go @@ -25,9 +25,6 @@ import ( ) const ( - handlerID = "cloudwatchagent.UserAgentHandler" - headerKeyUserAgent = "User-Agent" - flagRunAsUser = "run_as_user" flagContainerInsights = "container_insights" flagAppSignals = "app_signals" diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go index 071e96af98..fb04338dc7 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go @@ -22,6 +22,7 @@ import ( configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws" "github.com/aws/amazon-cloudwatch-agent/cfg/envconfig" "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth" + "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/provider" "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/useragent" "github.com/aws/amazon-cloudwatch-agent/handlers" @@ -155,6 +156,8 @@ func (c *CloudWatchLogs) getDest(t Target) *cwDest { if c.middleware != nil { if err := awsmiddleware.NewConfigurer(c.middleware.Handlers()).Configure(awsmiddleware.SDKv1(&client.Handlers)); err != nil { c.Log.Errorf("Unable to configure middleware on cloudwatch logs client: %v", err) + } else { + c.Log.Info("Configured middleware on AWS client") } } pusher := NewPusher(t, client, c.ForceFlushInterval.Duration, maxRetryTimeout, c.Log, c.pusherStopChan, &c.pusherWaitGroup) @@ -396,7 +399,10 @@ func init() { cwDests: make(map[Target]*cwDest), middleware: agenthealth.NewAgentHealth( zap.NewNop(), - &agenthealth.Config{IsUsageDataEnabled: envconfig.IsUsageDataEnabled()}, + &agenthealth.Config{ + IsUsageDataEnabled: envconfig.IsUsageDataEnabled(), + Stats: agent.StatsConfig{Operations: []string{"PutLogEvents"}}, + }, ), } })