diff --git a/examples/simple-example/simple_example/app.py b/examples/simple-example/simple_example/app.py index ef101e17..56c6458e 100644 --- a/examples/simple-example/simple_example/app.py +++ b/examples/simple-example/simple_example/app.py @@ -24,10 +24,16 @@ class EventStore: def add(self, event: ConsumerRecord) -> None: self.events.append(event) + def remove(self) -> None: + self.events.pop(0) + @property - def total(self): + def total(self) -> int: return len(self.events) + def clean(self) -> None: + self.events = [] + event_store = EventStore() @@ -35,7 +41,11 @@ def total(self): @stream_engine.stream(topic, group_id="example-group") async def consume(cr: ConsumerRecord): logger.info(f"Event consumed: {cr} \n") - event_store.add(cr) + + if cr.value == b"remove-event": + event_store.remove() + else: + event_store.add(cr) await stream_engine.send("local--hello-world", value=cr.value) diff --git a/kstreams/streams.py b/kstreams/streams.py index 1afa96da..256a613c 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -137,11 +137,10 @@ async def stop(self) -> None: def subscribe(self) -> None: if self.consumer is None: - # Always create a consumer on stream.start + # Only create a consumer if it was not previously created self.consumer = self._create_consumer() - self.consumer.subscribe( - topics=self.topics, listener=self.rebalance_listener - ) + + self.consumer.subscribe(topics=self.topics, listener=self.rebalance_listener) async def commit( self, offsets: typing.Optional[typing.Dict[TopicPartition, int]] = None diff --git a/tests/test_client.py b/tests/test_client.py index 7e8f13fd..49a81a73 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -405,6 +405,66 @@ async def test_e2e_example(): assert TopicManager.all_messages_consumed() +@pytest.mark.asyncio +async def test_repeat_e2e_example(): + """ + This test is to show that the same Stream can be tested multiple + times and that all the resources must start from scratch on every unittest: + 1. There must not be events in topics from previous tests + 2. All extra partitions should be removed from Topics + 3. Streams on new test must have only the default partitions (0, 1, 2) + 4. Total events per Topic (asyncio.Queue) must be 0 + """ + simple_example = importlib.import_module( + "examples.simple-example.simple_example.app" + ) + + topic_name = simple_example.topic + client = TestStreamClient(simple_example.stream_engine) + + # From the previous test, we have produced 5 events + # which still they are on memory. This is the application + # logic + assert simple_example.event_store.total == 5 + + # clean the application store + simple_example.event_store.clean() + assert simple_example.event_store.total == 0 + + async with client: + topic = TopicManager.get(topic_name) + assert topic.is_empty() + + # Even Though the topic is empty, the counter might be not + assert topic.total_events == 0 + + # check that all default partitions are empty + assert topic.total_partition_events[0] == -1 + assert topic.total_partition_events[1] == -1 + assert topic.total_partition_events[2] == -1 + + # add 1 event to the store + metadata = await client.send(topic=topic_name, value=b"add-event") + + # give some time to process the event + await asyncio.sleep(0.1) + assert simple_example.event_store.total == 1 + assert metadata.partition == 0 + + # remove 1 event from the store + metadata = await client.send( + topic=topic_name, value=b"remove-event", partition=1 + ) + + # give some time to process the event + await asyncio.sleep(0.1) + assert simple_example.event_store.total == 0 + assert metadata.partition == 1 + + # check that all events has been consumed + assert TopicManager.all_messages_consumed() + + @pytest.mark.asyncio @pytest.mark.parametrize( "monitoring_enabled",