diff --git a/cli/flags.go b/cli/flags.go index bf024a50..cb97da7d 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -21,6 +21,7 @@ import ( "fmt" "os" "sync" + "time" "github.com/minio/cli" "github.com/minio/mc/pkg/probe" @@ -250,6 +251,17 @@ var ioFlags = []cli.Flag{ EnvVar: appNameUC + "_INFLUXDB_CONNECT", Usage: "Send operations to InfluxDB. Specify as 'http://@://'", }, + cli.Float64Flag{ + Name: "rps-per-worker-limit", + Value: 0, + Usage: "Ratelimit each concurrent worker to this number of actions per --ratelimit-window (set a positive value to activate, 0 by default)", + }, + cli.DurationFlag{ + Name: "ratelimit-window", + Value: 1 * time.Second, + Usage: "The time window to which the --ratelimit applies (1s by default)", + Hidden: true, + }, } func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common { @@ -264,13 +276,15 @@ func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common { } } return bench.Common{ - Client: newClient(ctx), - Concurrency: ctx.Int("concurrent"), - Source: src, - Bucket: ctx.String("bucket"), - Location: ctx.String("region"), - PutOpts: putOpts(ctx), - DiscardOutput: ctx.Bool("stress"), - ExtraOut: extra, + Client: newClient(ctx), + Concurrency: ctx.Int("concurrent"), + Source: src, + Bucket: ctx.String("bucket"), + Location: ctx.String("region"), + PutOpts: putOpts(ctx), + DiscardOutput: ctx.Bool("stress"), + ExtraOut: extra, + RpsPerWorkerLimit: ctx.Float64("rps-per-worker-limit"), + RatelimitWindow: ctx.Duration("ratelimit-window"), } } diff --git a/pkg/bench/benchmark.go b/pkg/bench/benchmark.go index 3c8e9ceb..4d9256fe 100644 --- a/pkg/bench/benchmark.go +++ b/pkg/bench/benchmark.go @@ -93,6 +93,10 @@ type Common struct { // Does destination support versioning? Versioned bool + + // ratelimiting + RpsPerWorkerLimit float64 + RatelimitWindow time.Duration } const ( diff --git a/pkg/bench/delete.go b/pkg/bench/delete.go index 529658fd..03fdea47 100644 --- a/pkg/bench/delete.go +++ b/pkg/bench/delete.go @@ -62,6 +62,8 @@ func (d *Delete) Prepare(ctx context.Context) error { go func(i int) { defer wg.Done() src := d.Source() + ratelimit := InitRpsPerWorkerLimit(d.RpsPerWorkerLimit, d.RatelimitWindow) + for range obj { opts := d.PutOpts rcv := d.Collector.Receiver() @@ -72,6 +74,11 @@ func (d *Delete) Prepare(ctx context.Context) error { return default: } + + if ratelimit.Limit(done) { + return + } + obj := src.Object() client, cldone := d.Client() op := Operation{ @@ -148,6 +155,7 @@ func (d *Delete) Start(ctx context.Context, wait chan struct{}) (Operations, err rcv := c.Receiver() defer wg.Done() done := ctx.Done() + ratelimit := InitRpsPerWorkerLimit(d.RpsPerWorkerLimit, d.RatelimitWindow) <-wait for { @@ -157,6 +165,10 @@ func (d *Delete) Start(ctx context.Context, wait chan struct{}) (Operations, err default: } + if ratelimit.Limit(done) { + return + } + // Fetch d.BatchSize objects mu.Lock() if len(d.objects) == 0 { diff --git a/pkg/bench/get.go b/pkg/bench/get.go index 55f35d63..96aa372e 100644 --- a/pkg/bench/get.go +++ b/pkg/bench/get.go @@ -156,6 +156,7 @@ func (g *Get) Prepare(ctx context.Context) error { defer wg.Done() src := g.Source() opts := g.PutOpts + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) for range obj { select { @@ -163,6 +164,11 @@ func (g *Get) Prepare(ctx context.Context) error { return default: } + + if ratelimit.Limit(ctx.Done()) { + return + } + obj := src.Object() name := obj.Name @@ -258,6 +264,7 @@ func (g *Get) Start(ctx context.Context, wait chan struct{}) (Operations, error) defer wg.Done() opts := g.GetOpts done := ctx.Done() + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) <-wait for { @@ -266,6 +273,11 @@ func (g *Get) Start(ctx context.Context, wait chan struct{}) (Operations, error) return default: } + + if ratelimit.Limit(done) { + return + } + fbr := firstByteRecorder{} obj := g.objects[rng.Intn(len(g.objects))] client, cldone := g.Client() diff --git a/pkg/bench/list.go b/pkg/bench/list.go index 18ea0739..2c4149f7 100644 --- a/pkg/bench/list.go +++ b/pkg/bench/list.go @@ -86,6 +86,7 @@ func (d *List) Prepare(ctx context.Context) error { rcv := d.Collector.Receiver() done := ctx.Done() exists := make(map[string]struct{}, objPerPrefix) + ratelimit := InitRpsPerWorkerLimit(d.RpsPerWorkerLimit, d.RatelimitWindow) for j := 0; j < objPerPrefix; j++ { select { @@ -93,6 +94,11 @@ func (d *List) Prepare(ctx context.Context) error { return default: } + + if ratelimit.Limit(done) { + return + } + obj := src.Object() // Assure we don't have duplicates for { @@ -188,6 +194,7 @@ func (d *List) Start(ctx context.Context, wait chan struct{}) (Operations, error if d.NoPrefix { wantN *= d.Concurrency } + ratelimit := InitRpsPerWorkerLimit(d.RpsPerWorkerLimit, d.RatelimitWindow) <-wait for { @@ -197,6 +204,10 @@ func (d *List) Start(ctx context.Context, wait chan struct{}) (Operations, error default: } + if ratelimit.Limit(done) { + return + } + prefix := objs[0].Prefix client, cldone := d.Client() op := Operation{ diff --git a/pkg/bench/mixed.go b/pkg/bench/mixed.go index 0627364c..cbf7bad8 100644 --- a/pkg/bench/mixed.go +++ b/pkg/bench/mixed.go @@ -173,6 +173,8 @@ func (g *Mixed) Prepare(ctx context.Context) error { go func(i int) { defer wg.Done() src := g.Source() + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) + for range obj { opts := g.PutOpts done := ctx.Done() @@ -182,6 +184,11 @@ func (g *Mixed) Prepare(ctx context.Context) error { return default: } + + if ratelimit.Limit(done) { + return + } + obj := src.Object() client, clDone := g.Client() opts.ContentType = obj.ContentType @@ -239,6 +246,7 @@ func (g *Mixed) Start(ctx context.Context, wait chan struct{}) (Operations, erro putOpts := g.PutOpts statOpts := g.StatOpts getOpts := g.GetOpts + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) <-wait for { @@ -247,6 +255,11 @@ func (g *Mixed) Start(ctx context.Context, wait chan struct{}) (Operations, erro return default: } + + if ratelimit.Limit(done) { + return + } + operation := g.Dist.getOp() switch operation { case http.MethodGet: diff --git a/pkg/bench/multipart.go b/pkg/bench/multipart.go index d8dc90e0..5209d3a2 100644 --- a/pkg/bench/multipart.go +++ b/pkg/bench/multipart.go @@ -97,6 +97,7 @@ func (g *Multipart) Prepare(ctx context.Context) error { defer wg.Done() src := g.Source() opts := g.PutOpts + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) for partN := range obj { select { @@ -104,6 +105,11 @@ func (g *Multipart) Prepare(ctx context.Context) error { return default: } + + if ratelimit.Limit(ctx.Done()) { + return + } + name := g.ObjName // New input for each version obj := src.Object() @@ -205,6 +211,7 @@ func (g *Multipart) Start(ctx context.Context, wait chan struct{}) (Operations, defer wg.Done() opts := g.GetOpts done := ctx.Done() + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) <-wait for { @@ -213,6 +220,11 @@ func (g *Multipart) Start(ctx context.Context, wait chan struct{}) (Operations, return default: } + + if ratelimit.Limit(done) { + return + } + fbr := firstByteRecorder{} part := rng.Intn(len(g.objects)) obj := g.objects[part] diff --git a/pkg/bench/put.go b/pkg/bench/put.go index 7f9cee01..5ddf29c0 100644 --- a/pkg/bench/put.go +++ b/pkg/bench/put.go @@ -59,6 +59,7 @@ func (u *Put) Start(ctx context.Context, wait chan struct{}) (Operations, error) defer wg.Done() opts := u.PutOpts done := ctx.Done() + ratelimit := InitRpsPerWorkerLimit(u.RpsPerWorkerLimit, u.RatelimitWindow) <-wait for { @@ -67,6 +68,11 @@ func (u *Put) Start(ctx context.Context, wait chan struct{}) (Operations, error) return default: } + + if ratelimit.Limit(done) { + return + } + obj := src.Object() opts.ContentType = obj.ContentType client, cldone := u.Client() diff --git a/pkg/bench/ratelimit.go b/pkg/bench/ratelimit.go new file mode 100644 index 00000000..63307fb1 --- /dev/null +++ b/pkg/bench/ratelimit.go @@ -0,0 +1,58 @@ +package bench + +import ( + "time" +) + +// +// RpsPerWorkerLimit is not race condition proof +// it must be used in a single goroutine +// + +type RpsPerWorkerLimit struct { + limit float64 + window time.Duration + counter int64 + start time.Time +} + +func InitRpsPerWorkerLimit(limit float64, window time.Duration) RpsPerWorkerLimit { + return RpsPerWorkerLimit{ + limit: limit, + window: window, + counter: 0, + } +} + +// Limit returns true if the function has been interrupted by 'done', false otherwise +// Check if there is room for a new request, if not, sleep for the duration +// that will limit the requests rate whithin the ratelimit +func (r *RpsPerWorkerLimit) Limit(done <-chan struct{}) bool { + if r.limit <= 0 { + return false + } + + // init the relative clock when first used + if r.start.IsZero() { + r.start = time.Now() + } + + // calculate the time to sleep before next request to stay under the limit within the window + // sleepDuration <= 0: we have room for new requests, no need to ratelimit + // sleepDuration > 0: no more room for new requests, we have to wait before allowing new requests + elapsed := time.Since(r.start) + sleepDuration := time.Duration(float64(r.counter)/(r.limit/float64(r.window))) - elapsed + + // increment the number of requests + r.counter++ + + // sleep for sleepDuration and allow to be interrupted by done + // if sleepDuration is <= 0, it will immediately expire + timer := time.NewTimer(sleepDuration) + select { + case <-done: + return true + case <-timer.C: + return false + } +} diff --git a/pkg/bench/retention.go b/pkg/bench/retention.go index 80245056..21b17bd3 100644 --- a/pkg/bench/retention.go +++ b/pkg/bench/retention.go @@ -73,6 +73,8 @@ func (g *Retention) Prepare(ctx context.Context) error { go func(i int) { defer wg.Done() src := g.Source() + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) + for range obj { opts := g.PutOpts rcv := g.Collector.Receiver() @@ -83,6 +85,11 @@ func (g *Retention) Prepare(ctx context.Context) error { return default: } + + if ratelimit.Limit(done) { + return + } + obj := src.Object() name := obj.Name for ver := 0; ver < g.Versions; ver++ { @@ -159,6 +166,7 @@ func (g *Retention) Start(ctx context.Context, wait chan struct{}) (Operations, defer wg.Done() done := ctx.Done() var opts minio.PutObjectRetentionOptions + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) <-wait mode := minio.Governance @@ -168,6 +176,11 @@ func (g *Retention) Start(ctx context.Context, wait chan struct{}) (Operations, return default: } + + if ratelimit.Limit(done) { + return + } + obj := g.objects[rng.Intn(len(g.objects))] client, cldone := g.Client() op := Operation{ diff --git a/pkg/bench/s3zip.go b/pkg/bench/s3zip.go index bfc728d3..4678231f 100644 --- a/pkg/bench/s3zip.go +++ b/pkg/bench/s3zip.go @@ -69,6 +69,7 @@ func (g *S3Zip) Prepare(ctx context.Context) error { return default: } + obj := src.Object() opts.ContentType = obj.ContentType @@ -142,6 +143,7 @@ func (g *S3Zip) Start(ctx context.Context, wait chan struct{}) (Operations, erro defer wg.Done() done := ctx.Done() var opts minio.GetObjectOptions + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) <-wait for { @@ -150,6 +152,11 @@ func (g *S3Zip) Start(ctx context.Context, wait chan struct{}) (Operations, erro return default: } + + if ratelimit.Limit(done) { + return + } + fbr := firstByteRecorder{} obj := g.objects[rng.Intn(len(g.objects))] client, cldone := g.Client() diff --git a/pkg/bench/select.go b/pkg/bench/select.go index 461f3979..1f425a6a 100644 --- a/pkg/bench/select.go +++ b/pkg/bench/select.go @@ -69,12 +69,18 @@ func (g *Select) Prepare(ctx context.Context) error { opts := g.PutOpts rcv := g.Collector.Receiver() done := ctx.Done() + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) select { case <-done: return default: } + + if ratelimit.Limit(done) { + return + } + obj := src.Object() client, cldone := g.Client() op := Operation{ @@ -145,6 +151,7 @@ func (g *Select) Start(ctx context.Context, wait chan struct{}) (Operations, err defer wg.Done() opts := g.SelectOpts done := ctx.Done() + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) <-wait for { @@ -153,6 +160,11 @@ func (g *Select) Start(ctx context.Context, wait chan struct{}) (Operations, err return default: } + + if ratelimit.Limit(done) { + return + } + fbr := firstByteRecorder{} obj := g.objects[rng.Intn(len(g.objects))] client, cldone := g.Client() diff --git a/pkg/bench/snowball.go b/pkg/bench/snowball.go index 7c826b2a..f96e41a9 100644 --- a/pkg/bench/snowball.go +++ b/pkg/bench/snowball.go @@ -86,6 +86,7 @@ func (s *Snowball) Start(ctx context.Context, wait chan struct{}) (Operations, e opts := s.PutOpts opts.UserMetadata = map[string]string{"X-Amz-Meta-Snowball-Auto-Extract": "true"} done := ctx.Done() + ratelimit := InitRpsPerWorkerLimit(s.RpsPerWorkerLimit, s.RatelimitWindow) <-wait for { @@ -94,6 +95,11 @@ func (s *Snowball) Start(ctx context.Context, wait chan struct{}) (Operations, e return default: } + + if ratelimit.Limit(done) { + return + } + buf.Reset() w := io.Writer(&buf) if s.Compress { diff --git a/pkg/bench/stat.go b/pkg/bench/stat.go index dad32bdd..25cfe984 100644 --- a/pkg/bench/stat.go +++ b/pkg/bench/stat.go @@ -82,6 +82,7 @@ func (g *Stat) Prepare(ctx context.Context) error { defer wg.Done() src := g.Source() opts := g.PutOpts + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) for range obj { select { @@ -89,6 +90,11 @@ func (g *Stat) Prepare(ctx context.Context) error { return default: } + + if ratelimit.Limit(ctx.Done()) { + return + } + obj := src.Object() name := obj.Name @@ -165,6 +171,7 @@ func (g *Stat) Start(ctx context.Context, wait chan struct{}) (Operations, error defer wg.Done() opts := g.StatOpts done := ctx.Done() + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) <-wait for { @@ -173,6 +180,11 @@ func (g *Stat) Start(ctx context.Context, wait chan struct{}) (Operations, error return default: } + + if ratelimit.Limit(done) { + return + } + obj := g.objects[rng.Intn(len(g.objects))] client, cldone := g.Client() op := Operation{ diff --git a/pkg/bench/versioned.go b/pkg/bench/versioned.go index 66707a49..0ec26bba 100644 --- a/pkg/bench/versioned.go +++ b/pkg/bench/versioned.go @@ -78,6 +78,8 @@ func (g *Versioned) Prepare(ctx context.Context) error { go func(i int) { defer wg.Done() src := g.Source() + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) + for range obj { opts := g.PutOpts done := ctx.Done() @@ -87,6 +89,11 @@ func (g *Versioned) Prepare(ctx context.Context) error { return default: } + + if ratelimit.Limit(done) { + return + } + obj := src.Object() client, clDone := g.Client() opts.ContentType = obj.ContentType @@ -143,6 +150,7 @@ func (g *Versioned) Start(ctx context.Context, wait chan struct{}) (Operations, putOpts := g.PutOpts statOpts := g.StatOpts getOpts := g.GetOpts + ratelimit := InitRpsPerWorkerLimit(g.RpsPerWorkerLimit, g.RatelimitWindow) <-wait for { @@ -151,6 +159,11 @@ func (g *Versioned) Start(ctx context.Context, wait chan struct{}) (Operations, return default: } + + if ratelimit.Limit(done) { + return + } + operation := g.Dist.getOp() switch operation { case http.MethodGet: