diff --git a/README.md b/README.md
index ccb8ff34..38fe1497 100644
--- a/README.md
+++ b/README.md
@@ -373,6 +373,32 @@ Throughput, split into 59 x 1s:
* Slowest: 6.7MiB/s, 685.26 obj/s
```
+## RETENTION
+
+Benchmarking [PutObjectRetention](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectRetention.html) operations
+will upload `--objects` objects of size `--obj.size` with `--concurrent` prefixes and `--versions` versions on each object.
+
+Example:
+```
+λ warp retention --objects=2500 --duration=1m
+[...]
+----------------------------------------
+Operation: RETENTION
+* Average: 169.50 obj/s
+
+Throughput by host:
+ * http://192.168.1.78:9001: Avg: 85.01 obj/s
+ * http://192.168.1.78:9002: Avg: 84.56 obj/s
+
+Throughput, split into 59 x 1s:
+ * Fastest: 203.45 obj/s
+ * 50% Median: 169.45 obj/s
+ * Slowest: 161.73 obj/s
+```
+
+Note that since object locking can only be specified when creating a bucket, it may be needed to recreate the bucket.
+Warp will attempt to do that automatically.
+
# Analysis
When benchmarks have finished all request data will be saved to a file and an analysis will be shown.
diff --git a/cli/cli.go b/cli/cli.go
index 51bc5b19..20a6f225 100644
--- a/cli/cli.go
+++ b/cli/cli.go
@@ -96,6 +96,7 @@ func init() {
statCmd,
selectCmd,
versionedCmd,
+ retentionCmd,
}
b := []cli.Command{
analyzeCmd,
diff --git a/cli/retention.go b/cli/retention.go
new file mode 100644
index 00000000..5b062402
--- /dev/null
+++ b/cli/retention.go
@@ -0,0 +1,97 @@
+/*
+ * Warp (C) 2019-2020 MinIO, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package cli
+
+import (
+ "github.com/minio/cli"
+ "github.com/minio/pkg/console"
+ "github.com/minio/warp/pkg/bench"
+)
+
+var (
+ retentionFlags = []cli.Flag{
+ cli.IntFlag{
+ Name: "objects",
+ Value: 25000,
+ Usage: "Number of objects to upload.",
+ },
+ cli.IntFlag{
+ Name: "versions",
+ Value: 5,
+ Usage: "Number of versions to upload to each object",
+ },
+ cli.StringFlag{
+ Name: "obj.size",
+ Value: "1KiB",
+ Usage: "Size of each generated object. Can be a number or 10KiB/MiB/GiB. All sizes are base 2 binary.",
+ },
+ }
+)
+
+var retentionCmd = cli.Command{
+ Name: "retention",
+ Usage: "benchmark PutObjectRetention",
+ Action: mainRetention,
+ Before: setGlobalsFromContext,
+ Flags: combineFlags(globalFlags, ioFlags, retentionFlags, genFlags, benchFlags, analyzeFlags),
+ CustomHelpTemplate: `NAME:
+ {{.HelpName}} - {{.Usage}}
+
+USAGE:
+ {{.HelpName}} [FLAGS]
+ -> see https://github.com/minio/warp#retention
+
+FLAGS:
+ {{range .VisibleFlags}}{{.}}
+ {{end}}`,
+}
+
+// mainGet is the entry point for get command.
+func mainRetention(ctx *cli.Context) error {
+ checkRetentionSyntax(ctx)
+ src := newGenSource(ctx)
+ b := bench.Retention{
+ Common: bench.Common{
+ Client: newClient(ctx),
+ Concurrency: ctx.Int("concurrent"),
+ Source: src,
+ Bucket: ctx.String("bucket"),
+ Location: "",
+ PutOpts: putOpts(ctx),
+ Locking: true,
+ },
+ CreateObjects: ctx.Int("objects"),
+ Versions: ctx.Int("versions"),
+ }
+ return runBench(ctx, &b)
+}
+
+func checkRetentionSyntax(ctx *cli.Context) {
+ if ctx.NArg() > 0 {
+ console.Fatal("Command takes no arguments")
+ }
+ if ctx.Int("objects") <= 0 {
+ console.Fatal("There must be more than 0 objects.")
+ }
+ if ctx.Int("versions") <= 0 {
+ console.Fatal("There must be more than 0 versions per object.")
+ }
+
+ checkAnalyze(ctx)
+ checkBenchmark(ctx)
+}
diff --git a/pkg/bench/benchmark.go b/pkg/bench/benchmark.go
index a7df7383..9f4f0579 100644
--- a/pkg/bench/benchmark.go
+++ b/pkg/bench/benchmark.go
@@ -19,8 +19,10 @@ package bench
import (
"context"
+ "errors"
"fmt"
"math"
+ "strings"
"time"
"github.com/minio/minio-go/v7"
@@ -51,6 +53,7 @@ type Common struct {
Source func() generator.Source
Bucket string
Location string
+ Locking bool
// Running in client mode.
ClientMode bool
@@ -100,10 +103,33 @@ func (c *Common) createEmptyBucket(ctx context.Context) error {
return err
}
+ if x && c.Locking {
+ _, _, _, err := cl.GetBucketObjectLockConfig(ctx, c.Bucket)
+ if err != nil {
+ if !c.Clear {
+ return errors.New("not allowed to clear bucket to re-create bucket with locking")
+ }
+ if bvc, err := cl.GetBucketVersioning(ctx, c.Bucket); err == nil {
+ c.Versioned = bvc.Status == "Enabled"
+ }
+ console.Eraseline()
+ console.Infof("\rClearing Bucket %q to enable locking...", c.Bucket)
+ c.deleteAllInBucket(ctx)
+ err = cl.RemoveBucket(ctx, c.Bucket)
+ if err != nil {
+ return err
+ }
+ // Recreate bucket.
+ x = false
+ }
+ }
+
if !x {
+ console.Eraseline()
console.Infof("\rCreating Bucket %q...", c.Bucket)
err := cl.MakeBucket(ctx, c.Bucket, minio.MakeBucketOptions{
- Region: c.Location,
+ Region: c.Location,
+ ObjectLocking: c.Locking,
})
// In client mode someone else may have created it first.
@@ -125,6 +151,7 @@ func (c *Common) createEmptyBucket(ctx context.Context) error {
}
if c.Clear {
+ console.Eraseline()
console.Infof("\rClearing Bucket %q...", c.Bucket)
c.deleteAllInBucket(ctx)
}
@@ -152,7 +179,8 @@ func (c *Common) deleteAllInBucket(ctx context.Context, prefixes ...string) {
WithVersions: c.Versioned,
}
for _, prefix := range prefixes {
- if c.Source().Prefix() != "" {
+ opts.Prefix = prefix
+ if prefix != "" {
opts.Prefix = prefix + "/"
}
for object := range cl.ListObjects(ctx, c.Bucket, opts) {
@@ -162,11 +190,12 @@ func (c *Common) deleteAllInBucket(ctx context.Context, prefixes ...string) {
}
objectsCh <- object
}
- console.Infof("\rClearing Prefix %q/%q...", c.Bucket, prefix)
+ console.Eraseline()
+ console.Infof("\rClearing Prefix %q...", strings.Join([]string{c.Bucket, opts.Prefix}, "/"))
}
}()
- errCh := cl.RemoveObjects(ctx, c.Bucket, objectsCh, minio.RemoveObjectsOptions{})
+ errCh := cl.RemoveObjects(ctx, c.Bucket, objectsCh, minio.RemoveObjectsOptions{GovernanceBypass: true})
for err := range errCh {
if err.Err != nil {
c.Error(err.Err)
diff --git a/pkg/bench/delete.go b/pkg/bench/delete.go
index e5e7b9f9..76477691 100644
--- a/pkg/bench/delete.go
+++ b/pkg/bench/delete.go
@@ -47,6 +47,7 @@ func (d *Delete) Prepare(ctx context.Context) error {
return err
}
src := d.Source()
+ console.Eraseline()
console.Info("\rUploading ", d.CreateObjects, " objects of ", src.String())
var wg sync.WaitGroup
wg.Add(d.Concurrency)
diff --git a/pkg/bench/get.go b/pkg/bench/get.go
index fcc5e321..e1ba5391 100644
--- a/pkg/bench/get.go
+++ b/pkg/bench/get.go
@@ -51,6 +51,7 @@ func (g *Get) Prepare(ctx context.Context) error {
return err
}
src := g.Source()
+ console.Eraseline()
console.Info("\rUploading ", g.CreateObjects, " objects of ", src.String())
var wg sync.WaitGroup
wg.Add(g.Concurrency)
diff --git a/pkg/bench/list.go b/pkg/bench/list.go
index b0b2a388..f865a81d 100644
--- a/pkg/bench/list.go
+++ b/pkg/bench/list.go
@@ -50,6 +50,7 @@ func (d *List) Prepare(ctx context.Context) error {
}
src := d.Source()
objPerPrefix := d.CreateObjects / d.Concurrency
+ console.Eraseline()
if d.NoPrefix {
console.Info("\rUploading ", objPerPrefix*d.Concurrency, " objects of ", src.String(), " with no prefixes")
} else {
diff --git a/pkg/bench/mixed.go b/pkg/bench/mixed.go
index 302ddf95..152688b1 100644
--- a/pkg/bench/mixed.go
+++ b/pkg/bench/mixed.go
@@ -159,6 +159,7 @@ func (g *Mixed) Prepare(ctx context.Context) error {
return err
}
src := g.Source()
+ console.Eraseline()
console.Info("\rUploading ", g.CreateObjects, " objects of ", src.String())
var wg sync.WaitGroup
wg.Add(g.Concurrency)
diff --git a/pkg/bench/ops.go b/pkg/bench/ops.go
index cf6c7920..cc3332a5 100644
--- a/pkg/bench/ops.go
+++ b/pkg/bench/ops.go
@@ -30,7 +30,7 @@ import (
"sync"
"time"
- humanize "github.com/dustin/go-humanize"
+ "github.com/dustin/go-humanize"
"github.com/minio/pkg/console"
)
@@ -136,10 +136,12 @@ func (c *Collector) AutoTerm(ctx context.Context, op string, threshold float64,
}
// All checks passed.
if mb > 0 {
+ console.Eraseline()
console.Printf("\rThroughput %0.01fMiB/s within %f%% for %v. Assuming stability. Terminating benchmark.\n",
mb, threshold*100,
segs[0].Duration().Round(time.Millisecond)*time.Duration(len(segs)+1))
} else {
+ console.Eraseline()
console.Printf("\rThroughput %0.01f objects/s within %f%% for %v. Assuming stability. Terminating benchmark.\n",
objs, threshold*100,
segs[0].Duration().Round(time.Millisecond)*time.Duration(len(segs)+1))
@@ -1088,6 +1090,7 @@ func OperationsFromCSV(r io.Reader, analyzeOnly bool, offset, limit int, log fun
ClientID: getClient(clientID),
})
if log != nil && len(ops)%1000000 == 0 {
+ console.Eraseline()
log("\r%d operations loaded...", len(ops))
}
if limit > 0 && len(ops) >= limit {
@@ -1095,6 +1098,7 @@ func OperationsFromCSV(r io.Reader, analyzeOnly bool, offset, limit int, log fun
}
}
if log != nil {
+ console.Eraseline()
log("\r%d operations loaded... Done!\n", len(ops))
}
return ops, nil
diff --git a/pkg/bench/retention.go b/pkg/bench/retention.go
new file mode 100644
index 00000000..763a38e3
--- /dev/null
+++ b/pkg/bench/retention.go
@@ -0,0 +1,210 @@
+/*
+ * Warp (C) 2019-2020 MinIO, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package bench
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/minio/minio-go/v7"
+ "github.com/minio/pkg/console"
+ "github.com/minio/warp/pkg/generator"
+)
+
+// Retention benchmarks download speed.
+type Retention struct {
+ CreateObjects int
+ Versions int
+ Collector *Collector
+ objects generator.Objects
+
+ Common
+}
+
+// Prepare will create an empty bucket or delete any content already there
+// and upload a number of objects.
+func (g *Retention) Prepare(ctx context.Context) error {
+ if err := g.createEmptyBucket(ctx); err != nil {
+ return err
+ }
+ cl, done := g.Client()
+ defer done()
+ if !g.Versioned {
+ err := cl.EnableVersioning(ctx, g.Bucket)
+ if err != nil {
+ return err
+ }
+ g.Versioned = true
+ }
+
+ src := g.Source()
+ console.Eraseline()
+ console.Info("\rUploading ", g.CreateObjects, " objects with ", g.Versions, " versions each of ", src.String())
+ var wg sync.WaitGroup
+ wg.Add(g.Concurrency)
+ g.Collector = NewCollector()
+ obj := make(chan struct{}, g.CreateObjects)
+ for i := 0; i < g.CreateObjects; i++ {
+ obj <- struct{}{}
+ }
+ close(obj)
+ var groupErr error
+ var mu sync.Mutex
+
+ for i := 0; i < g.Concurrency; i++ {
+ go func(i int) {
+ defer wg.Done()
+ src := g.Source()
+ for range obj {
+ opts := g.PutOpts
+ rcv := g.Collector.Receiver()
+ done := ctx.Done()
+
+ select {
+ case <-done:
+ return
+ default:
+ }
+ obj := src.Object()
+ name := obj.Name
+ for ver := 0; ver < g.Versions; ver++ {
+ // New input for each version
+ obj := src.Object()
+ obj.Name = name
+ client, cldone := g.Client()
+ op := Operation{
+ OpType: http.MethodPut,
+ Thread: uint16(i),
+ Size: obj.Size,
+ File: obj.Name,
+ ObjPerOp: 1,
+ Endpoint: client.EndpointURL().String(),
+ }
+ opts.ContentType = obj.ContentType
+ op.Start = time.Now()
+ res, err := client.PutObject(ctx, g.Bucket, obj.Name, obj.Reader, obj.Size, opts)
+ op.End = time.Now()
+ if err != nil {
+ err := fmt.Errorf("upload error: %w", err)
+ g.Error(err)
+ mu.Lock()
+ if groupErr == nil {
+ groupErr = err
+ }
+ mu.Unlock()
+ return
+ }
+ obj.VersionID = res.VersionID
+ if res.Size != obj.Size {
+ err := fmt.Errorf("short upload. want: %d, got %d", obj.Size, res.Size)
+ g.Error(err)
+ mu.Lock()
+ if groupErr == nil {
+ groupErr = err
+ }
+ mu.Unlock()
+ return
+ }
+ cldone()
+ mu.Lock()
+ obj.Reader = nil
+ g.objects = append(g.objects, *obj)
+ g.prepareProgress(float64(len(g.objects)) / float64(g.CreateObjects*g.Versions))
+ mu.Unlock()
+ rcv <- op
+ }
+ }
+ }(i)
+ }
+ wg.Wait()
+ return groupErr
+}
+
+// Start will execute the main benchmark.
+// Operations should begin executing when the start channel is closed.
+func (g *Retention) Start(ctx context.Context, wait chan struct{}) (Operations, error) {
+ var wg sync.WaitGroup
+ wg.Add(g.Concurrency)
+ c := g.Collector
+ if g.AutoTermDur > 0 {
+ ctx = c.AutoTerm(ctx, http.MethodGet, g.AutoTermScale, autoTermCheck, autoTermSamples, g.AutoTermDur)
+ }
+
+ // Non-terminating context.
+ nonTerm := context.Background()
+
+ for i := 0; i < g.Concurrency; i++ {
+ go func(i int) {
+ rng := rand.New(rand.NewSource(int64(i)))
+ rcv := c.Receiver()
+ defer wg.Done()
+ done := ctx.Done()
+ var opts minio.PutObjectRetentionOptions
+
+ <-wait
+ mode := minio.Governance
+ for {
+ select {
+ case <-done:
+ return
+ default:
+ }
+ obj := g.objects[rng.Intn(len(g.objects))]
+ client, cldone := g.Client()
+ op := Operation{
+ OpType: "RETENTION",
+ Thread: uint16(i),
+ Size: 0,
+ File: obj.Name,
+ ObjPerOp: 1,
+ Endpoint: client.EndpointURL().String(),
+ }
+
+ op.Start = time.Now()
+ opts.VersionID = obj.VersionID
+ t := op.Start.Add(24 * time.Hour)
+ opts.RetainUntilDate = &t
+ opts.Mode = &mode
+ opts.GovernanceBypass = true
+ err := client.PutObjectRetention(nonTerm, g.Bucket, obj.Name, opts)
+ if err != nil {
+ g.Error("put retention error:", err)
+ op.Err = err.Error()
+ op.End = time.Now()
+ rcv <- op
+ cldone()
+ continue
+ }
+ op.End = time.Now()
+ rcv <- op
+ cldone()
+ }
+ }(i)
+ }
+ wg.Wait()
+ return c.Close(), nil
+}
+
+// Cleanup deletes everything uploaded to the bucket.
+func (g *Retention) Cleanup(ctx context.Context) {
+ g.deleteAllInBucket(ctx, g.objects.Prefixes()...)
+}
diff --git a/pkg/bench/select.go b/pkg/bench/select.go
index c1ff3c3e..bba3810e 100644
--- a/pkg/bench/select.go
+++ b/pkg/bench/select.go
@@ -50,6 +50,7 @@ func (g *Select) Prepare(ctx context.Context) error {
return err
}
src := g.Source()
+ console.Eraseline()
console.Info("\rUploading ", g.CreateObjects, " objects of ", src.String())
var wg sync.WaitGroup
wg.Add(g.Concurrency)
diff --git a/pkg/bench/stat.go b/pkg/bench/stat.go
index c786cc66..624159c8 100644
--- a/pkg/bench/stat.go
+++ b/pkg/bench/stat.go
@@ -48,6 +48,7 @@ func (g *Stat) Prepare(ctx context.Context) error {
return err
}
src := g.Source()
+ console.Eraseline()
console.Info("\rUploading ", g.CreateObjects, " objects of ", src.String())
var wg sync.WaitGroup
wg.Add(g.Concurrency)
diff --git a/pkg/bench/versioned.go b/pkg/bench/versioned.go
index 5c482ad4..a94a0739 100644
--- a/pkg/bench/versioned.go
+++ b/pkg/bench/versioned.go
@@ -64,6 +64,7 @@ func (g *Versioned) Prepare(ctx context.Context) error {
g.Versioned = true
}
src := g.Source()
+ console.Eraseline()
console.Info("\rUploading ", g.CreateObjects, " objects of ", src.String())
var wg sync.WaitGroup
wg.Add(g.Concurrency)