Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(TestStreamClient): add ability to create extra topics before the… #149

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions docs/test_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,103 @@ async def test_event_produced():
for example a `FastAPI` view.
Then you don't want to use `client.send` directly, just called the function that contains `stream_engine.send(...)`

## Defining extra topics

For some uses cases is required to produce an event to a topic (`target topic`) after it was consumed (`source topic`). We are in control of the `source topic`
because it has a `stream` associated with it and we want to consume events from it, however we might not be in control of the `target topic`.

How can we consume an event from the `target topic` which has not a `stream` associated and the topic will be created only when a `send` is reached?
The answer is to pre define the extra topics before the test cycle has started. Let's take a look an example:

Let's imagine that we have the following code:

```python
from kstreams import ConsumerRecord

from .engine import stream_engine


@stream_engine.stream("source-topic", name=name)
async def consume(cr: ConsumerRecord) -> None:
# do something, for example save to db
await save_to_db(cr)

# then produce the event to the `target topic`
await stream_engine.send("target-topic", value=cr.value, key=cr.key, headers=cr.headers)
```

Here we can test two things:

1. Sending an event to the `source-topic` and check that the event has been consumed and saved to the DB
2. Check that the event was send to the `target-topic`

Testing point `1` is straightforward:

```python
import pytest
from kstreams.test_utils import TestStreamClient

from .engine import stream_engine


client = TestStreamClient(stream_engine)
value = b'{"message": "Hello world!"}'
key = "my-key"

async with client:
# produce to the topic that has a stream
await client.send("source-topic", value=value, key=key)

# check that the event was saved to the DB
assert await db.get(...)
```

However to test the point `2` we need more effort as the `TestStreamClient` is not aware of the `target topic` until it reaches the `send` inside the `consume` coroutine.
If we try to get the `target topic` event inside the `async with` context we will have an error:

```python
async with client:
# produce to the topic that has a stream
await client.send("source-topic", value=value, key=key)

...
# Let's check if it was received by the target topic
event = await client.get_event(topic_name="target-topic")


ValueError: You might be trying to get the topic target-topic outside the `client async context` or trying to get an event from an empty topic target-topic. Make sure that the code is inside the async contextand the topic has events.
```

We can solve this with a `delay` (`await asyncio.sleep(...)`) inside the `async with` context to give time to the `TestStreamClient` to create the topic, however if the buisness logic
inside the `consume` is slow we need to add more delay, then it will become a `race condition`.

To proper solve it, we can specify to the `TestStreamClient` the extra topics that we need during the test cycle.

```python
import pytest
from kstreams.test_utils import TestStreamClient

from .engine import stream_engine


# tell the client to create the extra topics
client = TestStreamClient(stream_engine, topics=["target-topic"])
value = b'{"message": "Hello world!"}'
key = "my-key"

async with client:
# produce to the topic that has a stream
await client.send("source-topic", value=value, key=key)

# check that the event was saved to the DB
assert await db.get(...)

# Let's check if it was received by the target topic
event = await client.get_event(topic_name="target-topic")
assert event.value == value
assert event.key == key
```

## Disabling monitoring during testing

Monitoring streams and producers is vital for streaming application but it requires extra effort. Sometimes during testing,
Expand Down
15 changes: 14 additions & 1 deletion kstreams/test_utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,17 @@ class TestStreamClient:
__test__ = False

def __init__(
self, stream_engine: StreamEngine, monitoring_enabled: bool = True
self,
stream_engine: StreamEngine,
monitoring_enabled: bool = True,
topics: Optional[List[str]] = None,
) -> None:
self.stream_engine = stream_engine

# Extra topics' names defined by the end user which must be created
# before the cycle test starts
self.extra_user_topics = topics

# store the user clients to restore them later
self.monitor = stream_engine.monitor
self.producer_class = self.stream_engine.producer_class
Expand All @@ -63,8 +70,14 @@ def mock_streams(self) -> None:
def setup_mocks(self) -> None:
self.mock_streams()

def create_extra_topics(self) -> None:
if self.extra_user_topics is not None:
for topic_name in self.extra_user_topics:
TopicManager.create(topic_name)

async def start(self) -> None:
self.setup_mocks()
self.create_extra_topics()
await self.stream_engine.start()

async def stop(self) -> None:
Expand Down
29 changes: 29 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,35 @@ async def test_get_event_outside_context(stream_engine: StreamEngine):
) == str(exc.value)


@pytest.mark.asyncio
async def test_client_create_extra_user_topics(stream_engine: StreamEngine):
"""
Test that the topics defined by the end user are created before
the test cycle has started.

Note: the topics to be created should not have a `stream` associated,
otherwise it would not make any sense.
"""
value = b'{"message": "Hello world!"}'
key = "my-key"
extra_topic = "local--kstreams-statistics-consumer"

@stream_engine.stream(topic, name="my-stream")
async def consume(cr: ConsumerRecord):
# produce event to a topic that has not a stream associated
await client.send(extra_topic, value=cr.value, key=cr.key)

client = TestStreamClient(stream_engine, topics=[extra_topic])
async with client:
# produce to the topic that has a stream. Then, inside the stream
await client.send(topic, value=value, key=key)

# get the event from a topic that has not a `stream`
event = await client.get_event(topic_name=extra_topic)
assert event.value == value
assert event.key == key


@pytest.mark.asyncio
async def test_clean_up_events(stream_engine: StreamEngine):
topic_name = "local--kstreams-clean-up"
Expand Down
Loading