Skip to content

Commit

Permalink
[chore][testbed] - Further testbed enhancements (open-telemetry#36287)
Browse files Browse the repository at this point in the history
There was a merge conflict somewhere and we had to revert
open-telemetry#35209.

This PR fixes the CI failure and updates the test cases.

cc: @atoulme

---------

Co-authored-by: Evan Bradley <[email protected]>
  • Loading branch information
2 people authored and sbylica-splunk committed Dec 17, 2024
1 parent c3bd5dd commit 93a74a0
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 1 deletion.
9 changes: 8 additions & 1 deletion testbed/testbed/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type BaseOTLPDataReceiver struct {
compression string
retry string
sendingQueue string
timeout string
}

func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error {
Expand Down Expand Up @@ -98,6 +99,11 @@ func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataRec
return bor
}

func (bor *BaseOTLPDataReceiver) WithTimeout(timeout string) *BaseOTLPDataReceiver {
bor.timeout = timeout
return bor
}

func (bor *BaseOTLPDataReceiver) Stop() error {
// we reuse the receiver across signals. Shutting down the log receiver shuts down the metrics and traces receiver.
return bor.logReceiver.Shutdown(context.Background())
Expand All @@ -118,8 +124,9 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string {
endpoint: "%s"
%s
%s
%s
tls:
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue)
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.timeout)
comp := "none"
if bor.compression != "" {
comp = bor.compression
Expand Down
40 changes: 40 additions & 0 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,43 @@ func TestLargeFileOnce(t *testing.T) {
tc.StopAgent()
tc.ValidateData()
}

func TestMemoryLimiterHit(t *testing.T) {
otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
otlpreceiver.WithRetry(`
retry_on_failure:
enabled: true
max_interval: 5s
`)
otlpreceiver.WithQueue(`
sending_queue:
enabled: true
queue_size: 100000
num_consumers: 20
`)
otlpreceiver.WithTimeout(`
timeout: 0s
`)
processors := []ProcessorNameAndConfigBody{
{
Name: "memory_limiter",
Body: `
memory_limiter:
check_interval: 1s
limit_mib: 300
spike_limit_mib: 150
`,
},
}
ScenarioMemoryLimiterHit(
t,
testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)),
otlpreceiver,
testbed.LoadOptions{
DataItemsPerSecond: 100000,
ItemsPerBatch: 1000,
Parallel: 1,
MaxDelay: 20 * time.Second,
},
performanceResultsSummary, 100, processors)
}
88 changes: 88 additions & 0 deletions testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,94 @@ func ScenarioLong(
tc.ValidateData()
}

func ScenarioMemoryLimiterHit(
t *testing.T,
sender testbed.DataSender,
receiver testbed.DataReceiver,
loadOptions testbed.LoadOptions,
resultsSummary testbed.TestResultsSummary,
sleepTime int,
processors []ProcessorNameAndConfigBody,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
dataChannel := make(chan bool)
tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.CorrectnessLogTestValidator{},
resultsSummary,
testbed.WithDecisionFunc(func() error { return testbed.GenerateNonPernamentErrorUntil(dataChannel) }),
)
t.Cleanup(tc.Stop)
tc.MockBackend.EnableRecording()

tc.StartBackend()
tc.StartAgent()

tc.StartLoad(loadOptions)

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")

var timer *time.Timer

// check for "Memory usage is above soft limit"
tc.WaitForN(func() bool {
logFound := tc.AgentLogsContains("Memory usage is above soft limit. Refusing data.")
if !logFound {
dataChannel <- true
return false
}
// Log found. But keep the collector under stress for 10 more seconds so it starts refusing data
if timer == nil {
timer = time.NewTimer(10 * time.Second)
}
select {
case <-timer.C:
default:
return false
}
close(dataChannel)
return logFound
}, time.Second*time.Duration(sleepTime), "memory limit not hit")

// check if data started to be received successfully
tc.WaitForN(func() bool {
return tc.MockBackend.DataItemsReceived() > 0
}, time.Second*time.Duration(sleepTime), "data started to be successfully received")

// stop sending any more data
tc.StopLoad()

tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, time.Second*time.Duration(sleepTime), "all logs received")

tc.WaitForN(func() bool {
// get IDs from logs to retry
logsToRetry := getLogsID(tc.MockBackend.LogsToRetry)

// get IDs from logs received successfully
successfulLogs := getLogsID(tc.MockBackend.ReceivedLogs)

// check if all the logs to retry were actually retried
logsWereRetried := allElementsExistInSlice(logsToRetry, successfulLogs)
return logsWereRetried
}, time.Second*time.Duration(sleepTime), "all logs were retried successfully")

tc.StopAgent()
tc.ValidateData()
}

func constructLoadOptions(test TestCase) testbed.LoadOptions {
options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10}
options.Attributes = make(map[string]string)
Expand Down

0 comments on commit 93a74a0

Please sign in to comment.