Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][testbed] - Further testbed enhancements #36287

Merged
merged 13 commits into from
Nov 13, 2024
9 changes: 8 additions & 1 deletion testbed/testbed/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type BaseOTLPDataReceiver struct {
compression string
retry string
sendingQueue string
timeout string
}

func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error {
Expand Down Expand Up @@ -98,6 +99,11 @@ func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataRec
return bor
}

func (bor *BaseOTLPDataReceiver) WithTimeout(timeout string) *BaseOTLPDataReceiver {
bor.timeout = timeout
return bor
}

func (bor *BaseOTLPDataReceiver) Stop() error {
// we reuse the receiver across signals. Shutting down the log receiver shuts down the metrics and traces receiver.
return bor.logReceiver.Shutdown(context.Background())
Expand All @@ -118,8 +124,9 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string {
endpoint: "%s"
%s
%s
%s
tls:
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue)
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.timeout)
comp := "none"
if bor.compression != "" {
comp = bor.compression
Expand Down
40 changes: 40 additions & 0 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,43 @@ func TestLargeFileOnce(t *testing.T) {
tc.StopAgent()
tc.ValidateData()
}

func TestMemoryLimiterHit(t *testing.T) {
otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
otlpreceiver.WithRetry(`
retry_on_failure:
enabled: true
max_interval: 5s
`)
otlpreceiver.WithQueue(`
sending_queue:
enabled: true
queue_size: 100000
num_consumers: 20
`)
otlpreceiver.WithTimeout(`
timeout: 0s
`)
processors := []ProcessorNameAndConfigBody{
{
Name: "memory_limiter",
Body: `
memory_limiter:
check_interval: 1s
limit_mib: 300
spike_limit_mib: 150
`,
},
}
ScenarioMemoryLimiterHit(
t,
testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)),
otlpreceiver,
testbed.LoadOptions{
DataItemsPerSecond: 100000,
ItemsPerBatch: 1000,
Parallel: 1,
MaxDelay: 20 * time.Second,
},
performanceResultsSummary, 100, processors)
}
89 changes: 89 additions & 0 deletions testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,95 @@ func ScenarioLong(
tc.ValidateData()
}

func ScenarioMemoryLimiterHit(
t *testing.T,
sender testbed.DataSender,
receiver testbed.DataReceiver,
loadOptions testbed.LoadOptions,
resultsSummary testbed.TestResultsSummary,
sleepTime int,
processors []ProcessorNameAndConfigBody,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
fmt.Println(configStr)
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
dataChannel := make(chan bool)
tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.CorrectnessLogTestValidator{},
resultsSummary,
testbed.WithDecisionFunc(func() error { return testbed.GenerateNonPernamentErrorUntil(dataChannel) }),
)
t.Cleanup(tc.Stop)
tc.MockBackend.EnableRecording()

tc.StartBackend()
tc.StartAgent()

tc.StartLoad(loadOptions)

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")

var timer *time.Timer

// check for "Memory usage is above hard limit"
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
tc.WaitForN(func() bool {
logFound := tc.AgentLogsContains("Memory usage is above soft limit. Refusing data.")
if !logFound {
dataChannel <- true
return false
}
// Log found. But keep the collector under stress for 10 more seconds so it starts refusing data
if timer == nil {
timer = time.NewTimer(10 * time.Second)
}
select {
case <-timer.C:
default:
return false
}
close(dataChannel)
return logFound
}, time.Second*time.Duration(sleepTime), "memory limit not hit")

// check if data started to be received successfully
tc.WaitForN(func() bool {
return tc.MockBackend.DataItemsReceived() > 0
}, time.Second*time.Duration(sleepTime), "data started to be successfully received")

// stop sending any more data
tc.StopLoad()

tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, time.Second*time.Duration(sleepTime), "all logs received")

tc.WaitForN(func() bool {
// get IDs from logs to retry
logsToRetry := getLogsID(tc.MockBackend.LogsToRetry)

// get IDs from logs received successfully
successfulLogs := getLogsID(tc.MockBackend.ReceivedLogs)

// check if all the logs to retry were actually retried
logsWereRetried := allElementsExistInSlice(logsToRetry, successfulLogs)
return logsWereRetried
}, time.Second*time.Duration(sleepTime), "all logs were retried successfully")

tc.StopAgent()
tc.ValidateData()
}

func constructLoadOptions(test TestCase) testbed.LoadOptions {
options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10}
options.Attributes = make(map[string]string)
Expand Down