Skip to content

Commit

Permalink
Merge pull request #53 from kpn/52-add-iter_stream
Browse files Browse the repository at this point in the history
fix: decorator wrapper and tests
  • Loading branch information
woile authored Aug 25, 2022
2 parents 23ad41f + 14a77f4 commit 0c09079
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 13 deletions.
5 changes: 4 additions & 1 deletion kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import inspect
import logging
import uuid
from functools import update_wrapper
from typing import (
Any,
AsyncGenerator,
Expand Down Expand Up @@ -156,12 +157,14 @@ def stream(
**kwargs,
) -> Callable[[StreamFunc], Stream]:
def decorator(func: StreamFunc) -> Stream:
return Stream(
s = Stream(
topics=topics,
func=func,
name=name,
deserializer=deserializer,
config=kwargs,
)
update_wrapper(s, func)
return s

return decorator
6 changes: 0 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
from collections import namedtuple
from dataclasses import field
from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Tuple
Expand Down Expand Up @@ -149,11 +148,6 @@ def avro_schema_v1():
return AVRO_SCHEMA_V1


@pytest.fixture()
def event_loop():
return asyncio.get_event_loop()


@pytest.fixture()
def consumer_record_factory():
"""
Expand Down
41 changes: 35 additions & 6 deletions tests/test_stream_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import pytest

from kstreams.backends.kafka import Kafka
from kstreams.clients import Consumer, Producer
from kstreams.engine import Stream, StreamEngine
from kstreams.exceptions import DuplicateStreamException, EngineNotStartedException
from kstreams.streams import stream


@pytest.mark.asyncio
Expand Down Expand Up @@ -46,13 +46,11 @@ class MyDeserializer:
async def processor(stream: Stream):
pass

backend = Kafka()
my_stream = Stream(
topics,
name="my-stream",
func=processor,
deserializer=deserializer,
backend=backend,
)

assert not stream_engine.get_stream("my-stream")
Expand All @@ -65,9 +63,7 @@ async def processor(stream: Stream):

# can not add a stream with the same name
with pytest.raises(DuplicateStreamException):
stream_engine.add_stream(
Stream("a-topic", name="my-stream", func=processor, backend=backend)
)
stream_engine.add_stream(Stream("a-topic", name="my-stream", func=processor))


@pytest.mark.asyncio
Expand Down Expand Up @@ -164,3 +160,36 @@ async def getone(_):

# Now the stream is stopped because we left the context
assert not stream.running


@pytest.mark.asyncio
async def test_sream_decorator(stream_engine: StreamEngine):
topic = "local--hello-kpn"

@stream(topic)
async def streaming_fn(_):
pass

stream_engine.add_stream(streaming_fn)

with mock.patch.multiple(Consumer, start=mock.DEFAULT, stop=mock.DEFAULT):
with mock.patch.multiple(Producer, start=mock.DEFAULT, stop=mock.DEFAULT):
await stream_engine.start()
Consumer.start.assert_awaited()
stream_engine._producer.start.assert_awaited()

await stream_engine.stop()
stream_engine._producer.stop.assert_awaited()
Consumer.stop.assert_awaited()


@pytest.mark.asyncio
async def test_sream_decorates_properly(stream_engine: StreamEngine):
topic = "local--hello-kpn"

@stream(topic)
async def streaming_fn(_):
"""text from func"""

assert streaming_fn.__name__ == "streaming_fn"
assert streaming_fn.__doc__ == "text from func"

0 comments on commit 0c09079

Please sign in to comment.