diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index 3c7b161e2547..6db874822f94 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -55,6 +55,7 @@ type BaseOTLPDataReceiver struct { compression string retry string sendingQueue string + timeout string } func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error { @@ -98,6 +99,11 @@ func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataRec return bor } +func (bor *BaseOTLPDataReceiver) WithTimeout(timeout string) *BaseOTLPDataReceiver { + bor.timeout = timeout + return bor +} + func (bor *BaseOTLPDataReceiver) Stop() error { // we reuse the receiver across signals. Shutting down the log receiver shuts down the metrics and traces receiver. return bor.logReceiver.Shutdown(context.Background()) @@ -118,8 +124,9 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string { endpoint: "%s" %s %s + %s tls: - insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue) + insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.timeout) comp := "none" if bor.compression != "" { comp = bor.compression diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index 41cb83ed0797..2756ce2ffe97 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -368,3 +368,43 @@ func TestLargeFileOnce(t *testing.T) { tc.StopAgent() tc.ValidateData() } + +func TestMemoryLimiterHit(t *testing.T) { + otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) + otlpreceiver.WithRetry(` + retry_on_failure: + enabled: true + max_interval: 5s +`) + otlpreceiver.WithQueue(` + sending_queue: + enabled: true + queue_size: 100000 + num_consumers: 20 +`) + otlpreceiver.WithTimeout(` + timeout: 0s +`) + processors := []ProcessorNameAndConfigBody{ + { + Name: "memory_limiter", + Body: ` + memory_limiter: + check_interval: 1s + limit_mib: 300 + spike_limit_mib: 150 +`, + }, + } + ScenarioMemoryLimiterHit( + t, + testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)), + otlpreceiver, + testbed.LoadOptions{ + DataItemsPerSecond: 100000, + ItemsPerBatch: 1000, + Parallel: 1, + MaxDelay: 20 * time.Second, + }, + performanceResultsSummary, 100, processors) +} diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index 6e8efe229a4e..a2e3ea868a5c 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -548,6 +548,94 @@ func ScenarioLong( tc.ValidateData() } +func ScenarioMemoryLimiterHit( + t *testing.T, + sender testbed.DataSender, + receiver testbed.DataReceiver, + loadOptions testbed.LoadOptions, + resultsSummary testbed.TestResultsSummary, + sleepTime int, + processors []ProcessorNameAndConfigBody, +) { + resultDir, err := filepath.Abs(path.Join("results", t.Name())) + require.NoError(t, err) + + agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2")) + + configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil) + configCleanup, err := agentProc.PrepareConfig(configStr) + require.NoError(t, err) + defer configCleanup() + dataProvider := testbed.NewPerfTestDataProvider(loadOptions) + dataChannel := make(chan bool) + tc := testbed.NewTestCase( + t, + dataProvider, + sender, + receiver, + agentProc, + &testbed.CorrectnessLogTestValidator{}, + resultsSummary, + testbed.WithDecisionFunc(func() error { return testbed.GenerateNonPernamentErrorUntil(dataChannel) }), + ) + t.Cleanup(tc.Stop) + tc.MockBackend.EnableRecording() + + tc.StartBackend() + tc.StartAgent() + + tc.StartLoad(loadOptions) + + tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started") + + var timer *time.Timer + + // check for "Memory usage is above soft limit" + tc.WaitForN(func() bool { + logFound := tc.AgentLogsContains("Memory usage is above soft limit. Refusing data.") + if !logFound { + dataChannel <- true + return false + } + // Log found. But keep the collector under stress for 10 more seconds so it starts refusing data + if timer == nil { + timer = time.NewTimer(10 * time.Second) + } + select { + case <-timer.C: + default: + return false + } + close(dataChannel) + return logFound + }, time.Second*time.Duration(sleepTime), "memory limit not hit") + + // check if data started to be received successfully + tc.WaitForN(func() bool { + return tc.MockBackend.DataItemsReceived() > 0 + }, time.Second*time.Duration(sleepTime), "data started to be successfully received") + + // stop sending any more data + tc.StopLoad() + + tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, time.Second*time.Duration(sleepTime), "all logs received") + + tc.WaitForN(func() bool { + // get IDs from logs to retry + logsToRetry := getLogsID(tc.MockBackend.LogsToRetry) + + // get IDs from logs received successfully + successfulLogs := getLogsID(tc.MockBackend.ReceivedLogs) + + // check if all the logs to retry were actually retried + logsWereRetried := allElementsExistInSlice(logsToRetry, successfulLogs) + return logsWereRetried + }, time.Second*time.Duration(sleepTime), "all logs were retried successfully") + + tc.StopAgent() + tc.ValidateData() +} + func constructLoadOptions(test TestCase) testbed.LoadOptions { options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10} options.Attributes = make(map[string]string)