diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index a928a08261161..bd4f52cdc1872 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -55,6 +55,7 @@ type BaseOTLPDataReceiver struct { compression string retry string sendingQueue string + timeout string } func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error { @@ -98,6 +99,11 @@ func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataRec return bor } +func (bor *BaseOTLPDataReceiver) WithTimeout(timeout string) *BaseOTLPDataReceiver { + bor.timeout = timeout + return bor +} + func (bor *BaseOTLPDataReceiver) Stop() error { // we reuse the receiver across signals. Shutting down the log receiver shuts down the metrics and traces receiver. return bor.logReceiver.Shutdown(context.Background()) @@ -118,8 +124,9 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string { endpoint: "%s" %s %s + %s tls: - insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue) + insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.timeout) comp := "none" if bor.compression != "" { comp = bor.compression diff --git a/testbed/testbed/senders.go b/testbed/testbed/senders.go index bc534ebc57c63..abe930baf645a 100644 --- a/testbed/testbed/senders.go +++ b/testbed/testbed/senders.go @@ -222,7 +222,6 @@ func (ods *otlpDataSender) fillConfig(cfg *otlpexporter.Config) *otlpexporter.Co cfg.TLSSetting = configtls.ClientConfig{ Insecure: true, } - cfg.Timeout = 0 return cfg } diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index 210b6ef8928d5..0c6ccf44165ba 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -356,17 +356,22 @@ func TestMemoryLimiterHit(t *testing.T) { otlpreceiver.WithRetry(` retry_on_failure: enabled: true + max_interval: 5s `) otlpreceiver.WithQueue(` sending_queue: enabled: true - queue_size: 10000 + queue_size: 100000 + num_consumers: 20 +`) + otlpreceiver.WithTimeout(` + timeout: 0s `) processors := map[string]string{ "memory_limiter": ` memory_limiter: check_interval: 1s - limit_mib: 500 + limit_mib: 200 spike_limit_mib: 100 `, } diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index 0ada849e99dbc..3aa3fe3e979e5 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -579,6 +579,7 @@ func ScenarioMemoryLimiterHit( testbed.WithDecisionFunc(func() error { return testbed.GenerateNonPernamentErrorUntil(dataChannel) }), ) t.Cleanup(tc.Stop) + tc.MockBackend.EnableRecording() tc.StartBackend() tc.StartAgent() @@ -586,15 +587,26 @@ func ScenarioMemoryLimiterHit( tc.StartLoad(loadOptions) tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started") - // searchFunc checks for "sending queue is full" communicate and sends the signal to GenerateNonPernamentErrorUntil - // to generate only successes from that time on + + var timer *time.Timer + + // check for "Memory usage is above hard limit" tc.WaitForN(func() bool { - logFound := tc.AgentLogsContains("Memory usage is above hard limit. Forcing a GC.") + logFound := tc.AgentLogsContains("Memory usage is above soft limit. Refusing data.") if !logFound { dataChannel <- true return false + } else { + // Log found. But keep the collector under stress for 10 more seconds so it starts refusing data + if timer == nil { + timer = time.NewTimer(10 * time.Second) + } + select { + case <-timer.C: + default: + return false + } } - tc.WaitFor(func() bool { return tc.MockBackend.DataItemsReceived() == 0 }, "no data successfully received before an error") close(dataChannel) return logFound }, time.Second*time.Duration(sleepTime), "memory limit not hit") @@ -604,7 +616,10 @@ func ScenarioMemoryLimiterHit( return tc.MockBackend.DataItemsReceived() > 0 }, time.Second*time.Duration(sleepTime), "data started to be successfully received") - tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, "all logs received") + // stop sending any more data + tc.StopLoad() + + tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, time.Second*time.Duration(sleepTime), "all logs received") tc.WaitForN(func() bool { // get IDs from logs to retry @@ -618,9 +633,7 @@ func ScenarioMemoryLimiterHit( return logsWereRetried }, time.Second*time.Duration(sleepTime), "all logs were retried successfully") - tc.StopLoad() tc.StopAgent() - tc.ValidateData() }