Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support streaming downloads #63

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
v0.6.0
------
- [BREAKING] Drop support for Ruby 2.6
- Introduce support for streaming downloads


v0.5.0
------
- Support escaping of URI keys. Fixes #46.
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
MINIO_REGION_NAME: us-east-1
ports:
- "9000:9000"
- "9001:9001"
gcp-simulator:
image: fsouza/fake-gcs-server
hostname: gcp
Expand Down
21 changes: 21 additions & 0 deletions lib/bucket_store/disk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

module BucketStore
class Disk
DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb

def self.build(base_dir = ENV["DISK_ADAPTER_BASE_DIR"])
base_dir ||= Dir.tmpdir
Disk.new(base_dir)
Expand Down Expand Up @@ -33,6 +35,25 @@ def download(bucket:, key:)
end
end

def stream_download(bucket:, key:, chunk_size: nil)
chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES

fd = File.open(key_path(bucket, key), "r")
metadata = {
bucket: bucket,
key: key,
}.freeze

Enumerator.new do |yielder|
loop do
v = fd.gets(chunk_size)
break if v.nil?

yielder.yield([metadata, v])
end
end
end

def list(bucket:, key:, page_size:)
root = Pathname.new(bucket_root(bucket))

Expand Down
36 changes: 36 additions & 0 deletions lib/bucket_store/gcs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ module BucketStore
class Gcs
DEFAULT_TIMEOUT_SECONDS = 30

DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb

def self.build(timeout_seconds = DEFAULT_TIMEOUT_SECONDS)
Gcs.new(timeout_seconds)
end
Expand Down Expand Up @@ -56,6 +58,40 @@ def download(bucket:, key:)
}
end

def stream_download(bucket:, key:, chunk_size: nil)
chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES
Comment on lines +61 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def stream_download(bucket:, key:, chunk_size: nil)
chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES
def stream_download(bucket:, key:, chunk_size: DEFAULT_STREAM_CHUNK_SIZE_BYTES)

minor / nitpick, just curious why we're not using the default arg functionality

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a PEBKAC...


metadata = {
bucket: bucket,
key: key,
}.freeze

file = get_bucket(bucket).file(key)
obj_size = file.size

Enumerator.new do |yielder|
start = 0
while start < obj_size
stop = [start + chunk_size, obj_size].min

# We simulate an enumerator-based streaming approach by using partial range
# downloads as there's no direct support for streaming downloads. The returned
# object is a StringIO, so we must `.rewind` before we can access it.
obj_io = file.download(range: start...stop)
obj_io&.rewind

# rubocop:disable Style/ZeroLengthPredicate
# StringIO does not define the `.empty?` method that rubocop is so keen on using
body = obj_io&.read
start += body.size
break if body.nil? || body.size.zero?
# rubocop:enable Style/ZeroLengthPredicate

yielder.yield([metadata, body])
end
end
end

def list(bucket:, key:, page_size:)
Enumerator.new do |yielder|
token = nil
Expand Down
21 changes: 21 additions & 0 deletions lib/bucket_store/in_memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

module BucketStore
class InMemory
DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb

def self.build
InMemory.instance
end
Expand Down Expand Up @@ -41,6 +43,25 @@ def download(bucket:, key:)
}
end

def stream_download(bucket:, key:, chunk_size: nil)
chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES

content_stream = StringIO.new(@buckets[bucket].fetch(key))
metadata = {
bucket: bucket,
key: key,
}.freeze

Enumerator.new do |yielder|
loop do
v = content_stream.read(chunk_size)
break if v.nil?

yielder.yield([metadata, v])
end
end
end

def list(bucket:, key:, page_size:)
@buckets[bucket].keys.
select { |k| k.start_with?(key) }.
Expand Down
62 changes: 62 additions & 0 deletions lib/bucket_store/key_storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,59 @@ class KeyStorage
disk: Disk,
}.freeze

# Defines a streaming interface for download and upload operations.
#
# Note that individual adapters may require additional configuration for the correct
# behavior of the streaming interface.
class KeyStreamer
attr_reader :bucket, :key, :adapter

def initialize(adapter:, bucket:, key:)
@adapter = adapter
@bucket = bucket
@key = key
end

# Streams the content of the reference key
#
# @param [optional, Integer] chunk_size The maximum size of individual chunks.
# Note that adapters will only return at most the given size, but could
# return a smaller chunk when needed.
#
# @return [Enumerator]
# An enumerator where each item is a hash that includes a chunk of the downloaded result.
# The format of the hash returned on each iteration is compatible with what is returned by
# the non-streaming version of the `download` method, however the content of each item is
# limited in size.
#
# @see KeyStorage#download
# @example Download a key
# BucketStore.for("inmemory://bucket/file.xml").stream.download
def download(chunk_size: nil)
if !chunk_size.nil? && chunk_size <= 0
raise ArgumentError, "Chunk size must be > 0 when specified"
end

BucketStore.logger.info(event: "key_storage.stream.download_started")

start = BucketStore::Timing.monotonic_now
result = adapter.stream_download(
bucket: bucket,
key: key,
chunk_size: chunk_size,
)

BucketStore.logger.info(event: "key_storage.stream.download_prepared",
duration: BucketStore::Timing.monotonic_now - start)

result
end

def upload
raise NotImplementedError
end
end

attr_reader :bucket, :key, :adapter_type

def initialize(adapter:, bucket:, key:)
Expand Down Expand Up @@ -53,6 +106,15 @@ def download
result
end

# Returns an interface for streaming operations
#
# @return [KeyStreamer] An interface for streaming operations
def stream
raise ArgumentError, "Key cannot be empty" if key.empty?

KeyStreamer.new(adapter: adapter, bucket: bucket, key: key)
end

# Uploads the given content to the reference key location.
#
# If the `key` already exists, its content will be replaced by the one in input.
Expand Down
46 changes: 46 additions & 0 deletions lib/bucket_store/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ module BucketStore
class S3
DEFAULT_TIMEOUT_SECONDS = 30

DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb

def self.build(open_timeout_seconds = DEFAULT_TIMEOUT_SECONDS,
read_timeout_seconds = DEFAULT_TIMEOUT_SECONDS)
S3.new(open_timeout_seconds, read_timeout_seconds)
Expand Down Expand Up @@ -46,6 +48,50 @@ def download(bucket:, key:)
}
end

def stream_download(bucket:, key:, chunk_size: nil)
chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES

metadata = {
bucket: bucket,
key: key,
}.freeze

obj_size = storage.head_object(bucket: bucket, key: key)&.content_length || 0

Enumerator.new do |yielder|
start = 0
while start < obj_size
stop = [start + chunk_size - 1, obj_size].min

# S3 only supports streaming writes to an IO object (e.g. a file or StringIO),
# but that means we can't access the content of the downloaded chunk in-memory.
# Additionally, the block-based support in the sdk also doesn't support retries,
# which could lead to file corruption.
# (see https://aws.amazon.com/blogs/developer/downloading-objects-from-amazon-s3-using-the-aws-sdk-for-ruby/)
#
# We simulate an enumerator-based streaming approach by using partial range
# downloads instead. There's no helper methods for range downloads in the Ruby
# SDK, so we have to build our own range query.
# Range is specified in the same format as the HTTP range header (see
# https://www.rfc-editor.org/rfc/rfc9110.html#name-range)
obj = storage.get_object(
bucket: bucket,
key: key,
range: "bytes=#{start}-#{stop}",
)

# rubocop:disable Style/ZeroLengthPredicate
# StringIO does not define the `.empty?` method that rubocop is so keen on using
body = obj&.body&.read
start += body.size
break if body.nil? || body.size.zero?
# rubocop:enable Style/ZeroLengthPredicate

yielder.yield([metadata, body])
end
end
end

def list(bucket:, key:, page_size:)
Enumerator.new do |yielder|
page = storage.list_objects_v2(bucket: bucket, prefix: key, max_keys: page_size)
Expand Down
27 changes: 27 additions & 0 deletions spec/bucket_store/key_storage_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,31 @@ def build_for(key)
expect(build_for("inmemory://bucket/prefix/a").exists?).to be false
end
end

describe "#stream" do
let!(:large_file_content) { "Z" * 1024 * 1024 * 10 } # 10Mb

before do
build_for("inmemory://bucket/small").upload!("hello world")
build_for("inmemory://bucket/large").upload!(large_file_content)
end

describe "#download" do
it "returns a single chunk for small files" do
expect(build_for("inmemory://bucket/small").stream.download).to contain_exactly([
{ bucket: "bucket", key: "small" }, an_instance_of(String)
])
end

it "returns the file content in chunks for larger files" do
rebuilt =
build_for("inmemory://bucket/large").stream.download.map do |metadata, chunk|
expect(metadata).to eq({ bucket: "bucket", key: "large" })
chunk
end.join

expect(rebuilt).to eq(large_file_content)
end
end
end
end
Loading