Skip to content

Commit

Permalink
correctness validator
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana committed Sep 16, 2024
1 parent 8eff3b1 commit 59df2b6
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 11 deletions.
9 changes: 8 additions & 1 deletion testbed/testbed/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type BaseOTLPDataReceiver struct {
compression string
retry string
sendingQueue string
timeout string
}

func (bor *BaseOTLPDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error {
Expand Down Expand Up @@ -98,6 +99,11 @@ func (bor *BaseOTLPDataReceiver) WithQueue(sendingQueue string) *BaseOTLPDataRec
return bor
}

func (bor *BaseOTLPDataReceiver) WithTimeout(timeout string) *BaseOTLPDataReceiver {
bor.timeout = timeout
return bor
}

func (bor *BaseOTLPDataReceiver) Stop() error {
// we reuse the receiver across signals. Shutting down the log receiver shuts down the metrics and traces receiver.
return bor.logReceiver.Shutdown(context.Background())
Expand All @@ -118,8 +124,9 @@ func (bor *BaseOTLPDataReceiver) GenConfigYAMLStr() string {
endpoint: "%s"
%s
%s
%s
tls:
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue)
insecure: true`, bor.exporterType, addr, bor.retry, bor.sendingQueue, bor.timeout)
comp := "none"
if bor.compression != "" {
comp = bor.compression
Expand Down
1 change: 0 additions & 1 deletion testbed/testbed/senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ func (ods *otlpDataSender) fillConfig(cfg *otlpexporter.Config) *otlpexporter.Co
cfg.TLSSetting = configtls.ClientConfig{
Insecure: true,
}
cfg.Timeout = 0
return cfg
}

Expand Down
9 changes: 7 additions & 2 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,17 +356,22 @@ func TestMemoryLimiterHit(t *testing.T) {
otlpreceiver.WithRetry(`
retry_on_failure:
enabled: true
max_interval: 5s
`)
otlpreceiver.WithQueue(`
sending_queue:
enabled: true
queue_size: 10000
queue_size: 100000
num_consumers: 20
`)
otlpreceiver.WithTimeout(`
timeout: 0s
`)
processors := map[string]string{
"memory_limiter": `
memory_limiter:
check_interval: 1s
limit_mib: 500
limit_mib: 200
spike_limit_mib: 100
`,
}
Expand Down
27 changes: 20 additions & 7 deletions testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,22 +579,34 @@ func ScenarioMemoryLimiterHit(
testbed.WithDecisionFunc(func() error { return testbed.GenerateNonPernamentErrorUntil(dataChannel) }),
)
t.Cleanup(tc.Stop)
tc.MockBackend.EnableRecording()

tc.StartBackend()
tc.StartAgent()

tc.StartLoad(loadOptions)

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")
// searchFunc checks for "sending queue is full" communicate and sends the signal to GenerateNonPernamentErrorUntil
// to generate only successes from that time on

var timer *time.Timer

// check for "Memory usage is above hard limit"
tc.WaitForN(func() bool {
logFound := tc.AgentLogsContains("Memory usage is above hard limit. Forcing a GC.")
logFound := tc.AgentLogsContains("Memory usage is above soft limit. Refusing data.")
if !logFound {
dataChannel <- true
return false
} else {

Check failure on line 599 in testbed/tests/scenarios.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, other)

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)

Check failure on line 599 in testbed/tests/scenarios.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, other)

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
// Log found. But keep the collector under stress for 10 more seconds so it starts refusing data
if timer == nil {
timer = time.NewTimer(10 * time.Second)
}
select {
case <-timer.C:
default:
return false
}
}
tc.WaitFor(func() bool { return tc.MockBackend.DataItemsReceived() == 0 }, "no data successfully received before an error")
close(dataChannel)
return logFound
}, time.Second*time.Duration(sleepTime), "memory limit not hit")
Expand All @@ -604,7 +616,10 @@ func ScenarioMemoryLimiterHit(
return tc.MockBackend.DataItemsReceived() > 0
}, time.Second*time.Duration(sleepTime), "data started to be successfully received")

tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, "all logs received")
// stop sending any more data
tc.StopLoad()

tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, time.Second*time.Duration(sleepTime), "all logs received")

tc.WaitForN(func() bool {
// get IDs from logs to retry
Expand All @@ -618,9 +633,7 @@ func ScenarioMemoryLimiterHit(
return logsWereRetried
}, time.Second*time.Duration(sleepTime), "all logs were retried successfully")

tc.StopLoad()
tc.StopAgent()

tc.ValidateData()
}

Expand Down

0 comments on commit 59df2b6

Please sign in to comment.