From 96c4ed24c1a9c7637a0ae5cd19f720c81d4f04c8 Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Fri, 3 Feb 2023 16:24:36 +0000 Subject: [PATCH] Implement stream downloads for the GCS adapter Adds support for streaming reads/downloads in the GCS adapter. Note that GCS does _not_ natively support streaming downloads, so we emulate streaming using partial range downloads. --- lib/bucket_store/gcs.rb | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/lib/bucket_store/gcs.rb b/lib/bucket_store/gcs.rb index 802345e..7e253b9 100644 --- a/lib/bucket_store/gcs.rb +++ b/lib/bucket_store/gcs.rb @@ -9,6 +9,8 @@ module BucketStore class Gcs DEFAULT_TIMEOUT_SECONDS = 30 + DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb + def self.build(timeout_seconds = DEFAULT_TIMEOUT_SECONDS) Gcs.new(timeout_seconds) end @@ -56,6 +58,41 @@ def download(bucket:, key:) } end + def stream_download(bucket:, key:, chunk_size: nil) + chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES + + file = get_bucket(bucket).file(key) + metadata = { + bucket: bucket, + key: key, + }.freeze + + obj_size = file.size + + Enumerator.new do |yielder| + start = 0 + loop do + stop = [start + chunk_size, obj_size].min + break if stop.zero? || start >= stop + + # We simulate an enumerator-based streaming approach by using partial range + # downloads as there's no direct support for streaming downloads. The returned + # object is a StringIO, so we must `.rewind` before we can access it. + obj_io = file.download(range: start...stop) + obj_io&.rewind + + # rubocop:disable Style/ZeroLengthPredicate + # StringIO does not define the `.empty?` method that rubocop is so keen on using + body = obj_io&.read + break if body.nil? || body.size.zero? + # rubocop:enable Style/ZeroLengthPredicate + + yielder.yield([metadata, body]) + start += body.size + end + end + end + def list(bucket:, key:, page_size:) Enumerator.new do |yielder| token = nil