Skip to content

Commit

Permalink
fix: deserializer deprecation warning added. Examples with deserializ…
Browse files Browse the repository at this point in the history
…ation updated to middlewares
  • Loading branch information
marcosschroh committed Feb 12, 2024
1 parent 2a0732c commit a072563
Show file tree
Hide file tree
Showing 13 changed files with 338 additions and 307 deletions.
16 changes: 16 additions & 0 deletions docs/middleware.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,19 @@ async def processor(cr: ConsumerRecord):

!!! note
In the example we can see that only if there is not an `error` the event is saved to `elastic`

## Deserialization

To `deserialize` bytes into a different structure like `dict` middlewares are the preferred way to it. Examples:

::: examples.dataclasses-avroschema-example.dataclasses_avroschema_example.middlewares.AvroDeserializerMiddleware
options:
show_bases: false
members:
-

::: examples.confluent-example.confluent_example.middlewares.ConfluentMiddlewareDeserializer
options:
show_bases: false
members:
-
33 changes: 26 additions & 7 deletions docs/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,41 @@ In order to keep your code pythonic, we provide a mechanism to serialize/deseria
these bytes, into something more useful.
This way, you can work with other data structures, like a `dict` or `dataclasses`.

Sometimes it is easier to work with a `dict` in your app, give it to `kstreams`, and let it transform it into `bytes` to be delivered to Kafka. For this situations, you need to implement `kstreams.serializers.Serializer`.

The other situation is when you consume from Kafka (or other brokers). Instead of dealing with `bytes`,
you may want to receive in your function the `dict` ready to be used. For those cases, implement `kstreams.serializers.Deserializer`
Sometimes it is easier to work with a `dict` in your app, give it to `kstreams`, and let it transform it into `bytes` to be delivered to Kafka. For this situation, you need to implement `kstreams.serializers.Serializer`.

::: kstreams.serializers.Serializer
options:
show_root_heading: true
docstring_section_style: table
show_bases: false


The other situation is when you consume from Kafka (or other brokers). Instead of dealing with `bytes`,
you may want to receive in your function the `dict` ready to be used. For those cases, we need to use [middleware](https://kpn.github.io/kstreams/middleware/). For example, we can implement a `JsonMiddleware`:

```python
from kstreams import middleware, ConsumerRecord


class JsonDeserializerMiddleware(middleware.BaseMiddleware):
async def __call__(self, cr: ConsumerRecord):
if cr.value is not None:
data = json.loads(cr.value.decode())
cr.value = data
return await self.next_call(cr)
```

It is also possble to use `kstreams.serializers.Deserializer` for deserialization, but this will be deprecated

::: kstreams.serializers.Deserializer
options:
show_root_heading: true
docstring_section_style: table
show_bases: false

!!! warning
`kstreams.serializers.Deserializer` will be deprecated, use [middlewares](https://kpn.github.io/kstreams/middleware/) instead

## Usage

Once you have written your serializer or deserializer, there are 2 ways of using them, in a
Expand All @@ -43,14 +61,16 @@ By doing this all the streams will use these serializers by default.
stream_engine = create_engine(
title="my-stream-engine",
serializer=JsonSerializer(),
deserializer=JsonDeserializer(),
)
```

### Initilize `streams` with a `deserializer` and produce events with `serializers`

```python
@stream_engine.stream(topic, deserializer=JsonDeserializer())
from kstreams import middleware, ConsumerRecord


@stream_engine.stream(topic, middlewares=[middleware.Middleware(JsonDeserializerMiddleware)])
async def hello_stream(cr: ConsumerRecord):
# remember event.value is now a dict
print(cr.value["message"])
Expand All @@ -63,6 +83,5 @@ await stream_engine.send(
value={"message": "test"}
headers={"content-type": consts.APPLICATION_JSON,}
key="1",
serializer=JsonSerializer(),
)
```
33 changes: 33 additions & 0 deletions examples/confluent-example/confluent_example/middlewares.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Dict, Optional

from schema_registry.client import AsyncSchemaRegistryClient, schema
from schema_registry.serializers import AsyncAvroMessageSerializer

from kstreams import ConsumerRecord, middleware


class ConfluentMiddlewareDeserializer(
middleware.BaseMiddleware, AsyncAvroMessageSerializer
):
def __init__(
self,
*,
schema_registry_client: AsyncSchemaRegistryClient,
reader_schema: Optional[schema.AvroSchema] = None,
return_record_name: bool = False,
**kwargs,
):
super().__init__(**kwargs)
self.schemaregistry_client = schema_registry_client
self.reader_schema = reader_schema
self.return_record_name = return_record_name
self.id_to_decoder_func: Dict = {}
self.id_to_writers: Dict = {}

async def __call__(self, cr: ConsumerRecord):
"""
Deserialize the event to a dict
"""
data = await self.decode_message(cr.value)
cr.value = data
return await self.next_call(cr)
14 changes: 0 additions & 14 deletions examples/confluent-example/confluent_example/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

from schema_registry.serializers import AsyncAvroMessageSerializer

from kstreams import ConsumerRecord


class AvroSerializer(AsyncAvroMessageSerializer):
async def serialize(
Expand All @@ -17,15 +15,3 @@ async def serialize(
event = await self.encode_record_with_schema(subject, schema, payload)

return event


class AvroDeserializer(AsyncAvroMessageSerializer):
async def deserialize(
self, consumer_record: ConsumerRecord, **kwargs
) -> ConsumerRecord:
"""
Deserialize the event to a dict
"""
data = await self.decode_message(consumer_record.value)
consumer_record.value = data
return consumer_record
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from schema_registry.client import AsyncSchemaRegistryClient

from confluent_example import serializers
from confluent_example.serializers import AvroSerializer
from kstreams import create_engine

client = AsyncSchemaRegistryClient("http://localhost:8081")
schema_registry_client = AsyncSchemaRegistryClient("http://localhost:8081")

stream_engine = create_engine(
title="my-stream-engine",
serializer=serializers.AvroSerializer(client),
deserializer=serializers.AvroDeserializer(client),
serializer=AvroSerializer(schema_registry_client),
)
15 changes: 11 additions & 4 deletions examples/confluent-example/confluent_example/streaming/streams.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
from kstreams import ConsumerRecord
from confluent_example.middlewares import ConfluentMiddlewareDeserializer
from kstreams import ConsumerRecord, middleware

from .engine import stream_engine
from .engine import schema_registry_client, stream_engine

deployment_topic = "local--deployment"
country_topic = "local--country"

middlewares = [
middleware.Middleware(
ConfluentMiddlewareDeserializer, schema_registry_client=schema_registry_client
)
]

@stream_engine.stream(deployment_topic)

@stream_engine.stream(deployment_topic, middlewares=middlewares)
async def deployment_stream(cr: ConsumerRecord):
print(f"Event consumed on topic {deployment_topic}. The user is {cr.value}")


@stream_engine.stream(country_topic)
@stream_engine.stream(country_topic, middlewares=middlewares)
async def country_stream(cr: ConsumerRecord):
print(f"Event consumed on topic {country_topic}. The Address is {cr.value}")
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,32 @@

import aiorun

from kstreams import ConsumerRecord, create_engine
from kstreams import ConsumerRecord, create_engine, middleware

from . import serializers
from .middlewares import AvroDeserializerMiddleware
from .models import Address, User
from .serializers import AvroSerializer

user_topic = "local--avro-user"
address_topic = "local--avro-address"

stream_engine = create_engine(
title="my-stream-engine",
serializer=serializers.AvroSerializer(),
serializer=AvroSerializer(),
)


@stream_engine.stream(user_topic, deserializer=serializers.AvroDeserializer(model=User))
@stream_engine.stream(
user_topic,
middlewares=[middleware.Middleware(AvroDeserializerMiddleware, model=User)],
)
async def user_stream(cr: ConsumerRecord):
print(f"Event consumed on topic {user_topic}. The user is {cr.value}")


@stream_engine.stream(
address_topic, deserializer=serializers.AvroDeserializer(model=Address)
address_topic,
middlewares=[middleware.Middleware(AvroDeserializerMiddleware, model=Address)],
)
async def address_stream(cr: ConsumerRecord):
print(f"Event consumed on topic {address_topic}. The Address is {cr.value}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from dataclasses_avroschema import AvroModel

from kstreams import ConsumerRecord, middleware


class AvroDeserializerMiddleware(middleware.BaseMiddleware):
def __init__(self, *, model: AvroModel, **kwargs) -> None:
super().__init__(**kwargs)
self.model = model

async def __call__(self, cr: ConsumerRecord):
"""
Deserialize a payload to an AvroModel
"""
if cr.value is not None:
data = self.model.deserialize(cr.value)
cr.value = data
return await self.next_call(cr)
Original file line number Diff line number Diff line change
@@ -1,27 +1,9 @@
from dataclasses_avroschema import AvroModel

from kstreams import ConsumerRecord


class AvroSerializer:
async def serialize(self, instance: AvroModel, **kwargs) -> bytes:
"""
Serialize an AvroModel to avro-binary
"""
return instance.serialize()


class AvroDeserializer:
def __init__(self, *, model: AvroModel) -> None:
self.model = model

async def deserialize(
self, consumer_record: ConsumerRecord, **kwargs
) -> ConsumerRecord:
"""
Deserialize a payload to an AvroModel
"""
if consumer_record.value is not None:
data = self.model.deserialize(consumer_record.value)
consumer_record.value = data
return consumer_record
Loading

0 comments on commit a072563

Please sign in to comment.