Skip to content

Commit

Permalink
Eliminate VU races, unify execution loop, remove cpu check loop (#1505)
Browse files Browse the repository at this point in the history
  • Loading branch information
skudasov authored Dec 19, 2024
1 parent 70bd176 commit cb02f63
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 104 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/wasp-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
defaults:
run:
working-directory: wasp
runs-on: ubuntu-latest
runs-on: ubuntu22.04-16cores-64GB

Check failure on line 11 in .github/workflows/wasp-test.yml

View workflow job for this annotation

GitHub Actions / actionlint

[actionlint] .github/workflows/wasp-test.yml#L11

label "ubuntu22.04-16cores-64GB" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2022", "windows-2019", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-22.04", "ubuntu-20.04", "macos-latest", "macos-latest-xl", "macos-latest-xlarge", "macos-latest-large", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xl", "macos-14-xlarge", "macos-14-large", "macos-14", "macos-14.0", "macos-13-xl", "macos-13-xlarge", "macos-13-large", "macos-13", "macos-13.0", "macos-12-xl", "macos-12-xlarge", "macos-12-large", "macos-12", "macos-12.0", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file [runner-label]
Raw output
.github/workflows/wasp-test.yml:11:14: label "ubuntu22.04-16cores-64GB" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2022", "windows-2019", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-22.04", "ubuntu-20.04", "macos-latest", "macos-latest-xl", "macos-latest-xlarge", "macos-latest-large", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xl", "macos-14-xlarge", "macos-14-large", "macos-14", "macos-14.0", "macos-13-xl", "macos-13-xlarge", "macos-13-large", "macos-13", "macos-13.0", "macos-12-xl", "macos-12-xlarge", "macos-12-large", "macos-12", "macos-12.0", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file [runner-label]
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v3
Expand Down
48 changes: 0 additions & 48 deletions wasp/stat.go

This file was deleted.

67 changes: 25 additions & 42 deletions wasp/wasp.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ type Stats struct {
CurrentSegment atomic.Int64 `json:"current_schedule_segment"`
SamplesRecorded atomic.Int64 `json:"samples_recorded"`
SamplesSkipped atomic.Int64 `json:"samples_skipped"`
RunStarted atomic.Bool `json:"runStarted"`
RunPaused atomic.Bool `json:"runPaused"`
RunStopped atomic.Bool `json:"runStopped"`
RunFailed atomic.Bool `json:"runFailed"`
Expand Down Expand Up @@ -246,6 +245,7 @@ type Generator struct {
Log zerolog.Logger
labels model.LabelSet
rl atomic.Pointer[ratelimit.Limiter]
rpsLoopOnce *sync.Once
scheduleSegments []*Segment
currentSegmentMu *sync.Mutex
currentSegment *Segment
Expand Down Expand Up @@ -320,6 +320,7 @@ func NewGenerator(cfg *Config) (*Generator, error) {
dataCancel: dataCancel,
gun: cfg.Gun,
vu: cfg.VU,
rpsLoopOnce: &sync.Once{},
Responses: NewResponses(rch),
ResponsesChan: rch,
labels: ls,
Expand All @@ -345,43 +346,26 @@ func NewGenerator(cfg *Config) (*Generator, error) {
return nil, err
}
}
CPUCheckLoop()
return g, nil
}

// runExecuteLoop initiates the generator's execution loop based on the configured load type.
// It manages request pacing for RPS or handles virtual users for load testing scenarios.
func (g *Generator) runExecuteLoop() {
g.currentSegment = g.scheduleSegments[0]
g.stats.LastSegment.Store(int64(len(g.scheduleSegments)))
switch g.Cfg.LoadType {
case RPS:
g.ResponsesWaitGroup.Add(1)
// we run pacedCall controlled by stats.CurrentRPS
go func() {
for {
select {
case <-g.ResponsesCtx.Done():
g.ResponsesWaitGroup.Done()
g.Log.Info().Msg("RPS generator has stopped")
return
default:
g.pacedCall()
}
// runGunLoop runs the generator's Gun loop
// It manages request pacing for RPS after the first segment is loaded.
func (g *Generator) runGunLoop() {
g.ResponsesWaitGroup.Add(1)
// we run pacedCall controlled by stats.CurrentRPS
go func() {
for {
select {
case <-g.ResponsesCtx.Done():
g.ResponsesWaitGroup.Done()
g.Log.Info().Msg("RPS generator has stopped")
return
default:
g.pacedCall()
}
}()
case VU:
g.currentSegmentMu.Lock()
g.stats.CurrentVUs.Store(g.currentSegment.From)
g.currentSegmentMu.Unlock()
// we start all vus once
vus := g.stats.CurrentVUs.Load()
for i := 0; i < int(vus); i++ {
inst := g.vu.Clone(g)
g.runVU(inst)
g.vus = append(g.vus, inst)
}
}
}()
}

// runSetupWithTimeout executes the VirtualUser's setup within the configured timeout.
Expand Down Expand Up @@ -478,7 +462,6 @@ func (g *Generator) runVU(vu VirtualUser) {
// It returns true when all segments have been handled, signaling the scheduler to terminate.
func (g *Generator) processSegment() bool {
defer func() {
g.stats.RunStarted.Store(true)
g.Log.Info().
Int64("Segment", g.stats.CurrentSegment.Load()).
Int64("VUs", g.stats.CurrentVUs.Load()).
Expand All @@ -490,14 +473,18 @@ func (g *Generator) processSegment() bool {
}
g.currentSegmentMu.Lock()
g.currentSegment = g.scheduleSegments[g.stats.CurrentSegment.Load()]
g.currentSegment.StartTime = time.Now()
g.currentSegmentMu.Unlock()
g.stats.CurrentSegment.Add(1)
g.currentSegment.StartTime = time.Now()
switch g.Cfg.LoadType {
case RPS:
newRateLimit := ratelimit.New(int(g.currentSegment.From), ratelimit.Per(g.Cfg.RateLimitUnitDuration), ratelimit.WithoutSlack)
g.rl.Store(&newRateLimit)
g.stats.CurrentRPS.Store(g.currentSegment.From)
// start Gun loop once, in next segments we control it using g.rl ratelimiter
g.rpsLoopOnce.Do(func() {
g.runGunLoop()
})
case VU:
oldVUs := g.stats.CurrentVUs.Load()
newVUs := g.currentSegment.From
Expand Down Expand Up @@ -527,6 +514,8 @@ func (g *Generator) processSegment() bool {
// runScheduleLoop initiates an asynchronous loop that processes scheduling segments and monitors for completion signals.
// It enables the generator to handle load distribution seamlessly in the background.
func (g *Generator) runScheduleLoop() {
g.currentSegment = g.scheduleSegments[0]
g.stats.LastSegment.Store(int64(len(g.scheduleSegments)))
go func() {
for {
select {
Expand Down Expand Up @@ -621,11 +610,7 @@ func (g *Generator) collectVUResults() {
// handling timeouts and storing the response.
// It ensures requests adhere to the generator's configuration and execution state.
func (g *Generator) pacedCall() {
if !g.Stats().RunStarted.Load() {
return
}
l := *g.rl.Load()
l.Take()
(*g.rl.Load()).Take()
if g.stats.RunPaused.Load() {
return
}
Expand Down Expand Up @@ -668,7 +653,6 @@ func (g *Generator) Run(wait bool) (interface{}, bool) {
g.sendStatsToLoki()
}
g.runScheduleLoop()
g.runExecuteLoop()
g.collectVUResults()
if wait {
return g.Wait()
Expand Down Expand Up @@ -696,7 +680,6 @@ func (g *Generator) Stop() (interface{}, bool) {
if g.stats.RunStopped.Load() {
return nil, true
}
g.stats.RunStarted.Store(false)
g.stats.RunStopped.Store(true)
g.stats.RunFailed.Store(true)
g.Log.Warn().Msg("Graceful stop")
Expand Down
2 changes: 1 addition & 1 deletion wasp/wasp_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func BenchmarkPacedCall(b *testing.B) {
Gun: NewMockGun(&MockGunConfig{}),
})
require.NoError(b, err)
gen.runExecuteLoop()
gen.runGunLoop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
gen.pacedCall()
Expand Down
24 changes: 12 additions & 12 deletions wasp/wasp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,13 +449,13 @@ func TestSmokeStaticRPSSchedulePrecision(t *testing.T) {
require.NoError(t, err)
_, failed := gen.Run(true)
require.Equal(t, false, failed)
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(980))
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(950))
require.LessOrEqual(t, gen.Stats().Success.Load(), int64(1010))
require.Equal(t, gen.Stats().Failed.Load(), int64(0))
require.Equal(t, gen.Stats().CallTimeout.Load(), int64(0))

okData, _, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, len(okData), 980)
require.GreaterOrEqual(t, len(okData), 950)
require.LessOrEqual(t, len(okData), 1010)
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
Expand All @@ -475,14 +475,14 @@ func TestSmokeCustomUnitPrecision(t *testing.T) {
_, failed := gen.Run(true)
require.Equal(t, false, failed)
stats := gen.Stats()
require.GreaterOrEqual(t, stats.Success.Load(), int64(4970))
require.GreaterOrEqual(t, stats.Success.Load(), int64(4950))
require.LessOrEqual(t, stats.Success.Load(), int64(5010))
require.Equal(t, stats.Failed.Load(), int64(0))
require.Equal(t, stats.CallTimeout.Load(), int64(0))
require.Equal(t, stats.CurrentTimeUnit, gen.Cfg.RateLimitUnitDuration.Nanoseconds())

okData, _, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, len(okData), 4970)
require.GreaterOrEqual(t, len(okData), 4950)
require.LessOrEqual(t, len(okData), 5010)
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
Expand All @@ -501,13 +501,13 @@ func TestSmokeStaticRPSScheduleIsNotBlocking(t *testing.T) {
require.NoError(t, err)
_, failed := gen.Run(true)
require.Equal(t, false, failed)
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(980))
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(950))
require.LessOrEqual(t, gen.Stats().Success.Load(), int64(1010))
require.Equal(t, gen.Stats().Failed.Load(), int64(0))
require.Equal(t, gen.Stats().CallTimeout.Load(), int64(0))

okData, _, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, len(okData), 980)
require.GreaterOrEqual(t, len(okData), 950)
require.LessOrEqual(t, len(okData), 1010)
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
Expand Down Expand Up @@ -676,10 +676,10 @@ func TestSmokeVUsIncrease(t *testing.T) {

okData, okResponses, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, okResponses[0].Duration, 50*time.Millisecond)
require.GreaterOrEqual(t, len(okResponses), 147)
require.GreaterOrEqual(t, len(okData), 147)
require.GreaterOrEqual(t, len(okResponses), 140)
require.GreaterOrEqual(t, len(okData), 140)
require.Equal(t, okResponses[0].Data.(string), "successCallData")
require.Equal(t, okResponses[147].Data.(string), "successCallData")
require.Equal(t, okResponses[140].Data.(string), "successCallData")
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
}
Expand All @@ -706,10 +706,10 @@ func TestSmokeVUsDecrease(t *testing.T) {

okData, okResponses, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, okResponses[0].Duration, 50*time.Millisecond)
require.GreaterOrEqual(t, len(okResponses), 147)
require.GreaterOrEqual(t, len(okData), 147)
require.GreaterOrEqual(t, len(okResponses), 140)
require.GreaterOrEqual(t, len(okData), 140)
require.Equal(t, okResponses[0].Data.(string), "successCallData")
require.Equal(t, okResponses[147].Data.(string), "successCallData")
require.Equal(t, okResponses[140].Data.(string), "successCallData")
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
}
Expand Down

0 comments on commit cb02f63

Please sign in to comment.