diff --git a/docs/serialization.md b/docs/serialization.md index 2359ddb0..a8b8951d 100644 --- a/docs/serialization.md +++ b/docs/serialization.md @@ -6,7 +6,7 @@ from typing import Any, Dict, Optional, Protocol import aiokafka -class ValueDeserializer(Protocol): +class Deserializer(Protocol): async def deserialize( self, consumer_record: aiokafka.structs.ConsumerRecord, **kwargs ) -> Any: @@ -15,7 +15,7 @@ class ValueDeserializer(Protocol): End users can provide their own class overriding this method. If the engine was created with a schema_store_client, it will be available. - class CustomValueDeserializer(ValueDeserializer): + class CustomDeserializer(Deserializer): async deserialize(self, consumer_record: aiokafka.structs.ConsumerRecord): # custom logic and return something like a ConsumerRecord @@ -24,12 +24,12 @@ class ValueDeserializer(Protocol): ... -class ValueSerializer(Protocol): +class Serializer(Protocol): async def serialize( self, payload: Any, headers: Optional[Dict[str, str]] = None, - value_serializer_kwargs: Optional[Dict] = None, + serializer_kwargs: Optional[Dict] = None, ) -> bytes: """ Serialize the payload to bytes @@ -46,14 +46,14 @@ You can write custom `Serializers` and `Deserializers`. There are 2 ways of usin ```python stream_engine = create_engine( title="my-stream-engine", - value_serializer=MyValueSerializer(), - value_deserializer=MyValueDeserializer(), + serializer=MySerializer(), + deserializer=MyDeserializer(), ) ``` ```python -@stream_engine.stream(topic, value_deserializer=MyDeserializer()) +@stream_engine.stream(topic, deserializer=MyDeserializer()) async def hello_stream(stream: Stream): async for event in stream: save_to_db(event) @@ -65,6 +65,6 @@ await stream_engine.send( value={"message": "test"} headers={"content-type": consts.APPLICATION_JSON,} key="1", - value_serializer=MySerializer(), + serializer=MySerializer(), ) ``` diff --git a/docs/stream.md b/docs/stream.md index a7cb8d6f..610df4fc 100644 --- a/docs/stream.md +++ b/docs/stream.md @@ -37,7 +37,7 @@ from kstreams import create_engine, Stream stream_engine = create_engine(title="my-stream-engine") -class MyValueDeserializer: +class MyDeserializer: async def deserialize(self, consumer_record: structs.ConsumerRecord, **kwargs): return consumer_record.value.decode() @@ -52,7 +52,7 @@ stream = Stream( "local--kstreams", name="my-stream" func=stream, # coroutine or async generator - value_deserializer=MyValueDeserializer(), + deserializer=MyDeserializer(), ) # add the stream to the engine stream_engine.add_stream(stream) diff --git a/examples/confluent-example/README.md b/examples/confluent-example/README.md index 3a3e9810..2ddfc81b 100644 --- a/examples/confluent-example/README.md +++ b/examples/confluent-example/README.md @@ -52,7 +52,7 @@ COUNTRY_AVRO_SCHEMA = { We need to define custom `serializers` so the `engine` and `streams` will handle the payload properly. In this case we use `AsyncAvroMessageSerializer` that already contains the methods to `encode/decode` events. -Because to `serialize` events we need to know the `subject` and the `schema`, we make use of `value_serializer_kwargs` +Because to `serialize` events we need to know the `subject` and the `schema`, we make use of `serializer_kwargs` to provide them. ```python @@ -62,12 +62,12 @@ from schema_registry.serializers import AsyncAvroMessageSerializer class AvroSerializer(AsyncAvroMessageSerializer): - async def serialize(self, payload: Dict, value_serializer_kwargs: Dict[str, str], **kwargs) -> bytes: + async def serialize(self, payload: Dict, serializer_kwargs: Dict[str, str], **kwargs) -> bytes: """ Serialize a payload to avro-binary using the schema and the subject """ - schema = value_serializer_kwargs["schema"] # GET THE SCHEMA - subject = value_serializer_kwargs["subject"] # GET THE SUBJECT + schema = serializer_kwargs["schema"] # GET THE SCHEMA + subject = serializer_kwargs["subject"] # GET THE SUBJECT event = await self.encode_record_with_schema(subject, schema, payload) return event @@ -99,8 +99,8 @@ client = AsyncSchemaRegistryClient("http://localhost:8081") stream_engine = create_engine( title="my-stream-engine", - value_serializer=serializers.AvroSerializer(client), - value_deserializer=serializers.AvroDeserializer(client) + serializer=serializers.AvroSerializer(client), + deserializer=serializers.AvroDeserializer(client) ) ``` @@ -120,7 +120,7 @@ await stream_engine.send( "replicas": 1, "port": 8080, }, - value_serializer_kwargs={ # Using the value_serializer_kwargs + serializer_kwargs={ # Using the serializer_kwargs "subject": "deployment", "schema": deployment_schema, }, @@ -132,7 +132,7 @@ await stream_engine.send( value={ "country": "Netherlands", }, - value_serializer_kwargs={ # Using the value_serializer_kwargs + serializer_kwargs={ # Using the serializer_kwargs "subject": "country", "schema": country_schema, }, diff --git a/examples/confluent-example/confluent_example/app.py b/examples/confluent-example/confluent_example/app.py index 10ef891d..698242e2 100644 --- a/examples/confluent-example/confluent_example/app.py +++ b/examples/confluent-example/confluent_example/app.py @@ -16,7 +16,7 @@ async def produce(): "replicas": 1, "port": 8080, }, - value_serializer_kwargs={ + serializer_kwargs={ "subject": "deployment", "schema": deployment_schema, }, @@ -28,7 +28,7 @@ async def produce(): value={ "country": "Netherlands", }, - value_serializer_kwargs={ + serializer_kwargs={ "subject": "country", "schema": country_schema, }, diff --git a/examples/confluent-example/confluent_example/serializers.py b/examples/confluent-example/confluent_example/serializers.py index 80a95211..46348919 100644 --- a/examples/confluent-example/confluent_example/serializers.py +++ b/examples/confluent-example/confluent_example/serializers.py @@ -6,13 +6,13 @@ class AvroSerializer(AsyncAvroMessageSerializer): async def serialize( - self, payload: Dict, value_serializer_kwargs: Dict[str, str], **kwargs + self, payload: Dict, serializer_kwargs: Dict[str, str], **kwargs ) -> bytes: """ Serialize a payload to avro-binary using the schema and the subject """ - schema = value_serializer_kwargs["schema"] - subject = value_serializer_kwargs["subject"] + schema = serializer_kwargs["schema"] + subject = serializer_kwargs["subject"] event = await self.encode_record_with_schema(subject, schema, payload) return event diff --git a/examples/confluent-example/confluent_example/streaming/engine.py b/examples/confluent-example/confluent_example/streaming/engine.py index a3f08134..e782a731 100644 --- a/examples/confluent-example/confluent_example/streaming/engine.py +++ b/examples/confluent-example/confluent_example/streaming/engine.py @@ -7,6 +7,6 @@ stream_engine = create_engine( title="my-stream-engine", - value_serializer=serializers.AvroSerializer(client), - value_deserializer=serializers.AvroDeserializer(client), + serializer=serializers.AvroSerializer(client), + deserializer=serializers.AvroDeserializer(client), ) diff --git a/examples/dataclasses-avroschema-example/README.md b/examples/dataclasses-avroschema-example/README.md index 3d851af8..bf19c567 100644 --- a/examples/dataclasses-avroschema-example/README.md +++ b/examples/dataclasses-avroschema-example/README.md @@ -84,12 +84,12 @@ Then, we inject the `serializers` in the `engine` and `streams` and we are ready # app.py stream_engine = create_engine( title="my-stream-engine", - value_serializer=serializers.AvroSerializer(), + serializer=serializers.AvroSerializer(), ) @stream_engine.stream( - user_topic, value_deserializer=serializers.AvroDeserializer(model=User) + user_topic, deserializer=serializers.AvroDeserializer(model=User) ) async def user_stream(stream: Stream): async for cr in stream: @@ -97,7 +97,7 @@ async def user_stream(stream: Stream): @stream_engine.stream( - address_topic, value_deserializer=serializers.AvroDeserializer(model=Address) + address_topic, deserializer=serializers.AvroDeserializer(model=Address) ) async def address_stream(stream: Stream): async for cr in stream: diff --git a/examples/dataclasses-avroschema-example/dataclasses_avroschema_example/app.py b/examples/dataclasses-avroschema-example/dataclasses_avroschema_example/app.py index f8d1877f..0f1135b3 100755 --- a/examples/dataclasses-avroschema-example/dataclasses_avroschema_example/app.py +++ b/examples/dataclasses-avroschema-example/dataclasses_avroschema_example/app.py @@ -10,20 +10,18 @@ stream_engine = create_engine( title="my-stream-engine", - value_serializer=serializers.AvroSerializer(), + serializer=serializers.AvroSerializer(), ) -@stream_engine.stream( - user_topic, value_deserializer=serializers.AvroDeserializer(model=User) -) +@stream_engine.stream(user_topic, deserializer=serializers.AvroDeserializer(model=User)) async def user_stream(stream: Stream): async for cr in stream: print(f"Event consumed on topic {user_topic}. The user is {cr.value}") @stream_engine.stream( - address_topic, value_deserializer=serializers.AvroDeserializer(model=Address) + address_topic, deserializer=serializers.AvroDeserializer(model=Address) ) async def address_stream(stream: Stream): async for cr in stream: diff --git a/examples/json_serialization.py b/examples/json_serialization.py index 1b57dedc..0a60462a 100644 --- a/examples/json_serialization.py +++ b/examples/json_serialization.py @@ -13,7 +13,7 @@ async def serialize( self, payload: Any, headers: Optional[Headers] = None, - value_serializer_kwargs: Optional[Dict] = None, + serializer_kwargs: Optional[Dict] = None, ) -> bytes: """ Serialize a payload to json @@ -33,8 +33,8 @@ async def deserialize( stream_engine = create_engine( title="my-stream-engine", - value_serializer=JsonSerializer(), - value_deserializer=JsonDeserializer(), + serializer=JsonSerializer(), + deserializer=JsonDeserializer(), ) data = {"message": "Hello world!"} diff --git a/kstreams/create.py b/kstreams/create.py index 60584515..f88eeaaf 100644 --- a/kstreams/create.py +++ b/kstreams/create.py @@ -4,7 +4,7 @@ from .clients import Consumer, ConsumerType, Producer, ProducerType from .engine import StreamEngine from .prometheus.monitor import PrometheusMonitor -from .serializers import ValueDeserializer, ValueSerializer +from .serializers import Deserializer, Serializer def create_engine( @@ -12,8 +12,8 @@ def create_engine( backend: Optional[Kafka] = None, consumer_class: Type[ConsumerType] = Consumer, producer_class: Type[ProducerType] = Producer, - value_serializer: Optional[ValueSerializer] = None, - value_deserializer: Optional[ValueDeserializer] = None, + serializer: Optional[Serializer] = None, + deserializer: Optional[Deserializer] = None, monitor: Optional[PrometheusMonitor] = None, ) -> StreamEngine: @@ -28,7 +28,7 @@ def create_engine( title=title, consumer_class=consumer_class, producer_class=producer_class, - value_serializer=value_serializer, - value_deserializer=value_deserializer, + serializer=serializer, + deserializer=deserializer, monitor=monitor, ) diff --git a/kstreams/engine.py b/kstreams/engine.py index 872b4178..0b59ee26 100644 --- a/kstreams/engine.py +++ b/kstreams/engine.py @@ -10,7 +10,7 @@ from .exceptions import DuplicateStreamException, EngineNotStartedException from .prometheus.monitor import PrometheusMonitor from .prometheus.tasks import metrics_task -from .serializers import ValueDeserializer, ValueSerializer +from .serializers import Deserializer, Serializer from .singlenton import Singleton from .streams import Stream from .types import Headers @@ -28,15 +28,15 @@ def __init__( producer_class: Type[ProducerType], monitor: PrometheusMonitor, title: Optional[str] = None, - value_deserializer: Optional[ValueDeserializer] = None, - value_serializer: Optional[ValueSerializer] = None, + deserializer: Optional[Deserializer] = None, + serializer: Optional[Serializer] = None, ) -> None: self.title = title self.backend = backend self.consumer_class = consumer_class self.producer_class = producer_class - self.value_deserializer = value_deserializer - self.value_serializer = value_serializer + self.deserializer = deserializer + self.serializer = serializer self.monitor = monitor self._producer: Optional[Type[ProducerType]] = None self._streams: List[Stream] = [] @@ -50,18 +50,18 @@ async def send( partition: Optional[str] = None, timestamp_ms: Optional[int] = None, headers: Optional[Headers] = None, - value_serializer: Optional[ValueSerializer] = None, - value_serializer_kwargs: Optional[Dict] = None, + serializer: Optional[Serializer] = None, + serializer_kwargs: Optional[Dict] = None, ): if self._producer is None: raise EngineNotStartedException() - value_serializer = value_serializer or self.value_serializer + serializer = serializer or self.serializer - # serialize only when value and value_serializer are present - if value is not None and value_serializer is not None: - value = await value_serializer.serialize( - value, headers=headers, value_serializer_kwargs=value_serializer_kwargs + # serialize only when value and serializer are present + if value is not None and serializer is not None: + value = await serializer.serialize( + value, headers=headers, serializer_kwargs=serializer_kwargs ) encoded_headers = None @@ -159,7 +159,7 @@ def _create_stream( *, func: Callable[[Stream], None], name: Optional[str] = None, - value_deserializer: Optional[ValueDeserializer] = None, + deserializer: Optional[Deserializer] = None, **kwargs, ) -> Stream: """ @@ -175,7 +175,7 @@ def _create_stream( name=name, config=kwargs, consumer_class=self.consumer_class, - value_deserializer=value_deserializer or self.value_deserializer, + deserializer=deserializer or self.deserializer, backend=self.backend, ) self._streams.append(stream) @@ -186,7 +186,7 @@ def stream( topics: Union[List[str], str], *, name: Optional[str] = None, - value_deserializer: Optional[ValueDeserializer] = None, + deserializer: Optional[Deserializer] = None, **kwargs, ) -> Callable[[Callable[[Stream], None]], Stream]: def decorator(func: Callable[[Stream], None]) -> Stream: @@ -194,7 +194,7 @@ def decorator(func: Callable[[Stream], None]) -> Stream: topics, func=func, name=name, - value_deserializer=value_deserializer, + deserializer=deserializer, **kwargs, ) return stream diff --git a/kstreams/serializers.py b/kstreams/serializers.py index d9838ac9..5e055121 100644 --- a/kstreams/serializers.py +++ b/kstreams/serializers.py @@ -5,7 +5,7 @@ from .types import Headers -class ValueDeserializer(Protocol): +class Deserializer(Protocol): async def deserialize( self, consumer_record: aiokafka.structs.ConsumerRecord, **kwargs ) -> Any: @@ -13,7 +13,7 @@ async def deserialize( This method is used to deserialize the data in a KPN way. End users can provide their own class overriding this method. - class CustomValueDeserializer(ValueDeserializer): + class CustomDeserializer(Deserializer): async deserialize(self, consumer_record: aiokafka.structs.ConsumerRecord): # custom logic and return something like a ConsumerRecord @@ -22,12 +22,12 @@ class CustomValueDeserializer(ValueDeserializer): ... -class ValueSerializer(Protocol): +class Serializer(Protocol): async def serialize( self, payload: Any, headers: Optional[Headers] = None, - value_serializer_kwargs: Optional[Dict] = None, + serializer_kwargs: Optional[Dict] = None, ) -> bytes: """ Serialize the payload to bytes diff --git a/kstreams/streams.py b/kstreams/streams.py index c1c8c6d9..171760e9 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -8,7 +8,7 @@ from .backends.kafka import Kafka from .clients import Consumer, ConsumerType -from .serializers import ValueDeserializer +from .serializers import Deserializer logger = logging.getLogger(__name__) @@ -24,7 +24,7 @@ def __init__( name: Optional[str] = None, config: Optional[Dict] = None, model: Optional[Any] = None, - value_deserializer: Optional[ValueDeserializer] = None, + deserializer: Optional[Deserializer] = None, ) -> None: self.func = func self.backend = backend @@ -34,7 +34,7 @@ def __init__( self._consumer_task: Optional[asyncio.Task] = None self.name = name or str(uuid.uuid4()) self.model = model - self.value_deserializer = value_deserializer + self.deserializer = deserializer self.running = False # aiokafka expects topic names as arguments, meaning that @@ -122,12 +122,9 @@ async def __anext__(self) -> structs.ConsumerRecord: await self.consumer.getone() # type: ignore ) - # deserialize only when value and value_deserializer are present - if ( - consumer_record.value is not None - and self.value_deserializer is not None - ): - return await self.value_deserializer.deserialize(consumer_record) + # deserialize only when value and deserializer are present + if consumer_record.value is not None and self.deserializer is not None: + return await self.deserializer.deserialize(consumer_record) return consumer_record except errors.ConsumerStoppedError: diff --git a/kstreams/test_utils/test_clients.py b/kstreams/test_utils/test_clients.py index 907bda7f..a7312f9a 100644 --- a/kstreams/test_utils/test_clients.py +++ b/kstreams/test_utils/test_clients.py @@ -4,7 +4,7 @@ from aiokafka.structs import ConsumerRecord from kstreams.clients import Consumer, Producer -from kstreams.serializers import ValueSerializer +from kstreams.serializers import Serializer from kstreams.types import Headers from .structs import RecordMetadata, TopicPartition @@ -25,8 +25,8 @@ async def send( partition: int = 1, timestamp_ms: Optional[float] = None, headers: Optional[Headers] = None, - value_serializer: Optional[ValueSerializer] = None, - value_serializer_kwargs: Optional[Dict] = None, + serializer: Optional[Serializer] = None, + serializer_kwargs: Optional[Dict] = None, ) -> Coroutine: topic = TopicManager.get_or_create(topic_name) timestamp_ms = timestamp_ms or datetime.now().timestamp() diff --git a/kstreams/test_utils/test_utils.py b/kstreams/test_utils/test_utils.py index 5b5ef883..63f1b10a 100644 --- a/kstreams/test_utils/test_utils.py +++ b/kstreams/test_utils/test_utils.py @@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional, Type from kstreams.create import create_engine -from kstreams.serializers import ValueSerializer +from kstreams.serializers import Serializer from kstreams.streams import Stream from kstreams.types import Headers @@ -67,8 +67,8 @@ async def send( partition: Optional[str] = None, timestamp_ms: Optional[int] = None, headers: Optional[Headers] = None, - value_serializer: Optional[ValueSerializer] = None, - value_serializer_kwargs: Optional[Dict] = None, + serializer: Optional[Serializer] = None, + serializer_kwargs: Optional[Dict] = None, ) -> RecordMetadata: return await self.stream_engine.send( topic, @@ -77,6 +77,6 @@ async def send( partition=partition, timestamp_ms=timestamp_ms, headers=headers, - value_serializer=value_serializer, - value_serializer_kwargs=value_serializer_kwargs, + serializer=serializer, + serializer_kwargs=serializer_kwargs, ) diff --git a/tests/test_serialization.py b/tests/test_serialization.py index eb9aa05a..5320ea0c 100644 --- a/tests/test_serialization.py +++ b/tests/test_serialization.py @@ -17,7 +17,7 @@ async def serialize( self, payload: Any, headers: Optional[Headers] = None, - value_serializer_kwargs: Optional[Dict] = None, + serializer_kwargs: Optional[Dict] = None, ) -> bytes: """ Serialize paylod to json @@ -50,14 +50,14 @@ async def async_func(): with mock.patch.multiple(Producer, start=mock.DEFAULT, send=send): await stream_engine.start() - value_serializer = MySerializer() - serialized_data = await value_serializer.serialize(value) + serializer = MySerializer() + serialized_data = await serializer.serialize(value) metadata = await stream_engine.send( topic, value=value, headers=headers, - value_serializer=value_serializer, + serializer=serializer, ) assert metadata @@ -83,7 +83,7 @@ async def test_custom_deserialization( save_to_db = mock.Mock() - @stream_engine.stream(topic, value_deserializer=MyDeserializer()) + @stream_engine.stream(topic, deserializer=MyDeserializer()) async def hello_stream(stream: Stream): async for event in stream: save_to_db(event) @@ -95,7 +95,7 @@ async def hello_stream(stream: Stream): value=payload, headers=headers, key="1", - value_serializer=MySerializer(), + serializer=MySerializer(), ) # The payload as been encoded with json, diff --git a/tests/test_stream_engine.py b/tests/test_stream_engine.py index cce9d267..d02f612b 100644 --- a/tests/test_stream_engine.py +++ b/tests/test_stream_engine.py @@ -38,10 +38,10 @@ async def stream(_): async def test_add_stream_as_instance(stream_engine: StreamEngine): topics = ["local--hello-kpn", "local--hello-kpn-2"] - class MyValueDeserializer: + class MyDeserializer: ... - value_deserializer = MyValueDeserializer() + deserializer = MyDeserializer() async def processor(stream: Stream): pass @@ -51,7 +51,7 @@ async def processor(stream: Stream): topics, name="my-stream", func=processor, - value_deserializer=value_deserializer, + deserializer=deserializer, backend=backend, ) @@ -61,7 +61,7 @@ async def processor(stream: Stream): stream_instance = stream_engine.get_stream("my-stream") assert stream_instance == my_stream assert stream_instance.topics == topics - assert stream_instance.value_deserializer == value_deserializer + assert stream_instance.deserializer == deserializer # can not add a stream with the same name with pytest.raises(DuplicateStreamException):