From 5b53a56fd00af342c1362660f05faca9f5e173c5 Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 19 Dec 2024 04:56:13 +0100 Subject: [PATCH] finalize, switch runner back --- wasp/wasp.go | 12 ++++++++---- wasp/wasp_test.go | 12 ++++++------ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/wasp/wasp.go b/wasp/wasp.go index 029e4ae2e..fda1b07db 100644 --- a/wasp/wasp.go +++ b/wasp/wasp.go @@ -245,6 +245,7 @@ type Generator struct { Log zerolog.Logger labels model.LabelSet rl atomic.Pointer[ratelimit.Limiter] + executionLoopOnce *sync.Once executionLoopStart chan struct{} scheduleSegments []*Segment currentSegmentMu *sync.Mutex @@ -320,6 +321,7 @@ func NewGenerator(cfg *Config) (*Generator, error) { dataCancel: dataCancel, gun: cfg.Gun, vu: cfg.VU, + executionLoopOnce: &sync.Once{}, executionLoopStart: make(chan struct{}), Responses: NewResponses(rch), ResponsesChan: rch, @@ -359,9 +361,9 @@ func (g *Generator) runRPSLoop() { case RPS: g.ResponsesWaitGroup.Add(1) // we run pacedCall controlled by stats.CurrentRPS + // start when first segment is loaded, see runScheduleLoop + <-g.executionLoopStart go func() { - // start when first segment is loaded, see - <-g.executionLoopStart for { select { case <-g.ResponsesCtx.Done(): @@ -481,16 +483,18 @@ func (g *Generator) processSegment() bool { } g.currentSegmentMu.Lock() g.currentSegment = g.scheduleSegments[g.stats.CurrentSegment.Load()] - g.currentSegment.StartTime = time.Now() g.currentSegmentMu.Unlock() g.stats.CurrentSegment.Add(1) + g.currentSegment.StartTime = time.Now() switch g.Cfg.LoadType { case RPS: newRateLimit := ratelimit.New(int(g.currentSegment.From), ratelimit.Per(g.Cfg.RateLimitUnitDuration), ratelimit.WithoutSlack) g.rl.Store(&newRateLimit) g.stats.CurrentRPS.Store(g.currentSegment.From) // signal RPS loop to start - g.executionLoopStart <- struct{}{} + g.executionLoopOnce.Do(func() { + g.executionLoopStart <- struct{}{} + }) case VU: oldVUs := g.stats.CurrentVUs.Load() newVUs := g.currentSegment.From diff --git a/wasp/wasp_test.go b/wasp/wasp_test.go index abebb6beb..38fe0d394 100644 --- a/wasp/wasp_test.go +++ b/wasp/wasp_test.go @@ -449,13 +449,13 @@ func TestSmokeStaticRPSSchedulePrecision(t *testing.T) { require.NoError(t, err) _, failed := gen.Run(true) require.Equal(t, false, failed) - require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(980)) + require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(960)) require.LessOrEqual(t, gen.Stats().Success.Load(), int64(1010)) require.Equal(t, gen.Stats().Failed.Load(), int64(0)) require.Equal(t, gen.Stats().CallTimeout.Load(), int64(0)) okData, _, failResponses := convertResponsesData(gen) - require.GreaterOrEqual(t, len(okData), 980) + require.GreaterOrEqual(t, len(okData), 960) require.LessOrEqual(t, len(okData), 1010) require.Empty(t, failResponses) require.Empty(t, gen.Errors()) @@ -475,14 +475,14 @@ func TestSmokeCustomUnitPrecision(t *testing.T) { _, failed := gen.Run(true) require.Equal(t, false, failed) stats := gen.Stats() - require.GreaterOrEqual(t, stats.Success.Load(), int64(4970)) + require.GreaterOrEqual(t, stats.Success.Load(), int64(4960)) require.LessOrEqual(t, stats.Success.Load(), int64(5010)) require.Equal(t, stats.Failed.Load(), int64(0)) require.Equal(t, stats.CallTimeout.Load(), int64(0)) require.Equal(t, stats.CurrentTimeUnit, gen.Cfg.RateLimitUnitDuration.Nanoseconds()) okData, _, failResponses := convertResponsesData(gen) - require.GreaterOrEqual(t, len(okData), 4970) + require.GreaterOrEqual(t, len(okData), 4960) require.LessOrEqual(t, len(okData), 5010) require.Empty(t, failResponses) require.Empty(t, gen.Errors()) @@ -501,13 +501,13 @@ func TestSmokeStaticRPSScheduleIsNotBlocking(t *testing.T) { require.NoError(t, err) _, failed := gen.Run(true) require.Equal(t, false, failed) - require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(980)) + require.GreaterOrEqual(t, gen.Stats().Success.Load(), int64(960)) require.LessOrEqual(t, gen.Stats().Success.Load(), int64(1010)) require.Equal(t, gen.Stats().Failed.Load(), int64(0)) require.Equal(t, gen.Stats().CallTimeout.Load(), int64(0)) okData, _, failResponses := convertResponsesData(gen) - require.GreaterOrEqual(t, len(okData), 980) + require.GreaterOrEqual(t, len(okData), 960) require.LessOrEqual(t, len(okData), 1010) require.Empty(t, failResponses) require.Empty(t, gen.Errors())