Skip to content

Commit

Permalink
Add EMF concurrency test to check for log corruption. (#351)
Browse files Browse the repository at this point in the history
  • Loading branch information
jefchien authored Sep 26, 2023
1 parent 0ec07d2 commit bca3359
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 4 deletions.
17 changes: 15 additions & 2 deletions generator/test_case_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type matrixRow struct {
UseSSM bool `json:"useSSM"`
ExcludedTests string `json:"excludedTests"`
MetadataEnabled string `json:"metadataEnabled"`
MaxAttempts int `json:"max_attempts"`
}

type testConfig struct {
Expand All @@ -42,6 +43,8 @@ type testConfig struct {
// define target matrix field as set(s)
// empty map means a testConfig will be created with a test entry for each entry from *_test_matrix.json
targets map[string]map[string]struct{}
// maxAttempts limits the number of times a test will be run.
maxAttempts int
}

const (
Expand All @@ -60,6 +63,11 @@ var testTypeToTestConfig = map[string][]testConfig{
testDir: "./test/metrics_number_dimension",
targets: map[string]map[string]struct{}{"os": {"al2": {}}},
},
{
testDir: "./test/emf_concurrent",
targets: map[string]map[string]struct{}{"os": {"al2": {}}},
maxAttempts: 1,
},
{testDir: "./test/metric_value_benchmark"},
{testDir: "./test/run_as_user"},
{testDir: "./test/collection_interval"},
Expand Down Expand Up @@ -185,7 +193,7 @@ var testTypeToTestConfig = map[string][]testConfig{

func copyAllEC2LinuxTestForOnpremTesting() {
/* Some tests need to be fixed in order to run in both environment, so for now for PoC, run one that works.
testTypeToTestConfig["ec2_linux_onprem"] = testTypeToTestConfig[testTypeKeyEc2Linux]
testTypeToTestConfig["ec2_linux_onprem"] = testTypeToTestConfig[testTypeKeyEc2Linux]
*/
testTypeToTestConfig["ec2_linux_onprem"] = []testConfig{
{
Expand Down Expand Up @@ -224,7 +232,12 @@ func genMatrix(testType string, testConfigs []testConfig) []matrixRow {
testMatrixComplete := make([]matrixRow, 0, len(testMatrix))
for _, test := range testMatrix {
for _, testConfig := range testConfigs {
row := matrixRow{TestDir: testConfig.testDir, TestType: testType, TerraformDir: testConfig.terraformDir}
row := matrixRow{
TestDir: testConfig.testDir,
TestType: testType,
TerraformDir: testConfig.terraformDir,
MaxAttempts: testConfig.maxAttempts,
}
err = mapstructure.Decode(test, &row)
if err != nil {
log.Panicf("can't decode map test %v to metric line struct with error %v", testConfig, err)
Expand Down
89 changes: 89 additions & 0 deletions test/emf_concurrent/emf_concurrent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package emf_concurrent

import (
"fmt"
"log"
"net"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/aws/amazon-cloudwatch-agent-test/environment"
"github.com/aws/amazon-cloudwatch-agent-test/util/awsservice"
"github.com/aws/amazon-cloudwatch-agent-test/util/common"
)

const (
testRuntime = 10 * time.Minute
threadCount = 15
connectionCount = 5
interval = 500 * time.Millisecond
emfAddress = "0.0.0.0:25888"
)

var (
// queryString checks that both metric values are the same and have the same expected unit.
queryString = fmt.Sprintf("filter ispresent(%[1]s) and ispresent(%[2]s) and (%[1]s != %[2]s or (_aws.CloudWatchMetrics.0.Metrics.0.Unit!=%[3]q) or (_aws.CloudWatchMetrics.0.Metrics.1.Unit!=%[3]q))", metricName1, metricName2, metricUnit)
)

func init() {
environment.RegisterEnvironmentMetaDataFlags()
}

func TestConcurrent(t *testing.T) {
env := environment.GetEnvironmentMetaData()

common.CopyFile(filepath.Join("testdata", "config.json"), common.ConfigOutputPath)
require.NoError(t, common.StartAgent(common.ConfigOutputPath, true, false))

// wait for agent to start up
time.Sleep(5 * time.Second)

e := &emitter{
interval: interval,
logGroupName: fmt.Sprintf("emf-test-group-%s", env.InstanceId),
logStreamName: fmt.Sprintf("emf-test-stream-%s", env.InstanceId),
dimension: env.CwaCommitSha,
done: make(chan struct{}),
}

defer awsservice.DeleteLogGroup(e.logGroupName)

tcpAddr, err := net.ResolveTCPAddr("tcp", emfAddress)
if err != nil {
log.Fatalf("invalid tcp emfAddress (%s): %v", emfAddress, err)
}

var conns []*net.TCPConn
for i := 0; i < connectionCount; i++ {
var conn *net.TCPConn
conn, err = net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Fatalf("unable to connect to address (%s): %v", emfAddress, err)
}
conns = append(conns, conn)
}

log.Printf("Starting EMF emitters for log group (%s)/stream (%s)", e.logGroupName, e.logStreamName)
startTime := time.Now()
for i := 0; i < threadCount; i++ {
e.wg.Add(1)
go e.start(conns[i%len(conns)])
}
time.Sleep(testRuntime)
close(e.done)
log.Println("Stopping EMF emitters")
e.wg.Wait()
common.StopAgent()
endTime := time.Now()

assert.Lenf(t, awsservice.GetLogStreamNames(e.logGroupName), 1, "Detected corruption: multiple streams found")
log.Printf("Starting query for log group (%s): %s", e.logGroupName, queryString)
got, err := awsservice.GetLogQueryStats(e.logGroupName, startTime.Unix(), endTime.Unix(), queryString)
require.NoError(t, err, "Unable to get log query stats")
assert.NotZero(t, got.RecordsScanned, "No records found in CloudWatch Logs")
assert.Zerof(t, got.RecordsMatched, "Detected corruption: %v/%v records matched", got.RecordsMatched, got.RecordsScanned)
}
109 changes: 109 additions & 0 deletions test/emf_concurrent/emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package emf_concurrent

import (
"encoding/json"
"fmt"
"math/rand"
"net"
"sync"
"time"
)

const (
metadataName = "_aws"
namespace = "ConcurrentEMFTest"
metricName1 = "ExecutionTime"
metricName2 = "DuplicateExecutionTime"
metricValue = 1.23456789
metricUnit = "Seconds"
dimensionName = "Dimension"
randomName = "Random"
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
)

var (
newLineChar = []byte("\n")
)

type Metadata struct {
Timestamp int64 `json:"Timestamp"`
LogGroupName string `json:"LogGroupName"`
LogStreamName string `json:"LogStreamName"`
CloudWatchMetrics []CWMetric `json:"CloudWatchMetrics"`
}

type CWMetric struct {
Namespace string `json:"Namespace"`
Dimensions [][]string `json:"Dimensions"`
Metrics []Metric `json:"Metrics"`
}

type Metric struct {
Name string `json:"Name"`
Unit string `json:"Unit"`
}

type emitter struct {
wg sync.WaitGroup
done chan struct{}
interval time.Duration
logGroupName string
logStreamName string
dimension string
}

func (e *emitter) start(conn *net.TCPConn) {
defer e.wg.Done()
ticker := time.NewTicker(e.interval)
metadata := e.createMetadata()
for {
select {
case <-e.done:
ticker.Stop()
return
case <-ticker.C:
metadata.Timestamp = time.Now().UnixMilli()
_, _ = conn.Write(e.createEmfLog(metadata))
}
}
}

func (e *emitter) createMetadata() *Metadata {
return &Metadata{
Timestamp: time.Now().UnixMilli(),
LogGroupName: e.logGroupName,
LogStreamName: e.logStreamName,
CloudWatchMetrics: []CWMetric{
{
Namespace: namespace,
Dimensions: [][]string{{dimensionName}},
Metrics: []Metric{
{Name: metricName1, Unit: metricUnit},
{Name: metricName2, Unit: metricUnit},
},
},
},
}
}

func (e *emitter) createEmfLog(metadata *Metadata) []byte {
r := rand.Intn(99) + 1
emfLog := map[string]interface{}{
metadataName: metadata,
dimensionName: e.dimension,
metricName1: metricValue,
metricName2: metricValue,
// introduces variability in payload size
randomName: fmt.Sprintf("https://www.amazon.com/%s", randString(r)),
}
content, _ := json.Marshal(emfLog)
return append(content, newLineChar...)
}

func randString(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
11 changes: 11 additions & 0 deletions test/emf_concurrent/testdata/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"agent": {
"debug": true
},
"logs": {
"metrics_collected": {
"emf": {
}
}
}
}
56 changes: 54 additions & 2 deletions util/awsservice/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
"github.com/qri-io/jsonschema"
)

const logStreamRetry = 20
const (
logStreamRetry = 20
retryInterval = 10 * time.Second
)

// catch ResourceNotFoundException when deleting the log group and log stream, as these
// are not useful exceptions to log errors on during cleanup
Expand Down Expand Up @@ -144,6 +147,47 @@ func IsLogGroupExists(logGroupName string) bool {
return len(describeLogGroupOutput.LogGroups) > 0
}

// GetLogQueryStats for the log group between start/end (in epoch seconds) for the
// query string.
func GetLogQueryStats(logGroupName string, startTime, endTime int64, queryString string) (*types.QueryStatistics, error) {
output, err := CwlClient.StartQuery(ctx, &cloudwatchlogs.StartQueryInput{
LogGroupName: aws.String(logGroupName),
StartTime: aws.Int64(startTime),
EndTime: aws.Int64(endTime),
QueryString: aws.String(queryString),
})

if err != nil {
return nil, fmt.Errorf("failed to start query for log group (%s): %w", logGroupName, err)
}

// Sleep a fixed amount of time after making the query to give it time to
// process the request.
time.Sleep(retryInterval)

var attempts int
for {
results, err := CwlClient.GetQueryResults(ctx, &cloudwatchlogs.GetQueryResultsInput{
QueryId: output.QueryId,
})
if err != nil {
return nil, fmt.Errorf("failed to get query results for log group (%s): %w", logGroupName, err)
}
switch results.Status {
case types.QueryStatusScheduled, types.QueryStatusRunning, types.QueryStatusUnknown:
if attempts >= StandardRetries {
return nil, fmt.Errorf("attempted get query results after %s without success. final status: %v", time.Duration(attempts)*retryInterval, results.Status)
}
attempts++
time.Sleep(retryInterval)
case types.QueryStatusComplete:
return results.Statistics, nil
default:
return nil, fmt.Errorf("unexpected query status: %v", results.Status)
}
}
}

func GetLogStreams(logGroupName string) []types.LogStream {
for i := 0; i < logStreamRetry; i++ {
describeLogStreamsOutput, err := CwlClient.DescribeLogStreams(ctx, &cloudwatchlogs.DescribeLogStreamsInput{
Expand All @@ -162,12 +206,20 @@ func GetLogStreams(logGroupName string) []types.LogStream {
return describeLogStreamsOutput.LogStreams
}

time.Sleep(10 * time.Second)
time.Sleep(retryInterval)
}

return []types.LogStream{}
}

func GetLogStreamNames(logGroupName string) []string {
var logStreamNames []string
for _, stream := range GetLogStreams(logGroupName) {
logStreamNames = append(logStreamNames, *stream.LogStreamName)
}
return logStreamNames
}

type LogEventValidator func(event types.OutputLogEvent) error

type LogEventsValidator func(events []types.OutputLogEvent) error
Expand Down

0 comments on commit bca3359

Please sign in to comment.