From 2320f97907f97fe59ab39656676ce317261852be Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Tue, 7 Feb 2023 09:45:06 +0000 Subject: [PATCH] Add support for S3 streaming downloads --- lib/bucket_store/s3.rb | 42 +++++++++++++++++++++++++++ spec/bucket_store_integration_spec.rb | 2 +- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/lib/bucket_store/s3.rb b/lib/bucket_store/s3.rb index 393129a..59ec91d 100644 --- a/lib/bucket_store/s3.rb +++ b/lib/bucket_store/s3.rb @@ -8,6 +8,8 @@ module BucketStore class S3 DEFAULT_TIMEOUT_SECONDS = 30 + DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb + def self.build(open_timeout_seconds = DEFAULT_TIMEOUT_SECONDS, read_timeout_seconds = DEFAULT_TIMEOUT_SECONDS) S3.new(open_timeout_seconds, read_timeout_seconds) @@ -46,6 +48,46 @@ def download(bucket:, key:) } end + def stream_download(bucket:, key:, chunk_size: nil) + chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES + + metadata = { + bucket: bucket, + key: key, + }.freeze + + obj_size = storage.head_object(bucket: bucket, key: key)&.content_length || 0 + + Enumerator.new do |yielder| + start = 0 + loop do + stop = [start + chunk_size, obj_size].min + break if stop.zero? || start >= stop + + # S3 only supports streaming writes to an IO object (e.g. a file or StringIO) + # We simulate an enumerator-based streaming approach by using partial range + # downloads instead. There's no helper methods for range downloads in the Ruby + # SDK, so we have to build our own range query. + # Range is specified in the same format as the HTTP range header (see + # https://www.rfc-editor.org/rfc/rfc9110.html#name-range) + obj = storage.get_object( + bucket: bucket, + key: key, + range: "bytes=#{start}-#{stop}", + ) + + # rubocop:disable Style/ZeroLengthPredicate + # StringIO does not define the `.empty?` method that rubocop is so keen on using + body = obj&.body&.read + break if body.nil? || body.size.zero? + # rubocop:enable Style/ZeroLengthPredicate + + yielder.yield([metadata, body]) + start += body.size + end + end + end + def list(bucket:, key:, page_size:) Enumerator.new do |yielder| page = storage.list_objects_v2(bucket: bucket, prefix: key, max_keys: page_size) diff --git a/spec/bucket_store_integration_spec.rb b/spec/bucket_store_integration_spec.rb index 83d5d76..78d5193 100644 --- a/spec/bucket_store_integration_spec.rb +++ b/spec/bucket_store_integration_spec.rb @@ -74,7 +74,7 @@ end # Delete all the files, the bucket should be empty afterwards - file_list.map { |filename| "#{base_bucket_uri}/prefix/#{filename}" }.each do |key| + described_class.for(base_bucket_uri.to_s).list.each do |key| described_class.for(key).delete! end expect(described_class.for(base_bucket_uri.to_s).list.to_a.size).to eq(0)