Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate VU races, unify execution loop, remove cpu check loop #1505

Merged
merged 18 commits into from
Dec 19, 2024
2 changes: 1 addition & 1 deletion .github/workflows/wasp-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
defaults:
run:
working-directory: wasp
runs-on: ubuntu-latest
runs-on: ubuntu22.04-16cores-64GB

Check failure on line 11 in .github/workflows/wasp-test.yml

View workflow job for this annotation

GitHub Actions / actionlint

[actionlint] .github/workflows/wasp-test.yml#L11

label "ubuntu22.04-16cores-64GB" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2022", "windows-2019", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-22.04", "ubuntu-20.04", "macos-latest", "macos-latest-xl", "macos-latest-xlarge", "macos-latest-large", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xl", "macos-14-xlarge", "macos-14-large", "macos-14", "macos-14.0", "macos-13-xl", "macos-13-xlarge", "macos-13-large", "macos-13", "macos-13.0", "macos-12-xl", "macos-12-xlarge", "macos-12-large", "macos-12", "macos-12.0", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file [runner-label]
Raw output
.github/workflows/wasp-test.yml:11:14: label "ubuntu22.04-16cores-64GB" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2022", "windows-2019", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-22.04", "ubuntu-20.04", "macos-latest", "macos-latest-xl", "macos-latest-xlarge", "macos-latest-large", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xl", "macos-14-xlarge", "macos-14-large", "macos-14", "macos-14.0", "macos-13-xl", "macos-13-xlarge", "macos-13-large", "macos-13", "macos-13.0", "macos-12-xl", "macos-12-xlarge", "macos-12-large", "macos-12", "macos-12.0", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file [runner-label]
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v3
Expand Down
48 changes: 0 additions & 48 deletions wasp/stat.go

This file was deleted.

67 changes: 25 additions & 42 deletions wasp/wasp.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ type Stats struct {
CurrentSegment atomic.Int64 `json:"current_schedule_segment"`
SamplesRecorded atomic.Int64 `json:"samples_recorded"`
SamplesSkipped atomic.Int64 `json:"samples_skipped"`
RunStarted atomic.Bool `json:"runStarted"`
RunPaused atomic.Bool `json:"runPaused"`
RunStopped atomic.Bool `json:"runStopped"`
RunFailed atomic.Bool `json:"runFailed"`
Expand Down Expand Up @@ -246,6 +245,7 @@ type Generator struct {
Log zerolog.Logger
labels model.LabelSet
rl atomic.Pointer[ratelimit.Limiter]
rpsLoopOnce *sync.Once
scheduleSegments []*Segment
currentSegmentMu *sync.Mutex
currentSegment *Segment
Expand Down Expand Up @@ -320,6 +320,7 @@ func NewGenerator(cfg *Config) (*Generator, error) {
dataCancel: dataCancel,
gun: cfg.Gun,
vu: cfg.VU,
rpsLoopOnce: &sync.Once{},
Responses: NewResponses(rch),
ResponsesChan: rch,
labels: ls,
Expand All @@ -345,43 +346,26 @@ func NewGenerator(cfg *Config) (*Generator, error) {
return nil, err
}
}
CPUCheckLoop()
return g, nil
}

// runExecuteLoop initiates the generator's execution loop based on the configured load type.
// It manages request pacing for RPS or handles virtual users for load testing scenarios.
func (g *Generator) runExecuteLoop() {
g.currentSegment = g.scheduleSegments[0]
g.stats.LastSegment.Store(int64(len(g.scheduleSegments)))
switch g.Cfg.LoadType {
case RPS:
g.ResponsesWaitGroup.Add(1)
// we run pacedCall controlled by stats.CurrentRPS
go func() {
for {
select {
case <-g.ResponsesCtx.Done():
g.ResponsesWaitGroup.Done()
g.Log.Info().Msg("RPS generator has stopped")
return
default:
g.pacedCall()
}
// runGunLoop runs the generator's Gun loop
// It manages request pacing for RPS after the first segment is loaded.
func (g *Generator) runGunLoop() {
g.ResponsesWaitGroup.Add(1)
// we run pacedCall controlled by stats.CurrentRPS
go func() {
for {
select {
case <-g.ResponsesCtx.Done():
g.ResponsesWaitGroup.Done()
g.Log.Info().Msg("RPS generator has stopped")
return
default:
g.pacedCall()
}
}()
case VU:
g.currentSegmentMu.Lock()
g.stats.CurrentVUs.Store(g.currentSegment.From)
g.currentSegmentMu.Unlock()
// we start all vus once
vus := g.stats.CurrentVUs.Load()
for i := 0; i < int(vus); i++ {
inst := g.vu.Clone(g)
g.runVU(inst)
g.vus = append(g.vus, inst)
}
}
}()
}

// runSetupWithTimeout executes the VirtualUser's setup within the configured timeout.
Expand Down Expand Up @@ -478,7 +462,6 @@ func (g *Generator) runVU(vu VirtualUser) {
// It returns true when all segments have been handled, signaling the scheduler to terminate.
func (g *Generator) processSegment() bool {
defer func() {
g.stats.RunStarted.Store(true)
g.Log.Info().
Int64("Segment", g.stats.CurrentSegment.Load()).
Int64("VUs", g.stats.CurrentVUs.Load()).
Expand All @@ -490,14 +473,18 @@ func (g *Generator) processSegment() bool {
}
g.currentSegmentMu.Lock()
g.currentSegment = g.scheduleSegments[g.stats.CurrentSegment.Load()]
g.currentSegment.StartTime = time.Now()
g.currentSegmentMu.Unlock()
g.stats.CurrentSegment.Add(1)
g.currentSegment.StartTime = time.Now()
switch g.Cfg.LoadType {
case RPS:
newRateLimit := ratelimit.New(int(g.currentSegment.From), ratelimit.Per(g.Cfg.RateLimitUnitDuration), ratelimit.WithoutSlack)
g.rl.Store(&newRateLimit)
g.stats.CurrentRPS.Store(g.currentSegment.From)
// start Gun loop once, in next segments we control it using g.rl ratelimiter
g.rpsLoopOnce.Do(func() {
g.runGunLoop()
})
case VU:
oldVUs := g.stats.CurrentVUs.Load()
newVUs := g.currentSegment.From
Expand Down Expand Up @@ -527,6 +514,8 @@ func (g *Generator) processSegment() bool {
// runScheduleLoop initiates an asynchronous loop that processes scheduling segments and monitors for completion signals.
// It enables the generator to handle load distribution seamlessly in the background.
func (g *Generator) runScheduleLoop() {
g.currentSegment = g.scheduleSegments[0]
g.stats.LastSegment.Store(int64(len(g.scheduleSegments)))
go func() {
for {
select {
Expand Down Expand Up @@ -621,11 +610,7 @@ func (g *Generator) collectVUResults() {
// handling timeouts and storing the response.
// It ensures requests adhere to the generator's configuration and execution state.
func (g *Generator) pacedCall() {
if !g.Stats().RunStarted.Load() {
return
}
l := *g.rl.Load()
l.Take()
(*g.rl.Load()).Take()
if g.stats.RunPaused.Load() {
return
}
Expand Down Expand Up @@ -668,7 +653,6 @@ func (g *Generator) Run(wait bool) (interface{}, bool) {
g.sendStatsToLoki()
}
g.runScheduleLoop()
g.runExecuteLoop()
g.collectVUResults()
if wait {
return g.Wait()
Expand Down Expand Up @@ -696,7 +680,6 @@ func (g *Generator) Stop() (interface{}, bool) {
if g.stats.RunStopped.Load() {
return nil, true
}
g.stats.RunStarted.Store(false)
g.stats.RunStopped.Store(true)
g.stats.RunFailed.Store(true)
g.Log.Warn().Msg("Graceful stop")
Expand Down
2 changes: 1 addition & 1 deletion wasp/wasp_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func BenchmarkPacedCall(b *testing.B) {
Gun: NewMockGun(&MockGunConfig{}),
})
require.NoError(b, err)
gen.runExecuteLoop()
gen.runGunLoop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
gen.pacedCall()
Expand Down
24 changes: 12 additions & 12 deletions wasp/wasp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,13 +449,13 @@ func TestSmokeStaticRPSSchedulePrecision(t *testing.T) {
require.NoError(t, err)
_, failed := gen.Run(true)
require.Equal(t, false, failed)
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(980))
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(950))
require.LessOrEqual(t, gen.Stats().Success.Load(), int64(1010))
require.Equal(t, gen.Stats().Failed.Load(), int64(0))
require.Equal(t, gen.Stats().CallTimeout.Load(), int64(0))

okData, _, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, len(okData), 980)
require.GreaterOrEqual(t, len(okData), 950)
require.LessOrEqual(t, len(okData), 1010)
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
Expand All @@ -475,14 +475,14 @@ func TestSmokeCustomUnitPrecision(t *testing.T) {
_, failed := gen.Run(true)
require.Equal(t, false, failed)
stats := gen.Stats()
require.GreaterOrEqual(t, stats.Success.Load(), int64(4970))
require.GreaterOrEqual(t, stats.Success.Load(), int64(4950))
require.LessOrEqual(t, stats.Success.Load(), int64(5010))
require.Equal(t, stats.Failed.Load(), int64(0))
require.Equal(t, stats.CallTimeout.Load(), int64(0))
require.Equal(t, stats.CurrentTimeUnit, gen.Cfg.RateLimitUnitDuration.Nanoseconds())

okData, _, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, len(okData), 4970)
require.GreaterOrEqual(t, len(okData), 4950)
require.LessOrEqual(t, len(okData), 5010)
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
Expand All @@ -501,13 +501,13 @@ func TestSmokeStaticRPSScheduleIsNotBlocking(t *testing.T) {
require.NoError(t, err)
_, failed := gen.Run(true)
require.Equal(t, false, failed)
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(980))
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(950))
require.LessOrEqual(t, gen.Stats().Success.Load(), int64(1010))
require.Equal(t, gen.Stats().Failed.Load(), int64(0))
require.Equal(t, gen.Stats().CallTimeout.Load(), int64(0))

okData, _, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, len(okData), 980)
require.GreaterOrEqual(t, len(okData), 950)
require.LessOrEqual(t, len(okData), 1010)
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
Expand Down Expand Up @@ -676,10 +676,10 @@ func TestSmokeVUsIncrease(t *testing.T) {

okData, okResponses, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, okResponses[0].Duration, 50*time.Millisecond)
require.GreaterOrEqual(t, len(okResponses), 147)
require.GreaterOrEqual(t, len(okData), 147)
require.GreaterOrEqual(t, len(okResponses), 140)
require.GreaterOrEqual(t, len(okData), 140)
require.Equal(t, okResponses[0].Data.(string), "successCallData")
require.Equal(t, okResponses[147].Data.(string), "successCallData")
require.Equal(t, okResponses[140].Data.(string), "successCallData")
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
}
Expand All @@ -706,10 +706,10 @@ func TestSmokeVUsDecrease(t *testing.T) {

okData, okResponses, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, okResponses[0].Duration, 50*time.Millisecond)
require.GreaterOrEqual(t, len(okResponses), 147)
require.GreaterOrEqual(t, len(okData), 147)
require.GreaterOrEqual(t, len(okResponses), 140)
require.GreaterOrEqual(t, len(okData), 140)
require.Equal(t, okResponses[0].Data.(string), "successCallData")
require.Equal(t, okResponses[147].Data.(string), "successCallData")
require.Equal(t, okResponses[140].Data.(string), "successCallData")
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
}
Expand Down
Loading