Skip to content

Commit

Permalink
Merge pull request #348 from pixlise/feature/archive-optimiser-tool
Browse files Browse the repository at this point in the history
Attempt to implement streaming of large S3 files to disk for DB impor…
  • Loading branch information
pnemere authored Nov 7, 2024
2 parents 50eb013 + e2f37ea commit a77ef16
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 4 deletions.
6 changes: 3 additions & 3 deletions api/ws/wsHelpers/sync-mongo-restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ func DownloadArchive(svcs *services.APIServices) (string, error) {
svcs.Log.Errorf(" Failed to get free disk bytes: %v", err)
}

svcs.Log.Infof(" Downloading: %v... free space: %v bytes", dbFile, freeBytes)
svcs.Log.Infof(" Downloading: %v... (%v bytes free)", dbFile, freeBytes)

dbFileBytes, err := svcs.FS.ReadObject(svcs.Config.DataBackupBucket, dbFile)
dbStream, err := svcs.FS.ReadObjectStream(svcs.Config.DataBackupBucket, dbFile)
if err != nil {
return "", fmt.Errorf("Failed to download remote DB dump file: %v. Error: %v", dbFile, err)
}

// Save locally
// Remove remote root dir
dbFilePathLocal := strings.TrimPrefix(dbFile, dataBackupS3Path+"/")
err = localFS.WriteObject(dataBackupLocalPath, dbFilePathLocal, dbFileBytes)
err = localFS.WriteObjectStream(dataBackupLocalPath, dbFilePathLocal, dbStream)

if err != nil {
return "", fmt.Errorf("Failed to write local DB dump file: %v. Error: %v", dbFilePathLocal, err)
Expand Down
10 changes: 9 additions & 1 deletion core/fileaccess/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
// available from
package fileaccess

import "strings"
import (
"io"
"strings"
)

// Generic interface for reading/writing files asynchronously
// We could have used OS level things but we want to be able to
Expand All @@ -45,6 +48,11 @@ type FileAccess interface {
// Writes a file as bytes
WriteObject(bucket string, path string, data []byte) error

// Reads a file as bytes (returning a stream)
ReadObjectStream(bucket string, path string) (io.ReadCloser, error)
// Writes a file from stream
WriteObjectStream(bucket string, path string, stream io.Reader) error

// Reads a file as JSON and decodes it into itemsPtr
ReadJSON(bucket string, s3Path string, itemsPtr interface{}, emptyIfNotFound bool) error
// Writes itemsPtr as a JSON file
Expand Down
30 changes: 30 additions & 0 deletions core/fileaccess/localFileSystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,36 @@ func (fs *FSAccess) WriteObject(rootPath string, path string, data []byte) error
return os.WriteFile(fullPath, data, 0777)
}

func (fs *FSAccess) ReadObjectStream(rootPath string, path string) (io.ReadCloser, error) {
fullPath := fs.filePath(rootPath, path)
f, err := os.Open(fullPath)
if err != nil {
return nil, err
}

return f, nil
}

func (fs *FSAccess) WriteObjectStream(rootPath string, path string, stream io.Reader) error {
fullPath := fs.filePath(rootPath, path)

// Ensure any subdirs in between are created
createPath := filepath.Dir(fullPath)
err := os.MkdirAll(createPath, 0777)
if err != nil {
return err
}

outFile, err := os.Create(fullPath)
if err != nil {
return err
}
defer outFile.Close()

_, err = io.Copy(outFile, stream)
return err
}

func (fs *FSAccess) ReadJSON(rootPath string, s3Path string, itemsPtr interface{}, emptyIfNotFound bool) error {
fileData, err := fs.ReadObject(rootPath, s3Path)

Expand Down
26 changes: 26 additions & 0 deletions core/fileaccess/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,32 @@ func (s3Access S3Access) WriteObject(bucket string, path string, data []byte) er
return err
}

func (s3Access S3Access) ReadObjectStream(bucket string, path string) (io.ReadCloser, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(path),
}

result, err := s3Access.s3Api.GetObject(input)
if err != nil {
return nil, err
}

return result.Body, nil
}

func (s3Access S3Access) WriteObjectStream(bucket string, path string, stream io.Reader) error {
/*input := &s3.PutObjectInput{
Body: stream,
Bucket: aws.String(bucket),
Key: aws.String(path),
}
_, err := s3Access.s3Api.PutObject(input)
return err*/
return fmt.Errorf("Not implemented")
}

func (s3Access S3Access) ReadJSON(bucket string, s3Path string, itemsPtr interface{}, emptyIfNotFound bool) error {
fileData, err := s3Access.ReadObject(bucket, s3Path)

Expand Down

0 comments on commit a77ef16

Please sign in to comment.