Skip to content

Commit

Permalink
Merge pull request #195 from dasch/dasch-refactor-multi-entry-file-ha…
Browse files Browse the repository at this point in the history
…ndling

Avoid bringing entries into memory
  • Loading branch information
dasch authored Aug 29, 2023
2 parents 07caff2 + d59de92 commit 3b4b4e5
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
19 changes: 10 additions & 9 deletions lib/avro_turf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,38 +93,39 @@ def encode_to_stream(data, schema_name: nil, stream: nil, namespace: @namespace,
# namespace - The namespace of the Avro schema used to decode the data.
#
# Returns whatever is encoded in the data.
def decode(encoded_data, schema_name: nil, namespace: @namespace)
def decode_first(encoded_data, schema_name: nil, namespace: @namespace)
stream = StringIO.new(encoded_data)
decode_stream(stream, schema_name: schema_name, namespace: namespace)
end

alias decode decode_first

# Returns all entries encoded in the data.
def decode_all(encoded_data, schema_name: nil, namespace: @namespace)
stream = StringIO.new(encoded_data)
decode_all_from_stream(stream, schema_name: schema_name, namespace: namespace)
end

# Decodes Avro data from an IO stream.
# Decodes the first entry from an IO stream containing Avro data.
#
# stream - An IO object containing Avro data.
# schema_name - The String name of the schema that should be used to read
# the data. If nil, the writer schema will be used.
# namespace - The namespace of the Avro schema used to decode the data.
#
# Returns first entry encoded in the stream.
def decode_stream(stream, schema_name: nil, namespace: @namespace)
schema = schema_name && @schema_store.find(schema_name, namespace)
reader = Avro::IO::DatumReader.new(nil, schema)
dr = Avro::DataFile::Reader.new(stream, reader)
dr.first
def decode_first_from_stream(stream, schema_name: nil, namespace: @namespace)
data = decode_all_from_stream(stream, schema_name: schema_name, namespace: namespace)
data.first
end

alias decode_stream decode_first_from_stream

# Returns all entries encoded in the stream.
def decode_all_from_stream(stream, schema_name: nil, namespace: @namespace)
schema = schema_name && @schema_store.find(schema_name, namespace)
reader = Avro::IO::DatumReader.new(nil, schema)
dr = Avro::DataFile::Reader.new(stream, reader)
dr.entries
Avro::DataFile::Reader.new(stream, reader)
end

# Validates data against an Avro schema.
Expand Down
6 changes: 3 additions & 3 deletions spec/avro_turf_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@
let(:encoded_data) { "Obj\u0001\u0004\u0014avro.codec\bnull\u0016avro.schema\xB6\u0004[{\"type\": \"record\", \"name\": \"address\", \"fields\": [{\"type\": \"string\", \"name\": \"street\"}, {\"type\": \"string\", \"name\": \"city\"}]}, {\"type\": \"record\", \"name\": \"person\", \"fields\": [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": \"int\", \"name\": \"age\"}, {\"type\": \"address\", \"name\": \"address\"}]}]\u0000\xF9u\x84\xA1c\u0010\x82B\xE2\xCF\xF1\x98\xF7\xF1JH\u0004\x96\u0001\u0002\u0014Python🐍\x80\u0004\u0018Green Street\u001ASan Francisco\u0002\u0010Mojo🐍\u0002\u0016Blue Street\u0014Saturn🪐\xF9u\x84\xA1c\u0010\x82B\xE2\xCF\xF1\x98\xF7\xF1JH" }

it "returns array of entries decoded using the inlined writer's schema " do
expect(avro.decode_all(encoded_data)).to eq(
expect(avro.decode_all(encoded_data).entries).to eq(
[
{"name"=>"Python🐍", "age"=>256, "address"=>{"street"=>"Green Street", "city"=>"San Francisco"}},
{"name"=>"Mojo🐍", "age"=>1, "address"=>{"street"=>"Blue Street", "city"=>"Saturn🪐"}}
Expand All @@ -216,7 +216,7 @@

expect(
AvroTurf.new(schemas_path: "spec/schemas/reader")
.decode_all(encoded_data, schema_name: "person")
.decode_all(encoded_data, schema_name: "person").entries
).to eq(
[
{"name"=>"Python🐍", "age"=>256, "fav_color"=>"red🟥"},
Expand Down Expand Up @@ -350,7 +350,7 @@
encoded_data = "Obj\u0001\u0004\u0014avro.codec\bnull\u0016avro.schema\xB6\u0004[{\"type\": \"record\", \"name\": \"address\", \"fields\": [{\"type\": \"string\", \"name\": \"street\"}, {\"type\": \"string\", \"name\": \"city\"}]}, {\"type\": \"record\", \"name\": \"person\", \"fields\": [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": \"int\", \"name\": \"age\"}, {\"type\": \"address\", \"name\": \"address\"}]}]\u0000\xF9u\x84\xA1c\u0010\x82B\xE2\xCF\xF1\x98\xF7\xF1JH\u0004\x96\u0001\u0002\u0014Python🐍\x80\u0004\u0018Green Street\u001ASan Francisco\u0002\u0010Mojo🐍\u0002\u0016Blue Street\u0014Saturn🪐\xF9u\x84\xA1c\u0010\x82B\xE2\xCF\xF1\x98\xF7\xF1JH"
stream = StringIO.new(encoded_data)

expect(avro.decode_all_from_stream(stream)).to eq(
expect(avro.decode_all_from_stream(stream).entries).to eq(
[
{"name"=>"Python🐍", "age"=>256, "address"=>{"street"=>"Green Street", "city"=>"San Francisco"}},
{"name"=>"Mojo🐍", "age"=>1, "address"=>{"street"=>"Blue Street", "city"=>"Saturn🪐"}}
Expand Down

0 comments on commit 3b4b4e5

Please sign in to comment.