Skip to content

Commit

Permalink
--rps-per-worker-limit and --ratelimit-window flags
Browse files Browse the repository at this point in the history
when set, ratelimit operations during the prepare and the main phase of
the bench. The ratelimit applies to each concurrent worker and not globally.
  • Loading branch information
fatpat committed Jan 24, 2024
1 parent 1723cdb commit 941c7f5
Show file tree
Hide file tree
Showing 15 changed files with 213 additions and 8 deletions.
30 changes: 22 additions & 8 deletions cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"
"sync"
"time"

"github.com/minio/cli"
"github.com/minio/mc/pkg/probe"
Expand Down Expand Up @@ -250,6 +251,17 @@ var ioFlags = []cli.Flag{
EnvVar: appNameUC + "_INFLUXDB_CONNECT",
Usage: "Send operations to InfluxDB. Specify as 'http://<token>@<hostname>:<port>/<bucket>/<org>'",
},
cli.Float64Flag{
Name: "rps-per-worker-limit",
Value: 0,
Usage: "Ratelimit each concurrent worker to this number of actions per --ratelimit-window (set a positive value to activate, 0 by default)",
},
cli.DurationFlag{
Name: "ratelimit-window",
Value: 1 * time.Second,
Usage: "The time window to which the --ratelimit applies (1s by default)",
Hidden: true,
},
}

func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common {
Expand All @@ -264,13 +276,15 @@ func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common {
}
}
return bench.Common{
Client: newClient(ctx),
Concurrency: ctx.Int("concurrent"),
Source: src,
Bucket: ctx.String("bucket"),
Location: ctx.String("region"),
PutOpts: putOpts(ctx),
DiscardOutput: ctx.Bool("stress"),
ExtraOut: extra,
Client: newClient(ctx),
Concurrency: ctx.Int("concurrent"),
Source: src,
Bucket: ctx.String("bucket"),
Location: ctx.String("region"),
PutOpts: putOpts(ctx),
DiscardOutput: ctx.Bool("stress"),
ExtraOut: extra,
RpsPerWorkerLimit: ctx.Float64("rps-per-worker-limit"),
RatelimitWindow: ctx.Duration("ratelimit-window"),
}
}
4 changes: 4 additions & 0 deletions pkg/bench/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type Common struct {

// Does destination support versioning?
Versioned bool

// ratelimiting
RpsPerWorkerLimit float64
RatelimitWindow time.Duration
}

const (
Expand Down
12 changes: 12 additions & 0 deletions pkg/bench/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func (d *Delete) Prepare(ctx context.Context) error {
go func(i int) {
defer wg.Done()
src := d.Source()
ratelimit := InitRpsPerWorkerLimit(d.RpsPerWorkerLimit, d.RatelimitWindow)

for range obj {
opts := d.PutOpts
rcv := d.Collector.Receiver()
Expand All @@ -72,6 +74,11 @@ func (d *Delete) Prepare(ctx context.Context) error {
return
default:
}

if ratelimit.Limit(done) {
return
}

obj := src.Object()
client, cldone := d.Client()
op := Operation{
Expand Down Expand Up @@ -148,6 +155,7 @@ func (d *Delete) Start(ctx context.Context, wait chan struct{}) (Operations, err
rcv := c.Receiver()
defer wg.Done()
done := ctx.Done()
ratelimit := InitRpsPerWorkerLimit(d.RpsPerWorkerLimit, d.RatelimitWindow)

<-wait
for {
Expand All @@ -157,6 +165,10 @@ func (d *Delete) Start(ctx context.Context, wait chan struct{}) (Operations, err
default:
}

if ratelimit.Limit(done) {
return
}

// Fetch d.BatchSize objects
mu.Lock()
if len(d.objects) == 0 {
Expand Down
12 changes: 12 additions & 0 deletions pkg/bench/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,19 @@ func (g *Get) Prepare(ctx context.Context) error {
defer wg.Done()
src := g.Source()
opts := g.PutOpts
ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow)

for range obj {
select {
case <-ctx.Done():
return
default:
}

if ratelimit.Limit(ctx.Done()) {
return
}

obj := src.Object()

name := obj.Name
Expand Down Expand Up @@ -258,6 +264,7 @@ func (g *Get) Start(ctx context.Context, wait chan struct{}) (Operations, error)
defer wg.Done()
opts := g.GetOpts
done := ctx.Done()
ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow)

<-wait
for {
Expand All @@ -266,6 +273,11 @@ func (g *Get) Start(ctx context.Context, wait chan struct{}) (Operations, error)
return
default:
}

if ratelimit.Limit(done) {
return
}

fbr := firstByteRecorder{}
obj := g.objects[rng.Intn(len(g.objects))]
client, cldone := g.Client()
Expand Down
11 changes: 11 additions & 0 deletions pkg/bench/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,19 @@ func (d *List) Prepare(ctx context.Context) error {
rcv := d.Collector.Receiver()
done := ctx.Done()
exists := make(map[string]struct{}, objPerPrefix)
ratelimit := InitRpsPerWorkerLimit(d.RpsPerWorkerLimit, d.RatelimitWindow)

for j := 0; j < objPerPrefix; j++ {
select {
case <-done:
return
default:
}

if ratelimit.Limit(done) {
return
}

obj := src.Object()
// Assure we don't have duplicates
for {
Expand Down Expand Up @@ -188,6 +194,7 @@ func (d *List) Start(ctx context.Context, wait chan struct{}) (Operations, error
if d.NoPrefix {
wantN *= d.Concurrency
}
ratelimit := InitRpsPerWorkerLimit(d.RpsPerWorkerLimit, d.RatelimitWindow)

<-wait
for {
Expand All @@ -197,6 +204,10 @@ func (d *List) Start(ctx context.Context, wait chan struct{}) (Operations, error
default:
}

if ratelimit.Limit(done) {
return
}

prefix := objs[0].Prefix
client, cldone := d.Client()
op := Operation{
Expand Down
13 changes: 13 additions & 0 deletions pkg/bench/mixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ func (g *Mixed) Prepare(ctx context.Context) error {
go func(i int) {
defer wg.Done()
src := g.Source()
ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow)

for range obj {
opts := g.PutOpts
done := ctx.Done()
Expand All @@ -182,6 +184,11 @@ func (g *Mixed) Prepare(ctx context.Context) error {
return
default:
}

if ratelimit.Limit(done) {
return
}

obj := src.Object()
client, clDone := g.Client()
opts.ContentType = obj.ContentType
Expand Down Expand Up @@ -239,6 +246,7 @@ func (g *Mixed) Start(ctx context.Context, wait chan struct{}) (Operations, erro
putOpts := g.PutOpts
statOpts := g.StatOpts
getOpts := g.GetOpts
ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow)

<-wait
for {
Expand All @@ -247,6 +255,11 @@ func (g *Mixed) Start(ctx context.Context, wait chan struct{}) (Operations, erro
return
default:
}

if ratelimit.Limit(done) {
return
}

operation := g.Dist.getOp()
switch operation {
case http.MethodGet:
Expand Down
12 changes: 12 additions & 0 deletions pkg/bench/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,19 @@ func (g *Multipart) Prepare(ctx context.Context) error {
defer wg.Done()
src := g.Source()
opts := g.PutOpts
ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow)

for partN := range obj {
select {
case <-ctx.Done():
return
default:
}

if ratelimit.Limit(ctx.Done()) {
return
}

name := g.ObjName
// New input for each version
obj := src.Object()
Expand Down Expand Up @@ -205,6 +211,7 @@ func (g *Multipart) Start(ctx context.Context, wait chan struct{}) (Operations,
defer wg.Done()
opts := g.GetOpts
done := ctx.Done()
ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow)

<-wait
for {
Expand All @@ -213,6 +220,11 @@ func (g *Multipart) Start(ctx context.Context, wait chan struct{}) (Operations,
return
default:
}

if ratelimit.Limit(done) {
return
}

fbr := firstByteRecorder{}
part := rng.Intn(len(g.objects))
obj := g.objects[part]
Expand Down
6 changes: 6 additions & 0 deletions pkg/bench/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (u *Put) Start(ctx context.Context, wait chan struct{}) (Operations, error)
defer wg.Done()
opts := u.PutOpts
done := ctx.Done()
ratelimit := InitRpsPerWorkerLimit(u.RpsPerWorkerLimit, u.RatelimitWindow)

<-wait
for {
Expand All @@ -67,6 +68,11 @@ func (u *Put) Start(ctx context.Context, wait chan struct{}) (Operations, error)
return
default:
}

if ratelimit.Limit(done) {
return
}

obj := src.Object()
opts.ContentType = obj.ContentType
client, cldone := u.Client()
Expand Down
58 changes: 58 additions & 0 deletions pkg/bench/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package bench

import (
"time"
)

//
// RpsPerWorkerLimit is not race condition proof
// it must be used in a single goroutine
//

type RpsPerWorkerLimit struct {
limit float64
window time.Duration
counter int64
start time.Time
}

func InitRpsPerWorkerLimit(limit float64, window time.Duration) RpsPerWorkerLimit {
return RpsPerWorkerLimit{
limit: limit,
window: window,
counter: 0,
}
}

// Limit returns true if the function has been interrupted by 'done', false otherwise
// Check if there is room for a new request, if not, sleep for the duration
// that will limit the requests rate whithin the ratelimit
func (r *RpsPerWorkerLimit) Limit(done <-chan struct{}) bool {
if r.limit <= 0 {
return false
}

// init the relative clock when first used
if r.start.IsZero() {
r.start = time.Now()
}

// calculate the time to sleep before next request to stay under the limit within the window
// sleepDuration <= 0: we have room for new requests, no need to ratelimit
// sleepDuration > 0: no more room for new requests, we have to wait before allowing new requests
elapsed := time.Since(r.start)
sleepDuration := time.Duration(float64(r.counter)/(r.limit/float64(r.window))) - elapsed

// increment the number of requests
r.counter++

// sleep for sleepDuration and allow to be interrupted by done
// if sleepDuration is <= 0, it will immediately expire
timer := time.NewTimer(sleepDuration)
select {
case <-done:
return true
case <-timer.C:
return false
}
}
13 changes: 13 additions & 0 deletions pkg/bench/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func (g *Retention) Prepare(ctx context.Context) error {
go func(i int) {
defer wg.Done()
src := g.Source()
ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow)

for range obj {
opts := g.PutOpts
rcv := g.Collector.Receiver()
Expand All @@ -83,6 +85,11 @@ func (g *Retention) Prepare(ctx context.Context) error {
return
default:
}

if ratelimit.Limit(done) {
return
}

obj := src.Object()
name := obj.Name
for ver := 0; ver < g.Versions; ver++ {
Expand Down Expand Up @@ -159,6 +166,7 @@ func (g *Retention) Start(ctx context.Context, wait chan struct{}) (Operations,
defer wg.Done()
done := ctx.Done()
var opts minio.PutObjectRetentionOptions
ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow)

<-wait
mode := minio.Governance
Expand All @@ -168,6 +176,11 @@ func (g *Retention) Start(ctx context.Context, wait chan struct{}) (Operations,
return
default:
}

if ratelimit.Limit(done) {
return
}

obj := g.objects[rng.Intn(len(g.objects))]
client, cldone := g.Client()
op := Operation{
Expand Down
Loading

0 comments on commit 941c7f5

Please sign in to comment.