From f6370053740dc2c8f6578310a9ce3f92cf44581f Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Tue, 11 Apr 2023 16:39:37 +0100 Subject: [PATCH 01/10] Expose minio admin console --- docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.yml b/docker-compose.yml index eed352c..dbb3d0d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,7 @@ services: MINIO_REGION_NAME: us-east-1 ports: - "9000:9000" + - "9001:9001" gcp-simulator: image: fsouza/fake-gcs-server hostname: gcp From ef7856c66c65f46f752a1141bd0a3afb2590b0ab Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Tue, 11 Apr 2023 17:29:03 +0100 Subject: [PATCH 02/10] Delete all files in the bucket before starting Make sure integration tests always start with a clean slate, regardless of how the previous test run has ended. Note that this can also fail, but it's a good test in itself... --- spec/bucket_store_integration_spec.rb | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spec/bucket_store_integration_spec.rb b/spec/bucket_store_integration_spec.rb index c808615..c0c206c 100644 --- a/spec/bucket_store_integration_spec.rb +++ b/spec/bucket_store_integration_spec.rb @@ -24,8 +24,11 @@ end shared_examples "adapter integration" do |base_bucket_uri| - # This is presented as a single idempotent test as otherwise resetting state between execution - # makes things very complicated with no huge benefits. + before do + described_class.for(base_bucket_uri).list.each do |path| + described_class.for(path).delete! + end + end it "has a consistent interface" do # Write 201 files From 71452dcc72b4a27ecd9f78f11f6645b366697578 Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Thu, 2 Feb 2023 15:44:31 +0000 Subject: [PATCH 03/10] Add streaming support for downloads This introduces a new `.stream` interface for upload and download operations. Currently only downloads are supported, and as part of this commit there's only a reference implementation for the in-memory adapter. Requirements considered: - being able to easily access the content of the files in memory as we download it: this is so we can do processing of the content as the file gets downloaded - keep the interface as similar as possible to the existing (not stream-based) download/upload: this is to reduce cognitive load on the library, as the main change between stream/not-stream is to add the .stream keyword and iterate on blocks. --- lib/bucket_store/in_memory.rb | 21 +++++++++ lib/bucket_store/key_storage.rb | 62 +++++++++++++++++++++++++++ spec/bucket_store/key_storage_spec.rb | 27 ++++++++++++ 3 files changed, 110 insertions(+) diff --git a/lib/bucket_store/in_memory.rb b/lib/bucket_store/in_memory.rb index 6dfccc9..0fa2581 100644 --- a/lib/bucket_store/in_memory.rb +++ b/lib/bucket_store/in_memory.rb @@ -2,6 +2,8 @@ module BucketStore class InMemory + DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb + def self.build InMemory.instance end @@ -41,6 +43,25 @@ def download(bucket:, key:) } end + def stream_download(bucket:, key:, chunk_size: nil) + chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES + + content_stream = StringIO.new(@buckets[bucket].fetch(key)) + metadata = { + bucket: bucket, + key: key, + }.freeze + + Enumerator.new do |yielder| + loop do + v = content_stream.read(chunk_size) + break if v.nil? + + yielder.yield([metadata, v]) + end + end + end + def list(bucket:, key:, page_size:) @buckets[bucket].keys. select { |k| k.start_with?(key) }. diff --git a/lib/bucket_store/key_storage.rb b/lib/bucket_store/key_storage.rb index 4287d95..f3e7acc 100644 --- a/lib/bucket_store/key_storage.rb +++ b/lib/bucket_store/key_storage.rb @@ -15,6 +15,59 @@ class KeyStorage disk: Disk, }.freeze + # Defines a streaming interface for download and upload operations. + # + # Note that individual adapters may require additional configuration for the correct + # behavior of the streaming interface. + class KeyStreamer + attr_reader :bucket, :key, :adapter + + def initialize(adapter:, bucket:, key:) + @adapter = adapter + @bucket = bucket + @key = key + end + + # Streams the content of the reference key + # + # @param [optional, Integer] chunk_size The maximum size of individual chunks. + # Note that adapters will only return at most the given size, but could + # return a smaller chunk when needed. + # + # @return [Enumerator] + # An enumerator where each item is a hash that includes a chunk of the downloaded result. + # The format of the hash returned on each iteration is compatible with what is returned by + # the non-streaming version of the `download` method, however the content of each item is + # limited in size. + # + # @see KeyStorage#download + # @example Download a key + # BucketStore.for("inmemory://bucket/file.xml").stream.download + def download(chunk_size: nil) + if !chunk_size.nil? && chunk_size <= 0 + raise ArgumentError, "Chunk size must be > 0 when specified" + end + + BucketStore.logger.info(event: "key_storage.stream.download_started") + + start = BucketStore::Timing.monotonic_now + result = adapter.stream_download( + bucket: bucket, + key: key, + chunk_size: chunk_size, + ) + + BucketStore.logger.info(event: "key_storage.stream.download_prepared", + duration: BucketStore::Timing.monotonic_now - start) + + result + end + + def upload + raise NotImplementedError + end + end + attr_reader :bucket, :key, :adapter_type def initialize(adapter:, bucket:, key:) @@ -53,6 +106,15 @@ def download result end + # Returns an interface for streaming operations + # + # @return [KeyStreamer] An interface for streaming operations + def stream + raise ArgumentError, "Key cannot be empty" if key.empty? + + KeyStreamer.new(adapter: adapter, bucket: bucket, key: key) + end + # Uploads the given content to the reference key location. # # If the `key` already exists, its content will be replaced by the one in input. diff --git a/spec/bucket_store/key_storage_spec.rb b/spec/bucket_store/key_storage_spec.rb index ef3c993..480dd1f 100644 --- a/spec/bucket_store/key_storage_spec.rb +++ b/spec/bucket_store/key_storage_spec.rb @@ -154,4 +154,31 @@ def build_for(key) expect(build_for("inmemory://bucket/prefix/a").exists?).to be false end end + + describe "#stream" do + let!(:large_file_content) { "Z" * 1024 * 1024 * 10 } # 10Mb + + before do + build_for("inmemory://bucket/small").upload!("hello world") + build_for("inmemory://bucket/large").upload!(large_file_content) + end + + describe "#download" do + it "returns a single chunk for small files" do + expect(build_for("inmemory://bucket/small").stream.download).to contain_exactly([ + { bucket: "bucket", key: "small" }, an_instance_of(String) + ]) + end + + it "returns the file content in chunks for larger files" do + rebuilt = + build_for("inmemory://bucket/large").stream.download.map do |metadata, chunk| + expect(metadata).to eq({ bucket: "bucket", key: "large" }) + chunk + end.join + + expect(rebuilt).to eq(large_file_content) + end + end + end end From 4bb03fe630c9585a6694e098116552fa7b65c959 Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Tue, 7 Feb 2023 14:37:39 +0000 Subject: [PATCH 04/10] Implement stream downloads for the disk adapter Adds support for streaming reads/downloads in the disk adapter. --- lib/bucket_store/disk.rb | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/lib/bucket_store/disk.rb b/lib/bucket_store/disk.rb index 28b183a..f13312b 100644 --- a/lib/bucket_store/disk.rb +++ b/lib/bucket_store/disk.rb @@ -4,6 +4,8 @@ module BucketStore class Disk + DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb + def self.build(base_dir = ENV["DISK_ADAPTER_BASE_DIR"]) base_dir ||= Dir.tmpdir Disk.new(base_dir) @@ -33,6 +35,25 @@ def download(bucket:, key:) end end + def stream_download(bucket:, key:, chunk_size: nil) + chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES + + fd = File.open(key_path(bucket, key), "r") + metadata = { + bucket: bucket, + key: key, + }.freeze + + Enumerator.new do |yielder| + loop do + v = fd.gets(chunk_size) + break if v.nil? + + yielder.yield([metadata, v]) + end + end + end + def list(bucket:, key:, page_size:) root = Pathname.new(bucket_root(bucket)) From f7517225108ee6fb306ebc51733c1d9e1348af89 Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Fri, 3 Feb 2023 16:24:36 +0000 Subject: [PATCH 05/10] Implement stream downloads for the GCS adapter Adds support for streaming reads/downloads in the GCS adapter. Note that GCS does _not_ natively support streaming downloads, so we emulate streaming using partial range downloads. --- lib/bucket_store/gcs.rb | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/lib/bucket_store/gcs.rb b/lib/bucket_store/gcs.rb index 802345e..ba1436c 100644 --- a/lib/bucket_store/gcs.rb +++ b/lib/bucket_store/gcs.rb @@ -9,6 +9,8 @@ module BucketStore class Gcs DEFAULT_TIMEOUT_SECONDS = 30 + DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb + def self.build(timeout_seconds = DEFAULT_TIMEOUT_SECONDS) Gcs.new(timeout_seconds) end @@ -56,6 +58,40 @@ def download(bucket:, key:) } end + def stream_download(bucket:, key:, chunk_size: nil) + chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES + + metadata = { + bucket: bucket, + key: key, + }.freeze + + file = get_bucket(bucket).file(key) + obj_size = file.size + + Enumerator.new do |yielder| + start = 0 + while start < obj_size + stop = [start + chunk_size, obj_size].min + + # We simulate an enumerator-based streaming approach by using partial range + # downloads as there's no direct support for streaming downloads. The returned + # object is a StringIO, so we must `.rewind` before we can access it. + obj_io = file.download(range: start...stop) + obj_io&.rewind + + # rubocop:disable Style/ZeroLengthPredicate + # StringIO does not define the `.empty?` method that rubocop is so keen on using + body = obj_io&.read + start += body.size + break if body.nil? || body.size.zero? + # rubocop:enable Style/ZeroLengthPredicate + + yielder.yield([metadata, body]) + end + end + end + def list(bucket:, key:, page_size:) Enumerator.new do |yielder| token = nil From 98430b7b609bd498439e1811e27fab3c7a22076d Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Tue, 7 Feb 2023 09:45:06 +0000 Subject: [PATCH 06/10] Implement stream downloads for the S3 adapter S3's Ruby SDK supports streaming downloads in two ways: - by streaming directly into a file-like object (e.g. `StringIO`). However due to how this is implemented, we can't access the content of the file in-memory as it gets downloaded. This is problematic as it's one of our requirements, since we may want to do file processing as we download it. - by using block-based iteration. This has the big drawback that it will _not_ retry failed requests after the first chunk of data has been yielded, which could lead to file corruption on the client end by starting over emid-stream. (https://aws.amazon.com/blogs/developer/downloading-objects-from-amazon-s3-using-the-aws-sdk-for-ruby/ for more details) Therefore streaming downloads are implemented similarly to the GCS adapter by leveraging partial range downloads. This adds an extra HTTP call to AWS to obtain the overall file size, but is a more robust solution as it both supports retries of individual chunks, but also allows us to inspect the content of the file as we download it. --- lib/bucket_store/s3.rb | 46 +++++++++++++++++++++++++++ spec/bucket_store_integration_spec.rb | 2 +- 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/lib/bucket_store/s3.rb b/lib/bucket_store/s3.rb index 393129a..ffa24a2 100644 --- a/lib/bucket_store/s3.rb +++ b/lib/bucket_store/s3.rb @@ -8,6 +8,8 @@ module BucketStore class S3 DEFAULT_TIMEOUT_SECONDS = 30 + DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb + def self.build(open_timeout_seconds = DEFAULT_TIMEOUT_SECONDS, read_timeout_seconds = DEFAULT_TIMEOUT_SECONDS) S3.new(open_timeout_seconds, read_timeout_seconds) @@ -46,6 +48,50 @@ def download(bucket:, key:) } end + def stream_download(bucket:, key:, chunk_size: nil) + chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES + + metadata = { + bucket: bucket, + key: key, + }.freeze + + obj_size = storage.head_object(bucket: bucket, key: key)&.content_length || 0 + + Enumerator.new do |yielder| + start = 0 + while start < obj_size + stop = [start + chunk_size - 1, obj_size].min + + # S3 only supports streaming writes to an IO object (e.g. a file or StringIO), + # but that means we can't access the content of the downloaded chunk in-memory. + # Additionally, the block-based support in the sdk also doesn't support retries, + # which could lead to file corruption. + # (see https://aws.amazon.com/blogs/developer/downloading-objects-from-amazon-s3-using-the-aws-sdk-for-ruby/) + # + # We simulate an enumerator-based streaming approach by using partial range + # downloads instead. There's no helper methods for range downloads in the Ruby + # SDK, so we have to build our own range query. + # Range is specified in the same format as the HTTP range header (see + # https://www.rfc-editor.org/rfc/rfc9110.html#name-range) + obj = storage.get_object( + bucket: bucket, + key: key, + range: "bytes=#{start}-#{stop}", + ) + + # rubocop:disable Style/ZeroLengthPredicate + # StringIO does not define the `.empty?` method that rubocop is so keen on using + body = obj&.body&.read + start += body.size + break if body.nil? || body.size.zero? + # rubocop:enable Style/ZeroLengthPredicate + + yielder.yield([metadata, body]) + end + end + end + def list(bucket:, key:, page_size:) Enumerator.new do |yielder| page = storage.list_objects_v2(bucket: bucket, prefix: key, max_keys: page_size) diff --git a/spec/bucket_store_integration_spec.rb b/spec/bucket_store_integration_spec.rb index c0c206c..b1d0d8f 100644 --- a/spec/bucket_store_integration_spec.rb +++ b/spec/bucket_store_integration_spec.rb @@ -68,7 +68,7 @@ end # Delete all the files, the bucket should be empty afterwards - file_list.map { |filename| "#{base_bucket_uri}/prefix/#{filename}" }.each do |key| + described_class.for(base_bucket_uri.to_s).list.each do |key| described_class.for(key).delete! end expect(described_class.for(base_bucket_uri.to_s).list.to_a.size).to eq(0) From c4a701f14228a5e85945b9acb2121785737fe1d8 Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Wed, 12 Apr 2023 10:17:42 +0100 Subject: [PATCH 07/10] Include the base bucket uri in the context name --- spec/bucket_store_integration_spec.rb | 92 ++++++++++++++------------- 1 file changed, 47 insertions(+), 45 deletions(-) diff --git a/spec/bucket_store_integration_spec.rb b/spec/bucket_store_integration_spec.rb index b1d0d8f..0adfbaf 100644 --- a/spec/bucket_store_integration_spec.rb +++ b/spec/bucket_store_integration_spec.rb @@ -24,54 +24,56 @@ end shared_examples "adapter integration" do |base_bucket_uri| - before do - described_class.for(base_bucket_uri).list.each do |path| - described_class.for(path).delete! - end - end - - it "has a consistent interface" do - # Write 201 files - file_list = [] - 201.times do |i| - filename = "file#{(i + 1).to_s.rjust(3, '0')}.txt" - file_list << filename - - # the body of the file is the filename itself - described_class.for("#{base_bucket_uri}/prefix/#{filename}").upload!(filename) - end - - # Add some files with spaces - described_class.for("#{base_bucket_uri}/prefix/i have a space.txt"). - upload!("i have a space.txt") - described_class.for("#{base_bucket_uri}/prefix/another space.txt"). - upload!("another space.txt") - - file_list << "i have a space.txt" - file_list << "another space.txt" - - # List with prefix should only return the matching files - expect(described_class.for("#{base_bucket_uri}/prefix/file1").list.to_a.size).to eq(100) - expect(described_class.for("#{base_bucket_uri}/prefix/file2").list.to_a.size).to eq(2) - expect(described_class.for("#{base_bucket_uri}/prefix/").list.to_a.size).to eq(203) - - # List (without prefixes) should return everything - expect(described_class.for(base_bucket_uri.to_s).list.to_a). - to match_array(file_list.map { |filename| "#{base_bucket_uri}/prefix/#{filename}" }) - - # We know the content of the file, we can check `.download` returns it as expected - all_files = file_list.map do |filename| - [filename, "#{base_bucket_uri}/prefix/#{filename}"] - end - all_files.each do |content, key| - expect(described_class.for(key).download[:content]).to eq(content) + context "using #{base_bucket_uri}" do + before do + described_class.for(base_bucket_uri).list.each do |path| + described_class.for(path).delete! + end end - # Delete all the files, the bucket should be empty afterwards - described_class.for(base_bucket_uri.to_s).list.each do |key| - described_class.for(key).delete! + it "has a consistent interface" do + # Write 201 files + file_list = [] + 201.times do |i| + filename = "file#{(i + 1).to_s.rjust(3, '0')}.txt" + file_list << filename + + # the body of the file is the filename itself + described_class.for("#{base_bucket_uri}/prefix/#{filename}").upload!(filename) + end + + # Add some files with spaces + described_class.for("#{base_bucket_uri}/prefix/i have a space.txt"). + upload!("i have a space.txt") + described_class.for("#{base_bucket_uri}/prefix/another space.txt"). + upload!("another space.txt") + + file_list << "i have a space.txt" + file_list << "another space.txt" + + # List with prefix should only return the matching files + expect(described_class.for("#{base_bucket_uri}/prefix/file1").list.to_a.size).to eq(100) + expect(described_class.for("#{base_bucket_uri}/prefix/file2").list.to_a.size).to eq(2) + expect(described_class.for("#{base_bucket_uri}/prefix/").list.to_a.size).to eq(203) + + # List (without prefixes) should return everything + expect(described_class.for(base_bucket_uri.to_s).list.to_a). + to match_array(file_list.map { |filename| "#{base_bucket_uri}/prefix/#{filename}" }) + + # We know the content of the file, we can check `.download` returns it as expected + all_files = file_list.map do |filename| + [filename, "#{base_bucket_uri}/prefix/#{filename}"] + end + all_files.each do |content, key| + expect(described_class.for(key).download[:content]).to eq(content) + end + + # Delete all the files, the bucket should be empty afterwards + described_class.for(base_bucket_uri.to_s).list.each do |key| + described_class.for(key).delete! + end + expect(described_class.for(base_bucket_uri.to_s).list.to_a.size).to eq(0) end - expect(described_class.for(base_bucket_uri.to_s).list.to_a.size).to eq(0) end end From bffaa4a5fb1900f4e5c7996ddaab0351179789ba Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Wed, 12 Apr 2023 09:13:56 +0100 Subject: [PATCH 08/10] Add tests for large file downloads via the streaming interface --- spec/bucket_store_integration_spec.rb | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/spec/bucket_store_integration_spec.rb b/spec/bucket_store_integration_spec.rb index 0adfbaf..2269a5b 100644 --- a/spec/bucket_store_integration_spec.rb +++ b/spec/bucket_store_integration_spec.rb @@ -31,6 +31,10 @@ end end + it "returns an empty bucket when no files are uploaded" do + expect(described_class.for(base_bucket_uri.to_s).list.to_a.size).to eq(0) + end + it "has a consistent interface" do # Write 201 files file_list = [] @@ -74,10 +78,30 @@ end expect(described_class.for(base_bucket_uri.to_s).list.to_a.size).to eq(0) end + + context "using the streaming interface" do + it "supports large file downloads" do + # Upload a large file + large_file_content = "Z" * 1024 * 1024 * 10 # 10Mb + described_class. + for("#{base_bucket_uri}/large.txt"). + upload!(large_file_content) + + # Streaming downloads should return a chunked response + rebuilt_large_file = + described_class.for("#{base_bucket_uri}/large.txt"). + stream. + download. + map { |_meta, chunk| chunk }. + join + + expect(rebuilt_large_file.size).to eq(large_file_content.size) + expect(rebuilt_large_file).to eq(large_file_content) + end + end end end - # We don't test GCS as there's no sensible way of running a local simulator include_examples "adapter integration", "inmemory://bucket" include_examples "adapter integration", "disk://bucket" include_examples "adapter integration", "s3://bucket" From 87e46a20581badd7563c1f7bc2e4e553fe00e7e7 Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Wed, 12 Apr 2023 10:14:35 +0100 Subject: [PATCH 09/10] Add tests for small chunk downloads via the streaming interface This also tests that it's possible to customise the chunk size on individual adapters. --- spec/bucket_store_integration_spec.rb | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/spec/bucket_store_integration_spec.rb b/spec/bucket_store_integration_spec.rb index 2269a5b..428ddf7 100644 --- a/spec/bucket_store_integration_spec.rb +++ b/spec/bucket_store_integration_spec.rb @@ -98,6 +98,22 @@ expect(rebuilt_large_file.size).to eq(large_file_content.size) expect(rebuilt_large_file).to eq(large_file_content) end + + it "allows downloads of individual small chunks" do + described_class. + for("#{base_bucket_uri}/large.txt"). + upload!("1234567890") + + chunks = described_class.for("#{base_bucket_uri}/large.txt"). + stream. + download(chunk_size: 1). + to_a + + expect(chunks.size).to eq(10) + expect(chunks.map { |_meta, chunk| chunk }).to match_array( + %w[1 2 3 4 5 6 7 8 9 0], + ) + end end end end From 6109311f11151ba5b7c9e955ef3f9ff0c7ed0459 Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Wed, 12 Apr 2023 10:48:25 +0100 Subject: [PATCH 10/10] Update changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 337b458..1df43cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +v0.6.0 +------ +- [BREAKING] Drop support for Ruby 2.6 +- Introduce support for streaming downloads + + v0.5.0 ------ - Support escaping of URI keys. Fixes #46.