From 8eff3b11cba984b0bc9a58acf88385651054e0df Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Mon, 16 Sep 2024 12:20:45 +0530 Subject: [PATCH 1/6] initial commit --- testbed/testbed/senders.go | 1 + testbed/tests/log_test.go | 31 +++++++++++++++ testbed/tests/scenarios.go | 77 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+) diff --git a/testbed/testbed/senders.go b/testbed/testbed/senders.go index abe930baf645..bc534ebc57c6 100644 --- a/testbed/testbed/senders.go +++ b/testbed/testbed/senders.go @@ -222,6 +222,7 @@ func (ods *otlpDataSender) fillConfig(cfg *otlpexporter.Config) *otlpexporter.Co cfg.TLSSetting = configtls.ClientConfig{ Insecure: true, } + cfg.Timeout = 0 return cfg } diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index 2d1a0ae1b481..210b6ef8928d 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -350,3 +350,34 @@ func TestLargeFileOnce(t *testing.T) { tc.StopAgent() tc.ValidateData() } + +func TestMemoryLimiterHit(t *testing.T) { + otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)) + otlpreceiver.WithRetry(` + retry_on_failure: + enabled: true +`) + otlpreceiver.WithQueue(` + sending_queue: + enabled: true + queue_size: 10000 +`) + processors := map[string]string{ + "memory_limiter": ` + memory_limiter: + check_interval: 1s + limit_mib: 500 + spike_limit_mib: 100 +`, + } + ScenarioMemoryLimiterHit( + t, + testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)), + otlpreceiver, + testbed.LoadOptions{ + DataItemsPerSecond: 100000, + ItemsPerBatch: 1000, + Parallel: 1, + }, + performanceResultsSummary, 100, processors) +} diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index 78128d612d7e..0ada849e99db 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -547,6 +547,83 @@ func ScenarioLong( tc.ValidateData() } +func ScenarioMemoryLimiterHit( + t *testing.T, + sender testbed.DataSender, + receiver testbed.DataReceiver, + loadOptions testbed.LoadOptions, + resultsSummary testbed.TestResultsSummary, + sleepTime int, + processors map[string]string, +) { + resultDir, err := filepath.Abs(path.Join("results", t.Name())) + require.NoError(t, err) + + agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2")) + + configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil) + fmt.Println(configStr) + configCleanup, err := agentProc.PrepareConfig(configStr) + require.NoError(t, err) + defer configCleanup() + dataProvider := testbed.NewPerfTestDataProvider(loadOptions) + dataChannel := make(chan bool) + tc := testbed.NewTestCase( + t, + dataProvider, + sender, + receiver, + agentProc, + &testbed.CorrectnessLogTestValidator{}, + resultsSummary, + testbed.WithDecisionFunc(func() error { return testbed.GenerateNonPernamentErrorUntil(dataChannel) }), + ) + t.Cleanup(tc.Stop) + + tc.StartBackend() + tc.StartAgent() + + tc.StartLoad(loadOptions) + + tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started") + // searchFunc checks for "sending queue is full" communicate and sends the signal to GenerateNonPernamentErrorUntil + // to generate only successes from that time on + tc.WaitForN(func() bool { + logFound := tc.AgentLogsContains("Memory usage is above hard limit. Forcing a GC.") + if !logFound { + dataChannel <- true + return false + } + tc.WaitFor(func() bool { return tc.MockBackend.DataItemsReceived() == 0 }, "no data successfully received before an error") + close(dataChannel) + return logFound + }, time.Second*time.Duration(sleepTime), "memory limit not hit") + + // check if data started to be received successfully + tc.WaitForN(func() bool { + return tc.MockBackend.DataItemsReceived() > 0 + }, time.Second*time.Duration(sleepTime), "data started to be successfully received") + + tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, "all logs received") + + tc.WaitForN(func() bool { + // get IDs from logs to retry + logsToRetry := getLogsID(tc.MockBackend.LogsToRetry) + + // get IDs from logs received successfully + successfulLogs := getLogsID(tc.MockBackend.ReceivedLogs) + + // check if all the logs to retry were actually retried + logsWereRetried := allElementsExistInSlice(logsToRetry, successfulLogs) + return logsWereRetried + }, time.Second*time.Duration(sleepTime), "all logs were retried successfully") + + tc.StopLoad() + tc.StopAgent() + + tc.ValidateData() +} + func constructLoadOptions(test TestCase) testbed.LoadOptions { options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10} options.Attributes = make(map[string]string) From 27fad227f4ba483ca02b27591cd3d30b44c6d950 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Mon, 16 Sep 2024 12:21:13 +0530 Subject: [PATCH 2/6] correctness validator --- testbed/testbed/receivers.go | 9 ++++++++- testbed/testbed/senders.go | 1 - testbed/tests/log_test.go | 11 ++++++++--- testbed/tests/scenarios.go | 27 ++++++++++++++++++++------- 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index a928a0826116..bd4f52cdc187 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -55,6 +55,7 @@ type BaseOTLPDataReceiver struct { compression string retry string sendingQueue string + timeout string } func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error { @@ -98,6 +99,11 @@ func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataRec return bor } +func (bor *BaseOTLPDataReceiver) WithTimeout(timeout string) *BaseOTLPDataReceiver { + bor.timeout = timeout + return bor +} + func (bor *BaseOTLPDataReceiver) Stop() error { // we reuse the receiver across signals. Shutting down the log receiver shuts down the metrics and traces receiver. return bor.logReceiver.Shutdown(context.Background()) @@ -118,8 +124,9 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string { endpoint: "%s" %s %s + %s tls: - insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue) + insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.timeout) comp := "none" if bor.compression != "" { comp = bor.compression diff --git a/testbed/testbed/senders.go b/testbed/testbed/senders.go index bc534ebc57c6..abe930baf645 100644 --- a/testbed/testbed/senders.go +++ b/testbed/testbed/senders.go @@ -222,7 +222,6 @@ func (ods *otlpDataSender) fillConfig(cfg *otlpexporter.Config) *otlpexporter.Co cfg.TLSSetting = configtls.ClientConfig{ Insecure: true, } - cfg.Timeout = 0 return cfg } diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index 210b6ef8928d..4834df3c714f 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -356,18 +356,23 @@ func TestMemoryLimiterHit(t *testing.T) { otlpreceiver.WithRetry(` retry_on_failure: enabled: true + max_interval: 5s `) otlpreceiver.WithQueue(` sending_queue: enabled: true - queue_size: 10000 + queue_size: 100000 + num_consumers: 20 +`) + otlpreceiver.WithTimeout(` + timeout: 0s `) processors := map[string]string{ "memory_limiter": ` memory_limiter: check_interval: 1s - limit_mib: 500 - spike_limit_mib: 100 + limit_mib: 300 + spike_limit_mib: 150 `, } ScenarioMemoryLimiterHit( diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index 0ada849e99db..3aa3fe3e979e 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -579,6 +579,7 @@ func ScenarioMemoryLimiterHit( testbed.WithDecisionFunc(func() error { return testbed.GenerateNonPernamentErrorUntil(dataChannel) }), ) t.Cleanup(tc.Stop) + tc.MockBackend.EnableRecording() tc.StartBackend() tc.StartAgent() @@ -586,15 +587,26 @@ func ScenarioMemoryLimiterHit( tc.StartLoad(loadOptions) tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started") - // searchFunc checks for "sending queue is full" communicate and sends the signal to GenerateNonPernamentErrorUntil - // to generate only successes from that time on + + var timer *time.Timer + + // check for "Memory usage is above hard limit" tc.WaitForN(func() bool { - logFound := tc.AgentLogsContains("Memory usage is above hard limit. Forcing a GC.") + logFound := tc.AgentLogsContains("Memory usage is above soft limit. Refusing data.") if !logFound { dataChannel <- true return false + } else { + // Log found. But keep the collector under stress for 10 more seconds so it starts refusing data + if timer == nil { + timer = time.NewTimer(10 * time.Second) + } + select { + case <-timer.C: + default: + return false + } } - tc.WaitFor(func() bool { return tc.MockBackend.DataItemsReceived() == 0 }, "no data successfully received before an error") close(dataChannel) return logFound }, time.Second*time.Duration(sleepTime), "memory limit not hit") @@ -604,7 +616,10 @@ func ScenarioMemoryLimiterHit( return tc.MockBackend.DataItemsReceived() > 0 }, time.Second*time.Duration(sleepTime), "data started to be successfully received") - tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, "all logs received") + // stop sending any more data + tc.StopLoad() + + tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, time.Second*time.Duration(sleepTime), "all logs received") tc.WaitForN(func() bool { // get IDs from logs to retry @@ -618,9 +633,7 @@ func ScenarioMemoryLimiterHit( return logsWereRetried }, time.Second*time.Duration(sleepTime), "all logs were retried successfully") - tc.StopLoad() tc.StopAgent() - tc.ValidateData() } From ba5f3b79e887faa0f164e99751ce1ec42ba2ef4e Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Sun, 6 Oct 2024 20:27:57 +0530 Subject: [PATCH 3/6] chore: lint --- testbed/tests/scenarios.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index 14a2c7120693..a9bddd02799c 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -596,16 +596,15 @@ func ScenarioMemoryLimiterHit( if !logFound { dataChannel <- true return false - } else { - // Log found. But keep the collector under stress for 10 more seconds so it starts refusing data - if timer == nil { - timer = time.NewTimer(10 * time.Second) - } - select { - case <-timer.C: - default: - return false - } + } + // Log found. But keep the collector under stress for 10 more seconds so it starts refusing data + if timer == nil { + timer = time.NewTimer(10 * time.Second) + } + select { + case <-timer.C: + default: + return false } close(dataChannel) return logFound From 713c655cafd033108668f84c066a0d3fb9cc92c9 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Fri, 8 Nov 2024 18:24:19 +0530 Subject: [PATCH 4/6] chore: max delay and timer --- testbed/tests/log_test.go | 8 ++++++-- testbed/tests/scenarios.go | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index 727fde8141ea..fc99c79d8588 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -386,13 +386,16 @@ func TestMemoryLimiterHit(t *testing.T) { otlpreceiver.WithTimeout(` timeout: 0s `) - processors := map[string]string{ - "memory_limiter": ` + processors := []ProcessorNameAndConfigBody{ + { + Name: "memory_limiter", + Body: ` memory_limiter: check_interval: 1s limit_mib: 300 spike_limit_mib: 150 `, + }, } ScenarioMemoryLimiterHit( t, @@ -402,6 +405,7 @@ func TestMemoryLimiterHit(t *testing.T) { DataItemsPerSecond: 100000, ItemsPerBatch: 1000, Parallel: 1, + MaxDelay: 20 * time.Second, }, performanceResultsSummary, 100, processors) } diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index 8a3d5f694662..600effce5e6a 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -559,7 +559,7 @@ func ScenarioMemoryLimiterHit( loadOptions testbed.LoadOptions, resultsSummary testbed.TestResultsSummary, sleepTime int, - processors map[string]string, + processors []ProcessorNameAndConfigBody, ) { resultDir, err := filepath.Abs(path.Join("results", t.Name())) require.NoError(t, err) From 7570ec24d22cc9b6a81802aa96b2ec54b4ce4c40 Mon Sep 17 00:00:00 2001 From: Vihas Makwana <121151420+VihasMakwana@users.noreply.github.com> Date: Wed, 13 Nov 2024 23:36:28 +0530 Subject: [PATCH 5/6] Update testbed/tests/scenarios.go Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- testbed/tests/scenarios.go | 1 - 1 file changed, 1 deletion(-) diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index 60db1e9c663b..7de3dc0e1e52 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -563,7 +563,6 @@ func ScenarioMemoryLimiterHit( agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2")) configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil) - fmt.Println(configStr) configCleanup, err := agentProc.PrepareConfig(configStr) require.NoError(t, err) defer configCleanup() From ed64554554100a55105a56b2534ee8d0732e99f9 Mon Sep 17 00:00:00 2001 From: Vihas Makwana <121151420+VihasMakwana@users.noreply.github.com> Date: Wed, 13 Nov 2024 23:36:34 +0530 Subject: [PATCH 6/6] Update testbed/tests/scenarios.go Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- testbed/tests/scenarios.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testbed/tests/scenarios.go b/testbed/tests/scenarios.go index 7de3dc0e1e52..a2e3ea868a5c 100644 --- a/testbed/tests/scenarios.go +++ b/testbed/tests/scenarios.go @@ -590,7 +590,7 @@ func ScenarioMemoryLimiterHit( var timer *time.Timer - // check for "Memory usage is above hard limit" + // check for "Memory usage is above soft limit" tc.WaitForN(func() bool { logFound := tc.AgentLogsContains("Memory usage is above soft limit. Refusing data.") if !logFound {