Skip to content

Commit

Permalink
Added a custom retryer implementation to log any throttling events ha…
Browse files Browse the repository at this point in the history
…ndled by the SDK. (#243)
  • Loading branch information
jefchien authored Aug 17, 2021
1 parent 653375b commit 26a8d28
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 1 deletion.
93 changes: 93 additions & 0 deletions internal/retryer/logthrottle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package retryer

import (
"time"

"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/influxdata/telegraf"
)

var (
throttleReportTimeout = 1 * time.Minute
throttleReportCheckPeriod = 5 * time.Second
)

type LogThrottleRetryer struct {
Log telegraf.Logger

throttleChan chan error
done chan struct{}

client.DefaultRetryer
}

func NewLogThrottleRetryer(logger telegraf.Logger) *LogThrottleRetryer {
r := &LogThrottleRetryer{
Log: logger,
throttleChan: make(chan error, 1),
done: make(chan struct{}),
DefaultRetryer: client.DefaultRetryer{NumMaxRetries: client.DefaultRetryerMaxNumRetries},
}

go r.watchThrottleEvents()
return r
}

func (r *LogThrottleRetryer) ShouldRetry(req *request.Request) bool {
if req.IsErrorThrottle() {
r.throttleChan <- req.Error
}

// Fallback to SDK's built in retry rules
return r.DefaultRetryer.ShouldRetry(req)
}

func (r *LogThrottleRetryer) Stop() {
if r != nil {
close(r.done)
}
}

func (r *LogThrottleRetryer) watchThrottleEvents() {
ticker := time.NewTicker(throttleReportCheckPeriod)
defer ticker.Stop()

var start time.Time
var err error
cnt := 0
for {
select {
case err = <-r.throttleChan:
// Log first throttle if there has not been any recent throttling events
if cnt == 0 {
if time.Since(start) > 2*throttleReportTimeout {
r.Log.Infof("aws api call throttling detected: %v", err)
} else {
r.Log.Debugf("aws api call throttling detected: %v", err)
}
start = time.Now()
} else {
r.Log.Debugf("aws api call throttling detected: %v", err)
}
cnt++
case <-ticker.C:
if cnt == 0 {
continue
}
d := time.Since(start)
if d > throttleReportTimeout {
if cnt > 1 {
r.Log.Infof("aws api call has been throttled for %v times in the past %v, last throttle error message: %v", cnt, d, err)
}
cnt = 0
}
case <-r.done:
if cnt > 0 {
r.Log.Infof("aws api call has been throttled for %v times in the past %v, last throttle error message: %v", cnt, time.Since(start), err)
}
r.Log.Debugf("LogThrottleRetryer watch throttle events goroutine exiting")
return
}
}
}
139 changes: 139 additions & 0 deletions internal/retryer/logthrottle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package retryer

import (
"fmt"
"strings"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
)

type testLogger struct {
debugs, infos, warns, errors []string
}

func (l *testLogger) Errorf(format string, args ...interface{}) {
line := fmt.Sprintf(format, args...)
l.errors = append(l.errors, line)
}

func (l *testLogger) Error(args ...interface{}) {
line := fmt.Sprint(args...)
l.errors = append(l.errors, line)
}

func (l *testLogger) Debugf(format string, args ...interface{}) {
line := fmt.Sprintf(format, args...)
l.debugs = append(l.debugs, line)
}

func (l *testLogger) Debug(args ...interface{}) {
line := fmt.Sprint(args...)
l.debugs = append(l.debugs, line)
}

func (l *testLogger) Warnf(format string, args ...interface{}) {
line := fmt.Sprintf(format, args...)
l.warns = append(l.warns, line)
}

func (l *testLogger) Warn(args ...interface{}) {
line := fmt.Sprint(args...)
l.warns = append(l.warns, line)
}

func (l *testLogger) Infof(format string, args ...interface{}) {
line := fmt.Sprintf(format, args...)
l.infos = append(l.infos, line)
}

func (l *testLogger) Info(args ...interface{}) {
line := fmt.Sprint(args...)
l.infos = append(l.infos, line)
}

func TestLogThrottleRetryerLogging(t *testing.T) {
const throttleDetectedLine = "aws api call throttling detected: RequestLimitExceeded: Test AWS Error"
const watchGoroutineExitLine = "LogThrottleRetryer watch throttle events goroutine exiting"
const throttleSummaryLinePrefix = "aws api call has been throttled for"
const throttleBatchSize = 100
const totalThrottleCnt = throttleBatchSize * 2 // Test total 2 batches
const expectedDebugCnt = totalThrottleCnt - 2 // 2 of them are being log at info level

setup()
defer tearDown()

l := &testLogger{}
r := NewLogThrottleRetryer(l)

req := &request.Request{
Error: awserr.New("RequestLimitExceeded", "Test AWS Error", nil),
}

// Generate 200 throttles with a time gap between
for i := 0; i < throttleBatchSize; i++ {
r.ShouldRetry(req)
time.Sleep(10 * time.Millisecond)
}

time.Sleep(1500 * time.Millisecond)

for i := 0; i < throttleBatchSize; i++ {
r.ShouldRetry(req)
time.Sleep(10 * time.Millisecond)
}

r.Stop()
time.Sleep(200 * time.Millisecond) // Wait a bit to collect all logs

// Check the debug level log messages
debugCnt := 0
for _, d := range l.debugs {
if d == throttleDetectedLine {
debugCnt++
} else if d != watchGoroutineExitLine {
t.Errorf("unexpected debug log found: %v", d)
}
}
if debugCnt != expectedDebugCnt {
t.Errorf("wrong number of debug logs found, expected")
}

// Check the info level log messages
detectCnt := 0
throttleCnt := 0
for i, info := range l.infos {
if info == throttleDetectedLine {
if i > 0 {
if throttleCnt != throttleBatchSize {
t.Errorf("wrong number of throttle count reported, expecting %v, got %v", throttleBatchSize, throttleCnt)
}
}
detectCnt++
throttleCnt = 0
} else if strings.HasPrefix(info, throttleSummaryLinePrefix) {
n := 0
fmt.Sscanf(info, throttleSummaryLinePrefix+" %d", &n)
throttleCnt += n
}
}

if detectCnt != 2 {
t.Errorf("wrong number of throttle detected info log found, expecting 2, got %v", detectCnt)
}
if throttleCnt != throttleBatchSize {
t.Errorf("wrong number of throttle count reported, expecting %v, got %v", throttleBatchSize, throttleCnt)
}
}

func setup() {
throttleReportTimeout = 400 * time.Millisecond
throttleReportCheckPeriod = 50 * time.Millisecond
}

func tearDown() {
throttleReportTimeout = 1 * time.Minute
throttleReportCheckPeriod = 5 * time.Second
}
8 changes: 8 additions & 0 deletions plugins/outputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/aws/amazon-cloudwatch-agent/internal/publisher"
"github.com/aws/amazon-cloudwatch-agent/internal/retryer"

"github.com/aws/amazon-cloudwatch-agent/cfg/agentinfo"
internalaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws"
Expand Down Expand Up @@ -63,6 +64,8 @@ type CloudWatch struct {
RollupDimensions [][]string `toml:"rollup_dimensions"`
Namespace string `toml:"namespace"` // CloudWatch Metrics Namespace

Log telegraf.Logger `toml:"-"`

svc cloudwatchiface.CloudWatchAPI
aggregator Aggregator
aggregatorShutdownChan chan struct{}
Expand All @@ -76,6 +79,7 @@ type CloudWatch struct {
metricDecorations *MetricDecorations
retries int
publisher *publisher.Publisher
retryer *retryer.LogThrottleRetryer
}

var sampleConfig = `
Expand Down Expand Up @@ -132,11 +136,13 @@ func (c *CloudWatch) Connect() error {
}
configProvider := credentialConfig.Credentials()

logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log)
svc := cloudwatch.New(
configProvider,
&aws.Config{
Endpoint: aws.String(c.EndpointOverride),
HTTPClient: &http.Client{Timeout: 1 * time.Minute},
Retryer: logThrottleRetryer,
})

svc.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{opPutLogEvents, opPutMetricData}))
Expand All @@ -146,6 +152,7 @@ func (c *CloudWatch) Connect() error {
c.RollupDimensions = GetUniqueRollupList(c.RollupDimensions)

c.svc = svc
c.retryer = logThrottleRetryer
c.startRoutines()
return nil
}
Expand Down Expand Up @@ -190,6 +197,7 @@ func (c *CloudWatch) Close() error {
}
close(c.shutdownChan)
c.publisher.Close()
c.retryer.Stop()
log.Println("D! Stopped the CloudWatch output plugin")
return nil
}
Expand Down
7 changes: 6 additions & 1 deletion plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws"
"github.com/aws/amazon-cloudwatch-agent/handlers"
"github.com/aws/amazon-cloudwatch-agent/internal"
"github.com/aws/amazon-cloudwatch-agent/internal/retryer"
"github.com/aws/amazon-cloudwatch-agent/logs"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
Expand Down Expand Up @@ -109,18 +110,20 @@ func (c *CloudWatchLogs) getDest(t Target) *cwDest {
Token: c.Token,
}

logThrottleRetryer := retryer.NewLogThrottleRetryer(c.Log)
client := cloudwatchlogs.New(
credentialConfig.Credentials(),
&aws.Config{
Endpoint: aws.String(c.EndpointOverride),
HTTPClient: &http.Client{Timeout: 1 * time.Minute},
Retryer: logThrottleRetryer,
},
)
client.Handlers.Build.PushBackNamed(handlers.NewRequestCompressionHandler([]string{"PutLogEvents"}))
client.Handlers.Build.PushBackNamed(handlers.NewCustomHeaderHandler("User-Agent", agentinfo.UserAgent()))

pusher := NewPusher(t, client, c.ForceFlushInterval.Duration, maxRetryTimeout, c.Log)
cwd := &cwDest{pusher: pusher}
cwd := &cwDest{pusher: pusher, retryer: logThrottleRetryer}
c.cwDests[t] = cwd
return cwd
}
Expand Down Expand Up @@ -260,6 +263,7 @@ type cwDest struct {
sync.Mutex
isEMF bool
stopped bool
retryer *retryer.LogThrottleRetryer
}

func (cd *cwDest) Publish(events []logs.LogEvent) error {
Expand All @@ -280,6 +284,7 @@ func (cd *cwDest) Publish(events []logs.LogEvent) error {

func (cd *cwDest) Stop() {
cd.pusher.Stop()
cd.retryer.Stop()
cd.stopped = true
}

Expand Down

0 comments on commit 26a8d28

Please sign in to comment.