diff --git a/plugins/outputs/cloudwatchlogs/pusher.go b/plugins/outputs/cloudwatchlogs/pusher.go index 10cc50a364..1f45dddd80 100644 --- a/plugins/outputs/cloudwatchlogs/pusher.go +++ b/plugins/outputs/cloudwatchlogs/pusher.go @@ -235,7 +235,8 @@ func (p *pusher) send() { startTime := time.Now() - retryCount := 0 + retryCountShort := 0 + retryCountLong := 0 for { input.SequenceToken = p.sequenceToken output, err := p.Service.PutLogEvents(input) @@ -304,38 +305,36 @@ func (p *pusher) send() { p.Log.Errorf("Aws error received when sending logs to %v/%v: %v", p.Group, p.Stream, awsErr) } - wait := retryWait(retryCount) + // retry wait strategy depends on the type of error returned + var wait time.Duration + if chooseRetryWaitStrategy(err) == retryLong { + wait = retryWaitLong(retryCountLong) + retryCountLong++ + } else { + wait = retryWaitShort(retryCountShort) + retryCountShort++ + } + if time.Since(startTime)+wait > p.RetryDuration { - p.Log.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCount, p.Group, p.Stream) + p.Log.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, p.Group, p.Stream) p.reset() return } - p.Log.Warnf("Retried %v time, going to sleep %v before retrying.", retryCount, wait) + p.Log.Warnf("Retried %v time, going to sleep %v before retrying.", retryCountShort+retryCountLong-1, wait) select { case <-p.stop: - p.Log.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCount, p.Group, p.Stream) + p.Log.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, p.Group, p.Stream) p.reset() return case <-time.After(wait): } - retryCount++ } } -func retryWait(n int) time.Duration { - const base = 200 * time.Millisecond - // Max wait time is 1 minute (jittered) - d := 1 * time.Minute - if n < 5 { - d = base * time.Duration(1<