Skip to content

Commit

Permalink
Increase retry delay on PLE for certain exceptions (#1312)
Browse files Browse the repository at this point in the history
  • Loading branch information
dricross authored Sep 4, 2024
1 parent f0a39d6 commit 8271f62
Show file tree
Hide file tree
Showing 3 changed files with 353 additions and 16 deletions.
31 changes: 15 additions & 16 deletions plugins/outputs/cloudwatchlogs/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ func (p *pusher) send() {

startTime := time.Now()

retryCount := 0
retryCountShort := 0
retryCountLong := 0
for {
input.SequenceToken = p.sequenceToken
output, err := p.Service.PutLogEvents(input)
Expand Down Expand Up @@ -304,38 +305,36 @@ func (p *pusher) send() {
p.Log.Errorf("Aws error received when sending logs to %v/%v: %v", p.Group, p.Stream, awsErr)
}

wait := retryWait(retryCount)
// retry wait strategy depends on the type of error returned
var wait time.Duration
if chooseRetryWaitStrategy(err) == retryLong {
wait = retryWaitLong(retryCountLong)
retryCountLong++
} else {
wait = retryWaitShort(retryCountShort)
retryCountShort++
}

if time.Since(startTime)+wait > p.RetryDuration {
p.Log.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCount, p.Group, p.Stream)
p.Log.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, p.Group, p.Stream)
p.reset()
return
}

p.Log.Warnf("Retried %v time, going to sleep %v before retrying.", retryCount, wait)
p.Log.Warnf("Retried %v time, going to sleep %v before retrying.", retryCountShort+retryCountLong-1, wait)

select {
case <-p.stop:
p.Log.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCount, p.Group, p.Stream)
p.Log.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, p.Group, p.Stream)
p.reset()
return
case <-time.After(wait):
}

retryCount++
}

}

func retryWait(n int) time.Duration {
const base = 200 * time.Millisecond
// Max wait time is 1 minute (jittered)
d := 1 * time.Minute
if n < 5 {
d = base * time.Duration(1<<int64(n))
}
return time.Duration(seededRand.Int63n(int64(d/2)) + int64(d/2))
}

func (p *pusher) createLogGroupAndStream() error {
_, err := p.Service.CreateLogStream(&cloudwatchlogs.CreateLogStreamInput{
LogGroupName: &p.Group,
Expand Down
125 changes: 125 additions & 0 deletions plugins/outputs/cloudwatchlogs/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package cloudwatchlogs

import (
"net"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"

"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
)

const (
// baseRetryDelayShort is the base retry delay for short retry strategy
baseRetryDelayShort = 200 * time.Millisecond

// baseRetryDelayLong is the base retry delay for long retry strategy
baseRetryDelayLong = 2 * time.Second

// numBackoffRetriesShort is the maximum number of consecutive retries using the short retry strategy before using
// the maxRetryDelay
numBackoffRetriesShort = 5

// numBackoffRetriesLong is the maximum number of consecutive retries using the long retry strategy before using
// the maxRetryDelay
numBackoffRetriesLong = 2

// maxRetryDelay is the maximum retry delay for the either retry strategy
maxRetryDelay = 1 * time.Minute
)

type retryWaitStrategy int

const (
retryShort retryWaitStrategy = iota
retryLong
)

// retryWaitShort returns a duration to wait before retrying a request using the short retry strategy
func retryWaitShort(retryCount int) time.Duration {
return retryWait(baseRetryDelayShort, numBackoffRetriesShort, retryCount)
}

// retryWaitLong returns a duration to wait before retrying a request using the long retry strategy.
// this strategy is used for errors that should not be retried too quickly
func retryWaitLong(retryCount int) time.Duration {
return retryWait(baseRetryDelayLong, numBackoffRetriesLong, retryCount)
}

func retryWait(baseRetryDelay time.Duration, maxBackoffRetries int, retryCount int) time.Duration {
d := maxRetryDelay
if retryCount < maxBackoffRetries {
d = baseRetryDelay * time.Duration(1<<int64(retryCount))
}
return time.Duration(seededRand.Int63n(int64(d/2)) + int64(d/2))
}

// chooseRetryWaitStrategy decides if a "long" or "short" retry strategy should be used when the PutLogEvents API call
// returns an error. A short retry strategy should be used for most errors, while a long retry strategy is used for
// errors where retrying too quickly could cause excessive strain on the backend servers.
//
// Specifically, use the long retry strategy for the following PutLogEvents errors:
// - 500 (InternalFailure)
// - 503 (ServiceUnavailable)
// - Connection Refused
// - Connection Reset by Peer
// - Connection Timeout
// - Throttling
func chooseRetryWaitStrategy(err error) retryWaitStrategy {
if isErrConnectionTimeout(err) || isErrConnectionReset(err) || isErrConnectionRefused(err) || request.IsErrorThrottle(err) {
return retryLong
}

// Check AWS Error codes if available
if awsErr, ok := err.(awserr.Error); ok {
switch awsErr.Code() {
case
cloudwatchlogs.ErrCodeServiceUnavailableException,
cloudwatchlogs.ErrCodeThrottlingException,
"RequestTimeout",
request.ErrCodeResponseTimeout:
return retryLong
}

// Check HTTP status codes if available
if requestFailure, ok := err.(awserr.RequestFailure); ok {
switch requestFailure.StatusCode() {
case
500, // internal failure
503: // service unavailable
return retryLong
}
}
}

// Otherwise, default to short retry strategy
return retryShort
}

func isErrConnectionTimeout(err error) bool {
netErr, ok := err.(net.Error)
return ok && netErr.Timeout()
}

func isErrConnectionReset(err error) bool {
if strings.Contains(err.Error(), "read: connection reset") {
return false
}

if strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "connection reset") ||
strings.Contains(err.Error(), "broken pipe") {
return true
}

return false
}

func isErrConnectionRefused(err error) bool {
return strings.Contains(err.Error(), "connection refused")
}
Loading

0 comments on commit 8271f62

Please sign in to comment.