From 85c2d8f21678e119ae84dfac558bbbda44114c08 Mon Sep 17 00:00:00 2001 From: marcosschroh Date: Wed, 22 Nov 2023 14:35:14 +0100 Subject: [PATCH] fix: increase total events also when using sync testing --- kstreams/test_utils/topics.py | 13 +++-- tests/test_client.py | 103 +++++++++++++++++----------------- 2 files changed, 58 insertions(+), 58 deletions(-) diff --git a/kstreams/test_utils/topics.py b/kstreams/test_utils/topics.py index 09c6b81c..e6fe3da8 100644 --- a/kstreams/test_utils/topics.py +++ b/kstreams/test_utils/topics.py @@ -21,10 +21,7 @@ class Topic: async def put(self, event: ConsumerRecord) -> None: await self.queue.put(event) - - # keep track of the amount of events per topic partition - self.total_partition_events[event.partition] += 1 - self.total_events += 1 + self._inc_amount(event) async def get(self) -> ConsumerRecord: return await self.queue.get() @@ -33,7 +30,13 @@ def get_nowait(self) -> ConsumerRecord: return self.queue.get_nowait() def put_nowait(self, *, event: ConsumerRecord) -> None: - return self.queue.put_nowait(event) + self.queue.put_nowait(event) + self._inc_amount(event) + + def _inc_amount(self, event: ConsumerRecord) -> None: + # keep track of the amount of events per topic partition + self.total_partition_events[event.partition] += 1 + self.total_events += 1 def task_done(self) -> None: self.queue.task_done() diff --git a/tests/test_client.py b/tests/test_client.py index 5288dc4c..3b60a9b8 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,4 +1,4 @@ -from unittest.mock import Mock, call, patch +from unittest.mock import Mock, call import pytest @@ -316,56 +316,53 @@ async def test_streams_consume_events_with_initial_offsets(stream_engine: Stream tp2, ) - with patch("kstreams.test_utils.test_clients.TestConsumer.seek") as client_seek: - async with client: - await client.send(topic, value=event1, partition=0) - await client.send(topic, value=event1, partition=0) - await client.send(topic, value=event1, partition=0) - await client.send(topic, value=event2, partition=1) - - async def func_stream(consumer: Stream): - async for cr in consumer: - process(cr.value) - - stream: Stream = Stream( - topics=topic, - consumer_class=TestConsumer, - name="my-stream", - func=func_stream, - initial_offsets=[ - # initial topic offset is -1 - TopicPartitionOffset(topic=topic, partition=0, offset=1), - TopicPartitionOffset(topic=topic, partition=1, offset=0), - TopicPartitionOffset(topic=topic, partition=2, offset=10), - ], - ) - stream_engine.add_stream(stream) - await stream.start() - - # simulate partitions assigned on rebalance - await stream.rebalance_listener.on_partitions_assigned(assigned=assignments) - - assert stream.consumer.assignment() == [tp0, tp1, tp2] - - assert stream.consumer.last_stable_offset(tp0) == 2 - assert stream.consumer.highwater(tp0) == 3 - assert await stream.consumer.position(tp0) == 3 - - assert stream.consumer.last_stable_offset(tp1) == 0 - assert stream.consumer.highwater(tp1) == 1 - assert await stream.consumer.position(tp1) == 1 - - # the position will be 0 as the offset 10 does not exist - assert stream.consumer.last_stable_offset(tp2) == -1 - assert stream.consumer.highwater(tp2) == 0 - assert await stream.consumer.position(tp2) == 0 - - client_seek.assert_has_calls( - [ - call(partition=tp0, offset=1), - call(partition=tp1, offset=0), - call(partition=tp2, offset=10), - ], - any_order=True, - ) + async with client: + await client.send(topic, value=event1, partition=0) + await client.send(topic, value=event1, partition=0) + await client.send(topic, value=event1, partition=0) + await client.send(topic, value=event2, partition=1) + + assert TopicManager.get(name=topic).size() == 4 + + async def func_stream(consumer: Stream): + async for cr in consumer: + process(cr.value) + + stream: Stream = Stream( + topics=topic, + consumer_class=TestConsumer, + name="my-stream", + func=func_stream, + initial_offsets=[ + # initial topic offset is -1 + TopicPartitionOffset(topic=topic, partition=0, offset=1), + TopicPartitionOffset(topic=topic, partition=1, offset=0), + TopicPartitionOffset(topic=topic, partition=2, offset=10), + ], + ) + stream_engine.add_stream(stream) + await stream.start() + + # simulate partitions assigned on rebalance + await stream.rebalance_listener.on_partitions_assigned(assigned=assignments) + + assert stream.consumer.assignment() == [tp0, tp1, tp2] + + assert stream.consumer.last_stable_offset(tp0) == 2 + assert stream.consumer.highwater(tp0) == 3 + assert await stream.consumer.position(tp0) == 3 + + assert stream.consumer.last_stable_offset(tp1) == 0 + assert stream.consumer.highwater(tp1) == 1 + assert await stream.consumer.position(tp1) == 1 + + # the position will be 0 as the offset 10 does not exist + assert stream.consumer.last_stable_offset(tp2) == -1 + assert stream.consumer.highwater(tp2) == 0 + assert await stream.consumer.position(tp2) == 0 + + # We moved to offset 1 on partition 0, then there are + # 3 events in the Queue rather than 4 + assert TopicManager.get(name=topic).size() == 3 + process.assert_has_calls([call(event1), call(event1), call(event2)], any_order=True)