From 887650009261c09aca2161de55694acfa81934fd Mon Sep 17 00:00:00 2001 From: Marcos Schroh <2828842+marcosschroh@users.noreply.github.com> Date: Mon, 18 Dec 2023 16:03:50 +0100 Subject: [PATCH] feat(TestStreamClient): add ability to create extra topics before the test cycle starts (#149) --- docs/test_client.md | 97 +++++++++++++++++++++++++++++++ kstreams/test_utils/test_utils.py | 15 ++++- tests/test_client.py | 29 +++++++++ 3 files changed, 140 insertions(+), 1 deletion(-) diff --git a/docs/test_client.md b/docs/test_client.md index a175a163..16901c9e 100644 --- a/docs/test_client.md +++ b/docs/test_client.md @@ -256,6 +256,103 @@ async def test_event_produced(): for example a `FastAPI` view. Then you don't want to use `client.send` directly, just called the function that contains `stream_engine.send(...)` +## Defining extra topics + +For some uses cases is required to produce an event to a topic (`target topic`) after it was consumed (`source topic`). We are in control of the `source topic` +because it has a `stream` associated with it and we want to consume events from it, however we might not be in control of the `target topic`. + +How can we consume an event from the `target topic` which has not a `stream` associated and the topic will be created only when a `send` is reached? +The answer is to pre define the extra topics before the test cycle has started. Let's take a look an example: + +Let's imagine that we have the following code: + +```python +from kstreams import ConsumerRecord + +from .engine import stream_engine + + +@stream_engine.stream("source-topic", name=name) +async def consume(cr: ConsumerRecord) -> None: + # do something, for example save to db + await save_to_db(cr) + + # then produce the event to the `target topic` + await stream_engine.send("target-topic", value=cr.value, key=cr.key, headers=cr.headers) +``` + +Here we can test two things: + +1. Sending an event to the `source-topic` and check that the event has been consumed and saved to the DB +2. Check that the event was send to the `target-topic` + +Testing point `1` is straightforward: + +```python +import pytest +from kstreams.test_utils import TestStreamClient + +from .engine import stream_engine + + +client = TestStreamClient(stream_engine) +value = b'{"message": "Hello world!"}' +key = "my-key" + +async with client: + # produce to the topic that has a stream + await client.send("source-topic", value=value, key=key) + + # check that the event was saved to the DB + assert await db.get(...) +``` + +However to test the point `2` we need more effort as the `TestStreamClient` is not aware of the `target topic` until it reaches the `send` inside the `consume` coroutine. +If we try to get the `target topic` event inside the `async with` context we will have an error: + +```python +async with client: + # produce to the topic that has a stream + await client.send("source-topic", value=value, key=key) + + ... + # Let's check if it was received by the target topic + event = await client.get_event(topic_name="target-topic") + + +ValueError: You might be trying to get the topic target-topic outside the `client async context` or trying to get an event from an empty topic target-topic. Make sure that the code is inside the async contextand the topic has events. +``` + +We can solve this with a `delay` (`await asyncio.sleep(...)`) inside the `async with` context to give time to the `TestStreamClient` to create the topic, however if the buisness logic +inside the `consume` is slow we need to add more delay, then it will become a `race condition`. + +To proper solve it, we can specify to the `TestStreamClient` the extra topics that we need during the test cycle. + +```python +import pytest +from kstreams.test_utils import TestStreamClient + +from .engine import stream_engine + + +# tell the client to create the extra topics +client = TestStreamClient(stream_engine, topics=["target-topic"]) +value = b'{"message": "Hello world!"}' +key = "my-key" + +async with client: + # produce to the topic that has a stream + await client.send("source-topic", value=value, key=key) + + # check that the event was saved to the DB + assert await db.get(...) + + # Let's check if it was received by the target topic + event = await client.get_event(topic_name="target-topic") + assert event.value == value + assert event.key == key +``` + ## Disabling monitoring during testing Monitoring streams and producers is vital for streaming application but it requires extra effort. Sometimes during testing, diff --git a/kstreams/test_utils/test_utils.py b/kstreams/test_utils/test_utils.py index 248a6de3..f5fde440 100644 --- a/kstreams/test_utils/test_utils.py +++ b/kstreams/test_utils/test_utils.py @@ -40,10 +40,17 @@ class TestStreamClient: __test__ = False def __init__( - self, stream_engine: StreamEngine, monitoring_enabled: bool = True + self, + stream_engine: StreamEngine, + monitoring_enabled: bool = True, + topics: Optional[List[str]] = None, ) -> None: self.stream_engine = stream_engine + # Extra topics' names defined by the end user which must be created + # before the cycle test starts + self.extra_user_topics = topics + # store the user clients to restore them later self.monitor = stream_engine.monitor self.producer_class = self.stream_engine.producer_class @@ -63,8 +70,14 @@ def mock_streams(self) -> None: def setup_mocks(self) -> None: self.mock_streams() + def create_extra_topics(self) -> None: + if self.extra_user_topics is not None: + for topic_name in self.extra_user_topics: + TopicManager.create(topic_name) + async def start(self) -> None: self.setup_mocks() + self.create_extra_topics() await self.stream_engine.start() async def stop(self) -> None: diff --git a/tests/test_client.py b/tests/test_client.py index 49f2dbb8..4f15ff14 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -266,6 +266,35 @@ async def test_get_event_outside_context(stream_engine: StreamEngine): ) == str(exc.value) +@pytest.mark.asyncio +async def test_client_create_extra_user_topics(stream_engine: StreamEngine): + """ + Test that the topics defined by the end user are created before + the test cycle has started. + + Note: the topics to be created should not have a `stream` associated, + otherwise it would not make any sense. + """ + value = b'{"message": "Hello world!"}' + key = "my-key" + extra_topic = "local--kstreams-statistics-consumer" + + @stream_engine.stream(topic, name="my-stream") + async def consume(cr: ConsumerRecord): + # produce event to a topic that has not a stream associated + await client.send(extra_topic, value=cr.value, key=cr.key) + + client = TestStreamClient(stream_engine, topics=[extra_topic]) + async with client: + # produce to the topic that has a stream. Then, inside the stream + await client.send(topic, value=value, key=key) + + # get the event from a topic that has not a `stream` + event = await client.get_event(topic_name=extra_topic) + assert event.value == value + assert event.key == key + + @pytest.mark.asyncio async def test_clean_up_events(stream_engine: StreamEngine): topic_name = "local--kstreams-clean-up"