Skip to content

Commit

Permalink
Revert "[chore][testbed] - Further testbed enhancements" (#36266)
Browse files Browse the repository at this point in the history
  • Loading branch information
songy23 authored Nov 7, 2024
1 parent 3458d51 commit 4b1ca00
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 133 deletions.
9 changes: 1 addition & 8 deletions testbed/testbed/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type BaseOTLPDataReceiver struct {
compression string
retry string
sendingQueue string
timeout string
}

func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error {
Expand Down Expand Up @@ -99,11 +98,6 @@ func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataRec
return bor
}

func (bor *BaseOTLPDataReceiver) WithTimeout(timeout string) *BaseOTLPDataReceiver {
bor.timeout = timeout
return bor
}

func (bor *BaseOTLPDataReceiver) Stop() error {
// we reuse the receiver across signals. Shutting down the log receiver shuts down the metrics and traces receiver.
return bor.logReceiver.Shutdown(context.Background())
Expand All @@ -124,9 +118,8 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string {
endpoint: "%s"
%s
%s
%s
tls:
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.timeout)
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue)
comp := "none"
if bor.compression != "" {
comp = bor.compression
Expand Down
36 changes: 0 additions & 36 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,39 +369,3 @@ func TestLargeFileOnce(t *testing.T) {
tc.StopAgent()
tc.ValidateData()
}

func TestMemoryLimiterHit(t *testing.T) {
otlpreceiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
otlpreceiver.WithRetry(`
retry_on_failure:
enabled: true
max_interval: 5s
`)
otlpreceiver.WithQueue(`
sending_queue:
enabled: true
queue_size: 100000
num_consumers: 20
`)
otlpreceiver.WithTimeout(`
timeout: 0s
`)
processors := map[string]string{
"memory_limiter": `
memory_limiter:
check_interval: 1s
limit_mib: 300
spike_limit_mib: 150
`,
}
ScenarioMemoryLimiterHit(
t,
testbed.NewOTLPLogsDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)),
otlpreceiver,
testbed.LoadOptions{
DataItemsPerSecond: 100000,
ItemsPerBatch: 1000,
Parallel: 1,
},
performanceResultsSummary, 100, processors)
}
89 changes: 0 additions & 89 deletions testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,95 +552,6 @@ func ScenarioLong(
tc.ValidateData()
}

func ScenarioMemoryLimiterHit(
t *testing.T,
sender testbed.DataSender,
receiver testbed.DataReceiver,
loadOptions testbed.LoadOptions,
resultsSummary testbed.TestResultsSummary,
sleepTime int,
processors map[string]string,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
fmt.Println(configStr)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
dataChannel := make(chan bool)
tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&testbed.CorrectnessLogTestValidator{},
resultsSummary,
testbed.WithDecisionFunc(func() error { return testbed.GenerateNonPernamentErrorUntil(dataChannel) }),
)
t.Cleanup(tc.Stop)
tc.MockBackend.EnableRecording()

tc.StartBackend()
tc.StartAgent()

tc.StartLoad(loadOptions)

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")

var timer *time.Timer

// check for "Memory usage is above hard limit"
tc.WaitForN(func() bool {
logFound := tc.AgentLogsContains("Memory usage is above soft limit. Refusing data.")
if !logFound {
dataChannel <- true
return false
}
// Log found. But keep the collector under stress for 10 more seconds so it starts refusing data
if timer == nil {
timer = time.NewTimer(10 * time.Second)
}
select {
case <-timer.C:
default:
return false
}
close(dataChannel)
return logFound
}, time.Second*time.Duration(sleepTime), "memory limit not hit")

// check if data started to be received successfully
tc.WaitForN(func() bool {
return tc.MockBackend.DataItemsReceived() > 0
}, time.Second*time.Duration(sleepTime), "data started to be successfully received")

// stop sending any more data
tc.StopLoad()

tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, time.Second*time.Duration(sleepTime), "all logs received")

tc.WaitForN(func() bool {
// get IDs from logs to retry
logsToRetry := getLogsID(tc.MockBackend.LogsToRetry)

// get IDs from logs received successfully
successfulLogs := getLogsID(tc.MockBackend.ReceivedLogs)

// check if all the logs to retry were actually retried
logsWereRetried := allElementsExistInSlice(logsToRetry, successfulLogs)
return logsWereRetried
}, time.Second*time.Duration(sleepTime), "all logs were retried successfully")

tc.StopAgent()
tc.ValidateData()
}

func constructLoadOptions(test TestCase) testbed.LoadOptions {
options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10}
options.Attributes = make(map[string]string)
Expand Down

0 comments on commit 4b1ca00

Please sign in to comment.