Skip to content

Commit

Permalink
Add basic streaming support for downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
ivgiuliani committed Apr 12, 2023
1 parent 13e0c83 commit 055d6b0
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 0 deletions.
21 changes: 21 additions & 0 deletions lib/bucket_store/in_memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

module BucketStore
class InMemory
DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb

def self.build
InMemory.instance
end
Expand Down Expand Up @@ -41,6 +43,25 @@ def download(bucket:, key:)
}
end

def stream_download(bucket:, key:, chunk_size: nil)
chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES

content_stream = StringIO.new(@buckets[bucket].fetch(key))
metadata = {
bucket: bucket,
key: key,
}.freeze

Enumerator.new do |yielder|
loop do
v = content_stream.read(chunk_size)
break if v.nil?

yielder.yield([metadata, v])
end
end
end

def list(bucket:, key:, page_size:)
@buckets[bucket].keys.
select { |k| k.start_with?(key) }.
Expand Down
51 changes: 51 additions & 0 deletions lib/bucket_store/key_storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,48 @@ class KeyStorage
disk: Disk,
}.freeze

# Defines a streaming interface for download and upload operations.
#
# Note that individual adapters may require additional configuration for the correct
# behavior of the streaming interface.
class KeyStreamer
attr_reader :bucket, :key, :adapter, :chunk_size

def initialize(adapter:, bucket:, key:, chunk_size: nil)
@adapter = adapter
@bucket = bucket
@key = key
@chunk_size = chunk_size
end

# Streams the content of the reference key
#
# @return [Enumerator]
# An enumerator where each item is a hash that includes a chunk of the downloaded result.
# The format of the hash returned on each iteration is compatible with what is returned by
# the non-streaming version of the `download` method, however the content of each item is
# limited in size.
#
# @see KeyStorage#download
# @example Download a key
# BucketStore.for("inmemory://bucket/file.xml").stream.download
def download
BucketStore.logger.info(event: "key_storage.stream.download_started")

start = BucketStore::Timing.monotonic_now
result = adapter.stream_download(bucket: bucket, key: key, chunk_size: chunk_size)

BucketStore.logger.info(event: "key_storage.stream.download_prepared",
duration: BucketStore::Timing.monotonic_now - start)

result
end

def upload
raise NotImplementedError
end
end

attr_reader :bucket, :key, :adapter_type

def initialize(adapter:, bucket:, key:)
Expand Down Expand Up @@ -53,6 +95,15 @@ def download
result
end

# Returns an interface for streaming operations
#
# @return [KeyStreamer] An interface for streaming operations
def stream(chunk_size: nil)
raise ArgumentError, "Key cannot be empty" if key.empty?

KeyStreamer.new(adapter: adapter, bucket: bucket, key: key, chunk_size: chunk_size)
end

# Uploads the given content to the reference key location.
#
# If the `key` already exists, its content will be replaced by the one in input.
Expand Down
27 changes: 27 additions & 0 deletions spec/bucket_store/key_storage_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,31 @@ def build_for(key)
expect(build_for("inmemory://bucket/prefix/a").exists?).to be false
end
end

describe "#stream" do
let!(:large_file_content) { "Z" * 1024 * 1024 * 10 } # 10Mb

before do
build_for("inmemory://bucket/small").upload!("hello world")
build_for("inmemory://bucket/large").upload!(large_file_content)
end

describe "#download" do
it "returns a single chunk for small files" do
expect(build_for("inmemory://bucket/small").stream.download).to contain_exactly([
{ bucket: "bucket", key: "small" }, an_instance_of(String)
])
end

it "returns the file content in chunks for larger files" do
rebuilt =
build_for("inmemory://bucket/large").stream.download.map do |metadata, chunk|
expect(metadata).to eq({ bucket: "bucket", key: "large" })
chunk
end.join

expect(rebuilt).to eq(large_file_content)
end
end
end
end

0 comments on commit 055d6b0

Please sign in to comment.