Skip to content

Commit

Permalink
fix(streams_utils): properly identify if typed or not
Browse files Browse the repository at this point in the history
  • Loading branch information
woile committed Nov 1, 2024
1 parent a0e3ac8 commit db92d56
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
2 changes: 1 addition & 1 deletion kstreams/streams_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def consume(cr: ConsumerRecord, stream: Stream, send: Send):

first_annotation = params[0].annotation

if first_annotation in (inspect._empty, Stream):
if first_annotation in (inspect._empty, Stream) and len(params) < 2:
# use case 1 NO_TYPING
return UDFType.NO_TYPING
# typing cases
Expand Down
39 changes: 39 additions & 0 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,45 @@ async def stream(cr: ConsumerRecord, send: Send, stream: Stream):
await stream.stop()


@pytest.mark.asyncio
async def test_stream_all_typing_order_in_setup_type(
stream_engine: StreamEngine, consumer_record_factory
):
topic_name = "local--kstreams"
value = b"test"

async def getone(_):
return consumer_record_factory(value=value)

with mock.patch.multiple(
Consumer,
start=mock.DEFAULT,
subscribe=mock.DEFAULT,
getone=getone,
):

@stream_engine.stream(topic_name)
async def stream(stream: Stream, cr: ConsumerRecord, send: Send):
assert cr.value == value
assert isinstance(stream, Stream)
assert send == stream_engine.send
await asyncio.sleep(0.2)

assert stream.consumer is None
assert stream.topics == [topic_name]

with contextlib.suppress(TimeoutErrorException):
# now it is possible to run a stream directly, so we need
# to stop the `forever` consumption
await asyncio.wait_for(stream.start(), timeout=0.1)

assert stream.consumer
Consumer.subscribe.assert_called_once_with(
topics=[topic_name], listener=stream.rebalance_listener, pattern=None
)
await stream.stop()


@pytest.mark.asyncio
async def test_stream_multiple_topics(stream_engine: StreamEngine):
topics = ["local--hello-kpn", "local--hello-kpn-2"]
Expand Down

0 comments on commit db92d56

Please sign in to comment.