diff --git a/CHANGELOG.md b/CHANGELOG.md index 40515e8a..c2d5be93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- fixed #145 - Remove use of local temp file when reading/seeking from s3 files. This should improve performance by allowing streaming reads from s3 files. ## [6.9.1] - 2023-11-21 ### Fixed diff --git a/backend/helpers.go b/backend/helpers.go index 6d3bb4fa..4229405e 100644 --- a/backend/helpers.go +++ b/backend/helpers.go @@ -7,7 +7,7 @@ import ( "github.com/c2fo/vfs/v6" ) -// ValidateCopySeekPosition return ensures curren seek cursor is 0,0. This is useful to ensure it's safe to copy. A seek position +// ValidateCopySeekPosition return ensures current seek cursor is 0,0. This is useful to ensure it's safe to copy. A seek position // elsewhere will mean a partial copy. func ValidateCopySeekPosition(f vfs.File) error { // validate seek is at 0,0 before doing copy diff --git a/backend/s3/file.go b/backend/s3/file.go index 0e4ed01f..26308706 100644 --- a/backend/s3/file.go +++ b/backend/s3/file.go @@ -2,21 +2,19 @@ package s3 import ( "bytes" + "errors" "fmt" "io" "net/url" - "os" "path" "time" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/c2fo/vfs/v6" - "github.com/c2fo/vfs/v6/backend" "github.com/c2fo/vfs/v6/mocks" "github.com/c2fo/vfs/v6/options" "github.com/c2fo/vfs/v6/options/delete" @@ -28,19 +26,11 @@ type File struct { fileSystem *FileSystem bucket string key string - tempFile *os.File + cursorPos int64 + reader io.ReadCloser writeBuffer *bytes.Buffer } -// Downloader interface needed to mock S3 Downloader data access object in tests -type Downloader interface { - Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(downloader *s3manager.Downloader)) (n int64, err error) - - DownloadWithContext(ctx aws.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*s3manager.Downloader)) (n int64, err error) - - DownloadWithIterator(ctx aws.Context, iter s3manager.BatchDownloadIterator, opts ...func(*s3manager.Downloader)) error -} - // Info Functions // LastModified returns the LastModified property of a HEAD request to the s3 object. @@ -66,13 +56,10 @@ func (f *File) Path() string { // the object's HEAD through the s3 API. func (f *File) Exists() (bool, error) { _, err := f.getHeadObject() - code := "" if err != nil { - code = err.(awserr.Error).Code() - } - if err != nil && (code == s3.ErrCodeNoSuchKey || code == "NotFound") { - return false, nil - } else if err != nil { + if errors.Is(err, vfs.ErrNotExist) { + return false, nil + } return false, err } @@ -104,8 +91,8 @@ func (f *File) Location() vfs.Location { // method if the target file is also on S3, otherwise uses io.CopyBuffer. func (f *File) CopyToFile(file vfs.File) error { // validate seek is at 0,0 before doing copy - if err := backend.ValidateCopySeekPosition(f); err != nil { - return err + if f.cursorPos != 0 { + return vfs.CopyToNotPossible } // if target is S3 @@ -235,18 +222,16 @@ func (f *File) Delete(opts ...options.DeleteOption) error { // Close cleans up underlying mechanisms for reading from and writing to the file. Closes and removes the // local temp file, and triggers a write to s3 of anything in the f.writeBuffer if it has been created. func (f *File) Close() error { - if f.tempFile != nil { - err := f.tempFile.Close() - if err != nil { - return err - } + f.cursorPos = 0 - err = os.Remove(f.tempFile.Name()) - if err != nil && !os.IsNotExist(err) { + // invalidate reader + if f.reader != nil { + err := f.reader.Close() + if err != nil { return err } - f.tempFile = nil + f.reader = nil } if f.writeBuffer != nil { @@ -273,19 +258,69 @@ func (f *File) Close() error { // Read implements the standard for io.Reader. For this to work with an s3 file, a temporary local copy of // the file is created, and reads work on that. This file is closed and removed upon calling f.Close() func (f *File) Read(p []byte) (n int, err error) { - if err := f.checkTempFile(); err != nil { + // check/initialize for reader + r, err := f.getReader() + if err != nil { return 0, err } - return f.tempFile.Read(p) + + read, err := r.Read(p) + if err != nil { + return read, err + } + + f.cursorPos += int64(read) + + return read, nil } // Seek implements the standard for io.Seeker. A temporary local copy of the s3 file is created (the same // one used for Reads) which Seek() acts on. This file is closed and removed upon calling f.Close() func (f *File) Seek(offset int64, whence int) (int64, error) { - if err := f.checkTempFile(); err != nil { + length, err := f.Size() + if err != nil { return 0, err } - return f.tempFile.Seek(offset, whence) + + // update cursorPos + pos, err := seekTo(int64(length), f.cursorPos, offset, whence) + if err != nil { + return 0, err + } + f.cursorPos = pos + + // invalidate reader + if f.reader != nil { + err := f.reader.Close() + if err != nil { + return 0, err + } + + f.reader = nil + } + + return f.cursorPos, nil +} + +// seekTo is a helper function for Seek. It takes the current position, offset, whence, and length of the file +// and returns the new position. It also checks for invalid offsets and returns an error if one is found. +func seekTo(length, position, offset int64, whence int) (int64, error) { + + switch whence { + default: + return 0, vfs.ErrSeekInvalidWhence + case io.SeekStart: + // this actually does nothing since the new position just becomes the offset but is here for completeness + case io.SeekCurrent: + offset += position + case io.SeekEnd: + offset += length + } + if offset < 0 { + return 0, vfs.ErrSeekInvalidOffset + } + + return offset, nil } // Write implements the standard for io.Writer. A buffer is added to with each subsequent @@ -305,7 +340,14 @@ func (f *File) Write(data []byte) (res int, err error) { f.writeBuffer = bytes.NewBuffer([]byte{}) } - return f.writeBuffer.Write(data) + + written, err := f.writeBuffer.Write(data) + if err != nil { + return 0, err + } + f.cursorPos += int64(written) + + return written, err } // Touch creates a zero-length file on the vfs.File if no File exists. Update File's last modified timestamp. @@ -363,7 +405,10 @@ func (f *File) getHeadObject() (*s3.HeadObjectOutput, error) { if err != nil { return nil, err } - return client.HeadObject(headObjectInput) + + head, err := client.HeadObject(headObjectInput) + + return head, handleExistsError(err) } // For copy from S3-to-S3 when credentials are the same between source and target, return *s3.CopyObjectInput or error @@ -424,52 +469,6 @@ func (f *File) getCopyObjectInput(targetFile *File) (*s3.CopyObjectInput, error) return nil, nil } -func (f *File) checkTempFile() error { - if f.tempFile == nil { - localTempFile, err := f.copyToLocalTempReader() - if err != nil { - return err - } - f.tempFile = localTempFile - } - - return nil -} - -func (f *File) copyToLocalTempReader() (*os.File, error) { - // Create temp file - tmpFile, err := os.CreateTemp("", fmt.Sprintf("%s.%d", f.Name(), time.Now().UnixNano())) - if err != nil { - return nil, err - } - - // Create S3 Downloader, get client, and set partition size for multipart download - var partSize int64 - if opts, ok := f.Location().FileSystem().(*FileSystem).options.(Options); ok { - if partSize = opts.DownloadPartitionSize; partSize == 0 { - partSize = 32 * 1024 * 1024 // 32 MB per partition default if opts.DownloadPartitionSize is 0 - } - } - - client, err := f.fileSystem.Client() - if err != nil { - return nil, err - } - - // Download file - _, err = getDownloader(client, partSize).Download(tmpFile, f.getObjectInput()) - if err != nil { - return nil, err - } - - // Return temp file - return tmpFile, nil -} - -func (f *File) getObjectInput() *s3.GetObjectInput { - return new(s3.GetObjectInput).SetBucket(f.bucket).SetKey(f.key) -} - // TODO: need to provide an implementation-agnostic container for providing config options such as SSE func uploadInput(f *File) *s3manager.UploadInput { sseType := "AES256" @@ -534,8 +533,42 @@ func waitUntilFileExists(file vfs.File, retries int) error { return nil } -var getDownloader = func(client s3iface.S3API, partSize int64) Downloader { - return s3manager.NewDownloaderWithClient(client, func(d *s3manager.Downloader) { - d.PartSize = partSize - }) +func (f *File) getReader() (io.ReadCloser, error) { + if f.reader == nil { + // Create the request to get the object + input := new(s3.GetObjectInput). + SetBucket(f.bucket). + SetKey(f.key). + SetRange(fmt.Sprintf("bytes=%d-", f.cursorPos)) + + // Get the client + client, err := f.fileSystem.Client() + if err != nil { + return nil, err + } + + // Request the object + result, err := client.GetObject(input) + if err != nil { + return nil, err + } + + // Set the reader to the body of the object + f.reader = result.Body + } + return f.reader, nil +} + +func handleExistsError(err error) error { + if err != nil { + var awsErr awserr.Error + if errors.As(err, &awsErr) { + switch awsErr.Code() { + case s3.ErrCodeNoSuchKey, s3.ErrCodeNoSuchBucket, "NotFound": + return vfs.ErrNotExist + } + } + return err + } + return nil } diff --git a/backend/s3/file_test.go b/backend/s3/file_test.go index af9af096..914c7ddd 100644 --- a/backend/s3/file_test.go +++ b/backend/s3/file_test.go @@ -10,10 +10,10 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -26,7 +26,6 @@ import ( type fileTestSuite struct { suite.Suite - getDownloader func(client s3iface.S3API, partSize int64) Downloader } var ( @@ -46,9 +45,6 @@ func (ts *fileTestSuite) SetupTest() { testFileName = "/some/path/to/file.txt" bucket = "bucket" testFile, err = fs.NewFile(bucket, testFileName) - getDownloader = func(client s3iface.S3API, partSize int64) Downloader { - return mocks.S3MockDownloader{} - } if err != nil { ts.Fail("Shouldn't return error creating test s3.File instance.") @@ -56,17 +52,10 @@ func (ts *fileTestSuite) SetupTest() { } func (ts *fileTestSuite) TearDownTest() { - getDownloader = ts.getDownloader } func (ts *fileTestSuite) TestRead() { contents := "hello world!" - downloader := mocks.S3MockDownloader{ - ExpectedContents: contents, - } - getDownloader = func(client s3iface.S3API, partSize int64) Downloader { - return downloader - } file, err := fs.NewFile("bucket", "/some/path/file.txt") if err != nil { @@ -74,13 +63,27 @@ func (ts *fileTestSuite) TestRead() { } var localFile = bytes.NewBuffer([]byte{}) - - buffer := make([]byte, utils.TouchCopyMinBufferSize) - _, copyErr := io.CopyBuffer(localFile, file, buffer) + s3apiMock. + On("GetObject", mock.AnythingOfType("*s3.GetObjectInput")). + Return(&s3.GetObjectOutput{Body: io.NopCloser(strings.NewReader(contents))}, nil). + Once() + _, copyErr := io.Copy(localFile, file) ts.NoError(copyErr, "no error expected") closeErr := file.Close() ts.NoError(closeErr, "no error expected") ts.Equal(contents, localFile.String(), "Copying an s3 file to a buffer should fill buffer with file's contents") + + // test read with error + s3apiMock. + On("GetObject", mock.AnythingOfType("*s3.GetObjectInput")). + Return(nil, errors.New("some error")). + Once() + _, copyErr = io.Copy(localFile, file) + ts.Error(copyErr, "error expected") + ts.EqualError(copyErr, "some error", "error expected") + closeErr = file.Close() + ts.NoError(closeErr, "no error expected") + } // TODO: Write on Close() (actual s3 calls wait until file is closed to be made.) @@ -100,35 +103,59 @@ func (ts *fileTestSuite) TestSeek() { file, err := fs.NewFile("bucket", "/tmp/hello.txt") ts.NoError(err, "Shouldn't fail creating new file") - downloader := mocks.S3MockDownloader{ - ExpectedContents: contents, - } - getDownloader = func(client s3iface.S3API, partSize int64) Downloader { - return downloader + // setup mock for Size(getHeadObject) + headOutput := &s3.HeadObjectOutput{ContentLength: aws.Int64(12)} + + testCases := []struct { + seekOffset int64 + seekWhence int + expectedPos int64 + expectedErr bool + readContent string + }{ + {6, 0, 6, false, "world!"}, + {0, 0, 0, false, contents}, + {0, 2, 12, false, ""}, + {-1, 0, 0, true, ""}, // Seek before start + {0, 3, 0, true, ""}, // bad whence + } + + for _, tc := range testCases { + s3apiMock. + On("HeadObject", mock.AnythingOfType("*s3.HeadObjectInput")). + Return(headOutput, nil). + Once() + localFile := bytes.NewBuffer([]byte{}) + pos, err := file.Seek(tc.seekOffset, tc.seekWhence) + + if tc.expectedErr { + ts.Error(err, "Expected error for seek offset %d and whence %d", tc.seekOffset, tc.seekWhence) + } else { + ts.NoError(err, "No error expected for seek offset %d and whence %d", tc.seekOffset, tc.seekWhence) + ts.Equal(tc.expectedPos, pos, "Expected position does not match for seek offset %d and whence %d", tc.seekOffset, tc.seekWhence) + + // Mock the GetObject call + s3apiMock.On("GetObject", mock.AnythingOfType("*s3.GetObjectInput")). + Return(&s3.GetObjectOutput{Body: io.NopCloser(strings.NewReader(tc.readContent))}, nil). + Once() + + _, err = io.Copy(localFile, file) + ts.NoError(err, "No error expected during io.Copy") + ts.Equal(tc.readContent, localFile.String(), "Content does not match after seek and read") + } } - _, seekErr := file.Seek(6, 0) - assert.NoError(ts.T(), seekErr, "no error expected") - - var localFile = bytes.NewBuffer([]byte{}) - - buffer := make([]byte, utils.TouchCopyMinBufferSize) - _, copyErr := io.CopyBuffer(localFile, file, buffer) - assert.NoError(ts.T(), copyErr, "no error expected") - - ts.Equal("world!", localFile.String(), "Seeking should download the file and move the cursor as expected") - - localFile = bytes.NewBuffer([]byte{}) - _, seekErr2 := file.Seek(0, 0) - assert.NoError(ts.T(), seekErr2, "no error expected") + // test fails with Size error + s3apiMock. + On("HeadObject", mock.AnythingOfType("*s3.HeadObjectInput")). + Return(nil, awserr.New("NotFound", "file does not exist", errors.New("file not found"))). + Once() + _, err = file.Seek(0, 0) + ts.Require().Error(err, "error expected") + ts.Require().ErrorIs(err, vfs.ErrNotExist, "error expected") - buffer = make([]byte, utils.TouchCopyMinBufferSize) - _, copyErr2 := io.CopyBuffer(localFile, file, buffer) - assert.NoError(ts.T(), copyErr2, "no error expected") - ts.Equal(contents, localFile.String(), "Subsequent calls to seek work on temp file as expected") - - closeErr := file.Close() - assert.NoError(ts.T(), closeErr, "no error expected") + err = file.Close() + ts.NoError(err, "Closing file should not produce an error") } func (ts *fileTestSuite) TestGetLocation() { @@ -205,18 +232,13 @@ func (ts *fileTestSuite) TestCopyToFile() { } func (ts *fileTestSuite) TestEmptyCopyToFile() { - contents := "" - downloader := mocks.S3MockDownloader{ - ExpectedContents: contents, - } - getDownloader = func(client s3iface.S3API, partSize int64) Downloader { - return downloader - } - targetFile := &mocks.File{} targetFile.On("Write", mock.Anything).Return(0, nil) targetFile.On("Close").Return(nil) - + s3apiMock. + On("GetObject", mock.AnythingOfType("*s3.GetObjectInput")). + Return(&s3.GetObjectOutput{Body: io.NopCloser(strings.NewReader(""))}, nil). + Once() err := testFile.CopyToFile(targetFile) ts.Nil(err, "Error shouldn't be returned from successful call to CopyToFile") @@ -225,14 +247,6 @@ func (ts *fileTestSuite) TestEmptyCopyToFile() { } func (ts *fileTestSuite) TestMoveToFile() { - contents := "hello world!" - downloader := mocks.S3MockDownloader{ - ExpectedContents: contents, - } - getDownloader = func(client s3iface.S3API, partSize int64) Downloader { - return downloader - } - targetFile := &File{ fileSystem: &FileSystem{ client: s3apiMock, @@ -338,14 +352,6 @@ func (ts *fileTestSuite) TestGetCopyObject() { } func (ts *fileTestSuite) TestMoveToFile_CopyError() { - contents := "hello world!" - downloader := mocks.S3MockDownloader{ - ExpectedContents: contents, - } - getDownloader = func(client s3iface.S3API, partSize int64) Downloader { - return downloader - } - targetFile := &File{ fileSystem: &FileSystem{ client: s3apiMock, @@ -401,13 +407,6 @@ func (ts *fileTestSuite) TestCopyToLocation() { func (ts *fileTestSuite) TestTouch() { // Copy portion tested through CopyToLocation, just need to test whether or not Delete happens // in addition to CopyToLocation - contents := "hello world!" - downloader := mocks.S3MockDownloader{ - ExpectedContents: contents, - } - getDownloader = func(client s3iface.S3API, partSize int64) Downloader { - return downloader - } s3Mock1 := &mocks.S3API{} s3Mock1.On("HeadObject", mock.AnythingOfType("*s3.HeadObjectInput")).Return(&s3.HeadObjectOutput{}, nil) @@ -454,14 +453,6 @@ func (ts *fileTestSuite) TestTouch() { func (ts *fileTestSuite) TestMoveToLocation() { // Copy portion tested through CopyToLocation, just need to test whether or not Delete happens // in addition to CopyToLocation - contents := "hello world!" - downloader := mocks.S3MockDownloader{ - ExpectedContents: contents, - } - getDownloader = func(client s3iface.S3API, partSize int64) Downloader { - return downloader - } - s3Mock1 := &mocks.S3API{} s3Mock1.On("CopyObject", mock.AnythingOfType("*s3.CopyObjectInput")).Return(nil, nil) s3Mock1.On("HeadObject", mock.AnythingOfType("*s3.HeadObjectInput")).Return(&s3.HeadObjectOutput{}, nil) @@ -515,13 +506,6 @@ func (ts *fileTestSuite) TestMoveToLocation() { } func (ts *fileTestSuite) TestMoveToLocationFail() { - contents := "hello world!" - downloader := mocks.S3MockDownloader{ - ExpectedContents: contents, - } - getDownloader = func(client s3iface.S3API, partSize int64) Downloader { - return downloader - } // If CopyToLocation fails we need to ensure DeleteObject isn't called. otherFs := new(mocks.FileSystem) @@ -726,6 +710,53 @@ func (ts *fileTestSuite) TestCloseWithWrite() { } +// TestSeekTo tests the seekTo function with various cases +func (ts *fileTestSuite) TestSeekTo() { + testCases := []struct { + position int64 + offset int64 + whence int + length int64 + expectedPosition int64 + expectError error + }{ + // Test seeking from start + {0, 10, io.SeekStart, 100, 10, nil}, + {0, -10, io.SeekStart, 100, 0, vfs.ErrSeekInvalidOffset}, // Negative offset from start + {0, 110, io.SeekStart, 100, 110, nil}, // Offset beyond length + + // Test seeking from current position + {50, 10, io.SeekCurrent, 100, 60, nil}, + {50, -60, io.SeekCurrent, 100, 0, vfs.ErrSeekInvalidOffset}, // Moving before start + {50, 60, io.SeekCurrent, 100, 110, nil}, // Moving beyond length + + // Test seeking from end + {0, -10, io.SeekEnd, 100, 90, nil}, + {0, -110, io.SeekEnd, 100, 0, vfs.ErrSeekInvalidOffset}, // Moving before start + {0, 10, io.SeekEnd, 100, 110, nil}, // Moving beyond length + + // Additional edge cases + {0, 0, io.SeekStart, 100, 0, nil}, // No movement from start + {100, 0, io.SeekCurrent, 100, 100, nil}, // No movement from current + {0, 0, io.SeekEnd, 100, 100, nil}, // No movement from end + + // invalid whence case + {0, 0, 3, 100, 0, vfs.ErrSeekInvalidWhence}, + } + + for _, tc := range testCases { + result, err := seekTo(tc.length, tc.position, tc.offset, tc.whence) + + if tc.expectError != nil { + ts.Error(err, "error expected") + ts.ErrorIs(err, tc.expectError) + } else { + ts.NoError(err, "no error expected") + ts.Equal(tc.expectedPosition, result) + } + } +} + func TestFile(t *testing.T) { suite.Run(t, new(fileTestSuite)) } diff --git a/errors.go b/errors.go index ee8aed6e..b2a1f5a3 100644 --- a/errors.go +++ b/errors.go @@ -6,5 +6,16 @@ type Error string // Error returns a string representation of the error func (e Error) Error() string { return string(e) } -// CopyToNotPossible - CopyTo/MoveTo operations are only possible when seek position is 0,0 -const CopyToNotPossible = Error("current cursor offset is not 0 as required for this operation") +const ( + // CopyToNotPossible - CopyTo/MoveTo operations are only possible when seek position is 0,0 + CopyToNotPossible = Error("current cursor offset is not 0 as required for this operation") + + // ErrNotExist - File does not exist + ErrNotExist = Error("file does not exist") + + // ErrSeekInvalidOffset - Offset is invalid. Must be greater than or equal to 0 + ErrSeekInvalidOffset = Error("seek: invalid offset") + + // ErrSeekInvalidWhence - Whence is invalid. Must be one of the following: 0 (io.SeekStart), 1 (io.SeekCurrent), or 2 (io.SeekEnd) + ErrSeekInvalidWhence = Error("seek: invalid whence") +)