Skip to content

Commit

Permalink
Add KeyStreamer interface for uploads and downloads.
Browse files Browse the repository at this point in the history
Completely stolen from #63.

The KeyStreamer expects uploads / downloads to be expressed in terms of IO operations.

For example a download is actually `download into this IO` and upload is `upload the content from this file`.

Uploads / downloads of Strings are considered a special case of this. However is distinct enough that we have streaming be it's own API.
  • Loading branch information
Andrew Morton committed Apr 18, 2023
1 parent 9ab5e33 commit 195bd27
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 42 deletions.
133 changes: 95 additions & 38 deletions lib/bucket_store/key_storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,90 @@ class KeyStorage
disk: Disk,
}.freeze

# Defines a streaming interface for download and upload operations.
#
# Note that individual adapters may require additional configuration for the correct
# behavior of the streaming interface.
class KeyStreamer
attr_reader :bucket, :key, :adapter, :adapter_type

def initialize(adapter:, adapter_type:, bucket:, key:)
@adapter = adapter
@adapter_type = adapter_type
@bucket = bucket
@key = key
end

# Streams the content of the reference key into a File like object
#
# @return hash containing the bucket, the key and file like object passed in as input
#
# @see KeyStorage#download
# @example Download a key
# buffer = StringIO.new
# BucketStore.for("inmemory://bucket/file.xml").stream.download(file: buffer)
# buffer.string == "Imagine I'm a 2GB file"
def download(file:)

BucketStore.logger.info(event: "key_storage.stream.download_started")

start = BucketStore::Timing.monotonic_now
adapter.download(
bucket: bucket,
key: key,
file: file,
)

BucketStore.logger.info(event: "key_storage.stream.download_finished",
duration: BucketStore::Timing.monotonic_now - start)

{
bucket: bucket,
key: key,
file: file,
}
end

# Performs a streaming upload to the backing object store
#
# @return the generated key for the new object
#
# @see KeyStorage#upload!
# @example Upload a key
# buffer = StringIO.new("Imagine I'm a 2GB file")
# BucketStore.for("inmemory://bucket/file.xml").stream.upload!(file: buffer)
def upload!(file:)

raise ArgumentError, "Key cannot be empty" if key.empty?

BucketStore.logger.info(event: "key_storage.stream.upload_started",
**log_context)

start = BucketStore::Timing.monotonic_now
result = adapter.upload!(
bucket: bucket,
key: key,
file: file,
)

BucketStore.logger.info(event: "key_storage.stream.upload_finished",
duration: BucketStore::Timing.monotonic_now - start,
**log_context)

"#{adapter_type}://#{bucket}/#{key}"
end

private

def log_context
{
bucket: bucket,
key: key,
adapter_type: adapter_type,
}.compact
end
end

attr_reader :bucket, :key, :adapter_type

def initialize(adapter:, bucket:, key:)
Expand All @@ -40,22 +124,10 @@ def filename
# @example Download a key
# BucketStore.for("inmemory://bucket/file.xml").download
def download
raise ArgumentError, "Key cannot be empty" if key.empty?

BucketStore.logger.info(event: "key_storage.download_started")

start = BucketStore::Timing.monotonic_now
buffer = StringIO.new
adapter.download(bucket: bucket, key: key, file: buffer)

BucketStore.logger.info(event: "key_storage.download_finished",
duration: BucketStore::Timing.monotonic_now - start)

{
bucket: bucket,
key: key,
content: buffer.string,
}
stream.download(file: buffer).
except(:file).
merge(content: buffer.string)
end

# Uploads the given file to the reference key location.
Expand All @@ -67,23 +139,16 @@ def download
# @example Upload a file
# BucketStore.for("inmemory://bucket/file.xml").upload!("hello world")
def upload!(content)
raise ArgumentError, "Key cannot be empty" if key.empty?

BucketStore.logger.info(event: "key_storage.upload_started",
**log_context)

start = BucketStore::Timing.monotonic_now
result = adapter.upload!(
bucket: bucket,
key: key,
file: StringIO.new(content),
)
stream.upload!(file: StringIO.new(content))
end

BucketStore.logger.info(event: "key_storage.upload_finished",
duration: BucketStore::Timing.monotonic_now - start,
**log_context)
# Returns an interface for streaming operations
#
# @return [KeyStreamer] An interface for streaming operations
def stream
raise ArgumentError, "Key cannot be empty" if key.empty?

"#{adapter_type}://#{result[:bucket]}/#{result[:key]}"
KeyStreamer.new(adapter: adapter, adapter_type: adapter_type, bucket: bucket, key: key)
end

# Lists all keys for the current adapter that have the reference key as prefix
Expand Down Expand Up @@ -164,13 +229,5 @@ def exists?
private

attr_reader :adapter

def log_context
{
bucket: bucket,
key: key,
adapter_type: adapter_type,
}.compact
end
end
end
95 changes: 91 additions & 4 deletions spec/bucket_store/key_storage_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ def build_for(key)

it "logs the operation" do
expect(BucketStore.logger).to receive(:info).with(
hash_including(event: "key_storage.download_started"),
hash_including(event: "key_storage.stream.download_started"),
)
expect(BucketStore.logger).to receive(:info).with(
hash_including(event: "key_storage.download_finished"),
hash_including(event: "key_storage.stream.download_finished"),
)

build_for("inmemory://bucket/file1").download
Expand All @@ -100,10 +100,10 @@ def build_for(key)

it "logs the operation" do
expect(BucketStore.logger).to receive(:info).with(
hash_including(event: "key_storage.upload_started"),
hash_including(event: "key_storage.stream.upload_started"),
)
expect(BucketStore.logger).to receive(:info).with(
hash_including(event: "key_storage.upload_finished"),
hash_including(event: "key_storage.stream.upload_finished"),
)

build_for("inmemory://bucket/file1").upload!("hello")
Expand All @@ -117,6 +117,93 @@ def build_for(key)
end
end

describe "#stream" do
let(:stream) { build_for("inmemory://bucket/file1").stream }

it "will return an object" do
expect { stream }.to_not raise_error
expect(stream).to_not be_nil
end

context "when we try to upload a bucket" do
it "raises an error" do
expect { build_for("inmemory://bucket").stream }.
to raise_error(ArgumentError, /key cannot be empty/i)
end
end

describe "#download" do

let(:input_file_1) { StringIO.new("content1") }
let(:input_file_2) { StringIO.new("content") }
let(:output_file) { StringIO.new }

before do
build_for("inmemory://bucket/file1").
stream.
upload!(file: StringIO.new("content1"))
build_for("inmemory://bucket/file2").
stream.
upload!(file: StringIO.new("content2"))
end

it "downloads the given file" do
expect(
build_for("inmemory://bucket/file1").
stream.
download(file: output_file)).
to match(hash_including(file: output_file))
expect(output_file.string).to eq("content1")
end

it "logs the operation" do
expect(BucketStore.logger).to receive(:info).with(
hash_including(event: "key_storage.stream.download_started"),
)
expect(BucketStore.logger).to receive(:info).with(
hash_including(event: "key_storage.stream.download_finished"),
)

build_for("inmemory://bucket/file1").stream.download(file: output_file)
end

context "when we try to download a bucket" do
it "raises an error" do
expect { build_for("inmemory://bucket").download }.
to raise_error(ArgumentError, /key cannot be empty/i)
end
end
end

context "#upload!" do

it "will upload from a file" do
expect(stream.upload!(file: StringIO.new("hello"))).
to eq("inmemory://bucket/file1")
end

it "logs the operation" do
expect(BucketStore.logger).to receive(:info).with(
hash_including(event: "key_storage.stream.upload_started"),
)
expect(BucketStore.logger).to receive(:info).with(
hash_including(event: "key_storage.stream.upload_finished"),
)

stream.upload!(file: StringIO.new("hello"))

end

context "when we try to upload a bucket" do
it "raises an error" do
expect { build_for("inmemory://bucket").upload!("content") }.
to raise_error(ArgumentError, /key cannot be empty/i)
end
end
end

end

describe "#delete!" do
before do
build_for("inmemory://bucket/file1").upload!("content1")
Expand Down

0 comments on commit 195bd27

Please sign in to comment.