From 26a8d281ce74295f72f69d1443458dcea73af1f9 Mon Sep 17 00:00:00 2001 From: Jeffrey Chien Date: Tue, 17 Aug 2021 14:05:59 -0400 Subject: [PATCH] Added a custom retryer implementation to log any throttling events handled by the SDK. (#243) --- internal/retryer/logthrottle.go | 93 ++++++++++++ internal/retryer/logthrottle_test.go | 139 ++++++++++++++++++ plugins/outputs/cloudwatch/cloudwatch.go | 8 + .../outputs/cloudwatchlogs/cloudwatchlogs.go | 7 +- 4 files changed, 246 insertions(+), 1 deletion(-) create mode 100644 internal/retryer/logthrottle.go create mode 100644 internal/retryer/logthrottle_test.go diff --git a/internal/retryer/logthrottle.go b/internal/retryer/logthrottle.go new file mode 100644 index 0000000000..a8b18fffc9 --- /dev/null +++ b/internal/retryer/logthrottle.go @@ -0,0 +1,93 @@ +package retryer + +import ( + "time" + + "github.com/aws/aws-sdk-go/aws/client" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/influxdata/telegraf" +) + +var ( + throttleReportTimeout = 1 * time.Minute + throttleReportCheckPeriod = 5 * time.Second +) + +type LogThrottleRetryer struct { + Log telegraf.Logger + + throttleChan chan error + done chan struct{} + + client.DefaultRetryer +} + +func NewLogThrottleRetryer(logger telegraf.Logger) *LogThrottleRetryer { + r := &LogThrottleRetryer{ + Log: logger, + throttleChan: make(chan error, 1), + done: make(chan struct{}), + DefaultRetryer: client.DefaultRetryer{NumMaxRetries: client.DefaultRetryerMaxNumRetries}, + } + + go r.watchThrottleEvents() + return r +} + +func (r *LogThrottleRetryer) ShouldRetry(req *request.Request) bool { + if req.IsErrorThrottle() { + r.throttleChan <- req.Error + } + + // Fallback to SDK's built in retry rules + return r.DefaultRetryer.ShouldRetry(req) +} + +func (r *LogThrottleRetryer) Stop() { + if r != nil { + close(r.done) + } +} + +func (r *LogThrottleRetryer) watchThrottleEvents() { + ticker := time.NewTicker(throttleReportCheckPeriod) + defer ticker.Stop() + + var start time.Time + var err error + cnt := 0 + for { + select { + case err = <-r.throttleChan: + // Log first throttle if there has not been any recent throttling events + if cnt == 0 { + if time.Since(start) > 2*throttleReportTimeout { + r.Log.Infof("aws api call throttling detected: %v", err) + } else { + r.Log.Debugf("aws api call throttling detected: %v", err) + } + start = time.Now() + } else { + r.Log.Debugf("aws api call throttling detected: %v", err) + } + cnt++ + case <-ticker.C: + if cnt == 0 { + continue + } + d := time.Since(start) + if d > throttleReportTimeout { + if cnt > 1 { + r.Log.Infof("aws api call has been throttled for %v times in the past %v, last throttle error message: %v", cnt, d, err) + } + cnt = 0 + } + case <-r.done: + if cnt > 0 { + r.Log.Infof("aws api call has been throttled for %v times in the past %v, last throttle error message: %v", cnt, time.Since(start), err) + } + r.Log.Debugf("LogThrottleRetryer watch throttle events goroutine exiting") + return + } + } +} diff --git a/internal/retryer/logthrottle_test.go b/internal/retryer/logthrottle_test.go new file mode 100644 index 0000000000..d28b33a0c4 --- /dev/null +++ b/internal/retryer/logthrottle_test.go @@ -0,0 +1,139 @@ +package retryer + +import ( + "fmt" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" +) + +type testLogger struct { + debugs, infos, warns, errors []string +} + +func (l *testLogger) Errorf(format string, args ...interface{}) { + line := fmt.Sprintf(format, args...) + l.errors = append(l.errors, line) +} + +func (l *testLogger) Error(args ...interface{}) { + line := fmt.Sprint(args...) + l.errors = append(l.errors, line) +} + +func (l *testLogger) Debugf(format string, args ...interface{}) { + line := fmt.Sprintf(format, args...) + l.debugs = append(l.debugs, line) +} + +func (l *testLogger) Debug(args ...interface{}) { + line := fmt.Sprint(args...) + l.debugs = append(l.debugs, line) +} + +func (l *testLogger) Warnf(format string, args ...interface{}) { + line := fmt.Sprintf(format, args...) + l.warns = append(l.warns, line) +} + +func (l *testLogger) Warn(args ...interface{}) { + line := fmt.Sprint(args...) + l.warns = append(l.warns, line) +} + +func (l *testLogger) Infof(format string, args ...interface{}) { + line := fmt.Sprintf(format, args...) + l.infos = append(l.infos, line) +} + +func (l *testLogger) Info(args ...interface{}) { + line := fmt.Sprint(args...) + l.infos = append(l.infos, line) +} + +func TestLogThrottleRetryerLogging(t *testing.T) { + const throttleDetectedLine = "aws api call throttling detected: RequestLimitExceeded: Test AWS Error" + const watchGoroutineExitLine = "LogThrottleRetryer watch throttle events goroutine exiting" + const throttleSummaryLinePrefix = "aws api call has been throttled for" + const throttleBatchSize = 100 + const totalThrottleCnt = throttleBatchSize * 2 // Test total 2 batches + const expectedDebugCnt = totalThrottleCnt - 2 // 2 of them are being log at info level + + setup() + defer tearDown() + + l := &testLogger{} + r := NewLogThrottleRetryer(l) + + req := &request.Request{ + Error: awserr.New("RequestLimitExceeded", "Test AWS Error", nil), + } + + // Generate 200 throttles with a time gap between + for i := 0; i < throttleBatchSize; i++ { + r.ShouldRetry(req) + time.Sleep(10 * time.Millisecond) + } + + time.Sleep(1500 * time.Millisecond) + + for i := 0; i < throttleBatchSize; i++ { + r.ShouldRetry(req) + time.Sleep(10 * time.Millisecond) + } + + r.Stop() + time.Sleep(200 * time.Millisecond) // Wait a bit to collect all logs + + // Check the debug level log messages + debugCnt := 0 + for _, d := range l.debugs { + if d == throttleDetectedLine { + debugCnt++ + } else if d != watchGoroutineExitLine { + t.Errorf("unexpected debug log found: %v", d) + } + } + if debugCnt != expectedDebugCnt { + t.Errorf("wrong number of debug logs found, expected") + } + + // Check the info level log messages + detectCnt := 0 + throttleCnt := 0 + for i, info := range l.infos { + if info == throttleDetectedLine { + if i > 0 { + if throttleCnt != throttleBatchSize { + t.Errorf("wrong number of throttle count reported, expecting %v, got %v", throttleBatchSize, throttleCnt) + } + } + detectCnt++ + throttleCnt = 0 + } else if strings.HasPrefix(info, throttleSummaryLinePrefix) { + n := 0 + fmt.Sscanf(info, throttleSummaryLinePrefix+" %d", &n) + throttleCnt += n + } + } + + if detectCnt != 2 { + t.Errorf("wrong number of throttle detected info log found, expecting 2, got %v", detectCnt) + } + if throttleCnt != throttleBatchSize { + t.Errorf("wrong number of throttle count reported, expecting %v, got %v", throttleBatchSize, throttleCnt) + } +} + +func setup() { + throttleReportTimeout = 400 * time.Millisecond + throttleReportCheckPeriod = 50 * time.Millisecond +} + +func tearDown() { + throttleReportTimeout = 1 * time.Minute + throttleReportCheckPeriod = 5 * time.Second +} diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go index bd641f6244..23c86afaf9 100644 --- a/plugins/outputs/cloudwatch/cloudwatch.go +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -15,6 +15,7 @@ import ( "time" "github.com/aws/amazon-cloudwatch-agent/internal/publisher" + "github.com/aws/amazon-cloudwatch-agent/internal/retryer" "github.com/aws/amazon-cloudwatch-agent/cfg/agentinfo" internalaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws" @@ -63,6 +64,8 @@ type CloudWatch struct { RollupDimensions [][]string `toml:"rollup_dimensions"` Namespace string `toml:"namespace"` // CloudWatch Metrics Namespace + Log telegraf.Logger `toml:"-"` + svc cloudwatchiface.CloudWatchAPI aggregator Aggregator aggregatorShutdownChan chan struct{} @@ -76,6 +79,7 @@ type CloudWatch struct { metricDecorations *MetricDecorations retries int publisher *publisher.Publisher + retryer *retryer.LogThrottleRetryer } var sampleConfig = ` @@ -132,11 +136,13 @@ func (c *CloudWatch) Connect() error { } configProvider := credentialConfig.Credentials() + logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log) svc := cloudwatch.New( configProvider, &aws.Config{ Endpoint: aws.String(c.EndpointOverride), HTTPClient: &http.Client{Timeout: 1 * time.Minute}, + Retryer: logThrottleRetryer, }) svc.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{opPutLogEvents, opPutMetricData})) @@ -146,6 +152,7 @@ func (c *CloudWatch) Connect() error { c.RollupDimensions = GetUniqueRollupList(c.RollupDimensions) c.svc = svc + c.retryer = logThrottleRetryer c.startRoutines() return nil } @@ -190,6 +197,7 @@ func (c *CloudWatch) Close() error { } close(c.shutdownChan) c.publisher.Close() + c.retryer.Stop() log.Println("D! Stopped the CloudWatch output plugin") return nil } diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go index 8a9db3130a..82c52972b3 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go @@ -15,6 +15,7 @@ import ( configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws" "github.com/aws/amazon-cloudwatch-agent/handlers" "github.com/aws/amazon-cloudwatch-agent/internal" + "github.com/aws/amazon-cloudwatch-agent/internal/retryer" "github.com/aws/amazon-cloudwatch-agent/logs" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" @@ -109,18 +110,20 @@ func (c *CloudWatchLogs) getDest(t Target) *cwDest { Token: c.Token, } + logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log) client := cloudwatchlogs.New( credentialConfig.Credentials(), &aws.Config{ Endpoint: aws.String(c.EndpointOverride), HTTPClient: &http.Client{Timeout: 1 * time.Minute}, + Retryer: logThrottleRetryer, }, ) client.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{"PutLogEvents"})) client.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("User-Agent", agentinfo.UserAgent())) pusher := NewPusher(t, client, c.ForceFlushInterval.Duration, maxRetryTimeout, c.Log) - cwd := &cwDest{pusher: pusher} + cwd := &cwDest{pusher: pusher, retryer: logThrottleRetryer} c.cwDests[t] = cwd return cwd } @@ -260,6 +263,7 @@ type cwDest struct { sync.Mutex isEMF bool stopped bool + retryer *retryer.LogThrottleRetryer } func (cd *cwDest) Publish(events []logs.LogEvent) error { @@ -280,6 +284,7 @@ func (cd *cwDest) Publish(events []logs.LogEvent) error { func (cd *cwDest) Stop() { cd.pusher.Stop() + cd.retryer.Stop() cd.stopped = true }