Skip to content

Commit

Permalink
Merge pull request #44 from kpn/43-use-serializerdeserialize
Browse files Browse the repository at this point in the history
refactor: remove the `value_` prefix from `value_serializer` and `val…
  • Loading branch information
woile authored Aug 17, 2022
2 parents d1df9ae + 4fdc378 commit 1d30780
Show file tree
Hide file tree
Showing 17 changed files with 83 additions and 88 deletions.
16 changes: 8 additions & 8 deletions docs/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ from typing import Any, Dict, Optional, Protocol
import aiokafka


class ValueDeserializer(Protocol):
class Deserializer(Protocol):
async def deserialize(
self, consumer_record: aiokafka.structs.ConsumerRecord, **kwargs
) -> Any:
Expand All @@ -15,7 +15,7 @@ class ValueDeserializer(Protocol):
End users can provide their own class overriding this method.
If the engine was created with a schema_store_client, it will be available.
class CustomValueDeserializer(ValueDeserializer):
class CustomDeserializer(Deserializer):
async deserialize(self, consumer_record: aiokafka.structs.ConsumerRecord):
# custom logic and return something like a ConsumerRecord
Expand All @@ -24,12 +24,12 @@ class ValueDeserializer(Protocol):
...


class ValueSerializer(Protocol):
class Serializer(Protocol):
async def serialize(
self,
payload: Any,
headers: Optional[Dict[str, str]] = None,
value_serializer_kwargs: Optional[Dict] = None,
serializer_kwargs: Optional[Dict] = None,
) -> bytes:
"""
Serialize the payload to bytes
Expand All @@ -46,14 +46,14 @@ You can write custom `Serializers` and `Deserializers`. There are 2 ways of usin
```python
stream_engine = create_engine(
title="my-stream-engine",
value_serializer=MyValueSerializer(),
value_deserializer=MyValueDeserializer(),
serializer=MySerializer(),
deserializer=MyDeserializer(),
)
```


```python
@stream_engine.stream(topic, value_deserializer=MyDeserializer())
@stream_engine.stream(topic, deserializer=MyDeserializer())
async def hello_stream(stream: Stream):
async for event in stream:
save_to_db(event)
Expand All @@ -65,6 +65,6 @@ await stream_engine.send(
value={"message": "test"}
headers={"content-type": consts.APPLICATION_JSON,}
key="1",
value_serializer=MySerializer(),
serializer=MySerializer(),
)
```
4 changes: 2 additions & 2 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ from kstreams import create_engine, Stream
stream_engine = create_engine(title="my-stream-engine")


class MyValueDeserializer:
class MyDeserializer:

async def deserialize(self, consumer_record: structs.ConsumerRecord, **kwargs):
return consumer_record.value.decode()
Expand All @@ -52,7 +52,7 @@ stream = Stream(
"local--kstreams",
name="my-stream"
func=stream, # coroutine or async generator
value_deserializer=MyValueDeserializer(),
deserializer=MyDeserializer(),
)
# add the stream to the engine
stream_engine.add_stream(stream)
Expand Down
16 changes: 8 additions & 8 deletions examples/confluent-example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ COUNTRY_AVRO_SCHEMA = {

We need to define custom `serializers` so the `engine` and `streams` will handle the payload properly.
In this case we use `AsyncAvroMessageSerializer` that already contains the methods to `encode/decode` events.
Because to `serialize` events we need to know the `subject` and the `schema`, we make use of `value_serializer_kwargs`
Because to `serialize` events we need to know the `subject` and the `schema`, we make use of `serializer_kwargs`
to provide them.

```python
Expand All @@ -62,12 +62,12 @@ from schema_registry.serializers import AsyncAvroMessageSerializer

class AvroSerializer(AsyncAvroMessageSerializer):

async def serialize(self, payload: Dict, value_serializer_kwargs: Dict[str, str], **kwargs) -> bytes:
async def serialize(self, payload: Dict, serializer_kwargs: Dict[str, str], **kwargs) -> bytes:
"""
Serialize a payload to avro-binary using the schema and the subject
"""
schema = value_serializer_kwargs["schema"] # GET THE SCHEMA
subject = value_serializer_kwargs["subject"] # GET THE SUBJECT
schema = serializer_kwargs["schema"] # GET THE SCHEMA
subject = serializer_kwargs["subject"] # GET THE SUBJECT
event = await self.encode_record_with_schema(subject, schema, payload)

return event
Expand Down Expand Up @@ -99,8 +99,8 @@ client = AsyncSchemaRegistryClient("http://localhost:8081")

stream_engine = create_engine(
title="my-stream-engine",
value_serializer=serializers.AvroSerializer(client),
value_deserializer=serializers.AvroDeserializer(client)
serializer=serializers.AvroSerializer(client),
deserializer=serializers.AvroDeserializer(client)
)
```

Expand All @@ -120,7 +120,7 @@ await stream_engine.send(
"replicas": 1,
"port": 8080,
},
value_serializer_kwargs={ # Using the value_serializer_kwargs
serializer_kwargs={ # Using the serializer_kwargs
"subject": "deployment",
"schema": deployment_schema,
},
Expand All @@ -132,7 +132,7 @@ await stream_engine.send(
value={
"country": "Netherlands",
},
value_serializer_kwargs={ # Using the value_serializer_kwargs
serializer_kwargs={ # Using the serializer_kwargs
"subject": "country",
"schema": country_schema,
},
Expand Down
4 changes: 2 additions & 2 deletions examples/confluent-example/confluent_example/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def produce():
"replicas": 1,
"port": 8080,
},
value_serializer_kwargs={
serializer_kwargs={
"subject": "deployment",
"schema": deployment_schema,
},
Expand All @@ -28,7 +28,7 @@ async def produce():
value={
"country": "Netherlands",
},
value_serializer_kwargs={
serializer_kwargs={
"subject": "country",
"schema": country_schema,
},
Expand Down
6 changes: 3 additions & 3 deletions examples/confluent-example/confluent_example/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

class AvroSerializer(AsyncAvroMessageSerializer):
async def serialize(
self, payload: Dict, value_serializer_kwargs: Dict[str, str], **kwargs
self, payload: Dict, serializer_kwargs: Dict[str, str], **kwargs
) -> bytes:
"""
Serialize a payload to avro-binary using the schema and the subject
"""
schema = value_serializer_kwargs["schema"]
subject = value_serializer_kwargs["subject"]
schema = serializer_kwargs["schema"]
subject = serializer_kwargs["subject"]
event = await self.encode_record_with_schema(subject, schema, payload)

return event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@

stream_engine = create_engine(
title="my-stream-engine",
value_serializer=serializers.AvroSerializer(client),
value_deserializer=serializers.AvroDeserializer(client),
serializer=serializers.AvroSerializer(client),
deserializer=serializers.AvroDeserializer(client),
)
6 changes: 3 additions & 3 deletions examples/dataclasses-avroschema-example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,20 @@ Then, we inject the `serializers` in the `engine` and `streams` and we are ready
# app.py
stream_engine = create_engine(
title="my-stream-engine",
value_serializer=serializers.AvroSerializer(),
serializer=serializers.AvroSerializer(),
)


@stream_engine.stream(
user_topic, value_deserializer=serializers.AvroDeserializer(model=User)
user_topic, deserializer=serializers.AvroDeserializer(model=User)
)
async def user_stream(stream: Stream):
async for cr in stream:
print(f"Event consumed on topic {user_topic}. The user is {cr.value}")


@stream_engine.stream(
address_topic, value_deserializer=serializers.AvroDeserializer(model=Address)
address_topic, deserializer=serializers.AvroDeserializer(model=Address)
)
async def address_stream(stream: Stream):
async for cr in stream:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,18 @@

stream_engine = create_engine(
title="my-stream-engine",
value_serializer=serializers.AvroSerializer(),
serializer=serializers.AvroSerializer(),
)


@stream_engine.stream(
user_topic, value_deserializer=serializers.AvroDeserializer(model=User)
)
@stream_engine.stream(user_topic, deserializer=serializers.AvroDeserializer(model=User))
async def user_stream(stream: Stream):
async for cr in stream:
print(f"Event consumed on topic {user_topic}. The user is {cr.value}")


@stream_engine.stream(
address_topic, value_deserializer=serializers.AvroDeserializer(model=Address)
address_topic, deserializer=serializers.AvroDeserializer(model=Address)
)
async def address_stream(stream: Stream):
async for cr in stream:
Expand Down
6 changes: 3 additions & 3 deletions examples/json_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async def serialize(
self,
payload: Any,
headers: Optional[Headers] = None,
value_serializer_kwargs: Optional[Dict] = None,
serializer_kwargs: Optional[Dict] = None,
) -> bytes:
"""
Serialize a payload to json
Expand All @@ -33,8 +33,8 @@ async def deserialize(

stream_engine = create_engine(
title="my-stream-engine",
value_serializer=JsonSerializer(),
value_deserializer=JsonDeserializer(),
serializer=JsonSerializer(),
deserializer=JsonDeserializer(),
)

data = {"message": "Hello world!"}
Expand Down
10 changes: 5 additions & 5 deletions kstreams/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
from .clients import Consumer, ConsumerType, Producer, ProducerType
from .engine import StreamEngine
from .prometheus.monitor import PrometheusMonitor
from .serializers import ValueDeserializer, ValueSerializer
from .serializers import Deserializer, Serializer


def create_engine(
title: Optional[str] = None,
backend: Optional[Kafka] = None,
consumer_class: Type[ConsumerType] = Consumer,
producer_class: Type[ProducerType] = Producer,
value_serializer: Optional[ValueSerializer] = None,
value_deserializer: Optional[ValueDeserializer] = None,
serializer: Optional[Serializer] = None,
deserializer: Optional[Deserializer] = None,
monitor: Optional[PrometheusMonitor] = None,
) -> StreamEngine:

Expand All @@ -28,7 +28,7 @@ def create_engine(
title=title,
consumer_class=consumer_class,
producer_class=producer_class,
value_serializer=value_serializer,
value_deserializer=value_deserializer,
serializer=serializer,
deserializer=deserializer,
monitor=monitor,
)
32 changes: 16 additions & 16 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from .exceptions import DuplicateStreamException, EngineNotStartedException
from .prometheus.monitor import PrometheusMonitor
from .prometheus.tasks import metrics_task
from .serializers import ValueDeserializer, ValueSerializer
from .serializers import Deserializer, Serializer
from .singlenton import Singleton
from .streams import Stream
from .types import Headers
Expand All @@ -28,15 +28,15 @@ def __init__(
producer_class: Type[ProducerType],
monitor: PrometheusMonitor,
title: Optional[str] = None,
value_deserializer: Optional[ValueDeserializer] = None,
value_serializer: Optional[ValueSerializer] = None,
deserializer: Optional[Deserializer] = None,
serializer: Optional[Serializer] = None,
) -> None:
self.title = title
self.backend = backend
self.consumer_class = consumer_class
self.producer_class = producer_class
self.value_deserializer = value_deserializer
self.value_serializer = value_serializer
self.deserializer = deserializer
self.serializer = serializer
self.monitor = monitor
self._producer: Optional[Type[ProducerType]] = None
self._streams: List[Stream] = []
Expand All @@ -50,18 +50,18 @@ async def send(
partition: Optional[str] = None,
timestamp_ms: Optional[int] = None,
headers: Optional[Headers] = None,
value_serializer: Optional[ValueSerializer] = None,
value_serializer_kwargs: Optional[Dict] = None,
serializer: Optional[Serializer] = None,
serializer_kwargs: Optional[Dict] = None,
):
if self._producer is None:
raise EngineNotStartedException()

value_serializer = value_serializer or self.value_serializer
serializer = serializer or self.serializer

# serialize only when value and value_serializer are present
if value is not None and value_serializer is not None:
value = await value_serializer.serialize(
value, headers=headers, value_serializer_kwargs=value_serializer_kwargs
# serialize only when value and serializer are present
if value is not None and serializer is not None:
value = await serializer.serialize(
value, headers=headers, serializer_kwargs=serializer_kwargs
)

encoded_headers = None
Expand Down Expand Up @@ -159,7 +159,7 @@ def _create_stream(
*,
func: Callable[[Stream], None],
name: Optional[str] = None,
value_deserializer: Optional[ValueDeserializer] = None,
deserializer: Optional[Deserializer] = None,
**kwargs,
) -> Stream:
"""
Expand All @@ -175,7 +175,7 @@ def _create_stream(
name=name,
config=kwargs,
consumer_class=self.consumer_class,
value_deserializer=value_deserializer or self.value_deserializer,
deserializer=deserializer or self.deserializer,
backend=self.backend,
)
self._streams.append(stream)
Expand All @@ -186,15 +186,15 @@ def stream(
topics: Union[List[str], str],
*,
name: Optional[str] = None,
value_deserializer: Optional[ValueDeserializer] = None,
deserializer: Optional[Deserializer] = None,
**kwargs,
) -> Callable[[Callable[[Stream], None]], Stream]:
def decorator(func: Callable[[Stream], None]) -> Stream:
stream = self._create_stream(
topics,
func=func,
name=name,
value_deserializer=value_deserializer,
deserializer=deserializer,
**kwargs,
)
return stream
Expand Down
8 changes: 4 additions & 4 deletions kstreams/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
from .types import Headers


class ValueDeserializer(Protocol):
class Deserializer(Protocol):
async def deserialize(
self, consumer_record: aiokafka.structs.ConsumerRecord, **kwargs
) -> Any:
"""
This method is used to deserialize the data in a KPN way.
End users can provide their own class overriding this method.
class CustomValueDeserializer(ValueDeserializer):
class CustomDeserializer(Deserializer):
async deserialize(self, consumer_record: aiokafka.structs.ConsumerRecord):
# custom logic and return something like a ConsumerRecord
Expand All @@ -22,12 +22,12 @@ class CustomValueDeserializer(ValueDeserializer):
...


class ValueSerializer(Protocol):
class Serializer(Protocol):
async def serialize(
self,
payload: Any,
headers: Optional[Headers] = None,
value_serializer_kwargs: Optional[Dict] = None,
serializer_kwargs: Optional[Dict] = None,
) -> bytes:
"""
Serialize the payload to bytes
Expand Down
Loading

0 comments on commit 1d30780

Please sign in to comment.