Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EMF concurrency test to check for log corruption. #351

Merged
merged 3 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions generator/test_case_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type matrixRow struct {
UseSSM bool `json:"useSSM"`
ExcludedTests string `json:"excludedTests"`
MetadataEnabled string `json:"metadataEnabled"`
MaxAttempts int `json:"max_attempts"`
jefchien marked this conversation as resolved.
Show resolved Hide resolved
}

type testConfig struct {
Expand All @@ -42,6 +43,8 @@ type testConfig struct {
// define target matrix field as set(s)
// empty map means a testConfig will be created with a test entry for each entry from *_test_matrix.json
targets map[string]map[string]struct{}
// maxAttempts limits the number of times a test will be run.
maxAttempts int
}

const (
Expand All @@ -60,6 +63,11 @@ var testTypeToTestConfig = map[string][]testConfig{
testDir: "./test/metrics_number_dimension",
targets: map[string]map[string]struct{}{"os": {"al2": {}}},
},
{
testDir: "./test/emf_concurrent",
targets: map[string]map[string]struct{}{"os": {"al2": {}}},
maxAttempts: 1,
},
{testDir: "./test/metric_value_benchmark"},
{testDir: "./test/run_as_user"},
{testDir: "./test/collection_interval"},
Expand Down Expand Up @@ -185,7 +193,7 @@ var testTypeToTestConfig = map[string][]testConfig{

func copyAllEC2LinuxTestForOnpremTesting() {
/* Some tests need to be fixed in order to run in both environment, so for now for PoC, run one that works.
testTypeToTestConfig["ec2_linux_onprem"] = testTypeToTestConfig[testTypeKeyEc2Linux]
testTypeToTestConfig["ec2_linux_onprem"] = testTypeToTestConfig[testTypeKeyEc2Linux]
*/
testTypeToTestConfig["ec2_linux_onprem"] = []testConfig{
{
Expand Down Expand Up @@ -224,7 +232,12 @@ func genMatrix(testType string, testConfigs []testConfig) []matrixRow {
testMatrixComplete := make([]matrixRow, 0, len(testMatrix))
for _, test := range testMatrix {
for _, testConfig := range testConfigs {
row := matrixRow{TestDir: testConfig.testDir, TestType: testType, TerraformDir: testConfig.terraformDir}
row := matrixRow{
TestDir: testConfig.testDir,
TestType: testType,
TerraformDir: testConfig.terraformDir,
MaxAttempts: testConfig.maxAttempts,
}
err = mapstructure.Decode(test, &row)
if err != nil {
log.Panicf("can't decode map test %v to metric line struct with error %v", testConfig, err)
Expand Down
89 changes: 89 additions & 0 deletions test/emf_concurrent/emf_concurrent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package emf_concurrent

import (
"fmt"
"log"
"net"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/aws/amazon-cloudwatch-agent-test/environment"
"github.com/aws/amazon-cloudwatch-agent-test/util/awsservice"
"github.com/aws/amazon-cloudwatch-agent-test/util/common"
)

const (
testRuntime = 10 * time.Minute
threadCount = 15
connectionCount = 5
interval = 500 * time.Millisecond
emfAddress = "0.0.0.0:25888"
)

var (
// queryString checks that both metric values are the same and have the same expected unit.
queryString = fmt.Sprintf("filter ispresent(%[1]s) and ispresent(%[2]s) and (%[1]s != %[2]s or (_aws.CloudWatchMetrics.0.Metrics.0.Unit!=%[3]q) or (_aws.CloudWatchMetrics.0.Metrics.1.Unit!=%[3]q))", metricName1, metricName2, metricUnit)
)

func init() {
environment.RegisterEnvironmentMetaDataFlags()
}

func TestConcurrent(t *testing.T) {
env := environment.GetEnvironmentMetaData()

common.CopyFile(filepath.Join("testdata", "config.json"), common.ConfigOutputPath)
require.NoError(t, common.StartAgent(common.ConfigOutputPath, true, false))

// wait for agent to start up
time.Sleep(5 * time.Second)

e := &emitter{
interval: interval,
logGroupName: fmt.Sprintf("emf-test-group-%s", env.InstanceId),
logStreamName: fmt.Sprintf("emf-test-stream-%s", env.InstanceId),
dimension: env.CwaCommitSha,
done: make(chan struct{}),
}

defer awsservice.DeleteLogGroup(e.logGroupName)

tcpAddr, err := net.ResolveTCPAddr("tcp", emfAddress)
if err != nil {
log.Fatalf("invalid tcp emfAddress (%s): %v", emfAddress, err)
}

var conns []*net.TCPConn
for i := 0; i < connectionCount; i++ {
jefchien marked this conversation as resolved.
Show resolved Hide resolved
var conn *net.TCPConn
conn, err = net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Fatalf("unable to connect to address (%s): %v", emfAddress, err)
}
conns = append(conns, conn)
}

log.Printf("Starting EMF emitters for log group (%s)/stream (%s)", e.logGroupName, e.logStreamName)
startTime := time.Now()
for i := 0; i < threadCount; i++ {
e.wg.Add(1)
go e.start(conns[i%len(conns)])
jefchien marked this conversation as resolved.
Show resolved Hide resolved
}
time.Sleep(testRuntime)
close(e.done)
log.Println("Stopping EMF emitters")
e.wg.Wait()
common.StopAgent()
endTime := time.Now()

assert.Lenf(t, awsservice.GetLogStreamNames(e.logGroupName), 1, "Detected corruption: multiple streams found")
log.Printf("Starting query for log group (%s): %s", e.logGroupName, queryString)
got, err := awsservice.GetLogQueryStats(e.logGroupName, startTime.Unix(), endTime.Unix(), queryString)
require.NoError(t, err, "Unable to get log query stats")
assert.NotZero(t, got.RecordsScanned, "No records found in CloudWatch Logs")
assert.Zerof(t, got.RecordsMatched, "Detected corruption: %v/%v records matched", got.RecordsMatched, got.RecordsScanned)
}
109 changes: 109 additions & 0 deletions test/emf_concurrent/emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package emf_concurrent

import (
"encoding/json"
"fmt"
"math/rand"
"net"
"sync"
"time"
)

const (
metadataName = "_aws"
namespace = "ConcurrentEMFTest"
metricName1 = "ExecutionTime"
metricName2 = "DuplicateExecutionTime"
metricValue = 1.23456789
metricUnit = "Seconds"
dimensionName = "Dimension"
randomName = "Random"
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
)

var (
newLineChar = []byte("\n")
)

type Metadata struct {
Timestamp int64 `json:"Timestamp"`
LogGroupName string `json:"LogGroupName"`
LogStreamName string `json:"LogStreamName"`
CloudWatchMetrics []CWMetric `json:"CloudWatchMetrics"`
}

type CWMetric struct {
Namespace string `json:"Namespace"`
Dimensions [][]string `json:"Dimensions"`
Metrics []Metric `json:"Metrics"`
}

type Metric struct {
Name string `json:"Name"`
Unit string `json:"Unit"`
}

type emitter struct {
wg sync.WaitGroup
done chan struct{}
interval time.Duration
logGroupName string
logStreamName string
dimension string
}

func (e *emitter) start(conn *net.TCPConn) {
defer e.wg.Done()
ticker := time.NewTicker(e.interval)
metadata := e.createMetadata()
for {
select {
case <-e.done:
ticker.Stop()
return
case <-ticker.C:
metadata.Timestamp = time.Now().UnixMilli()
_, _ = conn.Write(e.createEmfLog(metadata))
}
}
}

func (e *emitter) createMetadata() *Metadata {
return &Metadata{
Timestamp: time.Now().UnixMilli(),
LogGroupName: e.logGroupName,
LogStreamName: e.logStreamName,
CloudWatchMetrics: []CWMetric{
{
Namespace: namespace,
Dimensions: [][]string{{dimensionName}},
Metrics: []Metric{
{Name: metricName1, Unit: metricUnit},
{Name: metricName2, Unit: metricUnit},
},
},
},
}
}

func (e *emitter) createEmfLog(metadata *Metadata) []byte {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need to done now, but I can see this code to send EMF metrics being useful outside this test case.
Might want to move it to a UTIL at some point. I saw you mentioned there was no EMF package in go to do this... yikes.

r := rand.Intn(99) + 1
emfLog := map[string]interface{}{
metadataName: metadata,
dimensionName: e.dimension,
metricName1: metricValue,
metricName2: metricValue,
// introduces variability in payload size
randomName: fmt.Sprintf("https://www.amazon.com/%s", randString(r)),
}
content, _ := json.Marshal(emfLog)
return append(content, newLineChar...)
}

func randString(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
11 changes: 11 additions & 0 deletions test/emf_concurrent/testdata/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"agent": {
"debug": true
},
"logs": {
"metrics_collected": {
"emf": {
}
}
}
}
56 changes: 54 additions & 2 deletions util/awsservice/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
"github.com/qri-io/jsonschema"
)

const logStreamRetry = 20
const (
logStreamRetry = 20
retryInterval = 10 * time.Second
)

// catch ResourceNotFoundException when deleting the log group and log stream, as these
// are not useful exceptions to log errors on during cleanup
Expand Down Expand Up @@ -144,6 +147,47 @@ func IsLogGroupExists(logGroupName string) bool {
return len(describeLogGroupOutput.LogGroups) > 0
}

// GetLogQueryStats for the log group between start/end (in epoch seconds) for the
// query string.
func GetLogQueryStats(logGroupName string, startTime, endTime int64, queryString string) (*types.QueryStatistics, error) {
output, err := CwlClient.StartQuery(ctx, &cloudwatchlogs.StartQueryInput{
LogGroupName: aws.String(logGroupName),
StartTime: aws.Int64(startTime),
EndTime: aws.Int64(endTime),
QueryString: aws.String(queryString),
})

if err != nil {
return nil, fmt.Errorf("failed to start query for log group (%s): %w", logGroupName, err)
}

// Sleep a fixed amount of time after making the query to give it time to
// process the request.
time.Sleep(retryInterval)

var attempts int
for {
results, err := CwlClient.GetQueryResults(ctx, &cloudwatchlogs.GetQueryResultsInput{
QueryId: output.QueryId,
})
if err != nil {
return nil, fmt.Errorf("failed to get query results for log group (%s): %w", logGroupName, err)
}
switch results.Status {
case types.QueryStatusScheduled, types.QueryStatusRunning, types.QueryStatusUnknown:
if attempts >= StandardRetries {
return nil, fmt.Errorf("attempted get query results after %s without success. final status: %v", time.Duration(attempts)*retryInterval, results.Status)
}
attempts++
time.Sleep(retryInterval)
jefchien marked this conversation as resolved.
Show resolved Hide resolved
case types.QueryStatusComplete:
return results.Statistics, nil
default:
return nil, fmt.Errorf("unexpected query status: %v", results.Status)
}
}
}

func GetLogStreams(logGroupName string) []types.LogStream {
for i := 0; i < logStreamRetry; i++ {
describeLogStreamsOutput, err := CwlClient.DescribeLogStreams(ctx, &cloudwatchlogs.DescribeLogStreamsInput{
Expand All @@ -162,12 +206,20 @@ func GetLogStreams(logGroupName string) []types.LogStream {
return describeLogStreamsOutput.LogStreams
}

time.Sleep(10 * time.Second)
time.Sleep(retryInterval)
}

return []types.LogStream{}
}

func GetLogStreamNames(logGroupName string) []string {
var logStreamNames []string
for _, stream := range GetLogStreams(logGroupName) {
logStreamNames = append(logStreamNames, *stream.LogStreamName)
}
return logStreamNames
}

type LogEventValidator func(event types.OutputLogEvent) error

type LogEventsValidator func(events []types.OutputLogEvent) error
Expand Down