From 3b1aae02c0145e0f2bf6711087941082edd00da1 Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Tue, 7 Feb 2023 09:45:06 +0000 Subject: [PATCH] Implement stream downloads for the S3 adapter S3's Ruby SDK supports streaming downloads in two ways: - by streaming directly into a file-like object (e.g. `StringIO`). However due to how this is implemented, we can't access the content of the file in-memory as it gets downloaded. This is problematic as it's one of our requirements, since we may want to do file processing as we download it. - by using block-based iteration. This has the big drawback that it will _not_ retry failed requests after the first chunk of data has been yielded, which could lead to file corruption on the client end by starting over emid-stream. (https://aws.amazon.com/blogs/developer/downloading-objects-from-amazon-s3-using-the-aws-sdk-for-ruby/ for more details) Therefore streaming downloads are implemented similarly to the GCS adapter by leveraging partial range downloads. This adds an extra HTTP call to AWS to obtain the overall file size, but is a more robust solution as it both supports retries of individual chunks, but also allows us to inspect the content of the file as we download it. --- lib/bucket_store/s3.rb | 41 +++++++++++++++++++++++++++ spec/bucket_store_integration_spec.rb | 2 +- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/lib/bucket_store/s3.rb b/lib/bucket_store/s3.rb index 393129a..4af96b5 100644 --- a/lib/bucket_store/s3.rb +++ b/lib/bucket_store/s3.rb @@ -8,6 +8,8 @@ module BucketStore class S3 DEFAULT_TIMEOUT_SECONDS = 30 + DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb + def self.build(open_timeout_seconds = DEFAULT_TIMEOUT_SECONDS, read_timeout_seconds = DEFAULT_TIMEOUT_SECONDS) S3.new(open_timeout_seconds, read_timeout_seconds) @@ -46,6 +48,45 @@ def download(bucket:, key:) } end + def stream_download(bucket:, key:, chunk_size: nil) + chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES + + metadata = { + bucket: bucket, + key: key, + }.freeze + + obj_size = storage.head_object(bucket: bucket, key: key)&.content_length || 0 + + Enumerator.new do |yielder| + start = 0 + while start < obj_size + stop = [start + chunk_size - 1, obj_size].min + + # S3 only supports streaming writes to an IO object (e.g. a file or StringIO) + # We simulate an enumerator-based streaming approach by using partial range + # downloads instead. There's no helper methods for range downloads in the Ruby + # SDK, so we have to build our own range query. + # Range is specified in the same format as the HTTP range header (see + # https://www.rfc-editor.org/rfc/rfc9110.html#name-range) + obj = storage.get_object( + bucket: bucket, + key: key, + range: "bytes=#{start}-#{stop}", + ) + + # rubocop:disable Style/ZeroLengthPredicate + # StringIO does not define the `.empty?` method that rubocop is so keen on using + body = obj&.body&.read + break if body.nil? || body.size.zero? + # rubocop:enable Style/ZeroLengthPredicate + + yielder.yield([metadata, body]) + start += body.size + end + end + end + def list(bucket:, key:, page_size:) Enumerator.new do |yielder| page = storage.list_objects_v2(bucket: bucket, prefix: key, max_keys: page_size) diff --git a/spec/bucket_store_integration_spec.rb b/spec/bucket_store_integration_spec.rb index c0c206c..b1d0d8f 100644 --- a/spec/bucket_store_integration_spec.rb +++ b/spec/bucket_store_integration_spec.rb @@ -68,7 +68,7 @@ end # Delete all the files, the bucket should be empty afterwards - file_list.map { |filename| "#{base_bucket_uri}/prefix/#{filename}" }.each do |key| + described_class.for(base_bucket_uri.to_s).list.each do |key| described_class.for(key).delete! end expect(described_class.for(base_bucket_uri.to_s).list.to_a.size).to eq(0)