Skip to content

Commit

Permalink
Restructed Xray Test
Browse files Browse the repository at this point in the history
  • Loading branch information
okankoAMZ committed Oct 6, 2023
1 parent 4fec4fd commit 8a81870
Show file tree
Hide file tree
Showing 16 changed files with 175 additions and 115 deletions.
2 changes: 1 addition & 1 deletion generator/test_case_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ var testTypeToTestConfig = map[string][]testConfig{
{testDir: "../../test/performance/system"},
{testDir: "../../test/performance/statsd"},
{testDir: "../../test/performance/collectd"},
{testDir: "../../test/performance/trace/xray",runMockServer: true},
{testDir: "../../test/performance/trace/xray", runMockServer: true},
},
"ec2_windows_performance": {
{testDir: "../../test/performance/windows/logs"},
Expand Down
3 changes: 0 additions & 3 deletions mockserver/README

This file was deleted.

41 changes: 41 additions & 0 deletions mockserver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# The Mock Server

## Overview

The Mock Server is a simple server designed for receiving metric and trace data, providing a simulated endpoint for testing purposes. It listens on two separate ports: 8080 and 443.
## Running the server
This server is runs as a docker container to run this server:
1. First build the docker container with
```sh
sudo docker build -t mockserver .
```
2. Run the container by mapping the ports you would like to use, for example:
```sh
sudo docker run --name mockserver -d -p 8080:8080 -p 443:443 mockserver
```

## How it Works
### The Receiver

The receiver component of the Mock Server operates on port 443. It is responsible for receiving messages and incrementing the transaction count. To simulate real-world conditions, there is a built-in 15ms latency between each received message. The data received can be sent to three possible routes:

- **Check Receiver Status:** You can check if the receiver is alive by making a request to `/ping`.

- **Send Data:** Use the `/put-data` route to send data. This route supports two sub-routes:
- `/put-data/trace/v1`: Use this sub-route for sending trace data.
- `/put-data/metrics`: Use this sub-route for sending metrics data.

> [!Important]
> Currently, both traces and metrics are handled in the same way.
### The Verifier

The verifier component can be accessed via a listener on port 8080. It provides information about the transactions, including:

- **Transactions per Minute:** You can obtain the transactions per minute by making a request to `/tpm`.

- **Transaction Count:** To check the total transaction count, use the `/check-data` route.

- **Verifier Status:** Determine if the verification server is alive by sending a request to `/ping`.


1 change: 1 addition & 0 deletions mockserver/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
65 changes: 36 additions & 29 deletions mockserver/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Amazon.com, Inc. or its affiliates
// Copyright 2023 Amazon.com, Inc. or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@ package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
Expand All @@ -39,92 +38,100 @@ var (
KeyFilePath = path.Join("certificates", "private.key")
)

type transactionStore struct {
type transactionHttpServer struct {
transactions uint32
startTime time.Time
}

type TransactionPayload struct {
TransactionsPerMinute float64 `json:"tpm"`
TransactionsPerMinute float64 `json:"GetNumberOfTransactionsPerMinute"`
}

func healthCheck(w http.ResponseWriter, _ *http.Request) {
func healthCheck(w http.ResponseWriter, _ *http.Request) {
if _, err := io.WriteString(w, HealthCheckMessage); err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Printf("Unable to write response: %v", err)
return
}
w.WriteHeader(http.StatusOK)
}

func (ts *transactionStore) checkData(w http.ResponseWriter, _ *http.Request) {
func (ts *transactionHttpServer) checkTransactionCount(w http.ResponseWriter, _ *http.Request) {
var message string
var t = atomic.LoadUint32(&ts.transactions)
if t > 0 {
message = SuccessMessage
}
fmt.Printf("\033[31m Time: %d | checkData msg: %s | %d\033[0m \n", time.Now().Unix(), message, t)
log.Printf("\033[31m Time: %d | checkTransactionCount msg: %s | %d\033[0m \n", time.Now().Unix(), message, t)
if _, err := io.WriteString(w, message); err != nil {
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, err.Error())
log.Printf("Unable to write response: %v", err)
return
}
w.WriteHeader(http.StatusOK)
}

func (ts *transactionStore) dataReceived(w http.ResponseWriter, _ *http.Request) {
func (ts *transactionHttpServer) recordTransaction(w http.ResponseWriter, _ *http.Request) {
atomic.AddUint32(&ts.transactions, 1)

// Built-in latency
fmt.Printf("\033[31m Time: %d | data Received \033[0m \n", time.Now().Unix())
log.Printf("\033[31m Time: %s | transaction received \033[0m \n", time.Now().String())
time.Sleep(15 * time.Millisecond)
w.WriteHeader(http.StatusOK)
}

// Retrieve number of transactions per minute
func (ts *transactionStore) tpm(w http.ResponseWriter, _ *http.Request) {
func (ts *transactionHttpServer) GetNumberOfTransactionsPerMinute(w http.ResponseWriter, _ *http.Request) {
// Calculate duration in minutes
duration := time.Now().Sub(ts.startTime)
transactions := float64(atomic.LoadUint32(&ts.transactions))
tpm := transactions / duration.Minutes()

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(TransactionPayload{tpm}); err != nil {
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, err.Error())
log.Printf("Unable to write response: %v", err)
}
}

// Starts an HTTPS server that receives requests for the data handler service at the sample server port
// Starts an HTTP server that receives request from validator only to verify the data ingestion
func StartHttpServer() {
var wg sync.WaitGroup
log.Println("\033[31m Starting Server \033[0m")
store := transactionStore{startTime: time.Now()}
store := transactionHttpServer{startTime: time.Now()}
//2 servers one for receiving the data , one for verify data
dataApp := mux.NewRouter()
daemonServer := &http.Server{Addr: ":443", Handler: dataApp}
verifyApp := http.NewServeMux()
appServer := &http.Server{Addr: ":8080", Handler: verifyApp}
go func(ts *transactionStore) {
wg.Add(1)
dataReceiverServer := &http.Server{Addr: ":443", Handler: dataApp}
verificationRequestServer := http.NewServeMux()
appServer := &http.Server{Addr: ":8080", Handler: verificationRequestServer}
wg.Add(2)
go func(ts *transactionHttpServer) {
defer wg.Done()
dataApp.HandleFunc("/ping", healthCheck)
dataApp.PathPrefix("/put-data").HandlerFunc(ts.dataReceived)
dataApp.HandleFunc("/trace/v1", ts.dataReceived)
dataApp.HandleFunc("/metric/v1", ts.dataReceived)
if err := daemonServer.ListenAndServeTLS(CertFilePath, KeyFilePath); err != nil {
dataApp.PathPrefix("/put-data").HandlerFunc(ts.recordTransaction)
dataApp.HandleFunc("/trace/v1", ts.recordTransaction)
dataApp.HandleFunc("/metric/v1", ts.recordTransaction)
if err := dataReceiverServer.ListenAndServeTLS(CertFilePath, KeyFilePath); err != nil {
log.Printf("HTTPS server error: %v", err)
err = daemonServer.Shutdown(context.TODO())
err = dataReceiverServer.Shutdown(context.TODO())
log.Fatalf("Shutdown server error: %v", err)
}
}(&store)

go func(ts *transactionStore) {
wg.Add(1)
verifyApp.HandleFunc("/ping", healthCheck)
verifyApp.HandleFunc("/check-data", ts.checkData)
verifyApp.HandleFunc("/tpm", ts.tpm)
go func(ts *transactionHttpServer) {
defer wg.Done()
verificationRequestServer.HandleFunc("/ping", healthCheck)
verificationRequestServer.HandleFunc("/check-data", ts.checkTransactionCount)
verificationRequestServer.HandleFunc("/tpm", ts.GetNumberOfTransactionsPerMinute)
if err := appServer.ListenAndServe(); err != nil {
log.Printf("Verification server error: %v", err)
err := appServer.Shutdown(context.TODO())
log.Fatalf("Shuwdown server error: %v", err)
log.Fatalf("Shutdown server error: %v", err)
}
}(&store)
wg.Done()
wg.Wait()
log.Println("\033[32m Stopping Server \033[0m")
}

Expand Down
8 changes: 3 additions & 5 deletions terraform/performance/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,10 @@ resource "null_resource" "validator_linux" {
provisioner "remote-exec" {
inline = [
#mock server dependencies getting transfered.
# todo make this optional with a env var.
"git clone https://github.com/aws/amazon-cloudwatch-agent-test.git",
"cd amazon-cloudwatch-agent-test && git checkout xray-performance-test",
"git clone --branch ${var.github_test_repo_branch} ${var.github_test_repo}",
var.run_mock_server ? "cd mockserver && sudo docker build -t mockserver . && cd .." : "echo skipping mock server build",
var.run_mock_server ? "sudo docker run --name mockserver -d -p 8080:8080 -p 443:443 mockserver" : "echo skipping mock server",
"cd ..",
var.run_mock_server ? "sudo docker run --name mockserver -d -p 8080:8080 -p 443:443 mockserver" : "echo skipping mock server run",
"cd ..", # return to root , two copy xray configs next to validator
"cp -r amazon-cloudwatch-agent-test/test/xray/resources /home/ec2-user/",
"export AWS_REGION=${var.region}",
"sudo chmod +x ./${local.install_validator}",
Expand Down
9 changes: 9 additions & 0 deletions terraform/performance/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ variable "family" {
error_message = "Valid values for family are (windows, linux)."
}
}
variable "github_test_repo" {
type = string
default = "https://github.com/aws/amazon-cloudwatch-agent-test.git"
}

variable "github_test_repo_branch" {
type = string
default = "main"
}
variable "run_mock_server" {
type = bool
default = false
Expand Down
13 changes: 7 additions & 6 deletions test/otlp/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"time"

"github.com/aws/amazon-cloudwatch-agent-test/environment"
"github.com/aws/amazon-cloudwatch-agent-test/util/common"
"github.com/aws/amazon-cloudwatch-agent-test/util/common/traces/base"
"github.com/aws/amazon-cloudwatch-agent-test/util/common/traces/otlp"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
)
Expand All @@ -25,11 +26,11 @@ func TestTraces(t *testing.T) {
env := environment.GetEnvironmentMetaData()
testCases := map[string]struct {
agentConfigPath string
generatorConfig *common.TraceGeneratorConfig
generatorConfig *base.TraceGeneratorConfig
}{
"WithOTLP/Simple": {
agentConfigPath: filepath.Join("resources", "otlp-config.json"),
generatorConfig: &common.TraceGeneratorConfig{
generatorConfig: &base.TraceGeneratorConfig{
Interval: loadGeneratorInterval,
Annotations: map[string]interface{}{
"test_type": "simple_otlp",
Expand All @@ -55,13 +56,13 @@ func TestTraces(t *testing.T) {

t.Run(name, func(t *testing.T) {

OtlpTestCfg := common.TraceTestConfig{
Generator: NewLoadGenerator(testCase.generatorConfig),
OtlpTestCfg := base.TraceTestConfig{
Generator: otlp.NewLoadGenerator(testCase.generatorConfig),
Name: name,
AgentConfigPath: testCase.agentConfigPath,
AgentRuntime: agentRuntime,
}
err := common.TraceTest(t, OtlpTestCfg)
err := base.TraceTest(t, OtlpTestCfg)
require.NoError(t, err, "TraceTest failed because %s", err)

})
Expand Down
13 changes: 7 additions & 6 deletions test/xray/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"time"

"github.com/aws/amazon-cloudwatch-agent-test/environment"
"github.com/aws/amazon-cloudwatch-agent-test/util/common"
"github.com/aws/amazon-cloudwatch-agent-test/util/common/traces/base"
"github.com/aws/amazon-cloudwatch-agent-test/util/common/traces/xray"
"github.com/stretchr/testify/require"
)

Expand All @@ -26,11 +27,11 @@ func TestTraces(t *testing.T) {
env := environment.GetEnvironmentMetaData()
testCases := map[string]struct {
agentConfigPath string
generatorConfig *common.TraceGeneratorConfig
generatorConfig *base.TraceGeneratorConfig
}{
"WithXray/Simple": {
agentConfigPath: filepath.Join("resources", "xray-config.json"),
generatorConfig: &common.TraceGeneratorConfig{
generatorConfig: &base.TraceGeneratorConfig{
Interval: loadGeneratorInterval,
Annotations: map[string]interface{}{
"test_type": "simple_xray",
Expand All @@ -53,13 +54,13 @@ func TestTraces(t *testing.T) {
for name, testCase := range testCases {

t.Run(name, func(t *testing.T) {
XrayTestCfg := common.TraceTestConfig{
Generator: NewLoadGenerator(testCase.generatorConfig),
XrayTestCfg := base.TraceTestConfig{
Generator: xray.NewLoadGenerator(testCase.generatorConfig),
Name: name,
AgentConfigPath: testCase.agentConfigPath,
AgentRuntime: agentRuntime,
}
err := common.TraceTest(t, XrayTestCfg)
err := base.TraceTest(t, XrayTestCfg)
require.NoError(t, err, "TraceTest failed because %s", err)

})
Expand Down
18 changes: 9 additions & 9 deletions util/common/traces.go → util/common/traces/base/base.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package common
package base

import (
"context"
"encoding/json"
"reflect"
"testing"
"time"

"github.com/aws/amazon-cloudwatch-agent-test/util/awsservice"
"github.com/aws/amazon-cloudwatch-agent-test/util/common"
"github.com/aws/aws-sdk-go-v2/service/xray/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"reflect"
"testing"
"time"
)

const (
Expand Down Expand Up @@ -54,15 +54,15 @@ type TraceGeneratorInterface interface {
func TraceTest(t *testing.T, traceTest TraceTestConfig) error {
t.Helper()
startTime := time.Now()
CopyFile(traceTest.AgentConfigPath, ConfigOutputPath)
require.NoError(t, StartAgent(ConfigOutputPath, true, false), "Couldn't Start the agent")
common.CopyFile(traceTest.AgentConfigPath, common.ConfigOutputPath)
require.NoError(t, common.StartAgent(common.ConfigOutputPath, true, false), "Couldn't Start the agent")
go func() {
require.NoError(t, traceTest.Generator.StartSendingTraces(context.Background()), "load generator exited with error")
}()
time.Sleep(traceTest.AgentRuntime)
traceTest.Generator.StopSendingTraces()
time.Sleep(AGENT_SHUTDOWN_DELAY)
StopAgent()
common.StopAgent()
testsGenerated, testsEnded := traceTest.Generator.GetSegmentCount()
t.Logf("For %s , Test Cases Generated %d | Test Cases Ended: %d", traceTest.Name, testsGenerated, testsEnded)
endTime := time.Now()
Expand Down Expand Up @@ -113,7 +113,7 @@ func SegmentValidationTest(t *testing.T, traceTest TraceTestConfig, segments []t

}
func GenerateTraces(traceTest TraceTestConfig) error {
CopyFile(traceTest.AgentConfigPath, ConfigOutputPath)
common.CopyFile(traceTest.AgentConfigPath, common.ConfigOutputPath)
go func() {
traceTest.Generator.StartSendingTraces(context.Background())
}()
Expand Down
Loading

0 comments on commit 8a81870

Please sign in to comment.