From 14a77f4986a2a1e3813b4c0f92e4cf3a1a5280f5 Mon Sep 17 00:00:00 2001 From: Santiago Fraire Date: Wed, 24 Aug 2022 16:36:12 +0200 Subject: [PATCH] fix: decorator wrapper and tests --- kstreams/streams.py | 5 ++++- tests/conftest.py | 6 ------ tests/test_stream_engine.py | 41 +++++++++++++++++++++++++++++++------ 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/kstreams/streams.py b/kstreams/streams.py index c2e7356d..2ea6f780 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -2,6 +2,7 @@ import inspect import logging import uuid +from functools import update_wrapper from typing import ( Any, AsyncGenerator, @@ -156,12 +157,14 @@ def stream( **kwargs, ) -> Callable[[StreamFunc], Stream]: def decorator(func: StreamFunc) -> Stream: - return Stream( + s = Stream( topics=topics, func=func, name=name, deserializer=deserializer, config=kwargs, ) + update_wrapper(s, func) + return s return decorator diff --git a/tests/conftest.py b/tests/conftest.py index fb12a4a3..f049b8fe 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,3 @@ -import asyncio from collections import namedtuple from dataclasses import field from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Tuple @@ -149,11 +148,6 @@ def avro_schema_v1(): return AVRO_SCHEMA_V1 -@pytest.fixture() -def event_loop(): - return asyncio.get_event_loop() - - @pytest.fixture() def consumer_record_factory(): """ diff --git a/tests/test_stream_engine.py b/tests/test_stream_engine.py index d02f612b..1b68e519 100644 --- a/tests/test_stream_engine.py +++ b/tests/test_stream_engine.py @@ -2,10 +2,10 @@ import pytest -from kstreams.backends.kafka import Kafka from kstreams.clients import Consumer, Producer from kstreams.engine import Stream, StreamEngine from kstreams.exceptions import DuplicateStreamException, EngineNotStartedException +from kstreams.streams import stream @pytest.mark.asyncio @@ -46,13 +46,11 @@ class MyDeserializer: async def processor(stream: Stream): pass - backend = Kafka() my_stream = Stream( topics, name="my-stream", func=processor, deserializer=deserializer, - backend=backend, ) assert not stream_engine.get_stream("my-stream") @@ -65,9 +63,7 @@ async def processor(stream: Stream): # can not add a stream with the same name with pytest.raises(DuplicateStreamException): - stream_engine.add_stream( - Stream("a-topic", name="my-stream", func=processor, backend=backend) - ) + stream_engine.add_stream(Stream("a-topic", name="my-stream", func=processor)) @pytest.mark.asyncio @@ -164,3 +160,36 @@ async def getone(_): # Now the stream is stopped because we left the context assert not stream.running + + +@pytest.mark.asyncio +async def test_sream_decorator(stream_engine: StreamEngine): + topic = "local--hello-kpn" + + @stream(topic) + async def streaming_fn(_): + pass + + stream_engine.add_stream(streaming_fn) + + with mock.patch.multiple(Consumer, start=mock.DEFAULT, stop=mock.DEFAULT): + with mock.patch.multiple(Producer, start=mock.DEFAULT, stop=mock.DEFAULT): + await stream_engine.start() + Consumer.start.assert_awaited() + stream_engine._producer.start.assert_awaited() + + await stream_engine.stop() + stream_engine._producer.stop.assert_awaited() + Consumer.stop.assert_awaited() + + +@pytest.mark.asyncio +async def test_sream_decorates_properly(stream_engine: StreamEngine): + topic = "local--hello-kpn" + + @stream(topic) + async def streaming_fn(_): + """text from func""" + + assert streaming_fn.__name__ == "streaming_fn" + assert streaming_fn.__doc__ == "text from func"