diff --git a/cli/flags.go b/cli/flags.go index bf024a50..9596cf0a 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -27,6 +27,8 @@ import ( "github.com/minio/pkg/v2/console" "github.com/minio/warp/pkg/bench" "github.com/minio/warp/pkg/generator" + + "golang.org/x/time/rate" ) // Collection of warp flags currently supported @@ -250,6 +252,11 @@ var ioFlags = []cli.Flag{ EnvVar: appNameUC + "_INFLUXDB_CONNECT", Usage: "Send operations to InfluxDB. Specify as 'http://@://'", }, + cli.Float64Flag{ + Name: "rps-limit", + Value: 0, + Usage: "Rate limit each instance to this number of requests per second (0 to disable)", + }, } func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common { @@ -263,6 +270,14 @@ func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common { extra = append(extra, in) } } + + rpsLimit := ctx.Float64("rps-limit") + var rpsLimiter *rate.Limiter + if rpsLimit > 0 { + // set burst to 1 as limiter will always be called to wait for 1 token + rpsLimiter = rate.NewLimiter(rate.Limit(rpsLimit), 1) + } + return bench.Common{ Client: newClient(ctx), Concurrency: ctx.Int("concurrent"), @@ -272,5 +287,6 @@ func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common { PutOpts: putOpts(ctx), DiscardOutput: ctx.Bool("stress"), ExtraOut: extra, + RpsLimiter: rpsLimiter, } } diff --git a/go.mod b/go.mod index dce47271..da1d4e7e 100644 --- a/go.mod +++ b/go.mod @@ -72,6 +72,7 @@ require ( golang.org/x/sync v0.5.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect + golang.org/x/time v0.5.0 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) diff --git a/go.sum b/go.sum index 90764486..9dd5f5d1 100644 --- a/go.sum +++ b/go.sum @@ -228,6 +228,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/pkg/bench/benchmark.go b/pkg/bench/benchmark.go index 3c8e9ceb..e71dccf9 100644 --- a/pkg/bench/benchmark.go +++ b/pkg/bench/benchmark.go @@ -28,6 +28,8 @@ import ( "github.com/minio/minio-go/v7" "github.com/minio/pkg/v2/console" "github.com/minio/warp/pkg/generator" + + "golang.org/x/time/rate" ) type Benchmark interface { @@ -93,6 +95,9 @@ type Common struct { // Does destination support versioning? Versioned bool + + // ratelimiting + RpsLimiter *rate.Limiter } const ( @@ -250,3 +255,11 @@ func (c *Common) addCollector() { } c.Collector.extra = c.ExtraOut } + +func (c *Common) rpsLimit(ctx context.Context) error { + if c.RpsLimiter == nil { + return nil + } + + return c.RpsLimiter.Wait(ctx) +} diff --git a/pkg/bench/delete.go b/pkg/bench/delete.go index 529658fd..8d192a8c 100644 --- a/pkg/bench/delete.go +++ b/pkg/bench/delete.go @@ -62,6 +62,7 @@ func (d *Delete) Prepare(ctx context.Context) error { go func(i int) { defer wg.Done() src := d.Source() + for range obj { opts := d.PutOpts rcv := d.Collector.Receiver() @@ -72,6 +73,11 @@ func (d *Delete) Prepare(ctx context.Context) error { return default: } + + if d.rpsLimit(ctx) != nil { + return + } + obj := src.Object() client, cldone := d.Client() op := Operation{ @@ -157,6 +163,10 @@ func (d *Delete) Start(ctx context.Context, wait chan struct{}) (Operations, err default: } + if d.rpsLimit(ctx) != nil { + return + } + // Fetch d.BatchSize objects mu.Lock() if len(d.objects) == 0 { diff --git a/pkg/bench/get.go b/pkg/bench/get.go index 55f35d63..f1a41efc 100644 --- a/pkg/bench/get.go +++ b/pkg/bench/get.go @@ -163,6 +163,11 @@ func (g *Get) Prepare(ctx context.Context) error { return default: } + + if g.rpsLimit(ctx) != nil { + return + } + obj := src.Object() name := obj.Name @@ -266,6 +271,11 @@ func (g *Get) Start(ctx context.Context, wait chan struct{}) (Operations, error) return default: } + + if g.rpsLimit(ctx) != nil { + return + } + fbr := firstByteRecorder{} obj := g.objects[rng.Intn(len(g.objects))] client, cldone := g.Client() diff --git a/pkg/bench/list.go b/pkg/bench/list.go index 18ea0739..ab523b8f 100644 --- a/pkg/bench/list.go +++ b/pkg/bench/list.go @@ -93,6 +93,11 @@ func (d *List) Prepare(ctx context.Context) error { return default: } + + if d.rpsLimit(ctx) != nil { + return + } + obj := src.Object() // Assure we don't have duplicates for { @@ -197,6 +202,10 @@ func (d *List) Start(ctx context.Context, wait chan struct{}) (Operations, error default: } + if d.rpsLimit(ctx) != nil { + return + } + prefix := objs[0].Prefix client, cldone := d.Client() op := Operation{ diff --git a/pkg/bench/mixed.go b/pkg/bench/mixed.go index 0627364c..1d69ce84 100644 --- a/pkg/bench/mixed.go +++ b/pkg/bench/mixed.go @@ -173,6 +173,7 @@ func (g *Mixed) Prepare(ctx context.Context) error { go func(i int) { defer wg.Done() src := g.Source() + for range obj { opts := g.PutOpts done := ctx.Done() @@ -182,6 +183,11 @@ func (g *Mixed) Prepare(ctx context.Context) error { return default: } + + if g.rpsLimit(ctx) != nil { + return + } + obj := src.Object() client, clDone := g.Client() opts.ContentType = obj.ContentType @@ -247,6 +253,11 @@ func (g *Mixed) Start(ctx context.Context, wait chan struct{}) (Operations, erro return default: } + + if g.rpsLimit(ctx) != nil { + return + } + operation := g.Dist.getOp() switch operation { case http.MethodGet: diff --git a/pkg/bench/multipart.go b/pkg/bench/multipart.go index d8dc90e0..21142e14 100644 --- a/pkg/bench/multipart.go +++ b/pkg/bench/multipart.go @@ -104,6 +104,11 @@ func (g *Multipart) Prepare(ctx context.Context) error { return default: } + + if g.rpsLimit(ctx) != nil { + return + } + name := g.ObjName // New input for each version obj := src.Object() @@ -213,6 +218,11 @@ func (g *Multipart) Start(ctx context.Context, wait chan struct{}) (Operations, return default: } + + if g.rpsLimit(ctx) != nil { + return + } + fbr := firstByteRecorder{} part := rng.Intn(len(g.objects)) obj := g.objects[part] diff --git a/pkg/bench/put.go b/pkg/bench/put.go index 7f9cee01..e737e6a4 100644 --- a/pkg/bench/put.go +++ b/pkg/bench/put.go @@ -67,6 +67,11 @@ func (u *Put) Start(ctx context.Context, wait chan struct{}) (Operations, error) return default: } + + if u.rpsLimit(ctx) != nil { + return + } + obj := src.Object() opts.ContentType = obj.ContentType client, cldone := u.Client() diff --git a/pkg/bench/retention.go b/pkg/bench/retention.go index 80245056..e77e254f 100644 --- a/pkg/bench/retention.go +++ b/pkg/bench/retention.go @@ -73,6 +73,7 @@ func (g *Retention) Prepare(ctx context.Context) error { go func(i int) { defer wg.Done() src := g.Source() + for range obj { opts := g.PutOpts rcv := g.Collector.Receiver() @@ -83,6 +84,11 @@ func (g *Retention) Prepare(ctx context.Context) error { return default: } + + if g.rpsLimit(ctx) != nil { + return + } + obj := src.Object() name := obj.Name for ver := 0; ver < g.Versions; ver++ { @@ -168,6 +174,11 @@ func (g *Retention) Start(ctx context.Context, wait chan struct{}) (Operations, return default: } + + if g.rpsLimit(ctx) != nil { + return + } + obj := g.objects[rng.Intn(len(g.objects))] client, cldone := g.Client() op := Operation{ diff --git a/pkg/bench/s3zip.go b/pkg/bench/s3zip.go index bfc728d3..72730807 100644 --- a/pkg/bench/s3zip.go +++ b/pkg/bench/s3zip.go @@ -69,6 +69,7 @@ func (g *S3Zip) Prepare(ctx context.Context) error { return default: } + obj := src.Object() opts.ContentType = obj.ContentType @@ -150,6 +151,11 @@ func (g *S3Zip) Start(ctx context.Context, wait chan struct{}) (Operations, erro return default: } + + if g.rpsLimit(ctx) != nil { + return + } + fbr := firstByteRecorder{} obj := g.objects[rng.Intn(len(g.objects))] client, cldone := g.Client() diff --git a/pkg/bench/select.go b/pkg/bench/select.go index 461f3979..de68d2b5 100644 --- a/pkg/bench/select.go +++ b/pkg/bench/select.go @@ -75,6 +75,11 @@ func (g *Select) Prepare(ctx context.Context) error { return default: } + + if g.rpsLimit(ctx) != nil { + return + } + obj := src.Object() client, cldone := g.Client() op := Operation{ @@ -153,6 +158,11 @@ func (g *Select) Start(ctx context.Context, wait chan struct{}) (Operations, err return default: } + + if g.rpsLimit(ctx) != nil { + return + } + fbr := firstByteRecorder{} obj := g.objects[rng.Intn(len(g.objects))] client, cldone := g.Client() diff --git a/pkg/bench/snowball.go b/pkg/bench/snowball.go index 7c826b2a..315b6df0 100644 --- a/pkg/bench/snowball.go +++ b/pkg/bench/snowball.go @@ -94,6 +94,11 @@ func (s *Snowball) Start(ctx context.Context, wait chan struct{}) (Operations, e return default: } + + if s.rpsLimit(ctx) != nil { + return + } + buf.Reset() w := io.Writer(&buf) if s.Compress { diff --git a/pkg/bench/stat.go b/pkg/bench/stat.go index dad32bdd..c9b7c8f3 100644 --- a/pkg/bench/stat.go +++ b/pkg/bench/stat.go @@ -89,6 +89,11 @@ func (g *Stat) Prepare(ctx context.Context) error { return default: } + + if g.rpsLimit(ctx) != nil { + return + } + obj := src.Object() name := obj.Name @@ -173,6 +178,11 @@ func (g *Stat) Start(ctx context.Context, wait chan struct{}) (Operations, error return default: } + + if g.rpsLimit(ctx) != nil { + return + } + obj := g.objects[rng.Intn(len(g.objects))] client, cldone := g.Client() op := Operation{ diff --git a/pkg/bench/versioned.go b/pkg/bench/versioned.go index 66707a49..10e460cc 100644 --- a/pkg/bench/versioned.go +++ b/pkg/bench/versioned.go @@ -78,6 +78,7 @@ func (g *Versioned) Prepare(ctx context.Context) error { go func(i int) { defer wg.Done() src := g.Source() + for range obj { opts := g.PutOpts done := ctx.Done() @@ -87,6 +88,11 @@ func (g *Versioned) Prepare(ctx context.Context) error { return default: } + + if g.rpsLimit(ctx) != nil { + return + } + obj := src.Object() client, clDone := g.Client() opts.ContentType = obj.ContentType @@ -151,6 +157,11 @@ func (g *Versioned) Start(ctx context.Context, wait chan struct{}) (Operations, return default: } + + if g.rpsLimit(ctx) != nil { + return + } + operation := g.Dist.getOp() switch operation { case http.MethodGet: