Skip to content

Commit

Permalink
simplify, one loop
Browse files Browse the repository at this point in the history
  • Loading branch information
skudasov committed Dec 19, 2024
1 parent 5b53a56 commit dc39dbe
Showing 1 changed file with 7 additions and 12 deletions.
19 changes: 7 additions & 12 deletions wasp/wasp.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,7 @@ type Generator struct {
Log zerolog.Logger
labels model.LabelSet
rl atomic.Pointer[ratelimit.Limiter]
executionLoopOnce *sync.Once
executionLoopStart chan struct{}
rpsLoopOnce *sync.Once
scheduleSegments []*Segment
currentSegmentMu *sync.Mutex
currentSegment *Segment
Expand Down Expand Up @@ -321,8 +320,7 @@ func NewGenerator(cfg *Config) (*Generator, error) {
dataCancel: dataCancel,
gun: cfg.Gun,
vu: cfg.VU,
executionLoopOnce: &sync.Once{},
executionLoopStart: make(chan struct{}),
rpsLoopOnce: &sync.Once{},
Responses: NewResponses(rch),
ResponsesChan: rch,
labels: ls,
Expand Down Expand Up @@ -355,14 +353,10 @@ func NewGenerator(cfg *Config) (*Generator, error) {
// runRPSLoop initiates the generator's RPS loop, noop if load type is VU.
// It manages request pacing for RPS after the first segment is loaded.
func (g *Generator) runRPSLoop() {
g.currentSegment = g.scheduleSegments[0]
g.stats.LastSegment.Store(int64(len(g.scheduleSegments)))
switch g.Cfg.LoadType {
case RPS:
g.ResponsesWaitGroup.Add(1)
// we run pacedCall controlled by stats.CurrentRPS
// start when first segment is loaded, see runScheduleLoop
<-g.executionLoopStart
go func() {
for {
select {
Expand Down Expand Up @@ -491,9 +485,9 @@ func (g *Generator) processSegment() bool {
newRateLimit := ratelimit.New(int(g.currentSegment.From), ratelimit.Per(g.Cfg.RateLimitUnitDuration), ratelimit.WithoutSlack)
g.rl.Store(&newRateLimit)
g.stats.CurrentRPS.Store(g.currentSegment.From)
// signal RPS loop to start
g.executionLoopOnce.Do(func() {
g.executionLoopStart <- struct{}{}
// start RPS loop once, in next segments we control it using g.rl ratelimiter
g.rpsLoopOnce.Do(func() {
g.runRPSLoop()
})
case VU:
oldVUs := g.stats.CurrentVUs.Load()
Expand Down Expand Up @@ -524,6 +518,8 @@ func (g *Generator) processSegment() bool {
// runScheduleLoop initiates an asynchronous loop that processes scheduling segments and monitors for completion signals.
// It enables the generator to handle load distribution seamlessly in the background.
func (g *Generator) runScheduleLoop() {
g.currentSegment = g.scheduleSegments[0]
g.stats.LastSegment.Store(int64(len(g.scheduleSegments)))
go func() {
for {
select {
Expand Down Expand Up @@ -661,7 +657,6 @@ func (g *Generator) Run(wait bool) (interface{}, bool) {
g.sendStatsToLoki()
}
g.runScheduleLoop()
g.runRPSLoop()
g.collectVUResults()
if wait {
return g.Wait()
Expand Down

0 comments on commit dc39dbe

Please sign in to comment.