Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[chore][testbed] - Further testbed enhancements" #36266

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions testbed/testbed/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type BaseOTLPDataReceiver struct {
compression string
retry string
sendingQueue string
timeout string
}

func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error {
Expand Down Expand Up @@ -99,11 +98,6 @@ func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataRec
return bor
}

func (bor *BaseOTLPDataReceiver) WithTimeout(timeout string) *BaseOTLPDataReceiver {
bor.timeout = timeout
return bor
}

func (bor *BaseOTLPDataReceiver) Stop() error {
// we reuse the receiver across signals. Shutting down the log receiver shuts down the metrics and traces receiver.
return bor.logReceiver.Shutdown(context.Background())
Expand All @@ -124,9 +118,8 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string {
endpoint: "%s"
%s
%s
%s
tls:
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.timeout)
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue)
comp := "none"
if bor.compression != "" {
comp = bor.compression
Expand Down
36 changes: 0 additions & 36 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,39 +369,3 @@ func TestLargeFileOnce(t *testing.T) {
tc.StopAgent()
tc.ValidateData()
}

func TestMemoryLimiterHit(t *testing.T) {
otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
otlpreceiver.WithRetry(`
retry_on_failure:
enabled: true
max_interval: 5s
`)
otlpreceiver.WithQueue(`
sending_queue:
enabled: true
queue_size: 100000
num_consumers: 20
`)
otlpreceiver.WithTimeout(`
timeout: 0s
`)
processors := map[string]string{
"memory_limiter": `
memory_limiter:
check_interval: 1s
limit_mib: 300
spike_limit_mib: 150
`,
}
ScenarioMemoryLimiterHit(
t,
testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)),
otlpreceiver,
testbed.LoadOptions{
DataItemsPerSecond: 100000,
ItemsPerBatch: 1000,
Parallel: 1,
},
performanceResultsSummary, 100, processors)
}
89 changes: 0 additions & 89 deletions testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,95 +552,6 @@ func ScenarioLong(
tc.ValidateData()
}

func ScenarioMemoryLimiterHit(
t *testing.T,
sender testbed.DataSender,
receiver testbed.DataReceiver,
loadOptions testbed.LoadOptions,
resultsSummary testbed.TestResultsSummary,
sleepTime int,
processors map[string]string,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
fmt.Println(configStr)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
dataChannel := make(chan bool)
tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.CorrectnessLogTestValidator{},
resultsSummary,
testbed.WithDecisionFunc(func() error { return testbed.GenerateNonPernamentErrorUntil(dataChannel) }),
)
t.Cleanup(tc.Stop)
tc.MockBackend.EnableRecording()

tc.StartBackend()
tc.StartAgent()

tc.StartLoad(loadOptions)

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")

var timer *time.Timer

// check for "Memory usage is above hard limit"
tc.WaitForN(func() bool {
logFound := tc.AgentLogsContains("Memory usage is above soft limit. Refusing data.")
if !logFound {
dataChannel <- true
return false
}
// Log found. But keep the collector under stress for 10 more seconds so it starts refusing data
if timer == nil {
timer = time.NewTimer(10 * time.Second)
}
select {
case <-timer.C:
default:
return false
}
close(dataChannel)
return logFound
}, time.Second*time.Duration(sleepTime), "memory limit not hit")

// check if data started to be received successfully
tc.WaitForN(func() bool {
return tc.MockBackend.DataItemsReceived() > 0
}, time.Second*time.Duration(sleepTime), "data started to be successfully received")

// stop sending any more data
tc.StopLoad()

tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, time.Second*time.Duration(sleepTime), "all logs received")

tc.WaitForN(func() bool {
// get IDs from logs to retry
logsToRetry := getLogsID(tc.MockBackend.LogsToRetry)

// get IDs from logs received successfully
successfulLogs := getLogsID(tc.MockBackend.ReceivedLogs)

// check if all the logs to retry were actually retried
logsWereRetried := allElementsExistInSlice(logsToRetry, successfulLogs)
return logsWereRetried
}, time.Second*time.Duration(sleepTime), "all logs were retried successfully")

tc.StopAgent()
tc.ValidateData()
}

func constructLoadOptions(test TestCase) testbed.LoadOptions {
options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10}
options.Attributes = make(map[string]string)
Expand Down