diff --git a/.github/workflows/functionality.yml b/.github/workflows/functionality.yml index c14d360ea..4dcd14dab 100644 --- a/.github/workflows/functionality.yml +++ b/.github/workflows/functionality.yml @@ -33,7 +33,7 @@ jobs: strategy: matrix: - storagetype: [s3, posix, s3notls] + storagetype: [s3, posix, s3notls, s3seekable] go-version: ['1.21'] fail-fast: false steps: diff --git a/sda-download/.github/integration/setup/posix/1_posix_env_setup.sh b/sda-download/.github/integration/setup/posix/1_posix_env_setup.sh index 913f1245f..68d98991f 100644 --- a/sda-download/.github/integration/setup/posix/1_posix_env_setup.sh +++ b/sda-download/.github/integration/setup/posix/1_posix_env_setup.sh @@ -1,4 +1,4 @@ #!/bin/bash -sed -i 's/=s3/=posix/g' dev_utils/env.download +sed -i 's/ARCHIVE_TYPE=.*/ARCHIVE_TYPE=posix/g' dev_utils/env.download diff --git a/sda-download/.github/integration/setup/s3seekable/100_s3_storage_setup.sh b/sda-download/.github/integration/setup/s3seekable/100_s3_storage_setup.sh new file mode 100644 index 000000000..5a58fc2f9 --- /dev/null +++ b/sda-download/.github/integration/setup/s3seekable/100_s3_storage_setup.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +pip3 install s3cmd + +cd dev_utils || exit 1 + +# Make buckets if they don't exist already +s3cmd -c s3cmd.conf mb s3://archive || true + +# # Upload test file +s3cmd -c s3cmd.conf put archive_data/4293c9a7-dc50-46db-b79a-27ddc0dad1c6 s3://archive/4293c9a7-dc50-46db-b79a-27ddc0dad1c6 diff --git a/sda-download/.github/integration/setup/s3seekable/1_s3seekable_env_setup.sh b/sda-download/.github/integration/setup/s3seekable/1_s3seekable_env_setup.sh new file mode 100644 index 000000000..57b87f448 --- /dev/null +++ b/sda-download/.github/integration/setup/s3seekable/1_s3seekable_env_setup.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +sed -i 's/ARCHIVE_TYPE=.*/ARCHIVE_TYPE=s3seekable/g' dev_utils/env.download + diff --git a/sda-download/api/api.go b/sda-download/api/api.go index 1b200ed20..d947f0c80 100644 --- a/sda-download/api/api.go +++ b/sda-download/api/api.go @@ -52,12 +52,7 @@ func Setup() *http.Server { // Configure TLS settings log.Info("(3/5) Configuring TLS") cfg := &tls.Config{ - MinVersion: tls.VersionTLS12, - CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256}, - PreferServerCipherSuites: true, - CipherSuites: []uint16{ - tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - }, + MinVersion: tls.VersionTLS12, } // Configure web server diff --git a/sda-download/api/sda/sda.go b/sda-download/api/sda/sda.go index 68c447050..cb49116c6 100644 --- a/sda-download/api/sda/sda.go +++ b/sda-download/api/sda/sda.go @@ -272,23 +272,13 @@ func Download(c *gin.Context) { return } - switch c.Param("type") { - case "encrypted": - // calculate coordinates - start, end, err = calculateEncryptedCoords(start, end, c.GetHeader("Range"), fileDetails) - if err != nil { - log.Errorf("Byte range coordinates invalid! %v", err) - - return - } - if start > 0 { - // reading from an offset in encrypted file is not yet supported - log.Errorf("Start coordinate for encrypted files not implemented! %v", start) - c.String(http.StatusBadRequest, "Start coordinate for encrypted files not implemented!") + start, end, err = calculateCoords(start, end, c.GetHeader("Range"), fileDetails, c.Param("type")) + if err != nil { + log.Errorf("Byte range coordinates invalid! %v", err) - return - } - default: + return + } + if c.Param("type") != "encrypted" { // set the content-length for unencrypted files if start == 0 && end == 0 { c.Header("Content-Length", fmt.Sprint(fileDetails.DecryptedSize)) @@ -325,35 +315,39 @@ func Download(c *gin.Context) { // set the user and server public keys that is send from htsget log.Debugf("Got to setting the headers: %s", c.GetHeader("client-public-key")) c.Header("Client-Public-Key", c.GetHeader("Client-Public-Key")) - c.Header("Server-Public-Key", c.GetHeader("Server-Public-Key")) } if c.Request.Method == http.MethodHead { + // Create headers for htsget, containing size of the crypt4gh header + reencKey := c.GetHeader("Client-Public-Key") + headerSize := bytes.NewReader(fileDetails.Header).Size() + // Size of the header in the archive + c.Header("Server-Additional-Bytes", fmt.Sprint(headerSize)) + if reencKey != "" { + newHeader, _ := reencryptHeader(fileDetails.Header, reencKey) + headerSize = bytes.NewReader(newHeader).Size() + // Size of the header if the file is re-encrypted before downloading + c.Header("Client-Additional-Bytes", fmt.Sprint(headerSize)) + } if c.Param("type") == "encrypted" { - c.Header("Content-Length", fmt.Sprint(fileDetails.ArchiveSize)) - - // set the length of the crypt4gh header for htsget - c.Header("Server-Additional-Bytes", fmt.Sprint(bytes.NewReader(fileDetails.Header).Size())) - // TODO figure out if client crypt4gh header will have other size - // c.Header("Client-Additional-Bytes", ...) + // Update the content length to match the encrypted file size + c.Header("Content-Length", fmt.Sprint(int(headerSize)+fileDetails.ArchiveSize)) } return } // Prepare the file for streaming, encrypted or decrypted - var fileStream io.Reader + + var fileStream io.ReadSeeker + hr := bytes.NewReader(fileDetails.Header) switch c.Param("type") { case "encrypted": - // The key provided in the header should be base64 encoded reencKey := c.GetHeader("Client-Public-Key") - if strings.HasPrefix(c.GetHeader("User-Agent"), "htsget") { - reencKey = c.GetHeader("Server-Public-Key") - } if reencKey == "" { - c.String(http.StatusBadRequest, "c4gh public key is mmissing from the header") + c.String(http.StatusBadRequest, "c4gh public key is missing from the header") return } @@ -369,27 +363,31 @@ func Download(c *gin.Context) { } log.Debugf("Reencrypted c4gh file header = %v", newHeader) newHr := bytes.NewReader(newHeader) - fileStream = io.MultiReader(newHr, file) + + fileStream, err = storage.SeekableMultiReader(newHr, file) + if err != nil { + log.Errorf("Construct SeekableMultiReader for file: %v", err) + c.String(http.StatusInternalServerError, "file stream error") + + return + } default: - encryptedFileReader := io.MultiReader(bytes.NewReader(fileDetails.Header), file) - c4ghfileStream, err := streaming.NewCrypt4GHReader(encryptedFileReader, *config.Config.App.Crypt4GHKey, nil) - defer c4ghfileStream.Close() + + fileStream, err = storage.SeekableMultiReader(hr, file) if err != nil { - log.Errorf("could not prepare file for streaming, %s", err) + log.Errorf("Construct SeekableMultiReader for file: %v", err) c.String(http.StatusInternalServerError, "file stream error") return } - if start != 0 { - // We don't want to read from start, skip ahead to where we should be - _, err = c4ghfileStream.Seek(start, 0) - if err != nil { - log.Errorf("error occurred while finding sending start: %v", err) - c.String(http.StatusInternalServerError, "an error occurred") + c4ghfileStream, err := streaming.NewCrypt4GHReader(fileStream, *config.Config.App.Crypt4GHKey, nil) + defer c4ghfileStream.Close() + if err != nil { + log.Errorf("could not prepare file for streaming, %s", err) + c.String(http.StatusInternalServerError, "file stream error") - return - } + return } fileStream = c4ghfileStream } @@ -403,8 +401,31 @@ func Download(c *gin.Context) { } } +var seekStart = func(fileStream io.ReadSeeker, start, end int64) (int64, int64, error) { + if start != 0 { + + // We don't want to read from start, skip ahead to where we should be + _, err := fileStream.Seek(start, 0) + if err != nil { + + return 0, 0, fmt.Errorf("error occurred while finding sending start: %v", err) + } + // adjust end to reflect that the file start has been moved + end -= start + start = 0 + + } + + return start, end, nil +} + // used from: https://github.com/neicnordic/crypt4gh/blob/master/examples/reader/main.go#L48C1-L113C1 -var sendStream = func(reader io.Reader, writer http.ResponseWriter, start, end int64) error { +var sendStream = func(reader io.ReadSeeker, writer http.ResponseWriter, start, end int64) error { + + start, end, err := seekStart(reader, start, end) + if err != nil { + return err + } // Calculate how much we should read (if given) togo := end - start @@ -451,10 +472,12 @@ var sendStream = func(reader io.Reader, writer http.ResponseWriter, start, end i } // Calculates the start and end coordinats to use. If a range is set in HTTP headers, -// it will be used as is. If not, the functions parameters will be used, -// and adjusted to match the data block boundaries of the encrypted file. -var calculateEncryptedCoords = func(start, end int64, htsget_range string, fileDetails *database.FileDownload) (int64, int64, error) { +// it will be used as is. If not, the functions parameters will be used. +// If in encrypted mode, the parameters will be adjusted to match the data block boundaries. +var calculateCoords = func(start, end int64, htsget_range string, fileDetails *database.FileDownload, encryptedType string) (int64, int64, error) { + log.Warnf("calculate") if htsget_range != "" { + log.Warnf("calculate non empty") startEnd := strings.Split(strings.TrimPrefix(htsget_range, "bytes="), "-") if len(startEnd) > 1 { a, err := strconv.ParseInt(startEnd[0], 10, 64) @@ -469,9 +492,17 @@ var calculateEncryptedCoords = func(start, end int64, htsget_range string, fileD return 0, 0, fmt.Errorf("endCoordinate must be greater than startCoordinate") } - return a, b, nil + // Byte ranges are inclusive; +1 so that the last byte is included + + return a, b + 1, nil } } + + // For unencrypted files, return the coordinates as is + if encryptedType != "encrypted" { + return start, end, nil + } + // Adapt end coordinate to follow the crypt4gh block boundaries headlength := bytes.NewReader(fileDetails.Header) bodyEnd := int64(fileDetails.ArchiveSize) @@ -484,4 +515,5 @@ var calculateEncryptedCoords = func(start, end int64, htsget_range string, fileD } return start, headlength.Size() + bodyEnd, nil + } diff --git a/sda-download/api/sda/sda_test.go b/sda-download/api/sda/sda_test.go index 81b9fcb24..8ca2c9c28 100644 --- a/sda-download/api/sda/sda_test.go +++ b/sda-download/api/sda/sda_test.go @@ -4,7 +4,10 @@ import ( "bytes" "errors" "io" + "net/http" "net/http/httptest" + "net/url" + "strconv" "testing" "github.com/gin-gonic/gin" @@ -477,9 +480,14 @@ func TestDownload_Fail_OpenFile(t *testing.T) { return fileDetails, nil } - // Mock request and response holders + // Mock request and response holders and initialize headers w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) + req := &http.Request{ + URL: &url.URL{}, + Header: make(http.Header), + } + c.Request = req // Test the outcomes of the handler Download(c) @@ -506,7 +514,7 @@ func TestDownload_Fail_OpenFile(t *testing.T) { } -func TestEncrypted_Coords(t *testing.T) { +func Test_CalucalateCoords(t *testing.T) { var to, from int64 from, to = 0, 1000 fileDetails := &database.FileDownload{ @@ -515,40 +523,43 @@ func TestEncrypted_Coords(t *testing.T) { Header: make([]byte, 124), } - // htsget_range should be used first and as is + // htsget_range should be used first and its end position should be increased by one headerSize := bytes.NewReader(fileDetails.Header).Size() fullSize := headerSize + int64(fileDetails.ArchiveSize) - start, end, err := calculateEncryptedCoords(from, to, "bytes=10-20", fileDetails) + var endPos int64 + endPos = 20 + start, end, err := calculateCoords(from, to, "bytes=10-"+strconv.FormatInt(endPos, 10), fileDetails, "default") assert.Equal(t, start, int64(10)) - assert.Equal(t, end, int64(20)) + assert.Equal(t, end, endPos+1) assert.NoError(t, err) // end should be greater than or equal to inputted end - _, end, err = calculateEncryptedCoords(from, to, "", fileDetails) + _, end, err = calculateCoords(from, to, "", fileDetails, "encrypted") assert.GreaterOrEqual(t, end, from) assert.NoError(t, err) // end should not be smaller than a header - _, end, err = calculateEncryptedCoords(from, headerSize-10, "", fileDetails) + _, end, err = calculateCoords(from, headerSize-10, "", fileDetails, "encrypted") assert.GreaterOrEqual(t, end, headerSize) assert.NoError(t, err) // end should not be larger than file length + header - _, end, err = calculateEncryptedCoords(from, fullSize+1900, "", fileDetails) + _, end, err = calculateCoords(from, fullSize+1900, "", fileDetails, "encrypted") assert.Equal(t, fullSize, end) assert.NoError(t, err) - // range 0-0 should give whole file - start, end, err = calculateEncryptedCoords(0, 0, "", fileDetails) + // param range 0-0 should give whole file + start, end, err = calculateCoords(0, 0, "", fileDetails, "encrypted") assert.Equal(t, end-start, fullSize) assert.NoError(t, err) - // range 0-0 with range in the header should return the range size - _, end, err = calculateEncryptedCoords(0, 0, "bytes=0-1000", fileDetails) - assert.Equal(t, end, int64(1000)) + // byte range 0-1000 should return the range size, end coord inclusive + endPos = 1000 + _, end, err = calculateCoords(0, 0, "bytes=0-"+strconv.FormatInt(endPos, 10), fileDetails, "encrypted") + assert.Equal(t, end, endPos+1) assert.NoError(t, err) // range in the header should return error if values are not numbers - _, _, err = calculateEncryptedCoords(0, 0, "bytes=start-end", fileDetails) + _, _, err = calculateCoords(0, 0, "bytes=start-end", fileDetails, "encrypted") assert.Error(t, err) } diff --git a/sda-download/internal/config/config.go b/sda-download/internal/config/config.go index 1c4110e27..dcf531fe1 100644 --- a/sda-download/internal/config/config.go +++ b/sda-download/internal/config/config.go @@ -19,6 +19,7 @@ import ( const POSIX = "posix" const S3 = "s3" +const S3seekable = "s3seekable" // availableMiddlewares list the options for middlewares // empty string "" is an alias for default, for when the config key is not set, or it's empty @@ -176,7 +177,7 @@ func NewConfig() (*Map, error) { "db.host", "db.user", "db.password", "db.database", "c4gh.filepath", "c4gh.passphrase", "oidc.configuration.url", } - if viper.GetString("archive.type") == S3 { + if viper.GetString("archive.type") == S3 || viper.GetString("archive.type") == S3seekable { requiredConfVars = append(requiredConfVars, []string{"archive.url", "archive.accesskey", "archive.secretkey", "archive.bucket"}...) } else if viper.GetString("archive.type") == POSIX { requiredConfVars = append(requiredConfVars, []string{"archive.location"}...) @@ -305,10 +306,17 @@ func (c *Map) configureOIDC() error { // configArchive provides configuration for the archive storage // we default to POSIX unless S3 specified func (c *Map) configArchive() { - if viper.GetString("archive.type") == S3 { + + switch viper.GetString("archive.type") { + case S3: c.Archive.Type = S3 c.Archive.S3 = configS3Storage("archive") - } else { + + case S3seekable: + c.Archive.Type = S3seekable + c.Archive.S3 = configS3Storage("archive") + + default: c.Archive.Type = POSIX c.Archive.Posix.Location = viper.GetString("archive.location") } diff --git a/sda-download/internal/config/config_test.go b/sda-download/internal/config/config_test.go index 2ac1cd568..c14b1e8af 100644 --- a/sda-download/internal/config/config_test.go +++ b/sda-download/internal/config/config_test.go @@ -99,6 +99,32 @@ func (suite *TestSuite) TestArchiveConfig() { } +func (suite *TestSuite) TestArchiveS3Config() { + for _, archiveType := range []string{S3, S3seekable} { + viper.Set("archive.type", archiveType) + viper.Set("archive.url", "localhost") + viper.Set("archive.accesskey", "access") + viper.Set("archive.secretkey", "secret") + viper.Set("archive.bucket", "bucket") + viper.Set("archive.port", "9090") + viper.Set("archive.chunksize", "10") + viper.Set("archive.region", "us-west-1") + viper.Set("archive.cacert", "filename") + + c := &Map{} + c.configArchive() + assert.Equal(suite.T(), "localhost", c.Archive.S3.URL) + assert.Equal(suite.T(), "access", c.Archive.S3.AccessKey) + assert.Equal(suite.T(), "secret", c.Archive.S3.SecretKey) + assert.Equal(suite.T(), "bucket", c.Archive.S3.Bucket) + assert.Equal(suite.T(), "us-west-1", c.Archive.S3.Region) + assert.Equal(suite.T(), "filename", c.Archive.S3.Cacert) + assert.Equal(suite.T(), 10*1024*1024, c.Archive.S3.Chunksize) + assert.Equal(suite.T(), 9090, c.Archive.S3.Port) + + } +} + func (suite *TestSuite) TestSessionConfig() { viper.Set("session.expiration", 3600) diff --git a/sda-download/internal/storage/storage.go b/sda-download/internal/storage/storage.go index 3cb68a876..41edba4fd 100644 --- a/sda-download/internal/storage/storage.go +++ b/sda-download/internal/storage/storage.go @@ -2,6 +2,7 @@ package storage import ( + "bytes" "crypto/tls" "crypto/x509" "fmt" @@ -11,6 +12,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -52,6 +54,9 @@ func NewBackend(config Conf) (Backend, error) { switch config.Type { case "s3": return newS3Backend(config.S3) + case "s3seekable": + return newS3SeekableBackend(config.S3) + default: return newPosixBackend(config.Posix) } @@ -126,6 +131,28 @@ type s3Backend struct { Conf *S3Conf } +type s3CacheBlock struct { + start int64 + length int64 + data []byte +} + +type s3SeekableBackend struct { + s3Backend +} + +type s3SeekableReader struct { + s3SeekableBackend + currentOffset int64 + local []s3CacheBlock + filePath string + objectSize int64 + lock sync.Mutex + outstandingPrefetches []int64 + seeked bool + objectReader io.Reader +} + // S3Conf stores information about the S3 storage backend type S3Conf struct { URL string @@ -140,6 +167,15 @@ type S3Conf struct { NonExistRetryTime time.Duration } +func newS3SeekableBackend(config S3Conf) (*s3SeekableBackend, error) { + sb, err := newS3Backend(config) + if err != nil { + return nil, err + } + + return &s3SeekableBackend{*sb}, nil +} + func newS3Backend(config S3Conf) (*s3Backend, error) { s3Transport := transportConfigS3(config) client := http.Client{Transport: s3Transport} @@ -315,3 +351,439 @@ func transportConfigS3(config S3Conf) http.RoundTripper { return trConfig } + +func (sb *s3SeekableBackend) NewFileReader(filePath string) (io.ReadCloser, error) { + s := sb.s3Backend + objectSize, err := s.GetFileSize(filePath) + + if err != nil { + return nil, err + } + + reader := &s3SeekableReader{ + *sb, + 0, + make([]s3CacheBlock, 0, 32), + filePath, + objectSize, + sync.Mutex{}, + make([]int64, 0, 32), + false, + nil, + } + + return reader, nil +} + +func (sb *s3SeekableBackend) GetFileSize(filePath string) (int64, error) { + s := sb.s3Backend + + return s.GetFileSize(filePath) +} + +func (r *s3SeekableReader) Close() (err error) { + return nil +} + +func (r *s3SeekableReader) pruneCache() { + r.lock.Lock() + defer r.lock.Unlock() + + if len(r.local) < 16 { + return + } + + // Prune the cache + keepfrom := len(r.local) - 8 + r.local = r.local[keepfrom:] + +} + +func (r *s3SeekableReader) prefetchSize() int64 { + n := r.Conf.Chunksize + + if n >= 5*1024*1024 { + return int64(n) + } + + return 50 * 1024 * 1024 +} + +func (r *s3SeekableReader) prefetchAt(offset int64) { + r.pruneCache() + + r.lock.Lock() + defer r.lock.Unlock() + + if r.isPrefetching(offset) { + // We're already fetching this + return + } + + // Check if we have the data in cache + for _, p := range r.local { + if offset >= p.start && offset < p.start+p.length { + // At least part of the data is here + return + } + } + + // Not found in cache, we should fetch the data + bucket := aws.String(r.Bucket) + key := aws.String(r.filePath) + prefetchSize := r.prefetchSize() + + r.outstandingPrefetches = append(r.outstandingPrefetches, offset) + + r.lock.Unlock() + + wantedRange := aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+prefetchSize-1)) + + object, err := r.Client.GetObject(&s3.GetObjectInput{ + Bucket: bucket, + Key: key, + Range: wantedRange, + }) + + r.lock.Lock() + + r.removeFromOutstanding(offset) + + if err != nil { + return + } + + responseRange := fmt.Sprintf("bytes %d-", r.currentOffset) + + if object.ContentRange == nil || !strings.HasPrefix(*object.ContentRange, responseRange) { + // Unexpected content range - ignore + return + } + + if len(r.local) > 16 { + // Don't cache anything more right now + return + } + + // Read into Buffer + b := bytes.Buffer{} + _, err = io.Copy(&b, object.Body) + if err != nil { + return + } + + // Store in cache + cacheBytes := b.Bytes() + r.local = append(r.local, s3CacheBlock{offset, int64(len(cacheBytes)), cacheBytes}) +} + +func (r *s3SeekableReader) Seek(offset int64, whence int) (int64, error) { + r.lock.Lock() + defer r.lock.Unlock() + + // Flag that we've seeked, so we don't use the mode optimised for reading from + // start to end + r.seeked = true + + switch whence { + case io.SeekStart: + if offset < 0 { + return r.currentOffset, fmt.Errorf("Invalid offset %v- can't be negative when seeking from start", offset) + } + if offset > r.objectSize { + return r.currentOffset, fmt.Errorf("Invalid offset %v - beyond end of object (size %v)", offset, r.objectSize) + } + + r.currentOffset = offset + go r.prefetchAt(r.currentOffset) + + return offset, nil + + case io.SeekCurrent: + if r.currentOffset+offset < 0 { + return r.currentOffset, fmt.Errorf("Invalid offset %v from %v would be be before start", offset, r.currentOffset) + } + if offset > r.objectSize { + return r.currentOffset, fmt.Errorf("Invalid offset - %v from %v would end up beyond of object %v", offset, r.currentOffset, r.objectSize) + } + + r.currentOffset += offset + go r.prefetchAt(r.currentOffset) + + return r.currentOffset, nil + + case io.SeekEnd: + if r.objectSize+offset < 0 { + return r.currentOffset, fmt.Errorf("Invalid offset %v from end in %v bytes object, would be before file start", offset, r.objectSize) + } + if r.objectSize+offset > r.objectSize { + return r.currentOffset, fmt.Errorf("Invalid offset %v from end in %v bytes object", offset, r.objectSize) + } + + r.currentOffset = r.objectSize + offset + go r.prefetchAt(r.currentOffset) + + return r.currentOffset, nil + } + + return r.currentOffset, fmt.Errorf("Bad whence") +} + +// removeFromOutstanding removes a prefetch from the list of outstanding prefetches once it's no longer active +func (r *s3SeekableReader) removeFromOutstanding(toRemove int64) { + switch len(r.outstandingPrefetches) { + case 0: + // Nothing to do + case 1: + // Check if it's the one we should remove + if r.outstandingPrefetches[0] == toRemove { + r.outstandingPrefetches = r.outstandingPrefetches[:0] + } + + default: + remove := 0 + found := false + for i, j := range r.outstandingPrefetches { + if j == toRemove { + remove = i + found = true + } + } + if found { + r.outstandingPrefetches[remove] = r.outstandingPrefetches[len(r.outstandingPrefetches)-1] + r.outstandingPrefetches = r.outstandingPrefetches[:len(r.outstandingPrefetches)-1] + } + } +} + +// isPrefetching checks if the data is already being fetched +func (r *s3SeekableReader) isPrefetching(offset int64) bool { + // Walk through the outstanding prefetches + for _, p := range r.outstandingPrefetches { + if offset >= p && offset < p+r.prefetchSize() { + // At least some of this read is already being fetched + + return true + } + } + + return false +} + +// wholeReader is a helper for when we read the whole object +func (r *s3SeekableReader) wholeReader(dst []byte) (int, error) { + if r.objectReader == nil { + // First call, setup a reader for the object + object, err := r.Client.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(r.Bucket), + Key: aws.String(r.filePath), + }) + + if err != nil { + return 0, err + } + + // Store for future use + r.objectReader = object.Body + } + + // Just use the reader, offset is handled in the caller + return r.objectReader.Read(dst) +} + +func (r *s3SeekableReader) Read(dst []byte) (n int, err error) { + r.lock.Lock() + defer r.lock.Unlock() + + if !r.seeked { + // If not seeked, guess that we use a whole object reader for performance + + n, err = r.wholeReader(dst) + // We need to keep track of the position in the stream in case we seek + r.currentOffset += int64(n) + + return n, err + } + + if r.currentOffset >= r.objectSize { + // For reading when there is no more data, just return EOF + return 0, io.EOF + } + + start := r.currentOffset + + // Walk through the cache + for _, p := range r.local { + if start >= p.start && start < p.start+p.length { + // At least part of the data is here + + offsetInBlock := start - p.start + + // Pull out wanted data (as much as we have) + n = copy(dst, p.data[offsetInBlock:]) + r.currentOffset += int64(n) + + // Prefetch the next bit + go r.prefetchAt(r.currentOffset) + + return n, nil + } + } + + // Check if we're already fetching this data + if r.isPrefetching(start) { + // Return 0, nil to have the client retry + + return 0, nil + } + + // Not found in cache, need to fetch data + + bucket := aws.String(r.Bucket) + key := aws.String(r.filePath) + + wantedRange := aws.String(fmt.Sprintf("bytes=%d-%d", r.currentOffset, r.currentOffset+r.prefetchSize()-1)) + + r.outstandingPrefetches = append(r.outstandingPrefetches, start) + + r.lock.Unlock() + + object, err := r.Client.GetObject(&s3.GetObjectInput{ + Bucket: bucket, + Key: key, + Range: wantedRange, + }) + + r.lock.Lock() + + r.removeFromOutstanding(start) + + if err != nil { + return 0, err + } + + responseRange := fmt.Sprintf("bytes %d-", r.currentOffset) + + if object.ContentRange == nil || !strings.HasPrefix(*object.ContentRange, responseRange) { + return 0, fmt.Errorf("Unexpected content range %v - expected prefix %v", object.ContentRange, responseRange) + } + + b := bytes.Buffer{} + _, err = io.Copy(&b, object.Body) + + // Add to cache + cacheBytes := bytes.Clone(b.Bytes()) + r.local = append(r.local, s3CacheBlock{start, int64(len(cacheBytes)), cacheBytes}) + + n, err = b.Read(dst) + + r.currentOffset += int64(n) + go r.prefetchAt(r.currentOffset) + + return n, err +} + +// seekableMultiReader is a helper struct to allow io.MultiReader to be used with a seekable reader +type seekableMultiReader struct { + readers []io.Reader + sizes []int64 + currentOffset int64 + totalSize int64 +} + +// SeekableMultiReader constructs a multireader that supports seeking. Requires +// all passed readers to be seekable +func SeekableMultiReader(readers ...io.Reader) (io.ReadSeeker, error) { + + r := make([]io.Reader, len(readers)) + sizes := make([]int64, len(readers)) + + copy(r, readers) + + var totalSize int64 + for i, reader := range readers { + seeker, ok := reader.(io.ReadSeeker) + if !ok { + return nil, fmt.Errorf("Reader %d to SeekableMultiReader is not seekable", i) + } + + size, err := seeker.Seek(0, io.SeekEnd) + if err != nil { + return nil, fmt.Errorf("Size determination failed for reader %d to SeekableMultiReader: %v", i, err) + } + + sizes[i] = size + totalSize += size + + } + + return &seekableMultiReader{r, sizes, 0, totalSize}, nil +} + +func (r *seekableMultiReader) Seek(offset int64, whence int) (int64, error) { + + switch whence { + case io.SeekStart: + r.currentOffset = offset + case io.SeekCurrent: + r.currentOffset += offset + case io.SeekEnd: + r.currentOffset = r.totalSize + offset + + default: + return 0, fmt.Errorf("Unsupported whence") + + } + + return r.currentOffset, nil +} + +func (r *seekableMultiReader) Read(dst []byte) (int, error) { + + var readerStartAt int64 + + for i, reader := range r.readers { + + if r.currentOffset < readerStartAt { + // We want data from a previous reader (? HELP ?) + readerStartAt += r.sizes[i] + + continue + } + + if readerStartAt+r.sizes[i] < r.currentOffset { + // We want data from a later reader + readerStartAt += r.sizes[i] + + continue + } + + // At least part of the data is in this reader + + seekable, ok := reader.(io.ReadSeeker) + if !ok { + return 0, fmt.Errorf("Expected seekable reader but changed") + } + + _, err := seekable.Seek(r.currentOffset-int64(readerStartAt), 0) + if err != nil { + return 0, fmt.Errorf("Unexpected error while seeking: %v", err) + } + + n, err := seekable.Read(dst) + r.currentOffset += int64(n) + + if n > 0 || err != io.EOF { + if err == io.EOF && r.currentOffset < r.totalSize { + // More data left, hold that EOF + err = nil + } + + return n, err + } + + readerStartAt += r.sizes[i] + } + + return 0, io.EOF +} diff --git a/sda-download/internal/storage/storage_test.go b/sda-download/internal/storage/storage_test.go index 7de3720da..d99fa523b 100644 --- a/sda-download/internal/storage/storage_test.go +++ b/sda-download/internal/storage/storage_test.go @@ -2,9 +2,11 @@ package storage import ( "bytes" + "fmt" "io" "net/http/httptest" "os" + "slices" "strconv" "strings" "testing" @@ -26,6 +28,7 @@ const posixType = "posix" // s3Type is the configuration type used for posix backends const s3Type = "s3" +const s3SeekableType = "s3seekable" var testS3Conf = S3Conf{ "http://127.0.0.1", @@ -241,27 +244,29 @@ func setupFakeS3() (err error) { } func TestS3Fail(t *testing.T) { + tmp := testConf.S3.URL + defer func() { testConf.S3.URL = tmp }() - testConf.Type = s3Type + for _, s3type := range []string{s3Type, s3SeekableType} { + testConf.Type = s3type - tmp := testConf.S3.URL + testConf.S3.URL = "file://tmp/" + _, err := NewBackend(testConf) + assert.NotNil(t, err, "Backend worked when it should not") - defer func() { testConf.S3.URL = tmp }() - testConf.S3.URL = "file://tmp/" - _, err := NewBackend(testConf) - assert.NotNil(t, err, "Backend worked when it should not") + var dummyBackend *s3Backend + reader, err := dummyBackend.NewFileReader("/") + assert.NotNil(t, err, "NewFileReader worked when it should not") + assert.Nil(t, reader, "Got a Reader when expected not to") - var dummyBackend *s3Backend - reader, err := dummyBackend.NewFileReader("/") - assert.NotNil(t, err, "NewFileReader worked when it should not") - assert.Nil(t, reader, "Got a Reader when expected not to") + writer, err := dummyBackend.NewFileWriter("/") + assert.NotNil(t, err, "NewFileWriter worked when it should not") + assert.Nil(t, writer, "Got a Writer when expected not to") - writer, err := dummyBackend.NewFileWriter("/") - assert.NotNil(t, err, "NewFileWriter worked when it should not") - assert.Nil(t, writer, "Got a Writer when expected not to") + _, err = dummyBackend.GetFileSize("/") + assert.NotNil(t, err, "GetFileSize worked when it should not") - _, err = dummyBackend.GetFileSize("/") - assert.NotNil(t, err, "GetFileSize worked when it should not") + } } func TestPOSIXFail(t *testing.T) { @@ -297,34 +302,304 @@ func TestPOSIXFail(t *testing.T) { func TestS3Backend(t *testing.T) { - testConf.Type = s3Type + for _, s3testtype := range []string{s3Type, s3SeekableType} { + + testConf.Type = s3testtype + backend, err := NewBackend(testConf) + assert.Nil(t, err, "Backend failed") + + var buf bytes.Buffer + + if s3testtype == s3Type { + s3back := backend.(*s3Backend) + assert.IsType(t, s3back, &s3Backend{}, "Wrong type from NewBackend with s3") + } + + if s3testtype == s3SeekableType { + s3back := backend.(*s3SeekableBackend) + assert.IsType(t, s3back, &s3SeekableBackend{}, "Wrong type from NewBackend with seekable s3") + } + + writer, err := backend.NewFileWriter(s3Creatable) + + assert.NotNil(t, writer, "Got a nil reader for writer from s3") + assert.Nil(t, err, "posix NewFileWriter failed when it shouldn't") + + written, err := writer.Write(writeData) + + assert.Nil(t, err, "Failure when writing to s3 writer") + assert.Equal(t, len(writeData), written, "Did not write all writeData") + writer.Close() + + reader, err := backend.NewFileReader(s3Creatable) + assert.Nil(t, err, "s3 NewFileReader failed when it should work") + assert.NotNil(t, reader, "Got a nil reader for s3") + + size, err := backend.GetFileSize(s3Creatable) + assert.Nil(t, err, "s3 GetFileSize failed when it should work") + assert.Equal(t, int64(len(writeData)), size, "Got an incorrect file size") + + if reader == nil { + t.Error("reader that should be usable is not, bailing out") + + return + } + + var readBackBuffer [4096]byte + readBack, err := reader.Read(readBackBuffer[0:4096]) + + assert.Equal(t, len(writeData), readBack, "did not read back data as expected") + assert.Equal(t, writeData, readBackBuffer[:readBack], "did not read back data as expected") + + if err != nil && err != io.EOF { + assert.Nil(t, err, "unexpected error when reading back data") + } + + buf.Reset() + + log.SetOutput(&buf) + + if !testing.Short() { + _, err = backend.GetFileSize(s3DoesNotExist) + assert.NotNil(t, err, "s3 GetFileSize worked when it should not") + assert.NotZero(t, buf.Len(), "Expected warning missing") + + buf.Reset() + + reader, err = backend.NewFileReader(s3DoesNotExist) + assert.NotNil(t, err, "s3 NewFileReader worked when it should not") + assert.Nil(t, reader, "Got a non-nil reader for s3") + assert.NotZero(t, buf.Len(), "Expected warning missing") + } + + log.SetOutput(os.Stdout) + } +} + +func TestSeekableBackend(t *testing.T) { + + for _, s3testtype := range []string{posixType, s3SeekableType} { + + testConf.Type = s3testtype + + backend, err := NewBackend(testConf) + assert.Nil(t, err, "Backend failed") + + var buf bytes.Buffer + + path := fmt.Sprintf("%v.%v", s3Creatable, time.Now().UnixNano()) + + if testConf.Type == s3SeekableType { + s3back := backend.(*s3SeekableBackend) + assert.IsType(t, s3back, &s3SeekableBackend{}, "Wrong type from NewBackend with seekable s3") + } + if testConf.Type == posixType { + path, err = writeName() + posix := backend.(*posixBackend) + assert.Nil(t, err, "File creation for backend failed") + assert.IsType(t, posix, &posixBackend{}, "Wrong type from NewBackend with seekable posix") + } + + writer, err := backend.NewFileWriter(path) + + assert.NotNil(t, writer, "Got a nil reader for writer from s3") + assert.Nil(t, err, "posix NewFileWriter failed when it shouldn't") + + for i := 0; i < 1000; i++ { + written, err := writer.Write(writeData) + assert.Nil(t, err, "Failure when writing to s3 writer") + assert.Equal(t, len(writeData), written, "Did not write all writeData") + } + + writer.Close() + + reader, err := backend.NewFileReader(path) + assert.Nil(t, err, "s3 NewFileReader failed when it should work") + assert.NotNil(t, reader, "Got a nil reader for s3") + + size, err := backend.GetFileSize(path) + assert.Nil(t, err, "s3 GetFileSize failed when it should work") + assert.Equal(t, int64(len(writeData))*1000, size, "Got an incorrect file size") + + if reader == nil { + t.Error("reader that should be usable is not, bailing out") + + return + } + + var readBackBuffer [4096]byte + seeker := reader.(io.ReadSeekCloser) + + _, err = seeker.Read(readBackBuffer[0:4096]) + assert.Equal(t, writeData, readBackBuffer[:14], "did not read back data as expected") + assert.Nil(t, err, "read returned unexpected error") + + if testConf.Type == s3SeekableType { + // POSIX is more allowing + _, err := seeker.Seek(95000, io.SeekStart) + assert.NotNil(t, err, "Seek didn't fail when it should") + + _, err = seeker.Seek(-95000, io.SeekStart) + assert.NotNil(t, err, "Seek didn't fail when it should") + + _, err = seeker.Seek(-95000, io.SeekCurrent) + assert.NotNil(t, err, "Seek didn't fail when it should") + + _, err = seeker.Seek(95000, io.SeekCurrent) + assert.NotNil(t, err, "Seek didn't fail when it should") + + _, err = seeker.Seek(95000, io.SeekEnd) + assert.NotNil(t, err, "Seek didn't fail when it should") + + _, err = seeker.Seek(-95000, io.SeekEnd) + assert.NotNil(t, err, "Seek didn't fail when it should") + + _, err = seeker.Seek(0, 4) + assert.NotNil(t, err, "Seek didn't fail when it should") + + } + + offset, err := seeker.Seek(15, io.SeekStart) + assert.Nil(t, err, "Seek failed when it shouldn't") + assert.Equal(t, int64(15), offset, "Seek did not return expected offset") + + offset, err = seeker.Seek(5, io.SeekCurrent) + assert.Nil(t, err, "Seek failed when it shouldn't") + assert.Equal(t, int64(20), offset, "Seek did not return expected offset") + + offset, err = seeker.Seek(-5, io.SeekEnd) + assert.Nil(t, err, "Seek failed when it shouldn't") + assert.Equal(t, int64(13995), offset, "Seek did not return expected offset") + + n, err := seeker.Read(readBackBuffer[0:4096]) + assert.Equal(t, 5, n, "Unexpected amount of read bytes") + assert.Nil(t, err, "Read failed when it shouldn't") + + n, err = seeker.Read(readBackBuffer[0:4096]) + + assert.Equal(t, io.EOF, err, "Expected EOF") + assert.Equal(t, 0, n, "Unexpected amount of read bytes") + + offset, err = seeker.Seek(0, io.SeekEnd) + assert.Nil(t, err, "Seek failed when it shouldn't") + assert.Equal(t, int64(14000), offset, "Seek did not return expected offset") + + n, err = seeker.Read(readBackBuffer[0:4096]) + assert.Equal(t, 0, n, "Unexpected amount of read bytes") + assert.Equal(t, io.EOF, err, "Read returned unexpected error when EOF") + + offset, err = seeker.Seek(6302, io.SeekStart) + assert.Nil(t, err, "Seek failed") + assert.Equal(t, int64(6302), offset, "Seek did not return expected offset") + + n = 0 + for i := 0; i < 500000 && n == 0 && err == nil; i++ { + // Allow 0 sizes while waiting for prefetch + n, err = seeker.Read(readBackBuffer[0:4096]) + } + + assert.Equal(t, 4096, n, "Read did not return expected amounts of bytes for %v", seeker) + assert.Equal(t, writeData[2:], readBackBuffer[:12], "did not read back data as expected") + assert.Nil(t, err, "unexpected error when reading back data") + + offset, err = seeker.Seek(6302, io.SeekStart) + assert.Nil(t, err, "unexpected error when seeking to read back data") + assert.Equal(t, int64(6302), offset, "returned offset wasn't expected") + + largeBuf := make([]byte, 65536) + readLen, err := seeker.Read(largeBuf) + assert.Equal(t, 7698, readLen, "did not read back expected amount of data") + assert.Nil(t, err, "unexpected error when reading back data") + + buf.Reset() + + log.SetOutput(&buf) + + if !testing.Short() { + _, err = backend.GetFileSize(s3DoesNotExist) + assert.NotNil(t, err, "s3 GetFileSize worked when it should not") + assert.NotZero(t, buf.Len(), "Expected warning missing") + + buf.Reset() + + reader, err = backend.NewFileReader(s3DoesNotExist) + assert.NotNil(t, err, "s3 NewFileReader worked when it should not") + assert.Nil(t, reader, "Got a non-nil reader for s3") + assert.NotZero(t, buf.Len(), "Expected warning missing") + } + + log.SetOutput(os.Stdout) + } +} + +func TestS3SeekablePrefetchSize(t *testing.T) { + + testConf.Type = s3SeekableType + chunkSize := testConf.S3.Chunksize + testConf.S3.Chunksize = 5 * 1024 * 1024 backend, err := NewBackend(testConf) - assert.Nil(t, err, "Backend failed") + s3back := backend.(*s3SeekableBackend) + assert.IsType(t, s3back, &s3SeekableBackend{}, "Wrong type from NewBackend with seekable s3") + assert.Nil(t, err, "S3 backend failed") + path := fmt.Sprintf("%v.%v", s3Creatable, time.Now().UnixNano()) - s3back := backend.(*s3Backend) + writer, err := backend.NewFileWriter(path) - var buf bytes.Buffer + assert.NotNil(t, writer, "Got a nil reader for writer from s3") + assert.Nil(t, err, "posix NewFileWriter failed when it shouldn't") - assert.IsType(t, s3back, &s3Backend{}, "Wrong type from NewBackend with s3") + writer.Close() + + reader, err := backend.NewFileReader(path) + assert.Nil(t, err, "s3 NewFileReader failed when it should work") + assert.NotNil(t, reader, "Got a nil reader for s3") + + s := reader.(*s3SeekableReader) + + assert.Equal(t, int64(5*1024*1024), s.prefetchSize(), "Prefetch size not as expected with chunksize 5MB") + s.Conf.Chunksize = 0 + assert.Equal(t, int64(50*1024*1024), s.prefetchSize(), "Prefetch size not as expected") + + s.Conf.Chunksize = 1024 * 1024 + assert.Equal(t, int64(50*1024*1024), s.prefetchSize(), "Prefetch size not as expected") + + testConf.S3.Chunksize = chunkSize +} - writer, err := s3back.NewFileWriter(s3Creatable) +func TestS3SeekableSpecial(t *testing.T) { + // Some special tests here, messing with internals to expose behaviour + + testConf.Type = s3SeekableType + + backend, err := NewBackend(testConf) + assert.Nil(t, err, "Backend failed") + + path := fmt.Sprintf("%v.%v", s3Creatable, time.Now().UnixNano()) + + s3back := backend.(*s3SeekableBackend) + assert.IsType(t, s3back, &s3SeekableBackend{}, "Wrong type from NewBackend with seekable s3") + + writer, err := backend.NewFileWriter(path) assert.NotNil(t, writer, "Got a nil reader for writer from s3") assert.Nil(t, err, "posix NewFileWriter failed when it shouldn't") - written, err := writer.Write(writeData) + for i := 0; i < 1000; i++ { + written, err := writer.Write(writeData) + assert.Nil(t, err, "Failure when writing to s3 writer") + assert.Equal(t, len(writeData), written, "Did not write all writeData") + } - assert.Nil(t, err, "Failure when writing to s3 writer") - assert.Equal(t, len(writeData), written, "Did not write all writeData") writer.Close() - reader, err := s3back.NewFileReader(s3Creatable) + reader, err := backend.NewFileReader(path) + reader.(*s3SeekableReader).seeked = true + assert.Nil(t, err, "s3 NewFileReader failed when it should work") assert.NotNil(t, reader, "Got a nil reader for s3") - - size, err := s3back.GetFileSize(s3Creatable) + size, err := backend.GetFileSize(path) assert.Nil(t, err, "s3 GetFileSize failed when it should work") - assert.Equal(t, int64(len(writeData)), size, "Got an incorrect file size") + assert.Equal(t, int64(len(writeData))*1000, size, "Got an incorrect file size") if reader == nil { t.Error("reader that should be usable is not, bailing out") @@ -333,32 +608,99 @@ func TestS3Backend(t *testing.T) { } var readBackBuffer [4096]byte - readBack, err := reader.Read(readBackBuffer[0:4096]) + seeker := reader.(io.ReadSeekCloser) - assert.Equal(t, len(writeData), readBack, "did not read back data as expected") - assert.Equal(t, writeData, readBackBuffer[:readBack], "did not read back data as expected") + _, err = seeker.Read(readBackBuffer[0:4096]) + assert.Equal(t, writeData, readBackBuffer[:14], "did not read back data as expected") + assert.Nil(t, err, "read returned unexpected error") - if err != nil && err != io.EOF { - assert.Nil(t, err, "unexpected error when reading back data") - } + err = seeker.Close() + assert.Nil(t, err, "unexpected error when closing") - buf.Reset() + reader, err = backend.NewFileReader(path) + assert.Nil(t, err, "unexpected error when creating reader") - log.SetOutput(&buf) + s := reader.(*s3SeekableReader) + s.seeked = true + s.prefetchAt(0) + assert.Equal(t, 1, len(s.local), "nothing cached after prefetch") + // Clear cache + s.local = s.local[:0] - if !testing.Short() { - _, err = backend.GetFileSize(s3DoesNotExist) - assert.NotNil(t, err, "s3 GetFileSize worked when it should not") - assert.NotZero(t, buf.Len(), "Expected warning missing") + s.outstandingPrefetches = []int64{0} + t.Logf("Cache %v, outstanding %v", s.local, s.outstandingPrefetches) - buf.Reset() + n, err := s.Read(readBackBuffer[0:4096]) + assert.Nil(t, err, "read returned unexpected error") + assert.Equal(t, 0, n, "got data when we should get 0 because of prefetch") - reader, err = backend.NewFileReader(s3DoesNotExist) - assert.NotNil(t, err, "s3 NewFileReader worked when it should not") - assert.Nil(t, reader, "Got a non-nil reader for s3") - assert.NotZero(t, buf.Len(), "Expected warning missing") + for i := 0; i < 30; i++ { + s.local = append(s.local, s3CacheBlock{90000000, int64(0), nil}) } + s.prefetchAt(0) + assert.Equal(t, 8, len(s.local), "unexpected length of cache after prefetch") + + s.outstandingPrefetches = []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + s.removeFromOutstanding(9) + assert.Equal(t, s.outstandingPrefetches, []int64{0, 1, 2, 3, 4, 5, 6, 7, 8}, "unexpected outstanding prefetches after remove") + s.removeFromOutstanding(19) + assert.Equal(t, s.outstandingPrefetches, []int64{0, 1, 2, 3, 4, 5, 6, 7, 8}, "unexpected outstanding prefetches after remove") + s.removeFromOutstanding(5) + // We don't care about the internal order, sort for simplicity + slices.Sort(s.outstandingPrefetches) + assert.Equal(t, s.outstandingPrefetches, []int64{0, 1, 2, 3, 4, 6, 7, 8}, "unexpected outstanding prefetches after remove") + + s.objectReader = nil + s.Bucket = "" + s.filePath = "" + data := make([]byte, 100) + _, err = s.wholeReader(data) + assert.NotNil(t, err, "wholeReader object instantiation worked when it should have failed") +} - log.SetOutput(os.Stdout) +func TestSeekableMultiReader(t *testing.T) { + + readers := make([]io.Reader, 10) + for i := 0; i < 10; i++ { + readers[i] = bytes.NewReader(writeData) + } + + seeker, err := SeekableMultiReader(readers...) + assert.Nil(t, err, "unexpected error from creating SeekableMultiReader") + + var readBackBuffer [4096]byte + + _, err = seeker.Read(readBackBuffer[0:4096]) + assert.Equal(t, writeData, readBackBuffer[:14], "did not read back data as expected") + assert.Nil(t, err, "unexpected error from read") + + offset, err := seeker.Seek(60, io.SeekStart) + assert.Nil(t, err, "Seek failed") + assert.Equal(t, int64(60), offset, "Seek did not return expected offset") + + // We don't know how many bytes this should return + _, err = seeker.Read(readBackBuffer[0:4096]) + assert.Equal(t, writeData[4:], readBackBuffer[:10], "did not read back data as expected") + assert.Nil(t, err, "Read failed when it should not") + + offset, err = seeker.Seek(0, io.SeekEnd) + assert.Equal(t, int64(140), offset, "Seek did not return expected offset") + assert.Nil(t, err, "Seek failed when it should not") + + n, err := seeker.Read(readBackBuffer[0:4096]) + + assert.Equal(t, 0, n, "Read did not return expected amounts of bytes") + assert.Equal(t, io.EOF, err, "did not get EOF as expected") + + offset, err = seeker.Seek(56, io.SeekStart) + assert.Equal(t, int64(56), offset, "Seek did not return expected offset") + assert.Nil(t, err, "Seek failed unexpectedly") + + largeBuf := make([]byte, 65536) + readLen, err := seeker.Read(largeBuf) + assert.Nil(t, err, "unexpected error when reading back data") + assert.Equal(t, 14, readLen, "did not read back expect amount of data") + + log.SetOutput(os.Stdout) }