Skip to content

Commit

Permalink
feat(TestStreamClient): add ability to create extra topics before the…
Browse files Browse the repository at this point in the history
… test cycle starts
  • Loading branch information
marcosschroh committed Dec 18, 2023
1 parent 3128ffd commit 389c896
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 1 deletion.
97 changes: 97 additions & 0 deletions docs/test_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,103 @@ async def test_event_produced():
for example a `FastAPI` view.
Then you don't want to use `client.send` directly, just called the function that contains `stream_engine.send(...)`

## Defining extra topics

For some uses cases is required to produce an event to a topic (`target topic`) after it was consumed (`source topic`). We are in control of the `source topic`
because it has a `stream` associated with it and we want to consume events from it, however we might not be in control of the `target topic`.

How can we consume an event from the `target topic` which has not a `stream` associated and the topic will be created only when a `send` is reached?
The answer is to pre define the extra topics before the test cycle has started. Let's take a look an example:

Let's imagine that we have the following code:

```python
from kstreams import ConsumerRecord

from .engine import stream_engine


@stream_engine.stream("source-topic", name=name)
async def consume(cr: ConsumerRecord) -> None:
# do something, for example save to db
await save_to_db(cr)

# then produce the event to the `target topic`
await stream_engine.send("target-topic", value=cr.value, key=cr.key, headers=cr.headers)
```

Here we can test two things:

1. Sending an event to the `source-topic` and check that the event has been consumed and saved to the DB
2. Check that the event was send to the `target-topic`

Testing point `1` is straightforward:

```python
import pytest
from kstreams.test_utils import TestStreamClient

from .engine import stream_engine


client = TestStreamClient(stream_engine)
value = b'{"message": "Hello world!"}'
key = "my-key"

async with client:
# produce to the topic that has a stream
await client.send("source-topic", value=value, key=key)

# check that the event was saved to the DB
assert await db.get(...)
```

However to test the point `2` we need more effort as the `TestStreamClient` is not aware of the `target topic` until it reaches the `send` inside the `consume` coroutine.
If we try to get the `target topic` event inside the `async with` context we will have an error:

```python
async with client:
# produce to the topic that has a stream
await client.send("source-topic", value=value, key=key)

...
# Let's check if it was received by the target topic
event = await client.get_event(topic_name="target-topic")


ValueError: You might be trying to get the topic target-topic outside the `client async context` or trying to get an event from an empty topic target-topic. Make sure that the code is inside the async contextand the topic has events.
```

We can solve this with a `delay` (`await asyncio.slpeep(...)`) inside the `async with` context to give time to the `TestStreamClient` to create the topic, however if the buisness logic
inside the `consume` is slow we need to add more delay, then it will become a `race condition`.

To proper solve it, we can specify to the `TestStreamClient` the extra topics that we need during the test cycle.

```python
import pytest
from kstreams.test_utils import TestStreamClient

from .engine import stream_engine


# tell the client to create the extra topics
client = TestStreamClient(stream_engine, topics=["target-topic"])
value = b'{"message": "Hello world!"}'
key = "my-key"

async with client:
# produce to the topic that has a stream
await client.send("source-topic", value=value, key=key)

# check that the event was saved to the DB
assert await db.get(...)

# Let's check if it was received by the target topic
event = await client.get_event(topic_name="target-topic")
assert event.value == value
assert event.key == key
```

## Disabling monitoring during testing

Monitoring streams and producers is vital for streaming application but it requires extra effort. Sometimes during testing,
Expand Down
15 changes: 14 additions & 1 deletion kstreams/test_utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,17 @@ class TestStreamClient:
__test__ = False

def __init__(
self, stream_engine: StreamEngine, monitoring_enabled: bool = True
self,
stream_engine: StreamEngine,
monitoring_enabled: bool = True,
topics: Optional[List[str]] = None,
) -> None:
self.stream_engine = stream_engine

# Extra topics' names defined by the end user which must be created
# before the cycle test starts
self.extra_user_topics = topics

# store the user clients to restore them later
self.monitor = stream_engine.monitor
self.producer_class = self.stream_engine.producer_class
Expand All @@ -63,8 +70,14 @@ def mock_streams(self) -> None:
def setup_mocks(self) -> None:
self.mock_streams()

def create_extra_topics(self) -> None:
if self.extra_user_topics is not None:
for topic_name in self.extra_user_topics:
TopicManager.create(topic_name)

async def start(self) -> None:
self.setup_mocks()
self.create_extra_topics()
await self.stream_engine.start()

async def stop(self) -> None:
Expand Down
29 changes: 29 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,35 @@ async def test_get_event_outside_context(stream_engine: StreamEngine):
) == str(exc.value)


@pytest.mark.asyncio
async def test_client_create_extra_user_topics(stream_engine: StreamEngine):
"""
Test that the topics defined by the end user are created before
the test cycle has started.
Note: the topics to be created should not have a `stream` associated,
otherwise it would not make any sense.
"""
value = b'{"message": "Hello world!"}'
key = "my-key"
extra_topic = "local--kstreams-statistics-consumer"

@stream_engine.stream(topic, name="my-stream")
async def consume(cr: ConsumerRecord):
# produce event to a topic that has not a stream associated
await client.send(extra_topic, value=cr.value, key=cr.key)

client = TestStreamClient(stream_engine, topics=[extra_topic])
async with client:
# produce to the topic that has a stream. Then, inside the stream
await client.send(topic, value=value, key=key)

# get the event from a topic that has not a `stream`
event = await client.get_event(topic_name=extra_topic)
assert event.value == value
assert event.key == key


@pytest.mark.asyncio
async def test_clean_up_events(stream_engine: StreamEngine):
topic_name = "local--kstreams-clean-up"
Expand Down

0 comments on commit 389c896

Please sign in to comment.