Skip to content

Commit

Permalink
Merge branch 'aws:main' into xray-performance-test
Browse files Browse the repository at this point in the history
  • Loading branch information
okankoAMZ committed Oct 5, 2023
2 parents 0d6c843 + bca3359 commit 4fec4fd
Show file tree
Hide file tree
Showing 13 changed files with 533 additions and 30 deletions.
18 changes: 16 additions & 2 deletions generator/test_case_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type matrixRow struct {
UseSSM bool `json:"useSSM"`
ExcludedTests string `json:"excludedTests"`
MetadataEnabled string `json:"metadataEnabled"`
MaxAttempts int `json:"max_attempts"`
}

type testConfig struct {
Expand All @@ -43,6 +44,8 @@ type testConfig struct {
// define target matrix field as set(s)
// empty map means a testConfig will be created with a test entry for each entry from *_test_matrix.json
targets map[string]map[string]struct{}
// maxAttempts limits the number of times a test will be run.
maxAttempts int
}

const (
Expand All @@ -61,6 +64,11 @@ var testTypeToTestConfig = map[string][]testConfig{
testDir: "./test/metrics_number_dimension",
targets: map[string]map[string]struct{}{"os": {"al2": {}}},
},
{
testDir: "./test/emf_concurrent",
targets: map[string]map[string]struct{}{"os": {"al2": {}}},
maxAttempts: 1,
},
{testDir: "./test/metric_value_benchmark"},
{testDir: "./test/run_as_user"},
{testDir: "./test/collection_interval"},
Expand Down Expand Up @@ -113,6 +121,7 @@ var testTypeToTestConfig = map[string][]testConfig{
{testDir: "../../../test/feature/windows"},
{testDir: "../../../test/restart"},
{testDir: "../../../test/acceptance"},
{testDir: "../../../test/feature/windows/event_logs"},
// assume role test doesn't add much value, and it already being tested with linux
//{testDir: "../../../test/assume_role"},
},
Expand Down Expand Up @@ -186,7 +195,7 @@ var testTypeToTestConfig = map[string][]testConfig{

func copyAllEC2LinuxTestForOnpremTesting() {
/* Some tests need to be fixed in order to run in both environment, so for now for PoC, run one that works.
testTypeToTestConfig["ec2_linux_onprem"] = testTypeToTestConfig[testTypeKeyEc2Linux]
testTypeToTestConfig["ec2_linux_onprem"] = testTypeToTestConfig[testTypeKeyEc2Linux]
*/
testTypeToTestConfig["ec2_linux_onprem"] = []testConfig{
{
Expand Down Expand Up @@ -225,7 +234,12 @@ func genMatrix(testType string, testConfigs []testConfig) []matrixRow {
testMatrixComplete := make([]matrixRow, 0, len(testMatrix))
for _, test := range testMatrix {
for _, testConfig := range testConfigs {
row := matrixRow{TestDir: testConfig.testDir, TestType: testType, TerraformDir: testConfig.terraformDir}
row := matrixRow{
TestDir: testConfig.testDir,
TestType: testType,
TerraformDir: testConfig.terraformDir,
MaxAttempts: testConfig.maxAttempts,
}
err = mapstructure.Decode(test, &row)
if err != nil {
log.Panicf("can't decode map test %v to metric line struct with error %v", testConfig, err)
Expand Down
2 changes: 1 addition & 1 deletion terraform/ec2/win/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,4 @@ data "aws_ami" "latest" {
name = "name"
values = [var.ami]
}
}
}
9 changes: 4 additions & 5 deletions terraform/setup/vpc.tf
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,19 @@ resource "aws_security_group" "ec2_security_group" {
}

egress {
from_port = 6379
to_port = 6379
from_port = 9121
to_port = 9121
protocol = "TCP"
cidr_blocks = ["0.0.0.0/0"]
}

egress {
from_port = 9121
to_port = 9121
from_port = 4566
to_port = 4566
protocol = "TCP"
cidr_blocks = ["0.0.0.0/0"]
}


ingress {
from_port = 6379
to_port = 6379
Expand Down
110 changes: 92 additions & 18 deletions test/cloudwatchlogs/publish_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"fmt"
"log"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -22,11 +24,12 @@ import (
)

const (
configOutputPath = "/opt/aws/amazon-cloudwatch-agent/bin/config.json"
logLineId1 = "foo"
logLineId2 = "bar"
logFilePath = "/tmp/test.log" // TODO: not sure how well this will work on Windows
agentRuntime = 20 * time.Second // default flush interval is 5 seconds
configOutputPath = "/opt/aws/amazon-cloudwatch-agent/bin/config.json"
logLineId1 = "foo"
logLineId2 = "bar"
logFilePath = "/tmp/cwagent_log_test.log" // TODO: not sure how well this will work on Windows
sleepForFlush = 20 * time.Second // default flush interval is 5 seconds
configPathAutoRemoval = "resources/config_auto_removal.json"
)

var logLineIds = []string{logLineId1, logLineId2}
Expand Down Expand Up @@ -86,17 +89,10 @@ func TestWriteLogsToCloudWatch(t *testing.T) {

// ensure that there is enough time from the "start" time and the first log line,
// so we don't miss it in the GetLogEvents call
time.Sleep(agentRuntime)
writeLogs(t, f, param.iterations)
time.Sleep(agentRuntime)
time.Sleep(sleepForFlush)
writeLogLines(t, f, param.iterations)
time.Sleep(sleepForFlush)
common.StopAgent()

agentLog, err := common.RunCommand(common.CatCommand + common.AgentLogFile)
if err != nil {
return
}
t.Logf("Agent logs %s", agentLog)

end := time.Now()

// check CWL to ensure we got the expected number of logs in the log stream
Expand All @@ -113,6 +109,84 @@ func TestWriteLogsToCloudWatch(t *testing.T) {
}
}

func autoRemovalTestCleanup() {
instanceId := awsservice.GetInstanceId()
awsservice.DeleteLogGroupAndStream(instanceId, instanceId)
paths, _ := filepath.Glob(logFilePath + "*")
for _, p := range paths {
_ = os.Remove(p)
}
}

// checkData queries CWL and verifies the number of log lines.
func checkData(t *testing.T, start time.Time, lineCount int) {
end := time.Now()
// Sleep to ensure backend stores logs.
time.Sleep(time.Second * 60)
instanceId := awsservice.GetInstanceId()
err := awsservice.ValidateLogs(
instanceId,
instanceId,
&start,
&end,
// *2 because 2 lines per loop
awsservice.AssertLogsCount(lineCount),
awsservice.AssertNoDuplicateLogs(),
)
assert.NoError(t, err)

}

func writeSleepRestart(t *testing.T, f *os.File, configPath string, linesPerLoop int, doRestart bool) {
if doRestart {
common.StartAgent(configPath, true, false)
}
// Sleep to ensure agent detects file before it is written.
time.Sleep(sleepForFlush)
writeLogLines(t, f, linesPerLoop)
time.Sleep(sleepForFlush)
if doRestart {
common.StopAgent()
}
c, _ := filepath.Glob(logFilePath + "*")
assert.Equal(t, 1, len(c))
}

// TestAutoRemovalStopAgent configures agent to monitor a file with auto removal on.
// Then it restarts the agent.
// Verify the file is NOT removed.
func TestAutoRemovalStopAgent(t *testing.T) {
defer autoRemovalTestCleanup()
f, _ := os.Create(logFilePath + "1")
defer f.Close()
// Restart the agent multiple times.
loopCount := 5
linesPerLoop := 1000
start := time.Now()
for i := 0; i < loopCount; i++ {
writeSleepRestart(t, f, configPathAutoRemoval, linesPerLoop, true)
}
checkData(t, start, loopCount*linesPerLoop*2)
}

// TestAutoRemovalFileRotation repeatedly creates files matching the monitored pattern.
// After creating each file, write some log lines, sleep and verify previous_file was auto removed.
// Retrieve LogEvents from CWL and verify all log lines were uploaded.
func TestAutoRemovalFileRotation(t *testing.T) {
defer autoRemovalTestCleanup()
common.StartAgent(configPathAutoRemoval, true, false)
loopCount := 5
linesPerLoop := 1000
start := time.Now()
for i := 0; i < loopCount; i++ {
// Create new file each minute and run for 5 minutes.
f, _ := os.Create(logFilePath + strconv.Itoa(i))
defer f.Close()
writeSleepRestart(t, f, configPathAutoRemoval, linesPerLoop, false)
}
checkData(t, start, loopCount*linesPerLoop*2)
}

// TestRotatingLogsDoesNotSkipLines validates https://github.com/aws/amazon-cloudwatch-agent/issues/447
// The following should happen in the test:
// 1. A log line of size N should be written
Expand All @@ -136,11 +210,11 @@ func TestRotatingLogsDoesNotSkipLines(t *testing.T) {

// ensure that there is enough time from the "start" time and the first log line,
// so we don't miss it in the GetLogEvents call
time.Sleep(agentRuntime)
time.Sleep(sleepForFlush)
t.Log("Writing logs and rotating")
// execute the script used in the repro case
common.RunCommand("/usr/bin/python3 resources/write_and_rotate_logs.py")
time.Sleep(agentRuntime)
time.Sleep(sleepForFlush)
common.StopAgent()

// These expected log lines are created using resources/write_and_rotate_logs.py,
Expand Down Expand Up @@ -176,7 +250,7 @@ func TestRotatingLogsDoesNotSkipLines(t *testing.T) {
assert.NoError(t, err)
}

func writeLogs(t *testing.T, f *os.File, iterations int) {
func writeLogLines(t *testing.T, f *os.File, iterations int) {
log.Printf("Writing %d lines to %s", iterations*len(logLineIds), f.Name())

for i := 0; i < iterations; i++ {
Expand Down
20 changes: 20 additions & 0 deletions test/cloudwatchlogs/resources/config_auto_removal.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"agent": {
"debug": true
},
"logs": {
"logs_collected": {
"files": {
"collect_list": [
{
"file_path": "/tmp/cwagent_log_test.log*",
"log_group_name": "{instance_id}",
"log_stream_name": "{instance_id}",
"timezone": "UTC",
"auto_removal": true
}
]
}
}
}
}
2 changes: 1 addition & 1 deletion test/cloudwatchlogs/resources/config_log.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"files": {
"collect_list": [
{
"file_path": "/tmp/test.log",
"file_path": "/tmp/cwagent_log_test.log",
"log_group_name": "{instance_id}",
"log_stream_name": "{instance_id}",
"timezone": "UTC"
Expand Down
2 changes: 1 addition & 1 deletion test/cloudwatchlogs/resources/config_log_filter.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"files": {
"collect_list": [
{
"file_path": "/tmp/test.log",
"file_path": "/tmp/cwagent_log_test.log",
"log_group_name": "{instance_id}",
"log_stream_name": "{instance_id}",
"timezone": "UTC",
Expand Down
89 changes: 89 additions & 0 deletions test/emf_concurrent/emf_concurrent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package emf_concurrent

import (
"fmt"
"log"
"net"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/aws/amazon-cloudwatch-agent-test/environment"
"github.com/aws/amazon-cloudwatch-agent-test/util/awsservice"
"github.com/aws/amazon-cloudwatch-agent-test/util/common"
)

const (
testRuntime = 10 * time.Minute
threadCount = 15
connectionCount = 5
interval = 500 * time.Millisecond
emfAddress = "0.0.0.0:25888"
)

var (
// queryString checks that both metric values are the same and have the same expected unit.
queryString = fmt.Sprintf("filter ispresent(%[1]s) and ispresent(%[2]s) and (%[1]s != %[2]s or (_aws.CloudWatchMetrics.0.Metrics.0.Unit!=%[3]q) or (_aws.CloudWatchMetrics.0.Metrics.1.Unit!=%[3]q))", metricName1, metricName2, metricUnit)
)

func init() {
environment.RegisterEnvironmentMetaDataFlags()
}

func TestConcurrent(t *testing.T) {
env := environment.GetEnvironmentMetaData()

common.CopyFile(filepath.Join("testdata", "config.json"), common.ConfigOutputPath)
require.NoError(t, common.StartAgent(common.ConfigOutputPath, true, false))

// wait for agent to start up
time.Sleep(5 * time.Second)

e := &emitter{
interval: interval,
logGroupName: fmt.Sprintf("emf-test-group-%s", env.InstanceId),
logStreamName: fmt.Sprintf("emf-test-stream-%s", env.InstanceId),
dimension: env.CwaCommitSha,
done: make(chan struct{}),
}

defer awsservice.DeleteLogGroup(e.logGroupName)

tcpAddr, err := net.ResolveTCPAddr("tcp", emfAddress)
if err != nil {
log.Fatalf("invalid tcp emfAddress (%s): %v", emfAddress, err)
}

var conns []*net.TCPConn
for i := 0; i < connectionCount; i++ {
var conn *net.TCPConn
conn, err = net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Fatalf("unable to connect to address (%s): %v", emfAddress, err)
}
conns = append(conns, conn)
}

log.Printf("Starting EMF emitters for log group (%s)/stream (%s)", e.logGroupName, e.logStreamName)
startTime := time.Now()
for i := 0; i < threadCount; i++ {
e.wg.Add(1)
go e.start(conns[i%len(conns)])
}
time.Sleep(testRuntime)
close(e.done)
log.Println("Stopping EMF emitters")
e.wg.Wait()
common.StopAgent()
endTime := time.Now()

assert.Lenf(t, awsservice.GetLogStreamNames(e.logGroupName), 1, "Detected corruption: multiple streams found")
log.Printf("Starting query for log group (%s): %s", e.logGroupName, queryString)
got, err := awsservice.GetLogQueryStats(e.logGroupName, startTime.Unix(), endTime.Unix(), queryString)
require.NoError(t, err, "Unable to get log query stats")
assert.NotZero(t, got.RecordsScanned, "No records found in CloudWatch Logs")
assert.Zerof(t, got.RecordsMatched, "Detected corruption: %v/%v records matched", got.RecordsMatched, got.RecordsScanned)
}
Loading

0 comments on commit 4fec4fd

Please sign in to comment.