Skip to content

Commit

Permalink
Add dynamic rate limit control in live ui
Browse files Browse the repository at this point in the history
  • Loading branch information
vearutop committed Sep 15, 2020
1 parent 7520968 commit f89aab3
Showing 1 changed file with 51 additions and 5 deletions.
56 changes: 51 additions & 5 deletions loadgen/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"os/signal"
"sort"
"sync"
"sync/atomic"
"syscall"
"time"
Expand All @@ -34,7 +35,10 @@ type runner struct {

jobProducer JobProducer

mu sync.Mutex
concurrencyLimit int64
rateLimit int64
rl *rate.Limiter
}

// Run runs load testing.
Expand Down Expand Up @@ -75,9 +79,9 @@ func Run(lf Flags, jobProducer JobProducer) {
dur = 1000 * time.Hour
}

var rl *rate.Limiter
if lf.RateLimit > 0 {
rl = rate.NewLimiter(rate.Limit(lf.RateLimit), int(r.concurrencyLimit))
r.rateLimit = int64(lf.RateLimit)
r.rl = rate.NewLimiter(rate.Limit(lf.RateLimit), int(r.concurrencyLimit))
}

r.exit = make(chan os.Signal, 1)
Expand All @@ -87,6 +91,18 @@ func Run(lf Flags, jobProducer JobProducer) {

if lf.LiveUI {
go func() {
refreshRateLimiter := func() {
lim := atomic.LoadInt64(&r.rateLimit)
if lim == 0 {
return
}

rl := rate.NewLimiter(rate.Limit(lim), int(atomic.LoadInt64(&r.concurrencyLimit)))
r.mu.Lock()
r.rl = rl
r.mu.Unlock()
}

uiEvents := ui.PollEvents()
for e := range uiEvents {
switch e.ID {
Expand All @@ -102,6 +118,7 @@ func Run(lf Flags, jobProducer JobProducer) {
for i := int64(0); i < delta; i++ {
<-limiter
}
refreshRateLimiter()
}
case "<Left>": // Decrease concurrency.
lim := atomic.LoadInt64(&r.concurrencyLimit)
Expand All @@ -114,6 +131,20 @@ func Run(lf Flags, jobProducer JobProducer) {
limiter <- struct{}{}
}
}

refreshRateLimiter()

case "<Up>": // Increase rate limit.
lim := atomic.LoadInt64(&r.rateLimit)
delta := int64(0.05 * float64(lim))
atomic.AddInt64(&r.rateLimit, delta)
refreshRateLimiter()

case "<Down>": // Decrease rate limit.
lim := atomic.LoadInt64(&r.rateLimit)
delta := int64(0.05 * float64(lim))
atomic.AddInt64(&r.rateLimit, -delta)
refreshRateLimiter()
}
}
}()
Expand All @@ -129,6 +160,10 @@ func Run(lf Flags, jobProducer JobProducer) {
}

for i := 0; i < n; i++ {
r.mu.Lock()
rl := r.rl
r.mu.Unlock()

if rl != nil {
err := rl.Wait(context.Background())
if err != nil {
Expand Down Expand Up @@ -206,6 +241,9 @@ func (r *runner) runLiveUI() {

ticker := time.NewTicker(500 * time.Millisecond).C

prev := time.Now()
reqPrev := 0

for {
doReturn := false
select {
Expand All @@ -214,10 +252,16 @@ func (r *runner) runLiveUI() {
doReturn = true
}

elaTick := time.Since(prev)
reqNum := r.roundTripHist.Count

drawables := make([]ui.Drawable, 0, 2)
elaDur := time.Since(r.start)
ela := elaDur.Seconds()
reqRate := float64(r.roundTripHist.Count) / ela
reqRate := float64(reqNum) / ela
reqRateTick := float64(reqNum-reqPrev) / elaTick.Seconds()
reqPrev = reqNum
prev = time.Now()

latencyPercentiles := widgets.NewParagraph()
latencyPercentiles.Title = "Round trip latency, ms"
Expand All @@ -243,7 +287,8 @@ func (r *runner) runLiveUI() {

loadLimits := widgets.NewParagraph()
loadLimits.Title = "Load Limits"
loadLimits.Text = fmt.Sprintf("Concurrency: %d, <Right>/<Left>: ±5%%", atomic.LoadInt64(&r.concurrencyLimit))
loadLimits.Text = fmt.Sprintf("Concurrency: %d, <Right>/<Left>: ±5%%\nRate Limit: %d, <Up>/<Down>: ±5%%",
atomic.LoadInt64(&r.concurrencyLimit), atomic.LoadInt64(&r.rateLimit))

loadLimits.SetRect(60, 0, 100, 7)

Expand Down Expand Up @@ -276,8 +321,9 @@ func (r *runner) runLiveUI() {
rpsPlot.Data = append(rpsPlot.Data, rates[name])
}

rpsPlot.Title = "Requests per second:" + fmt.Sprintf("%.2f (total requests: %d, time passed: %s)\n",
rpsPlot.Title = fmt.Sprintf("Avg rps: %.2f, current rps: %.2f, total requests: %d, time passed: %s)\n",
reqRate,
reqRateTick,
r.roundTripHist.Count,
elaDur.String(),
)
Expand Down

0 comments on commit f89aab3

Please sign in to comment.