diff --git a/.flake8 b/.flake8 index 70da7ac..eb212f1 100644 --- a/.flake8 +++ b/.flake8 @@ -1,4 +1,4 @@ -# Copyright 2022 J.P. Morgan Chase & Co. +# Copyright 2023 J.P. Morgan Chase & Co. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at diff --git a/pyproject.toml b/pyproject.toml index 4b8d41c..a75d291 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,7 +48,7 @@ dependencies = [ "fastavro~=1.8", # TODO: consider moving Avro-related dependencies to optional dependencies "memoization~=0.4", "orjson~=3.0", - "pluggy~=1.2", + "pluggy~=1.3", "py-avro-schema~=3.0", "python-dateutil~=2.8", ] diff --git a/src/py_adapter/__init__.py b/src/py_adapter/__init__.py index 75a82e9..de60225 100644 --- a/src/py_adapter/__init__.py +++ b/src/py_adapter/__init__.py @@ -21,10 +21,23 @@ import importlib import importlib.metadata import inspect +import io +import itertools import logging import uuid from collections.abc import Iterable, Iterator -from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union, cast +from typing import ( + Any, + BinaryIO, + Callable, + Dict, + List, + Optional, + Type, + TypeVar, + Union, + cast, +) import avro.schema import dateutil.parser @@ -91,10 +104,26 @@ def serialize(obj: Any, *, format: str, writer_schema: bytes = b"") -> bytes: :param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``. :param writer_schema: Data schema to serialize the data with, as JSON bytes. """ + data_stream = io.BytesIO() + serialize_to_stream(obj, data_stream, format=format, writer_schema=writer_schema) + data_stream.seek(0) + data = data_stream.read() + return data + + +def serialize_to_stream(obj: Any, stream: BinaryIO, *, format: str, writer_schema: bytes = b"") -> None: + """ + Serialize an object to a file-like object using a serialization format supported by **py-adapter** + + :param obj: Python object to serialize + :param stream: File like object to write the serialized data into + :param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``. + :param writer_schema: Data schema to serialize the data with, as JSON bytes. + """ serialize_fn = py_adapter.plugin.plugin_hook(format, "serialize") basic_obj = to_basic_type(obj) - data = serialize_fn(obj=basic_obj, writer_schema=writer_schema) - return data + py_type = type(obj) + serialize_fn(obj=basic_obj, stream=stream, py_type=py_type, writer_schema=writer_schema) def serialize_many(objs: Iterable[Any], *, format: str, writer_schema: bytes = b"") -> bytes: @@ -105,13 +134,35 @@ def serialize_many(objs: Iterable[Any], *, format: str, writer_schema: bytes = b :param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``. :param writer_schema: Data schema to serialize the data with, as JSON bytes. """ - serialize_fn = py_adapter.plugin.plugin_hook(format, "serialize_many") - basic_objs = (to_basic_type(obj) for obj in objs) - data = serialize_fn(objs=basic_objs, writer_schema=writer_schema) + data_stream = io.BytesIO() + serialize_many_to_stream(objs, data_stream, format=format, writer_schema=writer_schema) + data_stream.seek(0) + data = data_stream.read() return data -def deserialize(data: bytes, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"") -> Obj: +def serialize_many_to_stream(objs: Iterable[Any], stream: BinaryIO, *, format: str, writer_schema: bytes = b"") -> None: + """ + Serialize multiple objects to a file-like object using a serialization format supported by **py-adapter** + + :param objs: Python objects to serialize + :param stream: File like object to write the serialized data into + :param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``. + :param writer_schema: Data schema to serialize the data with, as JSON bytes. + """ + serialize_fn = py_adapter.plugin.plugin_hook(format, "serialize_many") + objs_iter = iter(objs) + # Use the first object to find the class, assuming all objects share the same type + first_obj = next(objs_iter) + py_type = type(first_obj) + # Then iterate over all objects again to convert to basic types + basic_objs = (to_basic_type(obj) for obj in itertools.chain([first_obj], objs_iter)) + serialize_fn(objs=basic_objs, stream=stream, py_type=py_type, writer_schema=writer_schema) + + +def deserialize( + data: bytes, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"", reader_schema: bytes = b"" +) -> Obj: """ Deserialize bytes as a Python object of a given type from a serialization format supported by **py-adapter** @@ -119,14 +170,39 @@ def deserialize(data: bytes, py_type: Type[Obj], *, format: str, writer_schema: :param py_type: The Python class to create an instance from :param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``. :param writer_schema: Data schema used to serialize the data with, as JSON bytes. + :param reader_schema: Data schema to deserialize the data with, as JSON bytes. The reader schema should be + compatible with the writer schema. + """ + data_stream = io.BytesIO(data) + obj = deserialize_from_stream( + data_stream, py_type, format=format, writer_schema=writer_schema, reader_schema=reader_schema + ) + return obj + + +def deserialize_from_stream( + stream: BinaryIO, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"", reader_schema: bytes = b"" +) -> Obj: + """ + Deserialize a file-like object as a Python object of a given type from a serialization format supported by + **py-adapter** + + :param stream: File-like object to deserialize + :param py_type: The Python class to create an instance from + :param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``. + :param writer_schema: Data schema used to serialize the data with, as JSON bytes. + :param reader_schema: Data schema to deserialize the data with, as JSON bytes. The reader schema should be + compatible with the writer schema. """ deserialize_fn = py_adapter.plugin.plugin_hook(format, "deserialize") - basic_obj = deserialize_fn(data=data, writer_schema=writer_schema) + basic_obj = deserialize_fn(stream=stream, py_type=py_type, writer_schema=writer_schema, reader_schema=reader_schema) obj = from_basic_type(basic_obj, py_type) return obj -def deserialize_many(data: bytes, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"") -> Iterator[Obj]: +def deserialize_many( + data: bytes, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"", reader_schema: bytes = b"" +) -> Iterator[Obj]: """ Deserialize bytes as an iterator over Python objects of a given type from a serialization format supported by **py-adapter** @@ -135,9 +211,34 @@ def deserialize_many(data: bytes, py_type: Type[Obj], *, format: str, writer_sch :param py_type: The Python class to create an instance from :param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``. :param writer_schema: Data schema used to serialize the data with, as JSON bytes. + :param reader_schema: Data schema to deserialize the data with, as JSON bytes. The reader schema should be + compatible with the writer schema. + """ + data_stream = io.BytesIO(data) + objs = deserialize_many_from_stream( + data_stream, py_type, format=format, writer_schema=writer_schema, reader_schema=reader_schema + ) + return objs + + +def deserialize_many_from_stream( + stream: BinaryIO, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"", reader_schema: bytes = b"" +) -> Iterator[Obj]: + """ + Deserialize a file-like object as an iterator over Python objects of a given type from a serialization format + supported by **py-adapter** + + :param stream: File-like object to deserialize + :param py_type: The Python class to create an instance from + :param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``. + :param writer_schema: Data schema used to serialize the data with, as JSON bytes. + :param reader_schema: Data schema to deserialize the data with, as JSON bytes. The reader schema should be + compatible with the writer schema. """ deserialize_fn = py_adapter.plugin.plugin_hook(format, "deserialize_many") - basic_objs = deserialize_fn(data=data, writer_schema=writer_schema) + basic_objs = deserialize_fn( + stream=stream, py_type=py_type, writer_schema=writer_schema, reader_schema=reader_schema + ) objs = (from_basic_type(basic_obj, py_type) for basic_obj in basic_objs) return objs diff --git a/src/py_adapter/plugin/__init__.py b/src/py_adapter/plugin/__init__.py index b29d448..0070163 100644 --- a/src/py_adapter/plugin/__init__.py +++ b/src/py_adapter/plugin/__init__.py @@ -17,13 +17,11 @@ import logging import sys from collections.abc import Iterable, Iterator -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, BinaryIO, Type import pluggy if TYPE_CHECKING: - from pluggy._hooks import _HookCaller - import py_adapter logger = logging.getLogger(__package__) @@ -66,7 +64,7 @@ def _load_default_plugins(manager_: pluggy.PluginManager) -> None: manager_.register(plugin, name=name) -def plugin_hook(plugin_name: str, hook_name: str) -> "_HookCaller": +def plugin_hook(plugin_name: str, hook_name: str) -> pluggy.HookCaller: """ Return a hook (caller) for a single named plugin and hook name @@ -102,45 +100,61 @@ def __init__(self, plugin_name: str, hook_name: str): @_hookspec(firstresult=True) -def serialize(obj: "py_adapter.Basic", writer_schema: bytes) -> bytes: +def serialize(obj: "py_adapter.Basic", stream: BinaryIO, py_type: Type, writer_schema: bytes) -> BinaryIO: """ Hook specification. Serialize a Python object of basic types to the format supported by the implementing plugin. + Although we write to the stream, we also return the stream from this function. We need to return something to avoid + pluggy thinking the hook is not implemented. + :param obj: Python object to serialize + :param stream: File-like object to serialize data to + :param py_type: Original Python class associated with the basic object :param writer_schema: Data schema to serialize the data with, as JSON bytes. """ raise NotImplementedError() @_hookspec(firstresult=True) -def serialize_many(objs: Iterable["py_adapter.Basic"], writer_schema: bytes) -> bytes: +def serialize_many( + objs: Iterable["py_adapter.Basic"], stream: BinaryIO, py_type: Type, writer_schema: bytes +) -> BinaryIO: """ Hook specification. Serialize multiple Python objects of basic types to the format supported by the implementing plugin. + Although we write to the stream, we also return the stream from this function. We need to return something to avoid + pluggy thinking the hook is not implemented. + :param objs: Python objects to serialize + :param stream: File-like object to serialize data to + :param py_type: Original Python class associated with the basic object :param writer_schema: Data schema to serialize the data with, as JSON bytes. """ raise NotImplementedError() @_hookspec(firstresult=True) -def deserialize(data: bytes, writer_schema: bytes) -> "py_adapter.Basic": +def deserialize(stream: BinaryIO, py_type: Type, writer_schema: bytes, reader_schema: bytes) -> "py_adapter.Basic": """ Hook specification. Deserialize data as an object of basic Python types - :param data: Bytes to deserialize + :param stream: File-like object to deserialize + :param py_type: Python class the basic object will ultimately be deserialized into :param writer_schema: Data schema used to serialize the data with, as JSON bytes. """ raise NotImplementedError() @_hookspec(firstresult=True) -def deserialize_many(data: bytes, writer_schema: bytes) -> Iterator["py_adapter.Basic"]: +def deserialize_many( + stream: BinaryIO, py_type: Type, writer_schema: bytes, reader_schema: bytes +) -> Iterator["py_adapter.Basic"]: """ Hook specification. Deserialize data as an iterator over objects of basic Python types - :param data: Bytes to deserialize + :param stream: File-like object to deserialize + :param py_type: Python class the basic object will ultimately be deserialized into :param writer_schema: Data schema used to serialize the data with, as JSON bytes. """ raise NotImplementedError() diff --git a/src/py_adapter/plugin/_avro.py b/src/py_adapter/plugin/_avro.py index f994ef1..c1de3b4 100644 --- a/src/py_adapter/plugin/_avro.py +++ b/src/py_adapter/plugin/_avro.py @@ -13,9 +13,11 @@ Avro serializer/deserializer **py-adapter** plugin """ -import io +import functools from collections.abc import Iterable, Iterator +from typing import BinaryIO, Type +import fastavro.types import orjson import py_adapter @@ -23,77 +25,100 @@ @py_adapter.plugin.hook -def serialize(obj: py_adapter.Basic, writer_schema: bytes) -> bytes: +def serialize(obj: py_adapter.Basic, stream: BinaryIO, py_type: Type, writer_schema: bytes) -> BinaryIO: """ Serialize an object of basic Python types as Avro bytes + This uses a single-record Avro file format which does **not** embed the schema. + :param obj: Python object to serialize + :param stream: File-like object to serialize data to + :param py_type: Original Python class associated with the basic object :param writer_schema: Avro schema to serialize the data with, as JSON bytes. """ import fastavro.write - data_stream = io.BytesIO() - # TODO: generate schema if not provided - schema_obj = fastavro.parse_schema(orjson.loads(writer_schema)) - # TODO: add support for writer which embeds the schema - fastavro.write.schemaless_writer(data_stream, schema=schema_obj, record=obj) - data_stream.flush() - data_stream.seek(0) - data = data_stream.read() - return data + writer_schema = writer_schema or _default_schema(py_type) + schema_obj = _parse_fastavro_schema(writer_schema) + fastavro.write.schemaless_writer(stream, schema=schema_obj, record=obj) + stream.flush() + return stream @py_adapter.plugin.hook -def serialize_many(objs: Iterable[py_adapter.Basic], writer_schema: bytes) -> bytes: +def serialize_many(objs: Iterable[py_adapter.Basic], stream: BinaryIO, py_type: Type, writer_schema: bytes) -> BinaryIO: """ - Serialize multiple Python objects of basic types as Avro container file format. + Serialize multiple Python objects of basic types as Avro container file format + + The Avro schema will be included in the header of the file. :param objs: Python objects to serialize + :param stream: File-like object to serialize data to + :param py_type: Original Python class associated with the basic object :param writer_schema: Avro schema to serialize the data with, as JSON bytes. """ import fastavro.write - data_stream = io.BytesIO() - # TODO: generate schema if not provided - schema_obj = fastavro.parse_schema(orjson.loads(writer_schema)) - fastavro.write.writer(data_stream, schema=schema_obj, records=objs) - data_stream.flush() - data_stream.seek(0) - data = data_stream.read() - return data + writer_schema = writer_schema or _default_schema(py_type) + schema_obj = _parse_fastavro_schema(writer_schema) + fastavro.write.writer(stream, schema=schema_obj, records=objs) + stream.flush() + return stream @py_adapter.plugin.hook -def deserialize(data: bytes, writer_schema: bytes) -> py_adapter.Basic: +def deserialize(stream: BinaryIO, py_type: Type, writer_schema: bytes, reader_schema: bytes) -> py_adapter.Basic: """ Deserialize Avro bytes as an object of basic Python types - :param data: Avro bytes to deserialize + :param stream: File-like object to deserialize + :param py_type: Python class the basic object will ultimately be deserialized into :param writer_schema: Avro schema used to serialize the data with, as JSON bytes. + :param reader_schema: Avro schema to deserialize the data with, as JSON bytes. The reader schema should be + compatible with the writer schema. """ import fastavro.read - # TODO: generate writer schema if not provided - writer_schema_obj = fastavro.parse_schema(orjson.loads(writer_schema)) - data_stream = io.BytesIO(data) - # TODO: add support for reader schema, if provided - # TODO: add support for reader of data with embedded (writer) schema - basic_obj = fastavro.read.schemaless_reader(data_stream, writer_schema=writer_schema_obj, reader_schema=None) + writer_schema = writer_schema or _default_schema(py_type) + writer_schema_obj = _parse_fastavro_schema(writer_schema) + reader_schema_obj = _parse_fastavro_schema(reader_schema) if reader_schema else None + basic_obj = fastavro.read.schemaless_reader( + stream, writer_schema=writer_schema_obj, reader_schema=reader_schema_obj + ) return basic_obj @py_adapter.plugin.hook -def deserialize_many(data: bytes, writer_schema: bytes) -> Iterator[py_adapter.Basic]: +def deserialize_many( + stream: BinaryIO, py_type: Type, writer_schema: bytes, reader_schema: bytes +) -> Iterator[py_adapter.Basic]: """ Deserialize Avro container file format data as an iterator over objects of basic Python types - :param data: Bytes to deserialize - :param writer_schema: Data schema used to serialize the data with, as JSON bytes. + :param stream: File-like object to deserialize + :param py_type: Python class the basic object will ultimately be deserialized into + :param writer_schema: Avro schema used to serialize the data with, as JSON bytes. + :param reader_schema: Avro schema to deserialize the data with, as JSON bytes. The reader schema should be + compatible with the writer schema. """ import fastavro.read # TODO: make it fail if writer_schema is provided? - data_stream = io.BytesIO(data) - # TODO: add support for reader schema, if provided - basic_objs = fastavro.read.reader(data_stream, reader_schema=None) + reader_schema_obj = _parse_fastavro_schema(reader_schema) if reader_schema else None + basic_objs = fastavro.read.reader(stream, reader_schema=reader_schema_obj) return basic_objs + + +def _default_schema(py_type: Type) -> bytes: + """Generate an Avro schema for a given Python type""" + import py_avro_schema as pas + + # JSON as string matches default argument in to_basic_type function + schema = pas.generate(py_type, options=pas.Option.LOGICAL_JSON_STRING) + return schema + + +@functools.lru_cache(maxsize=100) +def _parse_fastavro_schema(json_data: bytes) -> fastavro.types.Schema: + """Parse an Avro schema (JSON bytes) into a fastavro-internal representation""" + return fastavro.parse_schema(orjson.loads(json_data)) diff --git a/src/py_adapter/plugin/_json.py b/src/py_adapter/plugin/_json.py index 50161bc..4d4ab97 100644 --- a/src/py_adapter/plugin/_json.py +++ b/src/py_adapter/plugin/_json.py @@ -12,59 +12,66 @@ """ JSON serializer/deserializer **py-adapter** plugin """ + from collections.abc import Iterable, Iterator +from typing import BinaryIO import py_adapter import py_adapter.plugin @py_adapter.plugin.hook -def serialize(obj: py_adapter.Basic, writer_schema: bytes) -> bytes: +def serialize(obj: py_adapter.Basic, stream: BinaryIO) -> BinaryIO: """ Serialize an object of basic Python types as JSON bytes - :param obj: Python object to serialize - :param writer_schema: Schema to serialize the data with. Not used with JSON serialization. + :param obj: Python object to serialize + :param stream: File-like object to serialize data to """ import orjson - return orjson.dumps(obj) + data = orjson.dumps(obj) + stream.write(data) + stream.flush() + return stream @py_adapter.plugin.hook -def serialize_many(objs: Iterable[py_adapter.Basic], writer_schema: bytes) -> bytes: +def serialize_many(objs: Iterable[py_adapter.Basic], stream: BinaryIO) -> BinaryIO: """ Serialize multiple Python objects of basic types as Newline Delimited JSON (NDJSON). - :param objs: Python objects to serialize - :param writer_schema: Schema to serialize the data with. Not used with JSON serialization. + :param objs: Python objects to serialize + :param stream: File-like object to serialize data to """ import orjson - return b"\n".join(orjson.dumps(obj) for obj in objs) + data = b"\n".join(orjson.dumps(obj) for obj in objs) + stream.write(data) + stream.flush() + return stream @py_adapter.plugin.hook -def deserialize(data: bytes, writer_schema: bytes) -> py_adapter.Basic: +def deserialize(stream: BinaryIO) -> py_adapter.Basic: """ Deserialize JSON bytes as an object of basic Python types - :param data: JSON bytes to deserialize - :param writer_schema: Schema used to serialize the data with. Not used with JSON serialization. + :param stream: File-like object to deserialize """ import orjson - return orjson.loads(data) + return orjson.loads(stream.read()) @py_adapter.plugin.hook -def deserialize_many(data: bytes, writer_schema: bytes) -> Iterator[py_adapter.Basic]: +def deserialize_many(stream: BinaryIO) -> Iterator[py_adapter.Basic]: """ Deserialize Newline Delimited JSON (NDJSON) data as an iterator over objects of basic Python types - :param data: Bytes to deserialize - :param writer_schema: Schema used to serialize the data with. Not used with JSON serialization. + :param stream: File-like object to deserialize """ import orjson - return (orjson.loads(line) for line in data.splitlines()) + lines = stream.read().splitlines() + return (orjson.loads(line) for line in lines) diff --git a/tests/conftest.py b/tests/conftest.py index 264d34b..4a5f15a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,14 @@ +# Copyright 2023 J.P. Morgan Chase & Co. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + """ Test data models """ diff --git a/tests/test_serialize.py b/tests/test_serialize.py index 316dbe0..916e40f 100644 --- a/tests/test_serialize.py +++ b/tests/test_serialize.py @@ -8,10 +8,11 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. - +import dataclasses import io import re +import fastavro.read import py_avro_schema as pas import pytest @@ -79,7 +80,43 @@ def test_serialize_avro(ship_obj, ship_class): assert obj_out == ship_obj -@pytest.mark.skip("TODO") +def test_serialize_avro_automatic_writer_schema(ship_obj, ship_class): + data = py_adapter.serialize(ship_obj, format="Avro") + obj_out = py_adapter.deserialize(data, ship_class, format="Avro") + assert obj_out == ship_obj + + +@dataclasses.dataclass +class Ship: + name: str + + +def test_serialize_avro_reader_schema(ship_obj, ship_class): + writer_schema = pas.generate(ship_class, options=pas.Option.LOGICAL_JSON_STRING | pas.Option.MILLISECONDS) + reader_schema = pas.generate(Ship, options=pas.Option.LOGICAL_JSON_STRING | pas.Option.MILLISECONDS) + data = py_adapter.serialize(ship_obj, format="Avro", writer_schema=writer_schema) + obj_out = py_adapter.deserialize( + data, Ship, format="Avro", writer_schema=writer_schema, reader_schema=reader_schema + ) + assert isinstance(obj_out, Ship) + assert obj_out.name == ship_obj.name + + +@dataclasses.dataclass +class Vessel: + """If the record name does not match it's incompatible""" + + name: str + + +def test_serialize_avro_reader_schema_incompatible(ship_obj, ship_class): + writer_schema = pas.generate(ship_class, options=pas.Option.LOGICAL_JSON_STRING | pas.Option.MILLISECONDS) + reader_schema = pas.generate(Vessel, options=pas.Option.LOGICAL_JSON_STRING | pas.Option.MILLISECONDS) + data = py_adapter.serialize(ship_obj, format="Avro", writer_schema=writer_schema) + with pytest.raises(fastavro.read.SchemaResolutionError): + py_adapter.deserialize(data, Vessel, format="Avro", writer_schema=writer_schema, reader_schema=reader_schema) + + def test_serialize_stream_json(ship_obj, ship_class): data = io.BytesIO() py_adapter.serialize_to_stream(ship_obj, data, format="JSON") @@ -88,12 +125,12 @@ def test_serialize_stream_json(ship_obj, ship_class): assert obj_out == ship_obj -@pytest.mark.skip("TODO") def test_serialize_stream_avro(ship_obj, ship_class): + writer_schema = pas.generate(ship_class, options=pas.Option.LOGICAL_JSON_STRING | pas.Option.MILLISECONDS) data = io.BytesIO() - py_adapter.serialize_to_stream(ship_obj, data, format="Avro") + py_adapter.serialize_to_stream(ship_obj, data, format="Avro", writer_schema=writer_schema) data.seek(0) - obj_out = py_adapter.deserialize_from_stream(data, ship_class, format="Avro") + obj_out = py_adapter.deserialize_from_stream(data, ship_class, format="Avro", writer_schema=writer_schema) assert obj_out == ship_obj @@ -112,19 +149,41 @@ def test_serialize_many_avro(ship_obj, ship_class): assert objs_out == ship_objs -@pytest.mark.skip("TODO") +def test_serialize_many_avro_automatic_writer_schema(ship_obj, ship_class): + ship_objs = [ship_obj, ship_obj] + data = py_adapter.serialize_many(ship_objs, format="Avro") + objs_out = list(py_adapter.deserialize_many(data, ship_class, format="Avro")) + assert objs_out == ship_objs + + +def test_serialize_many_avro_automatic_writer_schema_generator(ship_obj, ship_class): + ship_objs = [ship_obj, ship_obj] + + def ship_objs_generator(): + for ship in ship_objs: + yield ship + + data = py_adapter.serialize_many(ship_objs_generator(), format="Avro") + objs_out = list(py_adapter.deserialize_many(data, ship_class, format="Avro")) + assert objs_out == ship_objs + + def test_serialize_many_stream_json(ship_obj, ship_class): ship_objs = [ship_obj, ship_obj] data = io.BytesIO() py_adapter.serialize_many_to_stream(ship_objs, data, format="JSON") + data.seek(0) objs_out = list(py_adapter.deserialize_many_from_stream(data, ship_class, format="JSON")) assert objs_out == ship_objs -@pytest.mark.skip("TODO") def test_serialize_many_stream_avro(ship_obj, ship_class): + writer_schema = pas.generate(ship_class, options=pas.Option.LOGICAL_JSON_STRING | pas.Option.MILLISECONDS) ship_objs = [ship_obj, ship_obj] data = io.BytesIO() - py_adapter.serialize_many_to_stream(ship_objs, data, format="Avro") - objs_out = list(py_adapter.deserialize_many_from_stream(data, ship_class, format="Avro")) + py_adapter.serialize_many_to_stream(ship_objs, data, format="Avro", writer_schema=writer_schema) + data.seek(0) + objs_out = list( + py_adapter.deserialize_many_from_stream(data, ship_class, format="Avro", writer_schema=writer_schema) + ) assert objs_out == ship_objs