From aa2ea42b1f30a1a08a007b435ad05a9c1f6403b8 Mon Sep 17 00:00:00 2001 From: Marcos Schroh <2828842+marcosschroh@users.noreply.github.com> Date: Wed, 25 Jan 2023 16:31:26 +0100 Subject: [PATCH] fix: TestStreamClient should not wait for topics that are empty (#93) Co-authored-by: Marcos Schroh --- kstreams/test_utils/topics.py | 7 +++++-- tests/test_client.py | 25 +++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/kstreams/test_utils/topics.py b/kstreams/test_utils/topics.py index 58af3d5e..e21bd0d1 100644 --- a/kstreams/test_utils/topics.py +++ b/kstreams/test_utils/topics.py @@ -115,9 +115,12 @@ def all_messages_consumed(cls) -> bool: @classmethod async def join(cls) -> None: """ - Wait for all topic messages to be processed + Wait for all topic messages to be processed. + Only topics that have a consumer assigned should be awaited. """ - await asyncio.gather(*[topic.join() for topic in cls.topics.values()]) + await asyncio.gather( + *[topic.join() for topic in cls.topics.values() if not topic.consumed] + ) @classmethod def clean(cls) -> None: diff --git a/tests/test_client.py b/tests/test_client.py index 09820236..f1a30a6d 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -79,6 +79,31 @@ async def consume(stream): save_to_db.assert_called_once_with(event) +@pytest.mark.asyncio +async def test_only_consume_topics_with_streams(stream_engine: StreamEngine): + """ + The test creates a stream but no events are send to it, + it means that the `TestStreamClient` should not wait for the topic to be consumed + even thought the topic is exist. + """ + client = TestStreamClient(stream_engine) + topic = "local--kstreams" + + @stream_engine.stream("a-different-topic", name="my-stream") + async def consume(stream): + async for cr in stream: + ... + + async with client: + metadata = await client.send( + topic, value=b'{"message": "Hello world!"}', key="1" + ) + + assert metadata.topic == topic + assert metadata.partition == 0 + assert metadata.offset == 1 + + @pytest.mark.asyncio async def test_topic_created(stream_engine: StreamEngine): topic_name = "local--kstreams"