Skip to content

Commit

Permalink
Add S3 to S3 copy functionality (#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewaerose authored May 12, 2023
1 parent baf6e27 commit 46899ad
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 6 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ fixture:
aws s3 --endpoint-url http://localhost:4572 cp README.md s3://example-bucket
aws s3 --endpoint-url http://localhost:4572 cp README.md s3://example-bucket/foo/
aws s3 --endpoint-url http://localhost:4572 cp README.md s3://example-bucket/bar/baz/
aws s3 --endpoint-url http://localhost:4572 mb s3://s3-source
aws s3 --endpoint-url http://localhost:4572 cp README.md s3://s3-source
aws s3 --endpoint-url http://localhost:4572 cp README.md s3://s3-source/foo/
aws s3 --endpoint-url http://localhost:4572 cp README.md s3://s3-source/bar/baz/
aws s3 --endpoint-url http://localhost:4572 mb s3://s3-destination
aws s3 --endpoint-url http://localhost:4572 mb s3://example-bucket-upload
aws s3 --endpoint-url http://localhost:4572 cp README.md s3://example-bucket-upload/dest_only_file
aws s3 --endpoint-url http://localhost:4572 mb s3://example-bucket-upload-file
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ func main() {

// Sync from local to s3
syncManager.Sync("local/path/to/dir", "s3://yourbucket/path/to/dir")

// Sync from s3 to s3
syncManager.Sync("s3://yourbucket/path/to/dir", "s3://anotherbucket/path/to/dir")
}
```

- Note: Sync from s3 to s3 is not implemented yet.

## Sets the custom logger

You can set your custom logger.
Expand Down
49 changes: 47 additions & 2 deletions s3sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,32 @@ func isS3URL(url *url.URL) bool {
return url.Scheme == "s3"
}

func (m *Manager) syncS3ToS3(ctx context.Context, chJob chan func(), sourcePath, destPath *s3Path) error {
return errors.New("S3 to S3 sync feature is not implemented")
func (m *Manager) syncS3ToS3(ctx context.Context, chJob chan func(), sourcePath *s3Path, destPath *s3Path) error {
wg := &sync.WaitGroup{}
errs := &multiErr{}
for source := range filterFilesForSync(
m.listS3Files(ctx, sourcePath), m.listS3Files(ctx, destPath), m.del,
) {
wg.Add(1)
source := source
chJob <- func() {
defer wg.Done()
if source.err != nil {
errs.Append(source.err)
return
}
switch source.op {
case opUpdate:
if err := m.copyS3ToS3(ctx, source.fileInfo, sourcePath, destPath); err != nil {
errs.Append(err)
}
}
}
}
wg.Wait()

return errs.ErrOrNil()

}

func (m *Manager) syncLocalToS3(ctx context.Context, chJob chan func(), sourcePath string, destPath *s3Path) error {
Expand Down Expand Up @@ -222,6 +246,27 @@ func (m *Manager) syncS3ToLocal(ctx context.Context, chJob chan func(), sourcePa
return errs.ErrOrNil()
}

func (m *Manager) copyS3ToS3(ctx context.Context, file *fileInfo, sourcePath *s3Path, destPath *s3Path) error {
println("Copying", file.name, "to", destPath.String())
if m.dryrun {
return nil
}

_, err := m.s3.CopyObject(&s3.CopyObjectInput{
Bucket: aws.String(destPath.bucket),
CopySource: aws.String(sourcePath.bucket + "/" + file.name),
Key: aws.String(file.name),
ACL: m.acl,
})

if err != nil {
return err
}

m.updateFileTransferStatistics(file.size)
return nil
}

func (m *Manager) download(file *fileInfo, sourcePath *s3Path, destPath string) error {
var targetFilename string
if !strings.HasSuffix(destPath, "/") && file.singleFile {
Expand Down
31 changes: 29 additions & 2 deletions s3sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@ func TestS3syncNotImplemented(t *testing.T) {
if err := m.Sync("foo", "bar"); err == nil {
t.Fatal("local to local sync is not supported")
}
}

func TestS3ToS3(t *testing.T) {
m := New(getSession())

if err := m.Sync("s3://foo", "s3://bar"); err == nil {
t.Fatal("s3 to s3 sync is not implemented yet")
err := m.Sync("s3://s3-source", "s3://s3-destination")

if err != nil {
t.Fatal(err.Error())
}
}

Expand Down Expand Up @@ -134,6 +140,27 @@ func TestS3sync(t *testing.T) {
fileHasSize(t, filepath.Join(temp, "test.md"), dummyFileSize)
})

t.Run("S3ToS3Copy", func(t *testing.T) {
if err := New(getSession()).Sync("s3://s3-source", "s3://s3-destination"); err != nil {
t.Fatal("Sync should be successful", err)
}

objs := listObjectsSorted(t, "s3-destination")
if n := len(objs); n != 3 {
t.Fatalf("Number of the files should be 3 (result: %v)", objs)
}
for _, obj := range objs {
if obj.size != dummyFileSize {
t.Errorf("Object size should be %d, actual %d", dummyFileSize, obj.size)
}
}
if objs[0].path != "README.md" ||
objs[1].path != "bar/baz/README.md" ||
objs[2].path != "foo/README.md" {
t.Error("Unexpected keys", objs)
}
})

t.Run("Upload", func(t *testing.T) {
temp, err := ioutil.TempDir("", "s3synctest")
defer os.RemoveAll(temp)
Expand Down

0 comments on commit 46899ad

Please sign in to comment.