diff --git a/generator/test_case_generator.go b/generator/test_case_generator.go index 45b46d16b..32522dad7 100644 --- a/generator/test_case_generator.go +++ b/generator/test_case_generator.go @@ -42,7 +42,8 @@ type testConfig struct { terraformDir string // define target matrix field as set(s) // empty map means a testConfig will be created with a test entry for each entry from *_test_matrix.json - targets map[string]map[string]struct{} + targets map[string]map[string]struct{} + // maxAttempts limits the number of times a test will be run. maxAttempts int } diff --git a/test/emf_concurrent/emf_concurrent_test.go b/test/emf_concurrent/emf_concurrent_test.go index a6b123844..1ddd145eb 100644 --- a/test/emf_concurrent/emf_concurrent_test.go +++ b/test/emf_concurrent/emf_concurrent_test.go @@ -24,6 +24,11 @@ const ( emfAddress = "0.0.0.0:25888" ) +var ( + // queryString checks that both metric values are the same and have the same expected unit. + queryString = fmt.Sprintf("filter ispresent(%[1]s) and ispresent(%[2]s) and (%[1]s != %[2]s or (_aws.CloudWatchMetrics.0.Metrics.0.Unit!=%[3]q) or (_aws.CloudWatchMetrics.0.Metrics.1.Unit!=%[3]q))", metricName1, metricName2, metricUnit) +) + func init() { environment.RegisterEnvironmentMetaDataFlags() } @@ -70,21 +75,15 @@ func TestConcurrent(t *testing.T) { } time.Sleep(testRuntime) close(e.done) + log.Println("Stopping EMF emitters") e.wg.Wait() common.StopAgent() endTime := time.Now() - log.Println("Stopping EMF emitters") assert.Lenf(t, awsservice.GetLogStreamNames(e.logGroupName), 1, "Detected corruption: multiple streams found") - qs := queryString() - log.Printf("Starting query for log group (%s): %s", e.logGroupName, qs) - got, err := awsservice.GetLogQueryStats(e.logGroupName, startTime.Unix(), endTime.Unix(), qs) + log.Printf("Starting query for log group (%s): %s", e.logGroupName, queryString) + got, err := awsservice.GetLogQueryStats(e.logGroupName, startTime.Unix(), endTime.Unix(), queryString) require.NoError(t, err, "Unable to get log query stats") assert.NotZero(t, got.RecordsScanned, "No records found in CloudWatch Logs") assert.Zerof(t, got.RecordsMatched, "Detected corruption: %v/%v records matched", got.RecordsMatched, got.RecordsScanned) } - -// queryString creates a log query string that will match corrupted logs. -func queryString() string { - return fmt.Sprintf("filter ispresent(%[1]s) and ispresent(%[2]s) and (%[1]s != %[2]s or (_aws.CloudWatchMetrics.0.Metrics.0.Unit!=%[3]q) or (_aws.CloudWatchMetrics.0.Metrics.1.Unit!=%[3]q))", metricName1, metricName2, metricUnit) -} diff --git a/util/awsservice/cloudwatchlogs.go b/util/awsservice/cloudwatchlogs.go index b7ce773e2..5eccc0ca1 100644 --- a/util/awsservice/cloudwatchlogs.go +++ b/util/awsservice/cloudwatchlogs.go @@ -161,8 +161,11 @@ func GetLogQueryStats(logGroupName string, startTime, endTime int64, queryString return nil, fmt.Errorf("failed to start query for log group (%s): %w", logGroupName, err) } + // Sleep a fixed amount of time after making the query to give it time to + // process the request. time.Sleep(retryInterval) + var attempts int for { results, err := CwlClient.GetQueryResults(ctx, &cloudwatchlogs.GetQueryResultsInput{ QueryId: output.QueryId, @@ -171,12 +174,16 @@ func GetLogQueryStats(logGroupName string, startTime, endTime int64, queryString return nil, fmt.Errorf("failed to get query results for log group (%s): %w", logGroupName, err) } switch results.Status { - case types.QueryStatusFailed, types.QueryStatusCancelled, types.QueryStatusTimeout: - return nil, fmt.Errorf("unexpected query status: %v", results.Status) case types.QueryStatusScheduled, types.QueryStatusRunning, types.QueryStatusUnknown: + if attempts >= StandardRetries { + return nil, fmt.Errorf("attempted get query results after %s without success. final status: %v", time.Duration(attempts)*retryInterval, results.Status) + } + attempts++ time.Sleep(retryInterval) case types.QueryStatusComplete: return results.Statistics, nil + default: + return nil, fmt.Errorf("unexpected query status: %v", results.Status) } } }