diff --git a/generator/test_case_generator.go b/generator/test_case_generator.go index 68453f77d..32522dad7 100644 --- a/generator/test_case_generator.go +++ b/generator/test_case_generator.go @@ -32,6 +32,7 @@ type matrixRow struct { UseSSM bool `json:"useSSM"` ExcludedTests string `json:"excludedTests"` MetadataEnabled string `json:"metadataEnabled"` + MaxAttempts int `json:"max_attempts"` } type testConfig struct { @@ -42,6 +43,8 @@ type testConfig struct { // define target matrix field as set(s) // empty map means a testConfig will be created with a test entry for each entry from *_test_matrix.json targets map[string]map[string]struct{} + // maxAttempts limits the number of times a test will be run. + maxAttempts int } const ( @@ -60,6 +63,11 @@ var testTypeToTestConfig = map[string][]testConfig{ testDir: "./test/metrics_number_dimension", targets: map[string]map[string]struct{}{"os": {"al2": {}}}, }, + { + testDir: "./test/emf_concurrent", + targets: map[string]map[string]struct{}{"os": {"al2": {}}}, + maxAttempts: 1, + }, {testDir: "./test/metric_value_benchmark"}, {testDir: "./test/run_as_user"}, {testDir: "./test/collection_interval"}, @@ -185,7 +193,7 @@ var testTypeToTestConfig = map[string][]testConfig{ func copyAllEC2LinuxTestForOnpremTesting() { /* Some tests need to be fixed in order to run in both environment, so for now for PoC, run one that works. - testTypeToTestConfig["ec2_linux_onprem"] = testTypeToTestConfig[testTypeKeyEc2Linux] + testTypeToTestConfig["ec2_linux_onprem"] = testTypeToTestConfig[testTypeKeyEc2Linux] */ testTypeToTestConfig["ec2_linux_onprem"] = []testConfig{ { @@ -224,7 +232,12 @@ func genMatrix(testType string, testConfigs []testConfig) []matrixRow { testMatrixComplete := make([]matrixRow, 0, len(testMatrix)) for _, test := range testMatrix { for _, testConfig := range testConfigs { - row := matrixRow{TestDir: testConfig.testDir, TestType: testType, TerraformDir: testConfig.terraformDir} + row := matrixRow{ + TestDir: testConfig.testDir, + TestType: testType, + TerraformDir: testConfig.terraformDir, + MaxAttempts: testConfig.maxAttempts, + } err = mapstructure.Decode(test, &row) if err != nil { log.Panicf("can't decode map test %v to metric line struct with error %v", testConfig, err) diff --git a/test/emf_concurrent/emf_concurrent_test.go b/test/emf_concurrent/emf_concurrent_test.go new file mode 100644 index 000000000..1ddd145eb --- /dev/null +++ b/test/emf_concurrent/emf_concurrent_test.go @@ -0,0 +1,89 @@ +package emf_concurrent + +import ( + "fmt" + "log" + "net" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/aws/amazon-cloudwatch-agent-test/environment" + "github.com/aws/amazon-cloudwatch-agent-test/util/awsservice" + "github.com/aws/amazon-cloudwatch-agent-test/util/common" +) + +const ( + testRuntime = 10 * time.Minute + threadCount = 15 + connectionCount = 5 + interval = 500 * time.Millisecond + emfAddress = "0.0.0.0:25888" +) + +var ( + // queryString checks that both metric values are the same and have the same expected unit. + queryString = fmt.Sprintf("filter ispresent(%[1]s) and ispresent(%[2]s) and (%[1]s != %[2]s or (_aws.CloudWatchMetrics.0.Metrics.0.Unit!=%[3]q) or (_aws.CloudWatchMetrics.0.Metrics.1.Unit!=%[3]q))", metricName1, metricName2, metricUnit) +) + +func init() { + environment.RegisterEnvironmentMetaDataFlags() +} + +func TestConcurrent(t *testing.T) { + env := environment.GetEnvironmentMetaData() + + common.CopyFile(filepath.Join("testdata", "config.json"), common.ConfigOutputPath) + require.NoError(t, common.StartAgent(common.ConfigOutputPath, true, false)) + + // wait for agent to start up + time.Sleep(5 * time.Second) + + e := &emitter{ + interval: interval, + logGroupName: fmt.Sprintf("emf-test-group-%s", env.InstanceId), + logStreamName: fmt.Sprintf("emf-test-stream-%s", env.InstanceId), + dimension: env.CwaCommitSha, + done: make(chan struct{}), + } + + defer awsservice.DeleteLogGroup(e.logGroupName) + + tcpAddr, err := net.ResolveTCPAddr("tcp", emfAddress) + if err != nil { + log.Fatalf("invalid tcp emfAddress (%s): %v", emfAddress, err) + } + + var conns []*net.TCPConn + for i := 0; i < connectionCount; i++ { + var conn *net.TCPConn + conn, err = net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + log.Fatalf("unable to connect to address (%s): %v", emfAddress, err) + } + conns = append(conns, conn) + } + + log.Printf("Starting EMF emitters for log group (%s)/stream (%s)", e.logGroupName, e.logStreamName) + startTime := time.Now() + for i := 0; i < threadCount; i++ { + e.wg.Add(1) + go e.start(conns[i%len(conns)]) + } + time.Sleep(testRuntime) + close(e.done) + log.Println("Stopping EMF emitters") + e.wg.Wait() + common.StopAgent() + endTime := time.Now() + + assert.Lenf(t, awsservice.GetLogStreamNames(e.logGroupName), 1, "Detected corruption: multiple streams found") + log.Printf("Starting query for log group (%s): %s", e.logGroupName, queryString) + got, err := awsservice.GetLogQueryStats(e.logGroupName, startTime.Unix(), endTime.Unix(), queryString) + require.NoError(t, err, "Unable to get log query stats") + assert.NotZero(t, got.RecordsScanned, "No records found in CloudWatch Logs") + assert.Zerof(t, got.RecordsMatched, "Detected corruption: %v/%v records matched", got.RecordsMatched, got.RecordsScanned) +} diff --git a/test/emf_concurrent/emitter.go b/test/emf_concurrent/emitter.go new file mode 100644 index 000000000..77f3eb29f --- /dev/null +++ b/test/emf_concurrent/emitter.go @@ -0,0 +1,109 @@ +package emf_concurrent + +import ( + "encoding/json" + "fmt" + "math/rand" + "net" + "sync" + "time" +) + +const ( + metadataName = "_aws" + namespace = "ConcurrentEMFTest" + metricName1 = "ExecutionTime" + metricName2 = "DuplicateExecutionTime" + metricValue = 1.23456789 + metricUnit = "Seconds" + dimensionName = "Dimension" + randomName = "Random" + letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" +) + +var ( + newLineChar = []byte("\n") +) + +type Metadata struct { + Timestamp int64 `json:"Timestamp"` + LogGroupName string `json:"LogGroupName"` + LogStreamName string `json:"LogStreamName"` + CloudWatchMetrics []CWMetric `json:"CloudWatchMetrics"` +} + +type CWMetric struct { + Namespace string `json:"Namespace"` + Dimensions [][]string `json:"Dimensions"` + Metrics []Metric `json:"Metrics"` +} + +type Metric struct { + Name string `json:"Name"` + Unit string `json:"Unit"` +} + +type emitter struct { + wg sync.WaitGroup + done chan struct{} + interval time.Duration + logGroupName string + logStreamName string + dimension string +} + +func (e *emitter) start(conn *net.TCPConn) { + defer e.wg.Done() + ticker := time.NewTicker(e.interval) + metadata := e.createMetadata() + for { + select { + case <-e.done: + ticker.Stop() + return + case <-ticker.C: + metadata.Timestamp = time.Now().UnixMilli() + _, _ = conn.Write(e.createEmfLog(metadata)) + } + } +} + +func (e *emitter) createMetadata() *Metadata { + return &Metadata{ + Timestamp: time.Now().UnixMilli(), + LogGroupName: e.logGroupName, + LogStreamName: e.logStreamName, + CloudWatchMetrics: []CWMetric{ + { + Namespace: namespace, + Dimensions: [][]string{{dimensionName}}, + Metrics: []Metric{ + {Name: metricName1, Unit: metricUnit}, + {Name: metricName2, Unit: metricUnit}, + }, + }, + }, + } +} + +func (e *emitter) createEmfLog(metadata *Metadata) []byte { + r := rand.Intn(99) + 1 + emfLog := map[string]interface{}{ + metadataName: metadata, + dimensionName: e.dimension, + metricName1: metricValue, + metricName2: metricValue, + // introduces variability in payload size + randomName: fmt.Sprintf("https://www.amazon.com/%s", randString(r)), + } + content, _ := json.Marshal(emfLog) + return append(content, newLineChar...) +} + +func randString(n int) string { + b := make([]byte, n) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +} diff --git a/test/emf_concurrent/testdata/config.json b/test/emf_concurrent/testdata/config.json new file mode 100644 index 000000000..0f5ba8ee3 --- /dev/null +++ b/test/emf_concurrent/testdata/config.json @@ -0,0 +1,11 @@ +{ + "agent": { + "debug": true + }, + "logs": { + "metrics_collected": { + "emf": { + } + } + } +} \ No newline at end of file diff --git a/util/awsservice/cloudwatchlogs.go b/util/awsservice/cloudwatchlogs.go index 1b0c56e62..5eccc0ca1 100644 --- a/util/awsservice/cloudwatchlogs.go +++ b/util/awsservice/cloudwatchlogs.go @@ -17,7 +17,10 @@ import ( "github.com/qri-io/jsonschema" ) -const logStreamRetry = 20 +const ( + logStreamRetry = 20 + retryInterval = 10 * time.Second +) // catch ResourceNotFoundException when deleting the log group and log stream, as these // are not useful exceptions to log errors on during cleanup @@ -144,6 +147,47 @@ func IsLogGroupExists(logGroupName string) bool { return len(describeLogGroupOutput.LogGroups) > 0 } +// GetLogQueryStats for the log group between start/end (in epoch seconds) for the +// query string. +func GetLogQueryStats(logGroupName string, startTime, endTime int64, queryString string) (*types.QueryStatistics, error) { + output, err := CwlClient.StartQuery(ctx, &cloudwatchlogs.StartQueryInput{ + LogGroupName: aws.String(logGroupName), + StartTime: aws.Int64(startTime), + EndTime: aws.Int64(endTime), + QueryString: aws.String(queryString), + }) + + if err != nil { + return nil, fmt.Errorf("failed to start query for log group (%s): %w", logGroupName, err) + } + + // Sleep a fixed amount of time after making the query to give it time to + // process the request. + time.Sleep(retryInterval) + + var attempts int + for { + results, err := CwlClient.GetQueryResults(ctx, &cloudwatchlogs.GetQueryResultsInput{ + QueryId: output.QueryId, + }) + if err != nil { + return nil, fmt.Errorf("failed to get query results for log group (%s): %w", logGroupName, err) + } + switch results.Status { + case types.QueryStatusScheduled, types.QueryStatusRunning, types.QueryStatusUnknown: + if attempts >= StandardRetries { + return nil, fmt.Errorf("attempted get query results after %s without success. final status: %v", time.Duration(attempts)*retryInterval, results.Status) + } + attempts++ + time.Sleep(retryInterval) + case types.QueryStatusComplete: + return results.Statistics, nil + default: + return nil, fmt.Errorf("unexpected query status: %v", results.Status) + } + } +} + func GetLogStreams(logGroupName string) []types.LogStream { for i := 0; i < logStreamRetry; i++ { describeLogStreamsOutput, err := CwlClient.DescribeLogStreams(ctx, &cloudwatchlogs.DescribeLogStreamsInput{ @@ -162,12 +206,20 @@ func GetLogStreams(logGroupName string) []types.LogStream { return describeLogStreamsOutput.LogStreams } - time.Sleep(10 * time.Second) + time.Sleep(retryInterval) } return []types.LogStream{} } +func GetLogStreamNames(logGroupName string) []string { + var logStreamNames []string + for _, stream := range GetLogStreams(logGroupName) { + logStreamNames = append(logStreamNames, *stream.LogStreamName) + } + return logStreamNames +} + type LogEventValidator func(event types.OutputLogEvent) error type LogEventsValidator func(events []types.OutputLogEvent) error