Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EMF concurrency test to check for log corruption. #351

Merged
merged 3 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions generator/test_case_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type matrixRow struct {
UseSSM bool `json:"useSSM"`
ExcludedTests string `json:"excludedTests"`
MetadataEnabled string `json:"metadataEnabled"`
MaxAttempts int `json:"max_attempts"`
}

type testConfig struct {
Expand All @@ -41,7 +42,8 @@ type testConfig struct {
terraformDir string
// define target matrix field as set(s)
// empty map means a testConfig will be created with a test entry for each entry from *_test_matrix.json
targets map[string]map[string]struct{}
targets map[string]map[string]struct{}
maxAttempts int
}

const (
Expand All @@ -60,6 +62,11 @@ var testTypeToTestConfig = map[string][]testConfig{
testDir: "./test/metrics_number_dimension",
targets: map[string]map[string]struct{}{"os": {"al2": {}}},
},
{
testDir: "./test/emf_concurrent",
targets: map[string]map[string]struct{}{"os": {"al2": {}}},
maxAttempts: 1,
},
{testDir: "./test/metric_value_benchmark"},
{testDir: "./test/run_as_user"},
{testDir: "./test/collection_interval"},
Expand Down Expand Up @@ -185,7 +192,7 @@ var testTypeToTestConfig = map[string][]testConfig{

func copyAllEC2LinuxTestForOnpremTesting() {
/* Some tests need to be fixed in order to run in both environment, so for now for PoC, run one that works.
testTypeToTestConfig["ec2_linux_onprem"] = testTypeToTestConfig[testTypeKeyEc2Linux]
testTypeToTestConfig["ec2_linux_onprem"] = testTypeToTestConfig[testTypeKeyEc2Linux]
*/
testTypeToTestConfig["ec2_linux_onprem"] = []testConfig{
{
Expand Down Expand Up @@ -224,7 +231,12 @@ func genMatrix(testType string, testConfigs []testConfig) []matrixRow {
testMatrixComplete := make([]matrixRow, 0, len(testMatrix))
for _, test := range testMatrix {
for _, testConfig := range testConfigs {
row := matrixRow{TestDir: testConfig.testDir, TestType: testType, TerraformDir: testConfig.terraformDir}
row := matrixRow{
TestDir: testConfig.testDir,
TestType: testType,
TerraformDir: testConfig.terraformDir,
MaxAttempts: testConfig.maxAttempts,
}
err = mapstructure.Decode(test, &row)
if err != nil {
log.Panicf("can't decode map test %v to metric line struct with error %v", testConfig, err)
Expand Down
93 changes: 93 additions & 0 deletions test/emf_concurrent/emf_concurrent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package emf_concurrent

import (
"fmt"
"log"
"net"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/aws/amazon-cloudwatch-agent-test/environment"
"github.com/aws/amazon-cloudwatch-agent-test/util/awsservice"
"github.com/aws/amazon-cloudwatch-agent-test/util/common"
)

const (
testRuntime = 10 * time.Minute
threadCount = 10
connectionCount = 10
interval = 500 * time.Millisecond
emfAddress = "0.0.0.0:25888"
)

func init() {
environment.RegisterEnvironmentMetaDataFlags()
}

func TestConcurrent(t *testing.T) {
env := environment.GetEnvironmentMetaData()

common.CopyFile(filepath.Join("testdata", "config.json"), common.ConfigOutputPath)
require.NoError(t, common.StartAgent(common.ConfigOutputPath, true, false))

// wait for agent to start up
time.Sleep(10 * interval)

e := &emitter{
interval: interval,
logGroupName: fmt.Sprintf("emf-test-group-%s", env.InstanceId),
logStreamName: fmt.Sprintf("emf-test-stream-%s", env.InstanceId),
dimension: env.CwaCommitSha,
done: make(chan struct{}),
}

defer awsservice.DeleteLogGroup(e.logGroupName)

tcpAddr, err := net.ResolveTCPAddr("tcp", emfAddress)
if err != nil {
log.Fatalf("invalid tcp emfAddress (%s): %v", emfAddress, err)
}

var conns []*net.TCPConn
for i := 0; i < connectionCount; i++ {
var conn *net.TCPConn
conn, err = net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Fatalf("unable to connect to address (%s): %v", emfAddress, err)
}
conns = append(conns, conn)
}

log.Printf("Starting EMF emitters for log group (%s)/stream (%s)", e.logGroupName, e.logStreamName)
startTime := time.Now()
for i := 0; i < threadCount; i++ {
e.wg.Add(1)
go e.start(conns[i%len(conns)])
}
time.Sleep(testRuntime)
close(e.done)
e.wg.Wait()
common.StopAgent()
endTime := time.Now()
log.Println("Stopping EMF emitters")

var gotStreamNames []string
for _, stream := range awsservice.GetLogStreams(e.logGroupName) {
gotStreamNames = append(gotStreamNames, *stream.LogStreamName)
}
assert.Lenf(t, gotStreamNames, 1, "Detected corruption: multiple streams found")
qs := queryString()
log.Printf("Starting query for log group (%s): %s", e.logGroupName, qs)
gotLogQueryStats, err := awsservice.GetLogQueryStats(e.logGroupName, startTime.Unix(), endTime.Unix(), qs)
require.NoError(t, err, "Unable to get log query stats")
assert.NotZero(t, gotLogQueryStats.RecordsScanned, "No records found in CloudWatch Logs")
assert.Zerof(t, gotLogQueryStats.RecordsMatched, "Detected corruption: %v/%v records matched", gotLogQueryStats.RecordsMatched, gotLogQueryStats.RecordsScanned)
}

func queryString() string {
return fmt.Sprintf("filter ispresent(%[1]s) and ispresent(%[2]s) and (%[1]s != %[2]s or (_aws.CloudWatchMetrics.0.Metrics.0.Unit!=%[3]q) or (_aws.CloudWatchMetrics.0.Metrics.1.Unit!=%[3]q))", metricName1, metricName2, metricUnit)
}
109 changes: 109 additions & 0 deletions test/emf_concurrent/emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package emf_concurrent

import (
"encoding/json"
"fmt"
"math/rand"
"net"
"sync"
"time"
)

const (
metadataName = "_aws"
namespace = "ConcurrentEMFTest"
metricName1 = "ExecutionTime"
metricName2 = "DuplicateExecutionTime"
metricValue = 1.23456789
metricUnit = "Seconds"
dimensionName = "Dimension"
randomName = "Random"
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
)

var (
newLineChar = []byte("\n")
)

type Metadata struct {
Timestamp int64 `json:"Timestamp"`
LogGroupName string `json:"LogGroupName"`
LogStreamName string `json:"LogStreamName"`
CloudWatchMetrics []CWMetric `json:"CloudWatchMetrics"`
}

type CWMetric struct {
Namespace string `json:"Namespace"`
Dimensions [][]string `json:"Dimensions"`
Metrics []Metric `json:"Metrics"`
}

type Metric struct {
Name string `json:"Name"`
Unit string `json:"Unit"`
}

type emitter struct {
wg sync.WaitGroup
done chan struct{}
interval time.Duration
logGroupName string
logStreamName string
dimension string
}

func (e *emitter) start(conn *net.TCPConn) {
defer e.wg.Done()
ticker := time.NewTicker(e.interval)
metadata := e.createMetadata()
for {
select {
case <-e.done:
ticker.Stop()
return
case <-ticker.C:
metadata.Timestamp = time.Now().UnixMilli()
_, _ = conn.Write(e.createEmfLog(metadata))
}
}
}

func (e *emitter) createMetadata() *Metadata {
return &Metadata{
Timestamp: time.Now().UnixMilli(),
LogGroupName: e.logGroupName,
LogStreamName: e.logStreamName,
CloudWatchMetrics: []CWMetric{
{
Namespace: namespace,
Dimensions: [][]string{{dimensionName}},
Metrics: []Metric{
{Name: metricName1, Unit: metricUnit},
{Name: metricName2, Unit: metricUnit},
},
},
},
}
}

func (e *emitter) createEmfLog(metadata *Metadata) []byte {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need to done now, but I can see this code to send EMF metrics being useful outside this test case.
Might want to move it to a UTIL at some point. I saw you mentioned there was no EMF package in go to do this... yikes.

r := rand.Intn(99) + 1
emfLog := map[string]interface{}{
metadataName: metadata,
dimensionName: e.dimension,
metricName1: metricValue,
metricName2: metricValue,
// introduces variability in payload size
randomName: fmt.Sprintf("https://www.amazon.com/%s", randString(r)),
}
content, _ := json.Marshal(emfLog)
return append(content, newLineChar...)
}

func randString(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
11 changes: 11 additions & 0 deletions test/emf_concurrent/testdata/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"agent": {
"debug": true
},
"logs": {
"metrics_collected": {
"emf": {
}
}
}
}
41 changes: 39 additions & 2 deletions util/awsservice/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
"github.com/qri-io/jsonschema"
)

const logStreamRetry = 20
const (
logStreamRetry = 20
retryInterval = 10 * time.Second
)

// catch ResourceNotFoundException when deleting the log group and log stream, as these
// are not useful exceptions to log errors on during cleanup
Expand Down Expand Up @@ -144,6 +147,40 @@ func IsLogGroupExists(logGroupName string) bool {
return len(describeLogGroupOutput.LogGroups) > 0
}

// GetLogQueryStats for the log group between start/end (in epoch seconds) for the
// query string.
func GetLogQueryStats(logGroupName string, startTime, endTime int64, queryString string) (*types.QueryStatistics, error) {
output, err := CwlClient.StartQuery(ctx, &cloudwatchlogs.StartQueryInput{
LogGroupName: aws.String(logGroupName),
StartTime: aws.Int64(startTime),
EndTime: aws.Int64(endTime),
QueryString: aws.String(queryString),
})

if err != nil {
return nil, fmt.Errorf("failed to start query for log group (%s): %w", logGroupName, err)
}

time.Sleep(retryInterval)

for {
results, err := CwlClient.GetQueryResults(ctx, &cloudwatchlogs.GetQueryResultsInput{
QueryId: output.QueryId,
})
if err != nil {
return nil, fmt.Errorf("failed to get query results for log group (%s): %w", logGroupName, err)
}
switch results.Status {
case types.QueryStatusFailed, types.QueryStatusCancelled, types.QueryStatusTimeout:
return nil, fmt.Errorf("unexpected query status: %v", results.Status)
case types.QueryStatusScheduled, types.QueryStatusRunning, types.QueryStatusUnknown:
time.Sleep(retryInterval)
case types.QueryStatusComplete:
return results.Statistics, nil
}
}
}

func GetLogStreams(logGroupName string) []types.LogStream {
for i := 0; i < logStreamRetry; i++ {
describeLogStreamsOutput, err := CwlClient.DescribeLogStreams(ctx, &cloudwatchlogs.DescribeLogStreamsInput{
Expand All @@ -162,7 +199,7 @@ func GetLogStreams(logGroupName string) []types.LogStream {
return describeLogStreamsOutput.LogStreams
}

time.Sleep(10 * time.Second)
time.Sleep(retryInterval)
}

return []types.LogStream{}
Expand Down