Skip to content

Commit

Permalink
feat: get topics using the TestStreamClient
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh committed Aug 25, 2022
1 parent bd943fa commit 79621ce
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 18 deletions.
79 changes: 77 additions & 2 deletions docs/test_client.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Testing

To test your `streams` or perform `e2e` tests you can make use of the `test_utils.TestStreamClient`. The `TestStreamClient` also can send events so you won't need to mock the `producer`.
To test `streams` and `producers` or perform `e2e` tests you can make use of the `test_utils.TestStreamClient`. The `TestStreamClient` can send events so you won't need to mock the `producer`.

## Using `TestStreamClient`

Expand Down Expand Up @@ -65,7 +65,6 @@ async def shutdown(loop):

def main():
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)

```

Then you could have a `test_stream.py` file to test the code, you need to instanciate the `TestStreamClient` with the `engine`:
Expand Down Expand Up @@ -99,8 +98,15 @@ async def test_streams_consume_events():
on_consume.assert_called()
```

!!! Note
Notice that the `produce` coroutine is not used to send events in the test case.
The `TestStreamClient.send` coroutine is used instead.
This allows to test `streams` without having producer code in your application

### E2E test

In the previous code example the application produces to and consumes from the same topic, then `TestStreamClient.send` is not needed because the `engine.send` is producing. For those situation you can just use your `producer` code and check that certain code was called.

```python
# test_example.py
import pytest
Expand All @@ -123,3 +129,72 @@ async def test_e2e_example():
on_produce.call_count == 5
on_consume.call_count == 5
```

## Producer only

In some scenarios, your application will only produce events and other application/s will consume it, but you want to make sure that
the event was procuced in a proper way and the `topic` contains that `event`.

```python
# producer_example.py
from kstreams import create_engine
import aiorun
import asyncio

stream_engine = create_engine(title="my-stream-engine")


async def produce(topic: str, value: bytes, key: str):
# This could be a complicated function or something like a FastAPI view
await stream_engine.send(topic, value=value, key=key)


async def start():
await stream_engine.start()
await produce()


async def shutdown(loop):
await stream_engine.stop()


def main():
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
```

Then you could have a `test_producer_example.py` file to test the code:

```python
# test_producer_example.py
import pytest
from kstreams.test_utils import TestStreamClient

from producer_example import stream_engine, produce

client = TestStreamClient(stream_engine)


@pytest.mark.asyncio
async def test_event_produced():
topic_name = "local--kstreams"
value = b'{"message": "Hello world!"}'
key = "1"

async with client:
await produce(topic=topic_name ,value=value, key=key) # use the produce code to send events

# check that the event was placed in a topic in a proper way
topic = client.get_topic(topic_name)

# get the event
consumer_record = await topic.get()

assert consumer_record.value == value
assert consumer_record.key == key
```

!!! Note
Even thought the previous example is using a simple `produce` function,
it shows what to do when the `procuder code` is encapsulated in other functions,
for example a `FastAPI` view.
Then you don't want to use `client.send` directly, just called the function that contains `stream_engine.send(...)`
29 changes: 19 additions & 10 deletions kstreams/test_utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from .structs import RecordMetadata
from .test_clients import TestConsumer, TestProducer
from .topics import TopicManager
from .topics import Topic, TopicManager


class TestStreamClient:
Expand All @@ -36,18 +36,12 @@ def setup_mocks(self) -> None:
self.mock_producer()
self.mock_streams()

async def __aenter__(self) -> "TestStreamClient":
async def start(self) -> None:
self.setup_mocks()
await self.stream_engine.start()
self.stream_engine._stop_metrics_task()
return self

async def __aexit__(
self,
exc_t: Optional[Type[BaseException]],
exc_v: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
async def stop(self) -> None:
# If there are streams, we must wait until all the messages are consumed
if self.stream_engine._streams:
while not TopicManager.all_messages_consumed():
Expand All @@ -59,10 +53,22 @@ async def __aexit__(
self.stream_engine.producer_class = self.producer_class
self.stream_engine.consumer_class = self.consumer_class

async def __aenter__(self) -> "TestStreamClient":
await self.start()
return self

async def __aexit__(
self,
exc_t: Optional[Type[BaseException]],
exc_v: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
await self.stop()

async def send(
self,
topic: str,
value: Optional[Dict] = None,
value: Any = None,
key: Optional[Any] = None,
partition: Optional[str] = None,
timestamp_ms: Optional[int] = None,
Expand All @@ -80,3 +86,6 @@ async def send(
serializer=serializer,
serializer_kwargs=serializer_kwargs,
)

def get_topic(self, topic_name: str) -> Topic:
return TopicManager.get(topic_name)
8 changes: 5 additions & 3 deletions kstreams/test_utils/topics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
from dataclasses import dataclass
from typing import Any, ClassVar, Dict, Optional
from typing import ClassVar, Dict, Optional

from kstreams import ConsumerRecord

from . import test_clients

Expand All @@ -13,11 +15,11 @@ class Topic:
# for now we assumed that 1 streams is connected to 1 topic
consumer: Optional["test_clients.Consumer"] = None

async def put(self, event: Any) -> None:
async def put(self, event: ConsumerRecord) -> None:
await self.queue.put(event)
self.total_messages += 1

async def get(self) -> Any:
async def get(self) -> ConsumerRecord:
return await self.queue.get()

def is_empty(self) -> bool:
Expand Down
12 changes: 9 additions & 3 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,18 @@ async def consume(stream):

@pytest.mark.asyncio
async def test_topic_created(stream_engine: StreamEngine):
topic = "local--kstreams"
topic_name = "local--kstreams"
value = b'{"message": "Hello world!"}'
key = "1"
client = TestStreamClient(stream_engine)
async with client:
await client.send(topic, value=b'{"message": "Hello world!"}', key="1")
await client.send(topic_name, value=value, key=key)

assert TopicManager.get(topic)
# check that the event was sent to a Topic
topic = client.get_topic(topic_name)
consumer_record = await topic.get()
assert consumer_record.value == value
assert consumer_record.key == key


@pytest.mark.asyncio
Expand Down

0 comments on commit 79621ce

Please sign in to comment.