diff --git a/kstreams/test_utils/test_clients.py b/kstreams/test_utils/test_clients.py index ffe35ee6..d4d52cda 100644 --- a/kstreams/test_utils/test_clients.py +++ b/kstreams/test_utils/test_clients.py @@ -7,7 +7,7 @@ from kstreams.types import Headers from .structs import RecordMetadata -from .topics import Topic, TopicManager +from .topics import TopicManager class Base: @@ -69,7 +69,6 @@ def __init__(self, group_id: Optional[str] = None, **kwargs) -> None: self.topics: Optional[Tuple[str]] = None self._group_id: Optional[str] = group_id self._assignment: List[TopicPartition] = [] - self._previous_topic: Optional[Topic] = None self.partitions_committed: Dict[TopicPartition, int] = {} # Called to make sure that has all the kafka attributes like _coordinator @@ -177,11 +176,6 @@ def partitions_for_topic(self, topic: str) -> Set: async def getone( self, ) -> Optional[ConsumerRecord]: # The return type must be fixed later on - if self._previous_topic: - # Assumes previous record retrieved through getone was completed - self._previous_topic.task_done() - self._previous_topic = None - topic = None for topic_partition in self._assignment: topic = TopicManager.get(topic_partition.topic) @@ -192,7 +186,6 @@ async def getone( if topic is not None: consumer_record = await topic.get() self._check_partition_assignments(consumer_record) - self._previous_topic = topic return consumer_record return None diff --git a/kstreams/test_utils/topics.py b/kstreams/test_utils/topics.py index 09c6b81c..55b6306b 100644 --- a/kstreams/test_utils/topics.py +++ b/kstreams/test_utils/topics.py @@ -27,7 +27,9 @@ async def put(self, event: ConsumerRecord) -> None: self.total_events += 1 async def get(self) -> ConsumerRecord: - return await self.queue.get() + cr = await self.queue.get() + self.task_done() + return cr def get_nowait(self) -> ConsumerRecord: return self.queue.get_nowait()