Skip to content

Commit

Permalink
Add S3 upload mode (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
vearutop authored May 15, 2022
1 parent 9bdbfe2 commit 1c3744b
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 10 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ Flags:
--bucket=BUCKET Bucket name.
--key=KEY Entry key.
--path-style To use path-style addressing, i.e., `http://s3.amazonaws.com/BUCKET/KEY`.
--save=SAVE Path to local file to save the entry.
--upload=UPLOAD Path to local file to upload to S3, enables upload load testing.
```

```
Expand Down
17 changes: 17 additions & 0 deletions s3/_testdata/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Start minio server with docker-compose up and run
# plt s3 --bucket=default --key=foo --number=100 --url=http://localhost:9000 --access-key=minio --secret-key=minio123 --path-style --upload=docker-compose.yml
# plt s3 --bucket=default --key=foo --number=100 --url=http://localhost:9000 --access-key=minio --secret-key=minio123 --path-style
# to test plt against local S3.

services:
s3:
image: minio/minio:latest
environment:
SERVICES: s3
MINIO_ACCESS_KEY: minio
MINIO_SECRET_KEY: minio123
ports:
- '9000:9000'
entrypoint: sh
# Creating "default" bucket and starting server.
command: -c 'mkdir -p /data/default && minio server /data'
4 changes: 3 additions & 1 deletion s3/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type Flags struct {
Key string
PathStyle bool

Save string
Save string
Upload string
}

// AddCommand registers curl command into CLI app.
Expand All @@ -40,6 +41,7 @@ func AddCommand(lf *loadgen.Flags) {
BoolVar(&f.PathStyle)

s3.Flag("save", "Path to local file to save the entry.").StringVar(&f.Save)
s3.Flag("upload", "Path to local file to upload to S3, enables upload load testing.").StringVar(&f.Upload)

s3.Action(func(kp *kingpin.ParseContext) error {
return run(*lf, f)
Expand Down
66 changes: 57 additions & 9 deletions s3/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,19 @@ func newJobProducer(f Flags) (*jobProducer, error) {
return nil, err
}

dl := s3manager.NewDownloader(sess)

return &jobProducer{
f: f,
dl: dl,
f: f,
dl: s3manager.NewDownloader(sess),
ul: s3manager.NewUploader(sess),

start: time.Now(),
}, nil
}

type jobProducer struct {
f Flags
dl *s3manager.Downloader
ul *s3manager.Uploader

totBytes int64
tot int64
Expand All @@ -73,13 +74,33 @@ func (nopWriterAt) WriteAt(p []byte, off int64) (n int, err error) {
}

func (j *jobProducer) Job(i int) (time.Duration, error) {
start := time.Now()
var (
start = time.Now()
err error
)

if j.f.Upload != "" {
err = j.upload(i)
} else {
err = j.download(i)
}

if err != nil {
return 0, err
}

atomic.AddInt64(&j.tot, 1)

return time.Since(start), nil
}

func (j *jobProducer) download(i int) error {
w := io.WriterAt(nopWriterAt{})

if i == 0 && j.f.Save != "" {
f, err := os.Create(j.f.Save)
if err != nil {
return 0, fmt.Errorf("failed to create file to save S3 result: %w", err)
return fmt.Errorf("failed to create file to save S3 result: %w", err)
}

w = f
Expand All @@ -96,13 +117,36 @@ func (j *jobProducer) Job(i int) (time.Duration, error) {
Key: aws.String(j.f.Key),
})
if err != nil {
return 0, fmt.Errorf("failed to download file: %w", err)
return fmt.Errorf("failed to download file: %w", err)
}

atomic.AddInt64(&j.totBytes, n)
atomic.AddInt64(&j.tot, 1)

return time.Since(start), nil
return nil
}

func (j *jobProducer) upload(_ int) error {
f, err := os.Open(j.f.Upload)
if err != nil {
return fmt.Errorf("failed to open file to upload: %w", err)
}

defer func() {
if err := f.Close(); err != nil {
println("failed to close file:", err)
}
}()

_, err = j.ul.Upload(&s3manager.UploadInput{
Bucket: aws.String(j.f.Bucket),
Key: aws.String(j.f.Key),
Body: f,
})
if err != nil {
return fmt.Errorf("failed to upload file to S3: %w", err)
}

return nil
}

func (j *jobProducer) RequestCounts() map[string]int {
Expand All @@ -111,6 +155,10 @@ func (j *jobProducer) RequestCounts() map[string]int {

// Print prints additional stats.
func (j *jobProducer) String() string {
if j.totBytes == 0 {
return ""
}

elapsed := time.Since(j.start).Seconds()

return fmt.Sprintf("\nRead: total %.2f MB, avg %.2f MB, %.2f MB/s\n",
Expand Down

0 comments on commit 1c3744b

Please sign in to comment.