From 30caae5394c3811612b66cbde84a3e6013608510 Mon Sep 17 00:00:00 2001 From: marcosschroh Date: Wed, 13 Dec 2023 17:11:52 +0100 Subject: [PATCH] feat: getmany added to Stream. Closes #128 --- docs/stream.md | 12 ++++++ .../recommended_worker_app/streams.py | 2 +- kstreams/streams.py | 43 +++++++++++++++++++ kstreams/test_utils/test_clients.py | 16 +++++++ tests/test_client.py | 26 +++++++++++ tests/test_stream_engine.py | 28 ++++++++++++ 6 files changed, 126 insertions(+), 1 deletion(-) diff --git a/docs/stream.md b/docs/stream.md index b11f3eb9..6302bb0b 100644 --- a/docs/stream.md +++ b/docs/stream.md @@ -9,6 +9,8 @@ Consuming can be done using `kstreams.Stream`. You only need to decorate a `coro show_root_heading: true docstring_section_style: table show_signature_annotations: false + members: + - ## Dependency Injection and typing @@ -256,6 +258,16 @@ async with stream as stream_flow: # Use the context manager If for some reason you interrupt the "async for in" in the async generator, the Stream will stopped consuming events meaning that the lag will increase. +## Get many + +::: kstreams.streams.Stream.getmany + options: + docstring_section_style: table + show_signature_annotations: false + +!!! warning + This approach does not works with `Dependency Injection`. + ## Rebalance Listener For some cases you will need a `RebalanceListener` so when partitions are `assigned` or `revoked` to the stream different accions can be performed. diff --git a/examples/recommended-worker-app/recommended_worker_app/streams.py b/examples/recommended-worker-app/recommended_worker_app/streams.py index 164da8d7..53b7f977 100644 --- a/examples/recommended-worker-app/recommended_worker_app/streams.py +++ b/examples/recommended-worker-app/recommended_worker_app/streams.py @@ -1,4 +1,4 @@ -from kstreams import stream, ConsumerRecord +from kstreams import ConsumerRecord, stream @stream("local--hello-world", group_id="example-group") diff --git a/kstreams/streams.py b/kstreams/streams.py index 7e18513b..fe765b45 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -160,6 +160,49 @@ async def getone(self) -> ConsumerRecord: return consumer_record + async def getmany( + self, + partitions: Optional[List[TopicPartition]] = None, + timeout_ms: int = 0, + max_records: Optional[int] = None, + ) -> Dict[TopicPartition, List[ConsumerRecord]]: + """ + Get a batch of events from the assigned TopicPartition. + + Prefetched events are returned in batches by topic-partition. + If messages is not available in the prefetched buffer this method waits + `timeout_ms` milliseconds. + + Attributes: + partitions List[TopicPartition] | None: The partitions that need + fetching message. If no one partition specified then all + subscribed partitions will be used + timeout_ms int | None: milliseconds spent waiting if + data is not available in the buffer. If 0, returns immediately + with any records that are available currently in the buffer, + else returns empty. Must not be negative. + max_records int | None: The amount of records to fetch. + if `timeout_ms` was defined and reached and the fetched records + has not reach `max_records` then returns immediately + with any records that are available currently in the buffer + + Returns: + Topic to list of records + + !!! Example + ```python + @stream_engine.stream(topic, ...) + async def stream(stream: Stream): + while True: + data = await stream.getmany(max_records=5) + print(data) + ``` + """ + partitions = partitions or [] + return await self.consumer.getmany( # type: ignore + *partitions, timeout_ms=timeout_ms, max_records=max_records + ) + async def start(self) -> Optional[AsyncGenerator]: if self.running: return None diff --git a/kstreams/test_utils/test_clients.py b/kstreams/test_utils/test_clients.py index 88962efd..d989cfa7 100644 --- a/kstreams/test_utils/test_clients.py +++ b/kstreams/test_utils/test_clients.py @@ -190,6 +190,22 @@ async def getone( return None + async def getmany( + self, + *partitions: List[TopicPartition], + timeout_ms: int = 0, + max_records: int = 1, + ) -> Dict[TopicPartition, List[ConsumerRecord]]: + """ + Basic getmany implementation. + `partitions` and `timeout_ms` could be added to the logic + but it seems unnecessary for now; if end users request them we + can add it + """ + return { + self._assignment[0]: [await self.getone() for _ in range(0, max_records)] + } + def seek(self, *, partition: TopicPartition, offset: int) -> None: # This method intends to have the same signature as aiokafka but with kwargs # rather than positional arguments diff --git a/tests/test_client.py b/tests/test_client.py index 1e657f90..47f7e51b 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -93,6 +93,32 @@ async def consume(stream): save_to_db.assert_called_once_with(event) +@pytest.mark.asyncio +async def test_stream_consume_many(stream_engine: StreamEngine): + event = b'{"message": "Hello world!"}' + max_records = 2 + save_to_db = Mock() + + @stream_engine.stream(topic) + async def stream(stream: Stream): + while True: + data = await stream.getmany(max_records=max_records) + save_to_db( + [ + cr.value + for consumer_records_list in data.values() + for cr in consumer_records_list + ] + ) + + client = TestStreamClient(stream_engine) + async with client: + await client.send(topic, value=event, key="1") + await client.send(topic, value=event, key="1") + + save_to_db.assert_called_once_with([event for _ in range(0, max_records)]) + + @pytest.mark.asyncio async def test_stream_consume_events_as_generator(stream_engine: StreamEngine): topic = "local--hello-kpn" diff --git a/tests/test_stream_engine.py b/tests/test_stream_engine.py index 2a8477e8..51f9177c 100644 --- a/tests/test_stream_engine.py +++ b/tests/test_stream_engine.py @@ -345,6 +345,34 @@ async def getone(_): assert not stream.running +@pytest.mark.asyncio +async def test_stream_getmany( + stream_engine: StreamEngine, consumer_record_factory: Callable[..., ConsumerRecord] +): + topic_partition_crs = { + TopicPartition(topic="local--hello-kpn", partition=0): [ + consumer_record_factory(offset=1), + consumer_record_factory(offset=2), + consumer_record_factory(offset=3), + ] + } + + save_to_db = mock.Mock() + + @stream_engine.stream("local--hello-kpn") + async def stream(stream: Stream): + data = await stream.getmany(max_records=3) + save_to_db(data) + + async def getmany(*args, **kwargs): + return topic_partition_crs + + with mock.patch.multiple(Consumer, start=mock.DEFAULT, getmany=getmany): + await stream_engine.start_streams() + await asyncio.sleep(0.1) + save_to_db.assert_called_once_with(topic_partition_crs) + + @pytest.mark.asyncio async def test_stream_decorator(stream_engine: StreamEngine): topic = "local--hello-kpn"