diff --git a/cli/get.go b/cli/get.go index 841a51b..0e5f6fc 100644 --- a/cli/get.go +++ b/cli/get.go @@ -102,6 +102,9 @@ func checkGetSyntax(ctx *cli.Context) { if ctx.Int("versions") < 1 { console.Fatal("At least one version must be tested") } + if ctx.Int("objects") < 1 { + console.Fatal("At least one object must be tested") + } checkAnalyze(ctx) checkBenchmark(ctx) } diff --git a/cli/list.go b/cli/list.go index cb1f6fe..875f4eb 100644 --- a/cli/list.go +++ b/cli/list.go @@ -27,7 +27,7 @@ var listFlags = []cli.Flag{ cli.IntFlag{ Name: "objects", Value: 10000, - Usage: "Number of objects to upload. Rounded to have equal concurrent objects.", + Usage: "Number of objects to upload. Rounded up to have equal concurrent objects.", }, cli.IntFlag{ Name: "versions", @@ -84,6 +84,9 @@ func checkListSyntax(ctx *cli.Context) { if ctx.Int("versions") < 1 { console.Fatal("At least one version must be tested") } + if ctx.Int("objects") < 1 { + console.Fatal("At least one object must be tested") + } checkAnalyze(ctx) checkBenchmark(ctx) diff --git a/cli/mixed.go b/cli/mixed.go index ca9ca50..f40ad72 100644 --- a/cli/mixed.go +++ b/cli/mixed.go @@ -108,7 +108,9 @@ func checkMixedSyntax(ctx *cli.Context) { if ctx.NArg() > 0 { console.Fatal("Command takes no arguments") } - + if ctx.Int("objects") < 1 { + console.Fatal("At least one object must be tested") + } checkAnalyze(ctx) checkBenchmark(ctx) } diff --git a/cli/multipart.go b/cli/multipart.go index 87aab36..79a0d02 100644 --- a/cli/multipart.go +++ b/cli/multipart.go @@ -125,6 +125,7 @@ func checkMultipartSyntax(ctx *cli.Context) { console.Fatal("part.size must be >= 5MiB") } } + checkAnalyze(ctx) checkBenchmark(ctx) } diff --git a/cli/select.go b/cli/select.go index 50064ba..9879213 100644 --- a/cli/select.go +++ b/cli/select.go @@ -20,6 +20,7 @@ package cli import ( "github.com/minio/cli" "github.com/minio/minio-go/v7" + "github.com/minio/pkg/v2/console" "github.com/minio/warp/pkg/bench" ) @@ -91,6 +92,9 @@ func mainSelect(ctx *cli.Context) error { } func checkSelectSyntax(ctx *cli.Context) { + if ctx.Int("objects") < 1 { + console.Fatal("At least one object must be tested") + } checkAnalyze(ctx) checkBenchmark(ctx) } diff --git a/cli/stat.go b/cli/stat.go index 91aa491..79eb04a 100644 --- a/cli/stat.go +++ b/cli/stat.go @@ -83,6 +83,9 @@ func checkStatSyntax(ctx *cli.Context) { if ctx.Int("versions") < 1 { console.Fatal("At least one version must be tested") } + if ctx.Int("objects") < 1 { + console.Fatal("At least one object must be tested") + } checkAnalyze(ctx) checkBenchmark(ctx) } diff --git a/cli/versioned.go b/cli/versioned.go index fdf8a1d..4869972 100644 --- a/cli/versioned.go +++ b/cli/versioned.go @@ -108,7 +108,9 @@ func checkVersionedSyntax(ctx *cli.Context) { if ctx.NArg() > 0 { console.Fatal("Command takes no arguments") } - + if ctx.Int("objects") < 1 { + console.Fatal("At least one object must be tested") + } checkAnalyze(ctx) checkBenchmark(ctx) } diff --git a/pkg/bench/benchmark.go b/pkg/bench/benchmark.go index e71dccf..5be15f6 100644 --- a/pkg/bench/benchmark.go +++ b/pkg/bench/benchmark.go @@ -263,3 +263,13 @@ func (c *Common) rpsLimit(ctx context.Context) error { return c.RpsLimiter.Wait(ctx) } + +func splitObjs(objects, concurrency int) [][]struct{} { + res := make([][]struct{}, concurrency) + // Round up if not cleanly divisible + inEach := (objects + concurrency - 1) / concurrency + for i := range res { + res[i] = make([]struct{}, inEach) + } + return res +} diff --git a/pkg/bench/delete.go b/pkg/bench/delete.go index d702e91..7d7bee1 100644 --- a/pkg/bench/delete.go +++ b/pkg/bench/delete.go @@ -108,14 +108,11 @@ func (d *Delete) Prepare(ctx context.Context) error { var wg sync.WaitGroup wg.Add(d.Concurrency) d.addCollector() - obj := make(chan struct{}, d.CreateObjects) - for i := 0; i < d.CreateObjects; i++ { - obj <- struct{}{} - } - close(obj) + objs := splitObjs(d.CreateObjects, d.Concurrency) + var mu sync.Mutex - for i := 0; i < d.Concurrency; i++ { - go func(i int) { + for i, obj := range objs { + go func(i int, obj []struct{}) { defer wg.Done() src := d.Source() @@ -179,7 +176,7 @@ func (d *Delete) Prepare(ctx context.Context) error { mu.Unlock() rcv <- op } - }(i) + }(i, obj) } wg.Wait() diff --git a/pkg/bench/get.go b/pkg/bench/get.go index f1a41ef..e8c6ad3 100644 --- a/pkg/bench/get.go +++ b/pkg/bench/get.go @@ -142,17 +142,13 @@ func (g *Get) Prepare(ctx context.Context) error { var wg sync.WaitGroup wg.Add(g.Concurrency) - obj := make(chan struct{}, g.CreateObjects) - for i := 0; i < g.CreateObjects; i++ { - obj <- struct{}{} - } + objs := splitObjs(g.CreateObjects, g.Concurrency) rcv := g.Collector.rcv - close(obj) var groupErr error var mu sync.Mutex - for i := 0; i < g.Concurrency; i++ { - go func(i int) { + for i, obj := range objs { + go func(i int, obj []struct{}) { defer wg.Done() src := g.Source() opts := g.PutOpts @@ -219,7 +215,7 @@ func (g *Get) Prepare(ctx context.Context) error { rcv <- op } } - }(i) + }(i, obj) } wg.Wait() return groupErr diff --git a/pkg/bench/list.go b/pkg/bench/list.go index ab523b8..8c22944 100644 --- a/pkg/bench/list.go +++ b/pkg/bench/list.go @@ -60,7 +60,7 @@ func (d *List) Prepare(ctx context.Context) error { done() } - objPerPrefix := d.CreateObjects / d.Concurrency + objPerPrefix := (d.CreateObjects + d.Concurrency - 1) / d.Concurrency console.Eraseline() x := "" if d.Versions > 1 { @@ -224,6 +224,7 @@ func (d *List) Start(ctx context.Context, wait chan struct{}) (Operations, error Prefix: objs[0].Prefix, Recursive: true, WithVersions: d.Versions > 1, + MaxKeys: 100, }) // Wait for errCh to close. diff --git a/pkg/bench/mixed.go b/pkg/bench/mixed.go index 1209adb..440347d 100644 --- a/pkg/bench/mixed.go +++ b/pkg/bench/mixed.go @@ -162,15 +162,12 @@ func (g *Mixed) Prepare(ctx context.Context) error { var wg sync.WaitGroup wg.Add(g.Concurrency) g.addCollector() - obj := make(chan struct{}, g.CreateObjects) - for i := 0; i < g.CreateObjects; i++ { - obj <- struct{}{} - } - close(obj) var groupErr error + + objs := splitObjs(g.CreateObjects, g.Concurrency) var mu sync.Mutex - for i := 0; i < g.Concurrency; i++ { - go func() { + for _, obj := range objs { + go func(obj []struct{}) { defer wg.Done() src := g.Source() @@ -218,7 +215,7 @@ func (g *Mixed) Prepare(ctx context.Context) error { g.Dist.addObj(*obj) g.prepareProgress(float64(len(g.Dist.objects)) / float64(g.CreateObjects)) } - }() + }(obj) } wg.Wait() return groupErr diff --git a/pkg/bench/multipart.go b/pkg/bench/multipart.go index 21142e1..43ca7ac 100644 --- a/pkg/bench/multipart.go +++ b/pkg/bench/multipart.go @@ -80,20 +80,17 @@ func (g *Multipart) Prepare(ctx context.Context) error { var wg sync.WaitGroup wg.Add(g.Concurrency) g.addCollector() - obj := make(chan int, g.CreateParts) - for i := 0; i < g.CreateParts; i++ { - obj <- i + g.PartStart - } + objs := splitObjs(g.CreateParts, g.Concurrency) + rcv := g.Collector.rcv - close(obj) var groupErr error var mu sync.Mutex if g.Custom == nil { g.Custom = make(map[string]string, g.CreateParts) } - for i := 0; i < g.Concurrency; i++ { - go func(i int) { + for i, obj := range objs { + go func(i int, obj []struct{}) { defer wg.Done() src := g.Source() opts := g.PutOpts @@ -164,7 +161,7 @@ func (g *Multipart) Prepare(ctx context.Context) error { mu.Unlock() rcv <- op } - }(i) + }(i, obj) } wg.Wait() return groupErr diff --git a/pkg/bench/retention.go b/pkg/bench/retention.go index e77e254..3786be5 100644 --- a/pkg/bench/retention.go +++ b/pkg/bench/retention.go @@ -61,16 +61,12 @@ func (g *Retention) Prepare(ctx context.Context) error { var wg sync.WaitGroup wg.Add(g.Concurrency) g.addCollector() - obj := make(chan struct{}, g.CreateObjects) - for i := 0; i < g.CreateObjects; i++ { - obj <- struct{}{} - } - close(obj) + objs := splitObjs(g.CreateObjects, g.Concurrency) var groupErr error var mu sync.Mutex - for i := 0; i < g.Concurrency; i++ { - go func(i int) { + for i, obj := range objs { + go func(i int, obj []struct{}) { defer wg.Done() src := g.Source() @@ -139,7 +135,7 @@ func (g *Retention) Prepare(ctx context.Context) error { rcv <- op } } - }(i) + }(i, obj) } wg.Wait() return groupErr diff --git a/pkg/bench/select.go b/pkg/bench/select.go index de68d2b..155434a 100644 --- a/pkg/bench/select.go +++ b/pkg/bench/select.go @@ -54,15 +54,12 @@ func (g *Select) Prepare(ctx context.Context) error { var wg sync.WaitGroup wg.Add(g.Concurrency) g.addCollector() - obj := make(chan struct{}, g.CreateObjects) - for i := 0; i < g.CreateObjects; i++ { - obj <- struct{}{} - } - close(obj) + objs := splitObjs(g.CreateObjects, g.Concurrency) + var groupErr error var mu sync.Mutex - for i := 0; i < g.Concurrency; i++ { - go func(i int) { + for i, obj := range objs { + go func(i int, obj []struct{}) { defer wg.Done() src := g.Source() for range obj { @@ -124,7 +121,7 @@ func (g *Select) Prepare(ctx context.Context) error { mu.Unlock() rcv <- op } - }(i) + }(i, obj) } wg.Wait() return groupErr diff --git a/pkg/bench/stat.go b/pkg/bench/stat.go index c9b7c8f..e822cdb 100644 --- a/pkg/bench/stat.go +++ b/pkg/bench/stat.go @@ -68,17 +68,13 @@ func (g *Stat) Prepare(ctx context.Context) error { var wg sync.WaitGroup wg.Add(g.Concurrency) g.addCollector() - obj := make(chan struct{}, g.CreateObjects) - for i := 0; i < g.CreateObjects; i++ { - obj <- struct{}{} - } + objs := splitObjs(g.CreateObjects, g.Concurrency) rcv := g.Collector.rcv - close(obj) var groupErr error var mu sync.Mutex - for i := 0; i < g.Concurrency; i++ { - go func(i int) { + for i, obj := range objs { + go func(i int, obj []struct{}) { defer wg.Done() src := g.Source() opts := g.PutOpts @@ -145,7 +141,7 @@ func (g *Stat) Prepare(ctx context.Context) error { rcv <- op } } - }(i) + }(i, obj) } wg.Wait() return groupErr diff --git a/pkg/bench/versioned.go b/pkg/bench/versioned.go index 511b5b0..11d2411 100644 --- a/pkg/bench/versioned.go +++ b/pkg/bench/versioned.go @@ -66,15 +66,12 @@ func (g *Versioned) Prepare(ctx context.Context) error { var wg sync.WaitGroup wg.Add(g.Concurrency) g.addCollector() - obj := make(chan struct{}, g.CreateObjects) - for i := 0; i < g.CreateObjects; i++ { - obj <- struct{}{} - } - close(obj) + objs := splitObjs(g.CreateObjects, g.Concurrency) + var groupErr error var mu sync.Mutex - for i := 0; i < g.Concurrency; i++ { - go func() { + for _, obj := range objs { + go func(obj []struct{}) { defer wg.Done() src := g.Source() @@ -122,7 +119,7 @@ func (g *Versioned) Prepare(ctx context.Context) error { g.Dist.addObj(*obj) g.prepareProgress(float64(len(g.Dist.objects)) / float64(g.CreateObjects)) } - }() + }(obj) } wg.Wait() return groupErr