Skip to content

Commit

Permalink
Restructed Xray Test
Browse files Browse the repository at this point in the history
  • Loading branch information
okankoAMZ committed Oct 5, 2023
1 parent 4fec4fd commit 3651eea
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 89 deletions.
1 change: 1 addition & 0 deletions mockserver/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
62 changes: 35 additions & 27 deletions mockserver/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Amazon.com, Inc. or its affiliates
// Copyright 2023 Amazon.com, Inc. or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@ package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
Expand All @@ -39,92 +38,101 @@ var (
KeyFilePath = path.Join("certificates", "private.key")
)

type transactionStore struct {
type transactionHttpServer struct {
transactions uint32
startTime time.Time
}

type TransactionPayload struct {
TransactionsPerMinute float64 `json:"tpm"`
TransactionsPerMinute float64 `json:"GetNumberOfTransactionsPerMinute"`
}

func healthCheck(w http.ResponseWriter, _ *http.Request) {
func healthCheck(w http.ResponseWriter, _ *http.Request) {
if _, err := io.WriteString(w, HealthCheckMessage); err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Printf("Unable to write response: %v", err)
return
}
w.WriteHeader(http.StatusOK)
}

func (ts *transactionStore) checkData(w http.ResponseWriter, _ *http.Request) {
func (ts *transactionHttpServer) checkTransactionCount(w http.ResponseWriter, _ *http.Request) {
var message string
var t = atomic.LoadUint32(&ts.transactions)
if t > 0 {
message = SuccessMessage
}
fmt.Printf("\033[31m Time: %d | checkData msg: %s | %d\033[0m \n", time.Now().Unix(), message, t)
log.Printf("\033[31m Time: %d | checkTransactionCount msg: %s | %d\033[0m \n", time.Now().Unix(), message, t)
if _, err := io.WriteString(w, message); err != nil {
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, err.Error())
log.Printf("Unable to write response: %v", err)
return
}
w.WriteHeader(http.StatusOK)
}

func (ts *transactionStore) dataReceived(w http.ResponseWriter, _ *http.Request) {
func (ts *transactionHttpServer) incomingDataHandler(w http.ResponseWriter, _ *http.Request) {
atomic.AddUint32(&ts.transactions, 1)

// Built-in latency
fmt.Printf("\033[31m Time: %d | data Received \033[0m \n", time.Now().Unix())
log.Printf("\033[31m Time: %d | data Received \033[0m \n", time.Now().Unix())
time.Sleep(15 * time.Millisecond)
w.WriteHeader(http.StatusOK)
}

// Retrieve number of transactions per minute
func (ts *transactionStore) tpm(w http.ResponseWriter, _ *http.Request) {
func (ts *transactionHttpServer) GetNumberOfTransactionsPerMinute(w http.ResponseWriter, _ *http.Request) {
// Calculate duration in minutes
duration := time.Now().Sub(ts.startTime)
transactions := float64(atomic.LoadUint32(&ts.transactions))
tpm := transactions / duration.Minutes()

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(TransactionPayload{tpm}); err != nil {
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, err.Error())
log.Printf("Unable to write response: %v", err)
}
}

// Starts an HTTPS server that receives requests for the data handler service at the sample server port
// Starts an HTTP server that receives request from validator only to verify the data ingestion
func StartHttpServer() {
var wg sync.WaitGroup
log.Println("\033[31m Starting Server \033[0m")
store := transactionStore{startTime: time.Now()}
store := transactionHttpServer{startTime: time.Now()}
//2 servers one for receiving the data , one for verify data
dataApp := mux.NewRouter()
daemonServer := &http.Server{Addr: ":443", Handler: dataApp}
verifyApp := http.NewServeMux()
appServer := &http.Server{Addr: ":8080", Handler: verifyApp}
go func(ts *transactionStore) {
dataReceiverServer := &http.Server{Addr: ":443", Handler: dataApp}
verificationRequestServer := http.NewServeMux()
appServer := &http.Server{Addr: ":8080", Handler: verificationRequestServer}
go func(ts *transactionHttpServer) {
wg.Add(1)
defer wg.Done()
dataApp.HandleFunc("/ping", healthCheck)
dataApp.PathPrefix("/put-data").HandlerFunc(ts.dataReceived)
dataApp.HandleFunc("/trace/v1", ts.dataReceived)
dataApp.HandleFunc("/metric/v1", ts.dataReceived)
if err := daemonServer.ListenAndServeTLS(CertFilePath, KeyFilePath); err != nil {
dataApp.PathPrefix("/put-data").HandlerFunc(ts.incomingDataHandler)
dataApp.HandleFunc("/trace/v1", ts.incomingDataHandler)
dataApp.HandleFunc("/metric/v1", ts.incomingDataHandler)
if err := dataReceiverServer.ListenAndServeTLS(CertFilePath, KeyFilePath); err != nil {
log.Printf("HTTPS server error: %v", err)
err = daemonServer.Shutdown(context.TODO())
err = dataReceiverServer.Shutdown(context.TODO())
log.Fatalf("Shutdown server error: %v", err)
}
}(&store)

go func(ts *transactionStore) {
go func(ts *transactionHttpServer) {
wg.Add(1)
verifyApp.HandleFunc("/ping", healthCheck)
verifyApp.HandleFunc("/check-data", ts.checkData)
verifyApp.HandleFunc("/tpm", ts.tpm)
defer wg.Done()
verificationRequestServer.HandleFunc("/ping", healthCheck)
verificationRequestServer.HandleFunc("/check-data", ts.checkTransactionCount)
verificationRequestServer.HandleFunc("/tpm", ts.GetNumberOfTransactionsPerMinute)
if err := appServer.ListenAndServe(); err != nil {
log.Printf("Verification server error: %v", err)
err := appServer.Shutdown(context.TODO())
log.Fatalf("Shuwdown server error: %v", err)
log.Fatalf("Shutdown server error: %v", err)
}
}(&store)
wg.Done()
wg.Wait()
log.Println("\033[32m Stopping Server \033[0m")
}

Expand Down
9 changes: 4 additions & 5 deletions terraform/performance/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,11 @@ resource "null_resource" "validator_linux" {
provisioner "remote-exec" {
inline = [
#mock server dependencies getting transfered.
# todo make this optional with a env var.
"git clone https://github.com/aws/amazon-cloudwatch-agent-test.git",
"cd amazon-cloudwatch-agent-test && git checkout xray-performance-test",
"git clone --branch ${var.github_test_repo_branch} ${var.github_test_repo}",
"cd amazon-cloudwatch-agent-test && git checkout xray-performance-test", #@TODO remove before PR merge
var.run_mock_server ? "cd mockserver && sudo docker build -t mockserver . && cd .." : "echo skipping mock server build",
var.run_mock_server ? "sudo docker run --name mockserver -d -p 8080:8080 -p 443:443 mockserver" : "echo skipping mock server",
"cd ..",
var.run_mock_server ? "sudo docker run --name mockserver -d -p 8080:8080 -p 443:443 mockserver" : "echo skipping mock server run",
"cd ..", # return to root , two copy xray configs next to validator
"cp -r amazon-cloudwatch-agent-test/test/xray/resources /home/ec2-user/",
"export AWS_REGION=${var.region}",
"sudo chmod +x ./${local.install_validator}",
Expand Down
9 changes: 9 additions & 0 deletions terraform/performance/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ variable "family" {
error_message = "Valid values for family are (windows, linux)."
}
}
variable "github_test_repo" {
type = string
default = "https://github.com/aws/amazon-cloudwatch-agent-test.git"
}

variable "github_test_repo_branch" {
type = string
default = "main"
}
variable "run_mock_server" {
type = bool
default = false
Expand Down
5 changes: 3 additions & 2 deletions test/otlp/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"time"

"github.com/aws/amazon-cloudwatch-agent-test/environment"
"github.com/aws/amazon-cloudwatch-agent-test/util/common"
"github.com/aws/amazon-cloudwatch-agent-test/util/common/traces/common"
"github.com/aws/amazon-cloudwatch-agent-test/util/common/traces/otlp"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
)
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestTraces(t *testing.T) {
t.Run(name, func(t *testing.T) {

OtlpTestCfg := common.TraceTestConfig{
Generator: NewLoadGenerator(testCase.generatorConfig),
Generator: otlp.NewLoadGenerator(testCase.generatorConfig),
Name: name,
AgentConfigPath: testCase.agentConfigPath,
AgentRuntime: agentRuntime,
Expand Down
5 changes: 3 additions & 2 deletions test/xray/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"time"

"github.com/aws/amazon-cloudwatch-agent-test/environment"
"github.com/aws/amazon-cloudwatch-agent-test/util/common"
"github.com/aws/amazon-cloudwatch-agent-test/util/common/traces/common"
"github.com/aws/amazon-cloudwatch-agent-test/util/common/traces/xray"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -54,7 +55,7 @@ func TestTraces(t *testing.T) {

t.Run(name, func(t *testing.T) {
XrayTestCfg := common.TraceTestConfig{
Generator: NewLoadGenerator(testCase.generatorConfig),
Generator: xray.NewLoadGenerator(testCase.generatorConfig),
Name: name,
AgentConfigPath: testCase.agentConfigPath,
AgentRuntime: agentRuntime,
Expand Down
16 changes: 8 additions & 8 deletions util/common/traces.go → util/common/traces/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package common
import (
"context"
"encoding/json"
"reflect"
"testing"
"time"

"github.com/aws/amazon-cloudwatch-agent-test/util/awsservice"
"github.com/aws/amazon-cloudwatch-agent-test/util/common"
"github.com/aws/aws-sdk-go-v2/service/xray/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"reflect"
"testing"
"time"
)

const (
Expand Down Expand Up @@ -54,15 +54,15 @@ type TraceGeneratorInterface interface {
func TraceTest(t *testing.T, traceTest TraceTestConfig) error {
t.Helper()
startTime := time.Now()
CopyFile(traceTest.AgentConfigPath, ConfigOutputPath)
require.NoError(t, StartAgent(ConfigOutputPath, true, false), "Couldn't Start the agent")
common.CopyFile(traceTest.AgentConfigPath, common.ConfigOutputPath)
require.NoError(t, common.StartAgent(common.ConfigOutputPath, true, false), "Couldn't Start the agent")
go func() {
require.NoError(t, traceTest.Generator.StartSendingTraces(context.Background()), "load generator exited with error")
}()
time.Sleep(traceTest.AgentRuntime)
traceTest.Generator.StopSendingTraces()
time.Sleep(AGENT_SHUTDOWN_DELAY)
StopAgent()
common.StopAgent()
testsGenerated, testsEnded := traceTest.Generator.GetSegmentCount()
t.Logf("For %s , Test Cases Generated %d | Test Cases Ended: %d", traceTest.Name, testsGenerated, testsEnded)
endTime := time.Now()
Expand Down Expand Up @@ -113,7 +113,7 @@ func SegmentValidationTest(t *testing.T, traceTest TraceTestConfig, segments []t

}
func GenerateTraces(traceTest TraceTestConfig) error {
CopyFile(traceTest.AgentConfigPath, ConfigOutputPath)
common.CopyFile(traceTest.AgentConfigPath, common.ConfigOutputPath)
go func() {
traceTest.Generator.StartSendingTraces(context.Background())
}()
Expand Down
43 changes: 43 additions & 0 deletions util/common/traces/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package traces

import (
"fmt"
"github.com/aws/amazon-cloudwatch-agent-test/util/common/traces/common"
"github.com/aws/amazon-cloudwatch-agent-test/util/common/traces/xray"
"time"
)

func StartTraceGeneration(receiver string, agentConfigPath string, agentRuntime time.Duration, traceSendingInterval time.Duration) error {
cfg := common.TraceTestConfig{
Generator: nil,
Name: "",
AgentConfigPath: agentConfigPath,
AgentRuntime: agentRuntime,
}
xrayGenCfg := common.TraceGeneratorConfig{
Interval: traceSendingInterval,
Annotations: map[string]interface{}{
"test_type": "simple_otlp",
},
Metadata: map[string]map[string]interface{}{
"default": {
"nested": map[string]interface{}{
"key": "value",
},
},
"custom_namespace": {
"custom_key": "custom_value",
},
},
}
switch receiver {
case "xray":
cfg.Generator = xray.NewLoadGenerator(&xrayGenCfg)
cfg.Name = "xray-performance-test"
case "otlp":
default:
return fmt.Errorf("%s is not supported.", receiver)
}
err := common.GenerateTraces(cfg)
return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
"time"

"github.com/aws/amazon-cloudwatch-agent-test/util/common"
"github.com/aws/amazon-cloudwatch-agent-test/util/common/traces/common"
"go.opentelemetry.io/contrib/propagators/aws/xray"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package xray
import (
"context"
"errors"
"github.com/aws/amazon-cloudwatch-agent-test/util/common/traces/common"
"github.com/aws/aws-xray-sdk-go/strategy/sampling"
"github.com/aws/aws-xray-sdk-go/xray"
"github.com/aws/aws-xray-sdk-go/xraylog"
"log"
"os"
"path"
"time"
"github.com/aws/amazon-cloudwatch-agent-test/util/common"
"github.com/aws/aws-xray-sdk-go/strategy/sampling"
"github.com/aws/aws-xray-sdk-go/xray"
"github.com/aws/aws-xray-sdk-go/xraylog"
)

var generatorError = errors.New("Generator error")
Expand Down Expand Up @@ -56,7 +56,6 @@ func NewLoadGenerator(cfg *common.TraceGeneratorConfig) *XrayTracesGenerator {
}
func (g *XrayTracesGenerator) Generate(ctx context.Context) error {
rootCtx, root := xray.BeginSegment(ctx, "load-generator")
log.Println("\033[34mGenerated Trace\033[0m")
g.SegmentsGenerationCount++
defer func() {
root.Close(nil)
Expand Down
3 changes: 2 additions & 1 deletion validator/validators/basic/basic_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/aws/amazon-cloudwatch-agent-test/util/awsservice"
"github.com/aws/amazon-cloudwatch-agent-test/util/common"
"github.com/aws/amazon-cloudwatch-agent-test/util/common/traces"
"github.com/aws/amazon-cloudwatch-agent-test/validator/models"
"github.com/aws/amazon-cloudwatch-agent-test/validator/validators/util"
)
Expand Down Expand Up @@ -50,7 +51,7 @@ func (s *BasicValidator) GenerateLoad() error {
case "logs":
return common.StartLogWrite(agentConfigFilePath, agentCollectionPeriod, metricSendingInterval, dataRate)
case "traces":
return util.StartTraceGeneration(receiver,agentConfigFilePath,agentCollectionPeriod,metricSendingInterval)
return traces.StartTraceGeneration(receiver, agentConfigFilePath, agentCollectionPeriod, metricSendingInterval)
default:
// Sending metrics based on the receivers; however, for scraping plugin (e.g prometheus), we would need to scrape it instead of sending
return common.StartSendingMetrics(receiver, agentCollectionPeriod, metricSendingInterval, dataRate, logGroup, metricNamespace)
Expand Down
Loading

0 comments on commit 3651eea

Please sign in to comment.