Skip to content

Commit

Permalink
Add POST object put benchmark
Browse files Browse the repository at this point in the history
Add `--post` parameter to `warp put`, which will use Form POST Object API.

Example
```
λ warp put -tls -post -duration=15s -obj.size=1KB
warp: Benchmark data written to "warp-put-2024-09-11[115431]-4IRH.csv.zst"

----------------------------------------
Operation: POST. Concurrency: 20
* Average: 0.12 MiB/s, 122.12 obj/s

Throughput, split into 13 x 1s:
 * Fastest: 127.5KiB/s, 130.61 obj/s
 * 50% Median: 120.9KiB/s, 123.78 obj/s
 * Slowest: 102.0KiB/s, 104.48 obj/s
warp: Cleanup Done.
```
  • Loading branch information
klauspost committed Sep 11, 2024
1 parent b3452c3 commit d9a9e48
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 3 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ Throughput, split into 59 x 1s:

It is possible by forcing md5 checksums on data by using the `--md5` option.

To test [POST Object](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html) operations use `-post` parameter.

## DELETE

Benchmarking delete operations will attempt to delete as many objects it can within `--duration`.
Expand Down
1 change: 1 addition & 0 deletions cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,5 +300,6 @@ func getCommon(ctx *cli.Context, src func() generator.Source) bench.Common {
DiscardOutput: ctx.Bool("stress"),
ExtraOut: extra,
RpsLimiter: rpsLimiter,
Transport: clientTransport(ctx),
}
}
7 changes: 6 additions & 1 deletion cli/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ var putFlags = []cli.Flag{
Usage: "Multipart part size. Can be a number or 10KiB/MiB/GiB. All sizes are base 2 binary.",
Hidden: true,
},
cli.BoolFlag{
Name: "post",
Usage: "Use PostObject for upload. Will force single part upload",
},
}

// Put command.
Expand All @@ -61,7 +65,8 @@ FLAGS:
func mainPut(ctx *cli.Context) error {
checkPutSyntax(ctx)
b := bench.Put{
Common: getCommon(ctx, newGenSource(ctx, "obj.size")),
Common: getCommon(ctx, newGenSource(ctx, "obj.size")),
PostObject: ctx.Bool("post"),
}
return runBench(ctx, &b)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/bench/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"math"
"net/http"
"strings"
"time"

Expand Down Expand Up @@ -98,6 +99,9 @@ type Common struct {

// ratelimiting
RpsLimiter *rate.Limiter

// Transport used.
Transport http.RoundTripper
}

const (
Expand Down
91 changes: 89 additions & 2 deletions pkg/bench/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,33 @@ package bench

import (
"context"
"errors"
"fmt"
"io"
"mime/multipart"
"net/http"
"sync"
"time"

"github.com/minio/minio-go/v7"
"github.com/minio/warp/pkg/generator"
)

// Put benchmarks upload speed.
type Put struct {
Common
prefixes map[string]struct{}
PostObject bool
prefixes map[string]struct{}
cl *http.Client
}

// Prepare will create an empty bucket ot delete any content already there.
func (u *Put) Prepare(ctx context.Context) error {
if u.PostObject {
u.cl = &http.Client{
Transport: u.Transport,
}
}
return u.createEmptyBucket(ctx)
}

Expand Down Expand Up @@ -85,7 +98,19 @@ func (u *Put) Start(ctx context.Context, wait chan struct{}) (Operations, error)
}

op.Start = time.Now()
res, err := client.PutObject(nonTerm, u.Bucket, obj.Name, obj.Reader, obj.Size, opts)
var err error
var res minio.UploadInfo
if !u.PostObject {
res, err = client.PutObject(nonTerm, u.Bucket, obj.Name, obj.Reader, obj.Size, opts)
} else {
op.OpType = http.MethodPost
var verID string
verID, err = u.postPolicy(ctx, client, u.Bucket, obj)
if err == nil {
res.Size = obj.Size
res.VersionID = verID
}
}
op.End = time.Now()
if err != nil {
u.Error("upload error: ", err)
Expand Down Expand Up @@ -118,3 +143,65 @@ func (u *Put) Cleanup(ctx context.Context) {
}
u.deleteAllInBucket(ctx, pf...)
}

// postPolicy will upload using https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html API.
func (u *Put) postPolicy(ctx context.Context, c *minio.Client, bucket string, obj *generator.Object) (versionID string, err error) {
pp := minio.NewPostPolicy()
pp.SetEncryption(u.PutOpts.ServerSideEncryption)
err = errors.Join(
pp.SetContentType(obj.ContentType),
pp.SetBucket(bucket),
pp.SetKey(obj.Name),
pp.SetContentLengthRange(obj.Size, obj.Size),
pp.SetExpires(time.Now().Add(24*time.Hour)),
)
if err != nil {
return "", err
}
url, form, err := c.PresignedPostPolicy(ctx, pp)
if err != nil {
return "", err
}
pr, pw := io.Pipe()
defer pr.Close()
writer := multipart.NewWriter(pw)
go func() {
for k, v := range form {
if err := writer.WriteField(k, v); err != nil {
pw.CloseWithError(err)
return
}
}
ff, err := writer.CreateFormFile("file", obj.Name)
if err != nil {
pw.CloseWithError(err)
return
}
_, err = io.Copy(ff, obj.Reader)
if err != nil {
pw.CloseWithError(err)
return
}
pw.CloseWithError(writer.Close())
}()

req, err := http.NewRequest(http.MethodPost, url.String(), pr)
if err != nil {
return "", err
}
req.Header.Set("Content-Type", writer.FormDataContentType())

// make POST request with form data
resp, err := u.cl.Do(req)
if err != nil {
return "", err
}
if resp.Body != nil {
defer resp.Body.Close()
}
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("unexpected status code: (%d) %s", resp.StatusCode, resp.Status)
}

return resp.Header.Get("x-amz-version-id"), nil
}
3 changes: 3 additions & 0 deletions yml-samples/put.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ warp:
# Concurrent operations to run per warp instance.
concurrent: 8

# Use POST Object operations for upload.
post: false

# Properties of uploaded objects.
obj:
# Size of each uploaded object
Expand Down

0 comments on commit d9a9e48

Please sign in to comment.