Skip to content

Commit

Permalink
Implement stream downloads for the S3 adapter
Browse files Browse the repository at this point in the history
S3's Ruby SDK supports streaming downloads in two ways:
- by streaming directly into a file-like object (e.g. `StringIO`).
  However due to how this is implemented, we can't access the content
  of the file in-memory as it gets downloaded. This is problematic as
  it's one of our requirements, since we may want to do file processing
  as we download it.
- by using block-based iteration. This has the big drawback that it will
  _not_ retry failed requests after the first chunk of data has been
  yielded, which could lead to file corruption on the client end by
  starting over emid-stream.

(https://aws.amazon.com/blogs/developer/downloading-objects-from-amazon-s3-using-the-aws-sdk-for-ruby/
for more details)

Therefore streaming downloads are implemented similarly to the GCS
adapter by leveraging partial range downloads. This adds an extra HTTP
call to AWS to obtain the overall file size, but is a more robust
solution as it both supports retries of individual chunks, but also
allows us to inspect the content of the file as we download it.
  • Loading branch information
ivgiuliani committed Apr 12, 2023
1 parent 96c4ed2 commit 3b1aae0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
41 changes: 41 additions & 0 deletions lib/bucket_store/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ module BucketStore
class S3
DEFAULT_TIMEOUT_SECONDS = 30

DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb

def self.build(open_timeout_seconds = DEFAULT_TIMEOUT_SECONDS,
read_timeout_seconds = DEFAULT_TIMEOUT_SECONDS)
S3.new(open_timeout_seconds, read_timeout_seconds)
Expand Down Expand Up @@ -46,6 +48,45 @@ def download(bucket:, key:)
}
end

def stream_download(bucket:, key:, chunk_size: nil)
chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES

metadata = {
bucket: bucket,
key: key,
}.freeze

obj_size = storage.head_object(bucket: bucket, key: key)&.content_length || 0

Enumerator.new do |yielder|
start = 0
while start < obj_size
stop = [start + chunk_size - 1, obj_size].min

# S3 only supports streaming writes to an IO object (e.g. a file or StringIO)
# We simulate an enumerator-based streaming approach by using partial range
# downloads instead. There's no helper methods for range downloads in the Ruby
# SDK, so we have to build our own range query.
# Range is specified in the same format as the HTTP range header (see
# https://www.rfc-editor.org/rfc/rfc9110.html#name-range)
obj = storage.get_object(
bucket: bucket,
key: key,
range: "bytes=#{start}-#{stop}",
)

# rubocop:disable Style/ZeroLengthPredicate
# StringIO does not define the `.empty?` method that rubocop is so keen on using
body = obj&.body&.read
break if body.nil? || body.size.zero?
# rubocop:enable Style/ZeroLengthPredicate

yielder.yield([metadata, body])
start += body.size
end
end
end

def list(bucket:, key:, page_size:)
Enumerator.new do |yielder|
page = storage.list_objects_v2(bucket: bucket, prefix: key, max_keys: page_size)
Expand Down
2 changes: 1 addition & 1 deletion spec/bucket_store_integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
end

# Delete all the files, the bucket should be empty afterwards
file_list.map { |filename| "#{base_bucket_uri}/prefix/#{filename}" }.each do |key|
described_class.for(base_bucket_uri.to_s).list.each do |key|
described_class.for(key).delete!
end
expect(described_class.for(base_bucket_uri.to_s).list.to_a.size).to eq(0)
Expand Down

0 comments on commit 3b1aae0

Please sign in to comment.