Skip to content

Commit

Permalink
finalize, switch runner back
Browse files Browse the repository at this point in the history
  • Loading branch information
skudasov committed Dec 19, 2024
1 parent caa994f commit 5b53a56
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
12 changes: 8 additions & 4 deletions wasp/wasp.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ type Generator struct {
Log zerolog.Logger
labels model.LabelSet
rl atomic.Pointer[ratelimit.Limiter]
executionLoopOnce *sync.Once
executionLoopStart chan struct{}
scheduleSegments []*Segment
currentSegmentMu *sync.Mutex
Expand Down Expand Up @@ -320,6 +321,7 @@ func NewGenerator(cfg *Config) (*Generator, error) {
dataCancel: dataCancel,
gun: cfg.Gun,
vu: cfg.VU,
executionLoopOnce: &sync.Once{},
executionLoopStart: make(chan struct{}),
Responses: NewResponses(rch),
ResponsesChan: rch,
Expand Down Expand Up @@ -359,9 +361,9 @@ func (g *Generator) runRPSLoop() {
case RPS:
g.ResponsesWaitGroup.Add(1)
// we run pacedCall controlled by stats.CurrentRPS
// start when first segment is loaded, see runScheduleLoop
<-g.executionLoopStart
go func() {
// start when first segment is loaded, see
<-g.executionLoopStart
for {
select {
case <-g.ResponsesCtx.Done():
Expand Down Expand Up @@ -481,16 +483,18 @@ func (g *Generator) processSegment() bool {
}
g.currentSegmentMu.Lock()
g.currentSegment = g.scheduleSegments[g.stats.CurrentSegment.Load()]
g.currentSegment.StartTime = time.Now()
g.currentSegmentMu.Unlock()
g.stats.CurrentSegment.Add(1)
g.currentSegment.StartTime = time.Now()
switch g.Cfg.LoadType {
case RPS:
newRateLimit := ratelimit.New(int(g.currentSegment.From), ratelimit.Per(g.Cfg.RateLimitUnitDuration), ratelimit.WithoutSlack)
g.rl.Store(&newRateLimit)
g.stats.CurrentRPS.Store(g.currentSegment.From)
// signal RPS loop to start
g.executionLoopStart <- struct{}{}
g.executionLoopOnce.Do(func() {
g.executionLoopStart <- struct{}{}
})
case VU:
oldVUs := g.stats.CurrentVUs.Load()
newVUs := g.currentSegment.From
Expand Down
12 changes: 6 additions & 6 deletions wasp/wasp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,13 +449,13 @@ func TestSmokeStaticRPSSchedulePrecision(t *testing.T) {
require.NoError(t, err)
_, failed := gen.Run(true)
require.Equal(t, false, failed)
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(980))
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(960))
require.LessOrEqual(t, gen.Stats().Success.Load(), int64(1010))
require.Equal(t, gen.Stats().Failed.Load(), int64(0))
require.Equal(t, gen.Stats().CallTimeout.Load(), int64(0))

okData, _, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, len(okData), 980)
require.GreaterOrEqual(t, len(okData), 960)
require.LessOrEqual(t, len(okData), 1010)
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
Expand All @@ -475,14 +475,14 @@ func TestSmokeCustomUnitPrecision(t *testing.T) {
_, failed := gen.Run(true)
require.Equal(t, false, failed)
stats := gen.Stats()
require.GreaterOrEqual(t, stats.Success.Load(), int64(4970))
require.GreaterOrEqual(t, stats.Success.Load(), int64(4960))
require.LessOrEqual(t, stats.Success.Load(), int64(5010))
require.Equal(t, stats.Failed.Load(), int64(0))
require.Equal(t, stats.CallTimeout.Load(), int64(0))
require.Equal(t, stats.CurrentTimeUnit, gen.Cfg.RateLimitUnitDuration.Nanoseconds())

okData, _, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, len(okData), 4970)
require.GreaterOrEqual(t, len(okData), 4960)
require.LessOrEqual(t, len(okData), 5010)
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
Expand All @@ -501,13 +501,13 @@ func TestSmokeStaticRPSScheduleIsNotBlocking(t *testing.T) {
require.NoError(t, err)
_, failed := gen.Run(true)
require.Equal(t, false, failed)
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(980))
require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(960))
require.LessOrEqual(t, gen.Stats().Success.Load(), int64(1010))
require.Equal(t, gen.Stats().Failed.Load(), int64(0))
require.Equal(t, gen.Stats().CallTimeout.Load(), int64(0))

okData, _, failResponses := convertResponsesData(gen)
require.GreaterOrEqual(t, len(okData), 980)
require.GreaterOrEqual(t, len(okData), 960)
require.LessOrEqual(t, len(okData), 1010)
require.Empty(t, failResponses)
require.Empty(t, gen.Errors())
Expand Down

0 comments on commit 5b53a56

Please sign in to comment.