diff --git a/tests/test_client.py b/tests/test_client.py index 6d4dc533..7ae77c27 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -91,6 +91,31 @@ async def consume(stream): save_to_db.assert_called_once_with(event) +@pytest.mark.asyncio +async def test_stream_consume_events_as_generator(stream_engine: StreamEngine): + topic = "local--hello-kpn" + event = b'{"message": "Hello world!"}' + client = TestStreamClient(stream_engine) + save_to_db = Mock() + + @stream_engine.stream(topic) + async def my_stream(stream: Stream): + async for cr in stream: + save_to_db(cr.value) + yield cr + + async with client: + await client.send(topic, value=event, key="1") + + async with my_stream as processor: + async for cr in processor: + assert cr.value == event + break + + # check that the event was consumed + save_to_db.assert_called_once_with(event) + + @pytest.mark.asyncio async def test_only_consume_topics_with_streams(stream_engine: StreamEngine): """