Skip to content

Commit

Permalink
Make object count a multiple of concurrency
Browse files Browse the repository at this point in the history
Also make sure that the same number of objects are uploaded to each prefix, by using a slice instead of a channel.

Also fixes minio#322
  • Loading branch information
klauspost committed Jun 21, 2024
1 parent ff64916 commit c1779cb
Show file tree
Hide file tree
Showing 17 changed files with 70 additions and 68 deletions.
3 changes: 3 additions & 0 deletions cli/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func checkGetSyntax(ctx *cli.Context) {
if ctx.Int("versions") < 1 {
console.Fatal("At least one version must be tested")
}
if ctx.Int("objects") < 1 {
console.Fatal("At least one object must be tested")
}
checkAnalyze(ctx)
checkBenchmark(ctx)
}
5 changes: 4 additions & 1 deletion cli/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var listFlags = []cli.Flag{
cli.IntFlag{
Name: "objects",
Value: 10000,
Usage: "Number of objects to upload. Rounded to have equal concurrent objects.",
Usage: "Number of objects to upload. Rounded up to have equal concurrent objects.",
},
cli.IntFlag{
Name: "versions",
Expand Down Expand Up @@ -84,6 +84,9 @@ func checkListSyntax(ctx *cli.Context) {
if ctx.Int("versions") < 1 {
console.Fatal("At least one version must be tested")
}
if ctx.Int("objects") < 1 {
console.Fatal("At least one object must be tested")
}

checkAnalyze(ctx)
checkBenchmark(ctx)
Expand Down
4 changes: 3 additions & 1 deletion cli/mixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func checkMixedSyntax(ctx *cli.Context) {
if ctx.NArg() > 0 {
console.Fatal("Command takes no arguments")
}

if ctx.Int("objects") < 1 {
console.Fatal("At least one object must be tested")
}
checkAnalyze(ctx)
checkBenchmark(ctx)
}
1 change: 1 addition & 0 deletions cli/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func checkMultipartSyntax(ctx *cli.Context) {
console.Fatal("part.size must be >= 5MiB")
}
}

checkAnalyze(ctx)
checkBenchmark(ctx)
}
4 changes: 4 additions & 0 deletions cli/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package cli
import (
"github.com/minio/cli"
"github.com/minio/minio-go/v7"
"github.com/minio/pkg/v2/console"
"github.com/minio/warp/pkg/bench"
)

Expand Down Expand Up @@ -91,6 +92,9 @@ func mainSelect(ctx *cli.Context) error {
}

func checkSelectSyntax(ctx *cli.Context) {
if ctx.Int("objects") < 1 {
console.Fatal("At least one object must be tested")
}
checkAnalyze(ctx)
checkBenchmark(ctx)
}
3 changes: 3 additions & 0 deletions cli/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func checkStatSyntax(ctx *cli.Context) {
if ctx.Int("versions") < 1 {
console.Fatal("At least one version must be tested")
}
if ctx.Int("objects") < 1 {
console.Fatal("At least one object must be tested")
}
checkAnalyze(ctx)
checkBenchmark(ctx)
}
4 changes: 3 additions & 1 deletion cli/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func checkVersionedSyntax(ctx *cli.Context) {
if ctx.NArg() > 0 {
console.Fatal("Command takes no arguments")
}

if ctx.Int("objects") < 1 {
console.Fatal("At least one object must be tested")
}
checkAnalyze(ctx)
checkBenchmark(ctx)
}
10 changes: 10 additions & 0 deletions pkg/bench/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,13 @@ func (c *Common) rpsLimit(ctx context.Context) error {

return c.RpsLimiter.Wait(ctx)
}

func splitObjs(objects, concurrency int) [][]struct{} {
res := make([][]struct{}, concurrency)
// Round up if not cleanly divisible
inEach := (objects + concurrency - 1) / concurrency
for i := range res {
res[i] = make([]struct{}, inEach)
}
return res
}
13 changes: 5 additions & 8 deletions pkg/bench/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,11 @@ func (d *Delete) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(d.Concurrency)
d.addCollector()
obj := make(chan struct{}, d.CreateObjects)
for i := 0; i < d.CreateObjects; i++ {
obj <- struct{}{}
}
close(obj)
objs := splitObjs(d.CreateObjects, d.Concurrency)

var mu sync.Mutex
for i := 0; i < d.Concurrency; i++ {
go func(i int) {
for i, obj := range objs {
go func(i int, obj []struct{}) {
defer wg.Done()
src := d.Source()

Expand Down Expand Up @@ -179,7 +176,7 @@ func (d *Delete) Prepare(ctx context.Context) error {
mu.Unlock()
rcv <- op
}
}(i)
}(i, obj)
}
wg.Wait()

Expand Down
12 changes: 4 additions & 8 deletions pkg/bench/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,13 @@ func (g *Get) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)

obj := make(chan struct{}, g.CreateObjects)
for i := 0; i < g.CreateObjects; i++ {
obj <- struct{}{}
}
objs := splitObjs(g.CreateObjects, g.Concurrency)
rcv := g.Collector.rcv
close(obj)
var groupErr error
var mu sync.Mutex

for i := 0; i < g.Concurrency; i++ {
go func(i int) {
for i, obj := range objs {
go func(i int, obj []struct{}) {
defer wg.Done()
src := g.Source()
opts := g.PutOpts
Expand Down Expand Up @@ -219,7 +215,7 @@ func (g *Get) Prepare(ctx context.Context) error {
rcv <- op
}
}
}(i)
}(i, obj)
}
wg.Wait()
return groupErr
Expand Down
3 changes: 2 additions & 1 deletion pkg/bench/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (d *List) Prepare(ctx context.Context) error {
done()
}

objPerPrefix := d.CreateObjects / d.Concurrency
objPerPrefix := (d.CreateObjects + d.Concurrency - 1) / d.Concurrency
console.Eraseline()
x := ""
if d.Versions > 1 {
Expand Down Expand Up @@ -224,6 +224,7 @@ func (d *List) Start(ctx context.Context, wait chan struct{}) (Operations, error
Prefix: objs[0].Prefix,
Recursive: true,
WithVersions: d.Versions > 1,
MaxKeys: 100,
})

// Wait for errCh to close.
Expand Down
13 changes: 5 additions & 8 deletions pkg/bench/mixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,12 @@ func (g *Mixed) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)
g.addCollector()
obj := make(chan struct{}, g.CreateObjects)
for i := 0; i < g.CreateObjects; i++ {
obj <- struct{}{}
}
close(obj)
var groupErr error

objs := splitObjs(g.CreateObjects, g.Concurrency)
var mu sync.Mutex
for i := 0; i < g.Concurrency; i++ {
go func() {
for i, obj := range objs {
go func(i int, obj []struct{}) {
defer wg.Done()
src := g.Source()

Expand Down Expand Up @@ -218,7 +215,7 @@ func (g *Mixed) Prepare(ctx context.Context) error {
g.Dist.addObj(*obj)
g.prepareProgress(float64(len(g.Dist.objects)) / float64(g.CreateObjects))
}
}()
}(i, obj)
}
wg.Wait()
return groupErr
Expand Down
13 changes: 5 additions & 8 deletions pkg/bench/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,17 @@ func (g *Multipart) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)
g.addCollector()
obj := make(chan int, g.CreateParts)
for i := 0; i < g.CreateParts; i++ {
obj <- i + g.PartStart
}
objs := splitObjs(g.CreateParts, g.Concurrency)

rcv := g.Collector.rcv
close(obj)
var groupErr error
var mu sync.Mutex

if g.Custom == nil {
g.Custom = make(map[string]string, g.CreateParts)
}
for i := 0; i < g.Concurrency; i++ {
go func(i int) {
for i, obj := range objs {
go func(i int, obj []struct{}) {
defer wg.Done()
src := g.Source()
opts := g.PutOpts
Expand Down Expand Up @@ -164,7 +161,7 @@ func (g *Multipart) Prepare(ctx context.Context) error {
mu.Unlock()
rcv <- op
}
}(i)
}(i, obj)
}
wg.Wait()
return groupErr
Expand Down
12 changes: 4 additions & 8 deletions pkg/bench/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,12 @@ func (g *Retention) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)
g.addCollector()
obj := make(chan struct{}, g.CreateObjects)
for i := 0; i < g.CreateObjects; i++ {
obj <- struct{}{}
}
close(obj)
objs := splitObjs(g.CreateObjects, g.Concurrency)
var groupErr error
var mu sync.Mutex

for i := 0; i < g.Concurrency; i++ {
go func(i int) {
for i, obj := range objs {
go func(i int, obj []struct{}) {
defer wg.Done()
src := g.Source()

Expand Down Expand Up @@ -139,7 +135,7 @@ func (g *Retention) Prepare(ctx context.Context) error {
rcv <- op
}
}
}(i)
}(i, obj)
}
wg.Wait()
return groupErr
Expand Down
13 changes: 5 additions & 8 deletions pkg/bench/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,12 @@ func (g *Select) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)
g.addCollector()
obj := make(chan struct{}, g.CreateObjects)
for i := 0; i < g.CreateObjects; i++ {
obj <- struct{}{}
}
close(obj)
objs := splitObjs(g.CreateObjects, g.Concurrency)

var groupErr error
var mu sync.Mutex
for i := 0; i < g.Concurrency; i++ {
go func(i int) {
for i, obj := range objs {
go func(i int, obj []struct{}) {
defer wg.Done()
src := g.Source()
for range obj {
Expand Down Expand Up @@ -124,7 +121,7 @@ func (g *Select) Prepare(ctx context.Context) error {
mu.Unlock()
rcv <- op
}
}(i)
}(i, obj)
}
wg.Wait()
return groupErr
Expand Down
12 changes: 4 additions & 8 deletions pkg/bench/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,13 @@ func (g *Stat) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)
g.addCollector()
obj := make(chan struct{}, g.CreateObjects)
for i := 0; i < g.CreateObjects; i++ {
obj <- struct{}{}
}
objs := splitObjs(g.CreateObjects, g.Concurrency)
rcv := g.Collector.rcv
close(obj)
var groupErr error
var mu sync.Mutex

for i := 0; i < g.Concurrency; i++ {
go func(i int) {
for i, obj := range objs {
go func(i int, obj []struct{}) {
defer wg.Done()
src := g.Source()
opts := g.PutOpts
Expand Down Expand Up @@ -145,7 +141,7 @@ func (g *Stat) Prepare(ctx context.Context) error {
rcv <- op
}
}
}(i)
}(i, obj)
}
wg.Wait()
return groupErr
Expand Down
13 changes: 5 additions & 8 deletions pkg/bench/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,12 @@ func (g *Versioned) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)
g.addCollector()
obj := make(chan struct{}, g.CreateObjects)
for i := 0; i < g.CreateObjects; i++ {
obj <- struct{}{}
}
close(obj)
objs := splitObjs(g.CreateObjects, g.Concurrency)

var groupErr error
var mu sync.Mutex
for i := 0; i < g.Concurrency; i++ {
go func() {
for _, obj := range objs {
go func(obj []struct{}) {
defer wg.Done()
src := g.Source()

Expand Down Expand Up @@ -122,7 +119,7 @@ func (g *Versioned) Prepare(ctx context.Context) error {
g.Dist.addObj(*obj)
g.prepareProgress(float64(len(g.Dist.objects)) / float64(g.CreateObjects))
}
}()
}(obj)
}
wg.Wait()
return groupErr
Expand Down

0 comments on commit c1779cb

Please sign in to comment.