diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index 6db874822f94..3c7b161e2547 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -55,7 +55,6 @@ type BaseOTLPDataReceiver struct { compression string retry string sendingQueue string - timeout string } func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error { @@ -99,11 +98,6 @@ func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataRec return bor } -func (bor *BaseOTLPDataReceiver) WithTimeout(timeout string) *BaseOTLPDataReceiver { - bor.timeout = timeout - return bor -} - func (bor *BaseOTLPDataReceiver) Stop() error { // we reuse the receiver across signals. Shutting down the log receiver shuts down the metrics and traces receiver. return bor.logReceiver.Shutdown(context.Background()) @@ -124,9 +118,8 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string { endpoint: "%s" %s %s - %s tls: - insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.timeout) + insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue) comp := "none" if bor.compression != "" { comp = bor.compression diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index 727fde8141ea..8b44f83f670a 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -369,39 +369,3 @@ func TestLargeFileOnce(t *testing.T) { tc.StopAgent() tc.ValidateData() } - -func TestMemoryLimiterHit(t *testing.T) { - otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) - otlpreceiver.WithRetry(` - retry_on_failure: - enabled: true - max_interval: 5s -`) - otlpreceiver.WithQueue(` - sending_queue: - enabled: true - queue_size: 100000 - num_consumers: 20 -`) - otlpreceiver.WithTimeout(` - timeout: 0s -`) - processors := map[string]string{ - "memory_limiter": ` - memory_limiter: - check_interval: 1s - limit_mib: 300 - spike_limit_mib: 150 -`, - } - ScenarioMemoryLimiterHit( - t, - testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)), - otlpreceiver, - testbed.LoadOptions{ - DataItemsPerSecond: 100000, - ItemsPerBatch: 1000, - Parallel: 1, - }, - performanceResultsSummary, 100, processors) -} diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index 8a3d5f694662..85973f89b29a 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -552,95 +552,6 @@ func ScenarioLong( tc.ValidateData() } -func ScenarioMemoryLimiterHit( - t *testing.T, - sender testbed.DataSender, - receiver testbed.DataReceiver, - loadOptions testbed.LoadOptions, - resultsSummary testbed.TestResultsSummary, - sleepTime int, - processors map[string]string, -) { - resultDir, err := filepath.Abs(path.Join("results", t.Name())) - require.NoError(t, err) - - agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2")) - - configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil) - fmt.Println(configStr) - configCleanup, err := agentProc.PrepareConfig(configStr) - require.NoError(t, err) - defer configCleanup() - dataProvider := testbed.NewPerfTestDataProvider(loadOptions) - dataChannel := make(chan bool) - tc := testbed.NewTestCase( - t, - dataProvider, - sender, - receiver, - agentProc, - &testbed.CorrectnessLogTestValidator{}, - resultsSummary, - testbed.WithDecisionFunc(func() error { return testbed.GenerateNonPernamentErrorUntil(dataChannel) }), - ) - t.Cleanup(tc.Stop) - tc.MockBackend.EnableRecording() - - tc.StartBackend() - tc.StartAgent() - - tc.StartLoad(loadOptions) - - tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started") - - var timer *time.Timer - - // check for "Memory usage is above hard limit" - tc.WaitForN(func() bool { - logFound := tc.AgentLogsContains("Memory usage is above soft limit. Refusing data.") - if !logFound { - dataChannel <- true - return false - } - // Log found. But keep the collector under stress for 10 more seconds so it starts refusing data - if timer == nil { - timer = time.NewTimer(10 * time.Second) - } - select { - case <-timer.C: - default: - return false - } - close(dataChannel) - return logFound - }, time.Second*time.Duration(sleepTime), "memory limit not hit") - - // check if data started to be received successfully - tc.WaitForN(func() bool { - return tc.MockBackend.DataItemsReceived() > 0 - }, time.Second*time.Duration(sleepTime), "data started to be successfully received") - - // stop sending any more data - tc.StopLoad() - - tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, time.Second*time.Duration(sleepTime), "all logs received") - - tc.WaitForN(func() bool { - // get IDs from logs to retry - logsToRetry := getLogsID(tc.MockBackend.LogsToRetry) - - // get IDs from logs received successfully - successfulLogs := getLogsID(tc.MockBackend.ReceivedLogs) - - // check if all the logs to retry were actually retried - logsWereRetried := allElementsExistInSlice(logsToRetry, successfulLogs) - return logsWereRetried - }, time.Second*time.Duration(sleepTime), "all logs were retried successfully") - - tc.StopAgent() - tc.ValidateData() -} - func constructLoadOptions(test TestCase) testbed.LoadOptions { options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10} options.Attributes = make(map[string]string)