Skip to content

Commit

Permalink
Add versioned listing (#202)
Browse files Browse the repository at this point in the history
If versioned listing should be tested, it is possible by setting `--versions=n` (default 1),
which will add multiple versions of each object and use `ListObjectVersions` for listing.

Minor tweaks to analysis output.
  • Loading branch information
klauspost authored Dec 7, 2021
1 parent 721870a commit 65db1d4
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 52 deletions.
13 changes: 7 additions & 6 deletions cli/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func printMixedOpAnalysis(ctx *cli.Context, aggr aggregate.Aggregated, details b
}
duration := ops.EndTime.Sub(ops.StartTime).Truncate(time.Second)
if !details {
console.Printf("Operation: %v, %d%%, Concurrency: %d, Duration: %v.\n", ops.Type, int(pct+0.5), ops.Concurrency, duration)
console.Printf("Operation: %v, %d%%, Concurrency: %d, Ran %v.\n", ops.Type, int(pct+0.5), ops.Concurrency, duration)
} else {
console.Printf("Operation: %v - total: %v, %.01f%%, Concurrency: %d, Duration: %v, starting %v\n", ops.Type, ops.Throughput.Operations, pct, ops.Concurrency, duration, ops.StartTime.Truncate(time.Millisecond))
console.Printf("Operation: %v - total: %v, %.01f%%, Concurrency: %d, Ran %v, starting %v\n", ops.Type, ops.Throughput.Operations, pct, ops.Concurrency, duration, ops.StartTime.Truncate(time.Millisecond))
}
console.SetColor("Print", color.New(color.FgWhite))

Expand Down Expand Up @@ -301,15 +301,16 @@ func printAnalysis(ctx *cli.Context, o bench.Operations) {
if ops.Clients > 1 {
hostsString = fmt.Sprintf("%s Warp Instances: %d.", hostsString, ops.Clients)
}
ran := ops.EndTime.Sub(ops.StartTime).Truncate(time.Second)
if opo > 1 {
if details {
console.Printf("Operation: %v (%d). Objects per operation: %d. Concurrency: %d.%s\n", typ, ops.N, opo, ops.Concurrency, hostsString)
console.Printf("Operation: %v (%d). Ran %v. Objects per operation: %d. Concurrency: %d.%s\n", typ, ops.N, ran, opo, ops.Concurrency, hostsString)
} else {
console.Printf("Operation: %v\n", typ)
}
} else {
if details {
console.Printf("Operation: %v (%d). Concurrency: %d.%s\n", typ, ops.N, ops.Concurrency, hostsString)
console.Printf("Operation: %v (%d). Ran %v. Concurrency: %d.%s\n", typ, ops.N, ran, ops.Concurrency, hostsString)
} else {
console.Printf("Operation: %v\n", typ)
}
Expand Down Expand Up @@ -474,7 +475,7 @@ func printRequestAnalysis(ctx *cli.Context, ops aggregate.Operation, details boo
", Slowest: ", time.Duration(reqs.SlowestMillis)*time.Millisecond,
"\n")
if reqs.FirstByte != nil {
console.Print(" * First Access TTFB: ", reqs.FirstByte)
console.Print(" * First Access TTFB: ", reqs.FirstByte, "\n")
}
}
if details && reqs.LastAccess != nil {
Expand All @@ -488,7 +489,7 @@ func printRequestAnalysis(ctx *cli.Context, ops aggregate.Operation, details boo
", Slowest: ", time.Duration(reqs.SlowestMillis)*time.Millisecond,
"\n")
if reqs.FirstByte != nil {
console.Print(" * Last Access TTFB: ", reqs.FirstByte)
console.Print(" * Last Access TTFB: ", reqs.FirstByte, "\n")
}
}

Expand Down
9 changes: 9 additions & 0 deletions cli/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ var (
Value: 10000,
Usage: "Number of objects to upload. Rounded to have equal concurrent objects.",
},
cli.IntFlag{
Name: "versions",
Value: 1,
Usage: "Number of versions to upload. If more than 1, versioned listing will be benchmarked",
},
cli.StringFlag{
Name: "obj.size",
Value: "1KB",
Expand Down Expand Up @@ -74,6 +79,7 @@ func mainList(ctx *cli.Context) error {
Location: "",
PutOpts: putOpts(ctx),
},
Versions: ctx.Int("versions"),
Metadata: ctx.Bool("metadata"),
CreateObjects: ctx.Int("objects"),
NoPrefix: ctx.Bool("noprefix"),
Expand All @@ -85,6 +91,9 @@ func checkListSyntax(ctx *cli.Context) {
if ctx.NArg() > 0 {
console.Fatal("Command takes no arguments")
}
if ctx.Int("versions") < 1 {
console.Fatal("At least one version must be tested")
}

checkAnalyze(ctx)
checkBenchmark(ctx)
Expand Down
107 changes: 65 additions & 42 deletions pkg/bench/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type List struct {
NoPrefix bool
Collector *Collector
Metadata bool
Versions int
objects []generator.Objects

Common
Expand All @@ -48,13 +49,28 @@ func (d *List) Prepare(ctx context.Context) error {
if err := d.createEmptyBucket(ctx); err != nil {
return err
}
src := d.Source()
if d.Versions > 1 {
cl, done := d.Client()
if !d.Versioned {
err := cl.EnableVersioning(ctx, d.Bucket)
if err != nil {
return err
}
d.Versioned = true
}
done()
}

objPerPrefix := d.CreateObjects / d.Concurrency
console.Eraseline()
x := ""
if d.Versions > 1 {
x = fmt.Sprintf(" with %d versions each", d.Versions)
}
if d.NoPrefix {
console.Info("\rUploading ", objPerPrefix*d.Concurrency, " objects of ", src.String(), " with no prefixes")
console.Info("\rUploading ", objPerPrefix*d.Concurrency, " objects", x)
} else {
console.Info("\rUploading ", objPerPrefix*d.Concurrency, " objects of ", src.String(), " with ", d.Concurrency, " prefixes")
console.Info("\rUploading ", objPerPrefix*d.Concurrency, " objects", x, " in ", d.Concurrency, " prefixes")
}
var wg sync.WaitGroup
wg.Add(d.Concurrency)
Expand Down Expand Up @@ -87,49 +103,55 @@ func (d *List) Prepare(ctx context.Context) error {
}
break
}
exists[obj.Name] = struct{}{}
client, cldone := d.Client()
op := Operation{
OpType: http.MethodPut,
Thread: uint16(i),
Size: obj.Size,
File: obj.Name,
ObjPerOp: 1,
Endpoint: client.EndpointURL().String(),
}
opts.ContentType = obj.ContentType
op.Start = time.Now()
res, err := client.PutObject(ctx, d.Bucket, obj.Name, obj.Reader, obj.Size, opts)
op.End = time.Now()
if err != nil {
err := fmt.Errorf("upload error: %w", err)
d.Error(err)
mu.Lock()
if groupErr == nil {
groupErr = err
name := obj.Name
exists[name] = struct{}{}
for ver := 0; ver < d.Versions; ver++ {
// New input for each version
obj := src.Object()
obj.Name = name
client, cldone := d.Client()
op := Operation{
OpType: http.MethodPut,
Thread: uint16(i),
Size: obj.Size,
File: obj.Name,
ObjPerOp: 1,
Endpoint: client.EndpointURL().String(),
}
mu.Unlock()
return
}
obj.VersionID = res.VersionID
if res.Size != obj.Size {
err := fmt.Errorf("short upload. want: %d, got %d", obj.Size, res.Size)
d.Error(err)
mu.Lock()
if groupErr == nil {
groupErr = err
opts.ContentType = obj.ContentType
op.Start = time.Now()
res, err := client.PutObject(ctx, d.Bucket, obj.Name, obj.Reader, obj.Size, opts)
op.End = time.Now()
if err != nil {
err := fmt.Errorf("upload error: %w", err)
d.Error(err)
mu.Lock()
if groupErr == nil {
groupErr = err
}
mu.Unlock()
return
}
obj.VersionID = res.VersionID
if res.Size != obj.Size {
err := fmt.Errorf("short upload. want: %d, got %d", obj.Size, res.Size)
d.Error(err)
mu.Lock()
if groupErr == nil {
groupErr = err
}
mu.Unlock()
return
}
cldone()
mu.Lock()
obj.Reader = nil
d.objects[i] = append(d.objects[i], *obj)
objsCreated++
d.prepareProgress(float64(objsCreated) / float64(objPerPrefix*d.Concurrency*d.Versions))
mu.Unlock()
return
rcv <- op
}
cldone()
mu.Lock()
obj.Reader = nil
d.objects[i] = append(d.objects[i], *obj)
objsCreated++
d.prepareProgress(float64(objsCreated) / float64(objPerPrefix*d.Concurrency))
mu.Unlock()
rcv <- op
}
}(i)
}
Expand Down Expand Up @@ -191,6 +213,7 @@ func (d *List) Start(ctx context.Context, wait chan struct{}) (Operations, error
WithMetadata: d.Metadata,
Prefix: objs[0].Prefix,
Recursive: true,
WithVersions: d.Versions > 1,
})

// Wait for errCh to close.
Expand Down
5 changes: 2 additions & 3 deletions pkg/bench/mixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"io/ioutil"
"math/rand"
"net/http"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -78,8 +77,8 @@ func (m *MixedDistribution) Generate(allocObjs int) error {
}
}
m.rng = rand.New(rand.NewSource(0xabad1dea))
sort.Slice(m.ops, func(i, j int) bool {
return m.rng.Int63()&1 == 0
m.rng.Shuffle(len(m.ops), func(i, j int) {
m.ops[i], m.ops[j] = m.ops[j], m.ops[i]
})
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bench/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ func (g *Retention) Prepare(ctx context.Context) error {
return err
}
cl, done := g.Client()
defer done()
if !g.Versioned {
err := cl.EnableVersioning(ctx, g.Bucket)
if err != nil {
return err
}
g.Versioned = true
done()
}

src := g.Source()
Expand Down

0 comments on commit 65db1d4

Please sign in to comment.