From 1c3744b3bc98011cdcc48a90a9e99fee19660979 Mon Sep 17 00:00:00 2001 From: Viacheslav Poturaev Date: Sun, 15 May 2022 20:46:37 +0200 Subject: [PATCH] Add S3 upload mode (#25) --- README.md | 2 + s3/_testdata/docker-compose.yml | 17 +++++++++ s3/cmd.go | 4 +- s3/job.go | 66 ++++++++++++++++++++++++++++----- 4 files changed, 79 insertions(+), 10 deletions(-) create mode 100644 s3/_testdata/docker-compose.yml diff --git a/README.md b/README.md index 3cd6c0f..71988a4 100644 --- a/README.md +++ b/README.md @@ -207,6 +207,8 @@ Flags: --bucket=BUCKET Bucket name. --key=KEY Entry key. --path-style To use path-style addressing, i.e., `http://s3.amazonaws.com/BUCKET/KEY`. + --save=SAVE Path to local file to save the entry. + --upload=UPLOAD Path to local file to upload to S3, enables upload load testing. ``` ``` diff --git a/s3/_testdata/docker-compose.yml b/s3/_testdata/docker-compose.yml new file mode 100644 index 0000000..4998da0 --- /dev/null +++ b/s3/_testdata/docker-compose.yml @@ -0,0 +1,17 @@ +# Start minio server with docker-compose up and run +# plt s3 --bucket=default --key=foo --number=100 --url=http://localhost:9000 --access-key=minio --secret-key=minio123 --path-style --upload=docker-compose.yml +# plt s3 --bucket=default --key=foo --number=100 --url=http://localhost:9000 --access-key=minio --secret-key=minio123 --path-style +# to test plt against local S3. + +services: + s3: + image: minio/minio:latest + environment: + SERVICES: s3 + MINIO_ACCESS_KEY: minio + MINIO_SECRET_KEY: minio123 + ports: + - '9000:9000' + entrypoint: sh + # Creating "default" bucket and starting server. + command: -c 'mkdir -p /data/default && minio server /data' diff --git a/s3/cmd.go b/s3/cmd.go index 08ed691..2e0c79e 100644 --- a/s3/cmd.go +++ b/s3/cmd.go @@ -17,7 +17,8 @@ type Flags struct { Key string PathStyle bool - Save string + Save string + Upload string } // AddCommand registers curl command into CLI app. @@ -40,6 +41,7 @@ func AddCommand(lf *loadgen.Flags) { BoolVar(&f.PathStyle) s3.Flag("save", "Path to local file to save the entry.").StringVar(&f.Save) + s3.Flag("upload", "Path to local file to upload to S3, enables upload load testing.").StringVar(&f.Upload) s3.Action(func(kp *kingpin.ParseContext) error { return run(*lf, f) diff --git a/s3/job.go b/s3/job.go index 0eed176..f7eb9d2 100644 --- a/s3/job.go +++ b/s3/job.go @@ -48,11 +48,11 @@ func newJobProducer(f Flags) (*jobProducer, error) { return nil, err } - dl := s3manager.NewDownloader(sess) - return &jobProducer{ - f: f, - dl: dl, + f: f, + dl: s3manager.NewDownloader(sess), + ul: s3manager.NewUploader(sess), + start: time.Now(), }, nil } @@ -60,6 +60,7 @@ func newJobProducer(f Flags) (*jobProducer, error) { type jobProducer struct { f Flags dl *s3manager.Downloader + ul *s3manager.Uploader totBytes int64 tot int64 @@ -73,13 +74,33 @@ func (nopWriterAt) WriteAt(p []byte, off int64) (n int, err error) { } func (j *jobProducer) Job(i int) (time.Duration, error) { - start := time.Now() + var ( + start = time.Now() + err error + ) + + if j.f.Upload != "" { + err = j.upload(i) + } else { + err = j.download(i) + } + + if err != nil { + return 0, err + } + + atomic.AddInt64(&j.tot, 1) + + return time.Since(start), nil +} + +func (j *jobProducer) download(i int) error { w := io.WriterAt(nopWriterAt{}) if i == 0 && j.f.Save != "" { f, err := os.Create(j.f.Save) if err != nil { - return 0, fmt.Errorf("failed to create file to save S3 result: %w", err) + return fmt.Errorf("failed to create file to save S3 result: %w", err) } w = f @@ -96,13 +117,36 @@ func (j *jobProducer) Job(i int) (time.Duration, error) { Key: aws.String(j.f.Key), }) if err != nil { - return 0, fmt.Errorf("failed to download file: %w", err) + return fmt.Errorf("failed to download file: %w", err) } atomic.AddInt64(&j.totBytes, n) - atomic.AddInt64(&j.tot, 1) - return time.Since(start), nil + return nil +} + +func (j *jobProducer) upload(_ int) error { + f, err := os.Open(j.f.Upload) + if err != nil { + return fmt.Errorf("failed to open file to upload: %w", err) + } + + defer func() { + if err := f.Close(); err != nil { + println("failed to close file:", err) + } + }() + + _, err = j.ul.Upload(&s3manager.UploadInput{ + Bucket: aws.String(j.f.Bucket), + Key: aws.String(j.f.Key), + Body: f, + }) + if err != nil { + return fmt.Errorf("failed to upload file to S3: %w", err) + } + + return nil } func (j *jobProducer) RequestCounts() map[string]int { @@ -111,6 +155,10 @@ func (j *jobProducer) RequestCounts() map[string]int { // Print prints additional stats. func (j *jobProducer) String() string { + if j.totBytes == 0 { + return "" + } + elapsed := time.Since(j.start).Seconds() return fmt.Sprintf("\nRead: total %.2f MB, avg %.2f MB, %.2f MB/s\n",