Skip to content

Commit

Permalink
Serialize/deserialize to/from file-like objects (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
faph authored Nov 28, 2023
2 parents faece61 + b6d2656 commit 97e7941
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 J.P. Morgan Chase & Co.
# Copyright 2023 J.P. Morgan Chase & Co.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ dependencies = [
"fastavro~=1.8", # TODO: consider moving Avro-related dependencies to optional dependencies
"memoization~=0.4",
"orjson~=3.0",
"pluggy~=1.2",
"pluggy~=1.3",
"py-avro-schema~=3.0",
"python-dateutil~=2.8",
]
Expand Down
121 changes: 111 additions & 10 deletions src/py_adapter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,23 @@
import importlib
import importlib.metadata
import inspect
import io
import itertools
import logging
import uuid
from collections.abc import Iterable, Iterator
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union, cast
from typing import (
Any,
BinaryIO,
Callable,
Dict,
List,
Optional,
Type,
TypeVar,
Union,
cast,
)

import avro.schema
import dateutil.parser
Expand Down Expand Up @@ -91,10 +104,26 @@ def serialize(obj: Any, *, format: str, writer_schema: bytes = b"") -> bytes:
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema to serialize the data with, as JSON bytes.
"""
data_stream = io.BytesIO()
serialize_to_stream(obj, data_stream, format=format, writer_schema=writer_schema)
data_stream.seek(0)
data = data_stream.read()
return data


def serialize_to_stream(obj: Any, stream: BinaryIO, *, format: str, writer_schema: bytes = b"") -> None:
"""
Serialize an object to a file-like object using a serialization format supported by **py-adapter**
:param obj: Python object to serialize
:param stream: File like object to write the serialized data into
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema to serialize the data with, as JSON bytes.
"""
serialize_fn = py_adapter.plugin.plugin_hook(format, "serialize")
basic_obj = to_basic_type(obj)
data = serialize_fn(obj=basic_obj, writer_schema=writer_schema)
return data
py_type = type(obj)
serialize_fn(obj=basic_obj, stream=stream, py_type=py_type, writer_schema=writer_schema)


def serialize_many(objs: Iterable[Any], *, format: str, writer_schema: bytes = b"") -> bytes:
Expand All @@ -105,28 +134,75 @@ def serialize_many(objs: Iterable[Any], *, format: str, writer_schema: bytes = b
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema to serialize the data with, as JSON bytes.
"""
serialize_fn = py_adapter.plugin.plugin_hook(format, "serialize_many")
basic_objs = (to_basic_type(obj) for obj in objs)
data = serialize_fn(objs=basic_objs, writer_schema=writer_schema)
data_stream = io.BytesIO()
serialize_many_to_stream(objs, data_stream, format=format, writer_schema=writer_schema)
data_stream.seek(0)
data = data_stream.read()
return data


def deserialize(data: bytes, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"") -> Obj:
def serialize_many_to_stream(objs: Iterable[Any], stream: BinaryIO, *, format: str, writer_schema: bytes = b"") -> None:
"""
Serialize multiple objects to a file-like object using a serialization format supported by **py-adapter**
:param objs: Python objects to serialize
:param stream: File like object to write the serialized data into
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema to serialize the data with, as JSON bytes.
"""
serialize_fn = py_adapter.plugin.plugin_hook(format, "serialize_many")
objs_iter = iter(objs)
# Use the first object to find the class, assuming all objects share the same type
first_obj = next(objs_iter)
py_type = type(first_obj)
# Then iterate over all objects again to convert to basic types
basic_objs = (to_basic_type(obj) for obj in itertools.chain([first_obj], objs_iter))
serialize_fn(objs=basic_objs, stream=stream, py_type=py_type, writer_schema=writer_schema)


def deserialize(
data: bytes, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"", reader_schema: bytes = b""
) -> Obj:
"""
Deserialize bytes as a Python object of a given type from a serialization format supported by **py-adapter**
:param data: Serialized data
:param py_type: The Python class to create an instance from
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
:param reader_schema: Data schema to deserialize the data with, as JSON bytes. The reader schema should be
compatible with the writer schema.
"""
data_stream = io.BytesIO(data)
obj = deserialize_from_stream(
data_stream, py_type, format=format, writer_schema=writer_schema, reader_schema=reader_schema
)
return obj


def deserialize_from_stream(
stream: BinaryIO, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"", reader_schema: bytes = b""
) -> Obj:
"""
Deserialize a file-like object as a Python object of a given type from a serialization format supported by
**py-adapter**
:param stream: File-like object to deserialize
:param py_type: The Python class to create an instance from
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
:param reader_schema: Data schema to deserialize the data with, as JSON bytes. The reader schema should be
compatible with the writer schema.
"""
deserialize_fn = py_adapter.plugin.plugin_hook(format, "deserialize")
basic_obj = deserialize_fn(data=data, writer_schema=writer_schema)
basic_obj = deserialize_fn(stream=stream, py_type=py_type, writer_schema=writer_schema, reader_schema=reader_schema)
obj = from_basic_type(basic_obj, py_type)
return obj


def deserialize_many(data: bytes, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"") -> Iterator[Obj]:
def deserialize_many(
data: bytes, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"", reader_schema: bytes = b""
) -> Iterator[Obj]:
"""
Deserialize bytes as an iterator over Python objects of a given type from a serialization format supported by
**py-adapter**
Expand All @@ -135,9 +211,34 @@ def deserialize_many(data: bytes, py_type: Type[Obj], *, format: str, writer_sch
:param py_type: The Python class to create an instance from
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
:param reader_schema: Data schema to deserialize the data with, as JSON bytes. The reader schema should be
compatible with the writer schema.
"""
data_stream = io.BytesIO(data)
objs = deserialize_many_from_stream(
data_stream, py_type, format=format, writer_schema=writer_schema, reader_schema=reader_schema
)
return objs


def deserialize_many_from_stream(
stream: BinaryIO, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"", reader_schema: bytes = b""
) -> Iterator[Obj]:
"""
Deserialize a file-like object as an iterator over Python objects of a given type from a serialization format
supported by **py-adapter**
:param stream: File-like object to deserialize
:param py_type: The Python class to create an instance from
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
:param reader_schema: Data schema to deserialize the data with, as JSON bytes. The reader schema should be
compatible with the writer schema.
"""
deserialize_fn = py_adapter.plugin.plugin_hook(format, "deserialize_many")
basic_objs = deserialize_fn(data=data, writer_schema=writer_schema)
basic_objs = deserialize_fn(
stream=stream, py_type=py_type, writer_schema=writer_schema, reader_schema=reader_schema
)
objs = (from_basic_type(basic_obj, py_type) for basic_obj in basic_objs)
return objs

Expand Down
34 changes: 24 additions & 10 deletions src/py_adapter/plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
import logging
import sys
from collections.abc import Iterable, Iterator
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, BinaryIO, Type

import pluggy

if TYPE_CHECKING:
from pluggy._hooks import _HookCaller

import py_adapter

logger = logging.getLogger(__package__)
Expand Down Expand Up @@ -66,7 +64,7 @@ def _load_default_plugins(manager_: pluggy.PluginManager) -> None:
manager_.register(plugin, name=name)


def plugin_hook(plugin_name: str, hook_name: str) -> "_HookCaller":
def plugin_hook(plugin_name: str, hook_name: str) -> pluggy.HookCaller:
"""
Return a hook (caller) for a single named plugin and hook name
Expand Down Expand Up @@ -102,45 +100,61 @@ def __init__(self, plugin_name: str, hook_name: str):


@_hookspec(firstresult=True)
def serialize(obj: "py_adapter.Basic", writer_schema: bytes) -> bytes:
def serialize(obj: "py_adapter.Basic", stream: BinaryIO, py_type: Type, writer_schema: bytes) -> BinaryIO:
"""
Hook specification. Serialize a Python object of basic types to the format supported by the implementing plugin.
Although we write to the stream, we also return the stream from this function. We need to return something to avoid
pluggy thinking the hook is not implemented.
:param obj: Python object to serialize
:param stream: File-like object to serialize data to
:param py_type: Original Python class associated with the basic object
:param writer_schema: Data schema to serialize the data with, as JSON bytes.
"""
raise NotImplementedError()


@_hookspec(firstresult=True)
def serialize_many(objs: Iterable["py_adapter.Basic"], writer_schema: bytes) -> bytes:
def serialize_many(
objs: Iterable["py_adapter.Basic"], stream: BinaryIO, py_type: Type, writer_schema: bytes
) -> BinaryIO:
"""
Hook specification. Serialize multiple Python objects of basic types to the format supported by the implementing
plugin.
Although we write to the stream, we also return the stream from this function. We need to return something to avoid
pluggy thinking the hook is not implemented.
:param objs: Python objects to serialize
:param stream: File-like object to serialize data to
:param py_type: Original Python class associated with the basic object
:param writer_schema: Data schema to serialize the data with, as JSON bytes.
"""
raise NotImplementedError()


@_hookspec(firstresult=True)
def deserialize(data: bytes, writer_schema: bytes) -> "py_adapter.Basic":
def deserialize(stream: BinaryIO, py_type: Type, writer_schema: bytes, reader_schema: bytes) -> "py_adapter.Basic":
"""
Hook specification. Deserialize data as an object of basic Python types
:param data: Bytes to deserialize
:param stream: File-like object to deserialize
:param py_type: Python class the basic object will ultimately be deserialized into
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
"""
raise NotImplementedError()


@_hookspec(firstresult=True)
def deserialize_many(data: bytes, writer_schema: bytes) -> Iterator["py_adapter.Basic"]:
def deserialize_many(
stream: BinaryIO, py_type: Type, writer_schema: bytes, reader_schema: bytes
) -> Iterator["py_adapter.Basic"]:
"""
Hook specification. Deserialize data as an iterator over objects of basic Python types
:param data: Bytes to deserialize
:param stream: File-like object to deserialize
:param py_type: Python class the basic object will ultimately be deserialized into
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
"""
raise NotImplementedError()
Loading

0 comments on commit 97e7941

Please sign in to comment.