From 381bd530ef7a33f6719f056cf0feae33d97bc12f Mon Sep 17 00:00:00 2001 From: Paramadon Date: Wed, 11 Dec 2024 18:45:45 -0500 Subject: [PATCH] Improved performance --- .../handler/stats/provider/statuscode.go | 129 ++++++++++-------- .../handler/stats/provider/statuscode_test.go | 37 +++-- 2 files changed, 93 insertions(+), 73 deletions(-) diff --git a/extension/agenthealth/handler/stats/provider/statuscode.go b/extension/agenthealth/handler/stats/provider/statuscode.go index dd8845591d..6b228e0e6c 100644 --- a/extension/agenthealth/handler/stats/provider/statuscode.go +++ b/extension/agenthealth/handler/stats/provider/statuscode.go @@ -1,6 +1,3 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: MIT - package provider import ( @@ -10,7 +7,6 @@ import ( "time" "github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware" - "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" ) @@ -26,37 +22,28 @@ var ( // StatusCodeProvider handles processing of status codes and maintains stats. type StatusCodeProvider struct { - StatsByOperation map[string]*[5]int - ResetTimer *time.Timer - statusCodeChan chan statusCodeEntry - stopChan chan struct{} - ShouldResetStats bool - Mu sync.RWMutex + currentStats map[string]*[5]int + mu sync.RWMutex + statusCodeChan chan statusCodeEntry + stopChan chan struct{} + resetTicker *time.Ticker + completedStats chan map[string][5]int // unbuffered channel for completed stats } -// statusCodeEntry represents a status code and its associated operation. type statusCodeEntry struct { operation string statusCode int } -// StatusCodeHandler is the handler that uses the StatusCodeProvider for processing. -type StatusCodeHandler struct { - StatusCodeProvider *StatusCodeProvider - filter agent.OperationsFilter -} - -// StatusCodeProvider methods - func GetStatusCodeStatsProvider() *StatusCodeProvider { StatusCodeProviderOnce.Do(func() { provider := &StatusCodeProvider{ - StatsByOperation: make(map[string]*[5]int), - statusCodeChan: make(chan statusCodeEntry, 1000), - stopChan: make(chan struct{}), + currentStats: make(map[string]*[5]int), + statusCodeChan: make(chan statusCodeEntry, 1000), + stopChan: make(chan struct{}), + resetTicker: time.NewTicker(statusResetInterval), + completedStats: make(chan map[string][5]int), } - - provider.startResetTimer() provider.startProcessing() statusCodeProviderSingleton = provider }) @@ -69,7 +56,10 @@ func (sp *StatusCodeProvider) startProcessing() { select { case entry := <-sp.statusCodeChan: sp.processStatusCode(entry) + case <-sp.resetTicker.C: + sp.RotateStats() case <-sp.stopChan: + sp.resetTicker.Stop() return } } @@ -77,27 +67,20 @@ func (sp *StatusCodeProvider) startProcessing() { } func (sp *StatusCodeProvider) EnqueueStatusCode(operation string, statusCode int) { - select { - case sp.statusCodeChan <- statusCodeEntry{operation: operation, statusCode: statusCode}: - default: - return - } + sp.statusCodeChan <- statusCodeEntry{operation: operation, statusCode: statusCode} } func (sp *StatusCodeProvider) processStatusCode(entry statusCodeEntry) { - sp.Mu.Lock() - defer sp.Mu.Unlock() + sp.mu.Lock() + defer sp.mu.Unlock() - stats, exists := sp.StatsByOperation[entry.operation] + stats, exists := sp.currentStats[entry.operation] if !exists { stats = &[5]int{} - sp.StatsByOperation[entry.operation] = stats + sp.currentStats[entry.operation] = stats } - sp.updateStatusCodeCount(stats, entry.statusCode) -} -func (sp *StatusCodeProvider) updateStatusCodeCount(stats *[5]int, statusCode int) { - switch statusCode { + switch entry.statusCode { case 200: stats[0]++ case 400: @@ -108,41 +91,67 @@ func (sp *StatusCodeProvider) updateStatusCodeCount(stats *[5]int, statusCode in stats[3]++ case 429: stats[4]++ - default: - return } } -func (sp *StatusCodeProvider) startResetTimer() { - sp.ResetTimer = time.AfterFunc(statusResetInterval, func() { - sp.Mu.Lock() - sp.ShouldResetStats = true - sp.Mu.Unlock() - }) -} - -func (sp *StatusCodeProvider) Stats(_ string) agent.Stats { - sp.Mu.Lock() - defer sp.Mu.Unlock() +func (sp *StatusCodeProvider) RotateStats() { + sp.mu.Lock() + newStats := make(map[string][5]int, len(sp.currentStats)) + for op, stats := range sp.currentStats { + newStats[op] = *stats + } + sp.currentStats = make(map[string]*[5]int) + sp.mu.Unlock() - statusCodeMap := make(map[string][5]int) - if sp.ShouldResetStats { - for op, stats := range sp.StatsByOperation { - statusCodeMap[op] = *stats + // Try to merge with existing stats with a timeout + select { + case existingStats := <-sp.completedStats: + // Merge existing stats with new stats + for op, stats := range existingStats { + if currentStats, exists := newStats[op]; !exists { + newStats[op] = stats + } else { + var mergedStats [5]int + for i := range stats { + mergedStats[i] = currentStats[i] + stats[i] + } + newStats[op] = mergedStats + } } - sp.StatsByOperation = make(map[string]*[5]int) - sp.ShouldResetStats = false + case <-time.After(100 * time.Millisecond): + // Timeout if can't read from channel } - return agent.Stats{ - StatusCodes: statusCodeMap, + // Try to write with timeout + select { + case sp.completedStats <- newStats: + case <-time.After(100 * time.Millisecond): + // If we can't write, log error or handle appropriately + } +} + +func (sp *StatusCodeProvider) Stats(_ string) agent.Stats { + select { + case stats := <-sp.completedStats: + return agent.Stats{ + StatusCodes: stats, + } + default: + return agent.Stats{} } } -// StatusCodeHandler methods +// StatusCodeHandler implementation remains the same +type StatusCodeHandler struct { + StatusCodeProvider *StatusCodeProvider + filter agent.OperationsFilter +} func NewStatusCodeHandler(provider *StatusCodeProvider, filter agent.OperationsFilter) *StatusCodeHandler { - return &StatusCodeHandler{StatusCodeProvider: provider, filter: filter} + return &StatusCodeHandler{ + StatusCodeProvider: provider, + filter: filter, + } } func (h *StatusCodeHandler) HandleResponse(ctx context.Context, r *http.Response) { diff --git a/extension/agenthealth/handler/stats/provider/statuscode_test.go b/extension/agenthealth/handler/stats/provider/statuscode_test.go index 9cdefd123d..51c3781525 100644 --- a/extension/agenthealth/handler/stats/provider/statuscode_test.go +++ b/extension/agenthealth/handler/stats/provider/statuscode_test.go @@ -6,6 +6,7 @@ package provider_test import ( "sync" "testing" + "time" "github.com/stretchr/testify/assert" "go.uber.org/zap" @@ -75,34 +76,44 @@ func TestSingleton(t *testing.T) { } } -func TestStatsResetRace(_ *testing.T) { +func TestStatsResetRace(t *testing.T) { sp := provider.GetStatusCodeStatsProvider() - // Initialize the map in a thread-safe manner - sp.Mu.Lock() - sp.StatsByOperation = map[string]*[5]int{ - "op1": {1, 2, 3, 4, 5}, - "op2": {6, 7, 8, 9, 10}, - } - sp.Mu.Unlock() + // Pre-populate some initial stats and rotate + sp.EnqueueStatusCode("op1", 200) + sp.EnqueueStatusCode("op2", 400) + sp.RotateStats() var wg sync.WaitGroup - wg.Add(2) + wg.Add(3) - // Goroutine 1: Continuously call the Stats method + // Goroutine 1: Call Stats method go func() { defer wg.Done() - for i := 0; i < 1000; i++ { - _ = sp.Stats("") + for i := 0; i < 100; i++ { + stats := sp.Stats("") + if stats.StatusCodes != nil { + assert.Greater(t, len(stats.StatusCodes), 0) + } } }() + // Goroutine 2: Add new status codes go func() { defer wg.Done() - for i := 0; i < 1000; i++ { + for i := 0; i < 100; i++ { sp.EnqueueStatusCode("op3", 200) } }() + // Goroutine 3: Trigger rotations + go func() { + defer wg.Done() + for i := 0; i < 3; i++ { + time.Sleep(1 * time.Millisecond) + sp.RotateStats() + } + }() + wg.Wait() }