Skip to content

Commit

Permalink
Fix DescribeTags retry on high-frequency and delay refresh interval (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zhihonl authored Sep 10, 2024
1 parent 1e0a049 commit 44fdf6c
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 19 deletions.
42 changes: 27 additions & 15 deletions extension/entitystore/serviceprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"

Expand All @@ -39,8 +40,13 @@ const (
ServiceNameSourceUnknown = "Unknown"
ServiceNameSourceUserConfiguration = "UserConfiguration"

jitterMax = 180
jitterMin = 60
describeTagsJitterMax = 3600
describeTagsJitterMin = 3000
defaultJitterMin = 60
defaultJitterMax = 180
maxRetry = 3
infRetry = -1
RequestLimitExceeded = "RequestLimitExceeded"
)

var (
Expand Down Expand Up @@ -86,10 +92,10 @@ type serviceprovider struct {
func (s *serviceprovider) startServiceProvider() {
err := s.getEC2Client()
if err != nil {
go refreshLoop(s.done, s.getEC2Client, true)
go refreshLoop(s.done, s.getEC2Client, true, "", defaultJitterMin, defaultJitterMax, ec2tagger.BackoffSleepArray, infRetry)
}
go refreshLoop(s.done, s.getIAMRole, false)
go refreshLoop(s.done, s.getEC2TagServiceName, false)
go refreshLoop(s.done, s.getIAMRole, false, "", defaultJitterMin, defaultJitterMax, ec2tagger.BackoffSleepArray, infRetry)
go refreshLoop(s.done, s.getEC2TagServiceName, false, RequestLimitExceeded, describeTagsJitterMin, describeTagsJitterMax, ec2tagger.ThrottleBackOffArray, maxRetry)
}

// addEntryForLogFile adds an association between a log file glob and a service attribute, as configured in the
Expand Down Expand Up @@ -242,7 +248,7 @@ func (s *serviceprovider) getEC2TagServiceName() error {
}
result, err := s.ec2API.DescribeTags(input)
if err != nil {
continue
return err
}
for _, tag := range result.Tags {
key := *tag.Key
Expand Down Expand Up @@ -307,23 +313,28 @@ func newServiceProvider(mode string, region string, ec2Info *ec2Info, metadataPr
}
}

func refreshLoop(done chan struct{}, updateFunc func() error, oneTime bool) {
func refreshLoop(done chan struct{}, updateFunc func() error, oneTime bool, retryOnError string, successRetryMin int, successRetryMax int, backoffArray []time.Duration, maxRetry int) int {
// Offset retry by 1 so we can start with 1 minute wait time
// instead of immediately retrying
retry := 1
for {
if maxRetry != -1 && retry > maxRetry {
return retry
}
err := updateFunc()
if err == nil && oneTime {
return
return retry
} else if awsErr, ok := err.(awserr.Error); ok && retryOnError != "" && awsErr.Code() != retryOnError {
return retry
}

waitDuration := calculateWaitTime(retry, err)
waitDuration := calculateWaitTime(retry-1, err, successRetryMin, successRetryMax, backoffArray)
wait := time.NewTimer(waitDuration)
select {
case <-done:
log.Printf("D! serviceprovider: Shutting down now")
wait.Stop()
return
return retry
case <-wait.C:
}

Expand All @@ -339,20 +350,21 @@ func refreshLoop(done chan struct{}, updateFunc func() error, oneTime bool) {
}

}
return retry
}

// calculateWaitTime returns different time based on whether if
// a function call was returned with error. If returned with error,
// follow exponential backoff wait time, otherwise, refresh with jitter
func calculateWaitTime(retry int, err error) time.Duration {
func calculateWaitTime(retry int, err error, successRetryMin int, successRetryMax int, backoffArray []time.Duration) time.Duration {
var waitDuration time.Duration
if err == nil {
return time.Duration(rand.Intn(jitterMax-jitterMin)+jitterMin) * time.Second
return time.Duration(rand.Intn(successRetryMax-successRetryMin)+successRetryMin) * time.Second
}
if retry < len(ec2tagger.BackoffSleepArray) {
waitDuration = ec2tagger.BackoffSleepArray[retry]
if retry < len(backoffArray) {
waitDuration = backoffArray[retry]
} else {
waitDuration = ec2tagger.BackoffSleepArray[len(ec2tagger.BackoffSleepArray)-1]
waitDuration = backoffArray[len(backoffArray)-1]
}
return waitDuration
}
75 changes: 71 additions & 4 deletions extension/entitystore/serviceprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,40 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/stretchr/testify/assert"

configaws "github.com/aws/amazon-cloudwatch-agent/cfg/aws"
"github.com/aws/amazon-cloudwatch-agent/internal/ec2metadataprovider"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/ec2tagger"
"github.com/aws/amazon-cloudwatch-agent/translator/config"
)

type mockServiceNameEC2Client struct {
ec2iface.EC2API
throttleError bool
authError bool
}

// construct the return results for the mocked DescribeTags api
var (
tagKeyService = "service"
tagValService = "test-service"
tagDesService = ec2.TagDescription{Key: &tagKeyService, Value: &tagValService}

FastBackOffArray = []time.Duration{0, 0, 0}
)

func (m *mockServiceNameEC2Client) DescribeTags(*ec2.DescribeTagsInput) (*ec2.DescribeTagsOutput, error) {
if m.throttleError {
return nil, awserr.New(RequestLimitExceeded, "throttle limit exceeded", nil)
}
if m.authError {
return nil, awserr.New("UnauthorizedOperation", "UnauthorizedOperation occurred", nil)
}
testTags := ec2.DescribeTagsOutput{
NextToken: nil,
Tags: []*ec2.TagDescription{&tagDesService},
Expand Down Expand Up @@ -365,7 +377,6 @@ func Test_refreshLoop(t *testing.T) {
ec2API ec2iface.EC2API
iamRole string
ec2TagServiceName string
refreshInterval time.Duration
oneTime bool
}
type expectedInfo struct {
Expand All @@ -387,7 +398,6 @@ func Test_refreshLoop(t *testing.T) {
ec2API: &mockServiceNameEC2Client{},
iamRole: "original-role",
ec2TagServiceName: "original-tag-name",
refreshInterval: time.Millisecond,
},
expectedInfo: expectedInfo{
iamRole: "TestRole",
Expand All @@ -408,12 +418,69 @@ func Test_refreshLoop(t *testing.T) {
ec2TagServiceName: tt.fields.ec2TagServiceName,
done: done,
}
go refreshLoop(done, s.getEC2TagServiceName, tt.fields.oneTime)
go refreshLoop(done, s.getIAMRole, tt.fields.oneTime)
go refreshLoop(done, s.getEC2TagServiceName, tt.fields.oneTime, RequestLimitExceeded, describeTagsJitterMin, describeTagsJitterMax, ec2tagger.ThrottleBackOffArray, maxRetry)
go refreshLoop(done, s.getIAMRole, tt.fields.oneTime, "", defaultJitterMin, defaultJitterMax, ec2tagger.BackoffSleepArray, infRetry)
time.Sleep(time.Second)
close(done)
assert.Equal(t, tt.expectedInfo.iamRole, s.iamRole)
assert.Equal(t, tt.expectedInfo.ec2TagServiceName, s.ec2TagServiceName)
})
}
}

func Test_refreshLoopRetry(t *testing.T) {
type fields struct {
metadataProvider ec2metadataprovider.MetadataProvider
ec2API ec2iface.EC2API
oneTime bool
}
tests := []struct {
name string
fields fields
expectedRetry int
}{
{
name: "ThrottleLimitError",
fields: fields{
metadataProvider: &mockMetadataProvider{
InstanceIdentityDocument: &ec2metadata.EC2InstanceIdentityDocument{
InstanceID: "i-123456789"},
},
ec2API: &mockServiceNameEC2Client{
throttleError: true,
},
},
expectedRetry: 4,
},
{
name: "AuthError",
fields: fields{
metadataProvider: &mockMetadataProvider{
InstanceIdentityDocument: &ec2metadata.EC2InstanceIdentityDocument{
InstanceID: "i-123456789"},
},
ec2API: &mockServiceNameEC2Client{
authError: true,
},
},
expectedRetry: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
done := make(chan struct{})
s := &serviceprovider{
metadataProvider: tt.fields.metadataProvider,
ec2API: tt.fields.ec2API,
ec2Provider: func(s string, config *configaws.CredentialConfig) ec2iface.EC2API {
return tt.fields.ec2API
},
done: done,
}
retry := refreshLoop(done, s.getEC2TagServiceName, tt.fields.oneTime, RequestLimitExceeded, describeTagsJitterMin, describeTagsJitterMax, FastBackOffArray, maxRetry)
time.Sleep(time.Second)
close(done)
assert.Equal(t, tt.expectedRetry, retry)
})
}
}
1 change: 1 addition & 0 deletions plugins/processors/ec2tagger/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@ const (
var (
// issue with newer versions of the sdk take longer when hop limit is 1 in eks
defaultRefreshInterval = 180 * time.Second
ThrottleBackOffArray = []time.Duration{0, 1 * time.Minute, 3 * time.Minute} // backoff retry for ec2 describe instances API call. Assuming the throttle limit is 20 per second. 10 mins allow 12000 API calls.
BackoffSleepArray = []time.Duration{0, 1 * time.Minute, 1 * time.Minute, 3 * time.Minute, 3 * time.Minute, 3 * time.Minute, 10 * time.Minute} // backoff retry for ec2 describe instances API call. Assuming the throttle limit is 20 per second. 10 mins allow 12000 API calls.
)

0 comments on commit 44fdf6c

Please sign in to comment.