Skip to content

Commit

Permalink
feat: getmany added to Stream. Closes #128
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh committed Dec 13, 2023
1 parent 0b2567d commit 30caae5
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 1 deletion.
12 changes: 12 additions & 0 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Consuming can be done using `kstreams.Stream`. You only need to decorate a `coro
show_root_heading: true
docstring_section_style: table
show_signature_annotations: false
members:
-

## Dependency Injection and typing

Expand Down Expand Up @@ -256,6 +258,16 @@ async with stream as stream_flow: # Use the context manager
If for some reason you interrupt the "async for in" in the async generator, the Stream will stopped consuming events
meaning that the lag will increase.

## Get many

::: kstreams.streams.Stream.getmany
options:
docstring_section_style: table
show_signature_annotations: false

!!! warning
This approach does not works with `Dependency Injection`.

## Rebalance Listener

For some cases you will need a `RebalanceListener` so when partitions are `assigned` or `revoked` to the stream different accions can be performed.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kstreams import stream, ConsumerRecord
from kstreams import ConsumerRecord, stream


@stream("local--hello-world", group_id="example-group")
Expand Down
43 changes: 43 additions & 0 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,49 @@ async def getone(self) -> ConsumerRecord:

return consumer_record

async def getmany(
self,
partitions: Optional[List[TopicPartition]] = None,
timeout_ms: int = 0,
max_records: Optional[int] = None,
) -> Dict[TopicPartition, List[ConsumerRecord]]:
"""
Get a batch of events from the assigned TopicPartition.
Prefetched events are returned in batches by topic-partition.
If messages is not available in the prefetched buffer this method waits
`timeout_ms` milliseconds.
Attributes:
partitions List[TopicPartition] | None: The partitions that need
fetching message. If no one partition specified then all
subscribed partitions will be used
timeout_ms int | None: milliseconds spent waiting if
data is not available in the buffer. If 0, returns immediately
with any records that are available currently in the buffer,
else returns empty. Must not be negative.
max_records int | None: The amount of records to fetch.
if `timeout_ms` was defined and reached and the fetched records
has not reach `max_records` then returns immediately
with any records that are available currently in the buffer
Returns:
Topic to list of records
!!! Example
```python
@stream_engine.stream(topic, ...)
async def stream(stream: Stream):
while True:
data = await stream.getmany(max_records=5)
print(data)
```
"""
partitions = partitions or []
return await self.consumer.getmany( # type: ignore
*partitions, timeout_ms=timeout_ms, max_records=max_records
)

async def start(self) -> Optional[AsyncGenerator]:
if self.running:
return None
Expand Down
16 changes: 16 additions & 0 deletions kstreams/test_utils/test_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,22 @@ async def getone(

return None

async def getmany(
self,
*partitions: List[TopicPartition],
timeout_ms: int = 0,
max_records: int = 1,
) -> Dict[TopicPartition, List[ConsumerRecord]]:
"""
Basic getmany implementation.
`partitions` and `timeout_ms` could be added to the logic
but it seems unnecessary for now; if end users request them we
can add it
"""
return {
self._assignment[0]: [await self.getone() for _ in range(0, max_records)]
}

def seek(self, *, partition: TopicPartition, offset: int) -> None:
# This method intends to have the same signature as aiokafka but with kwargs
# rather than positional arguments
Expand Down
26 changes: 26 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,32 @@ async def consume(stream):
save_to_db.assert_called_once_with(event)


@pytest.mark.asyncio
async def test_stream_consume_many(stream_engine: StreamEngine):
event = b'{"message": "Hello world!"}'
max_records = 2
save_to_db = Mock()

@stream_engine.stream(topic)
async def stream(stream: Stream):
while True:
data = await stream.getmany(max_records=max_records)
save_to_db(
[
cr.value
for consumer_records_list in data.values()
for cr in consumer_records_list
]
)

client = TestStreamClient(stream_engine)
async with client:
await client.send(topic, value=event, key="1")
await client.send(topic, value=event, key="1")

save_to_db.assert_called_once_with([event for _ in range(0, max_records)])


@pytest.mark.asyncio
async def test_stream_consume_events_as_generator(stream_engine: StreamEngine):
topic = "local--hello-kpn"
Expand Down
28 changes: 28 additions & 0 deletions tests/test_stream_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,34 @@ async def getone(_):
assert not stream.running


@pytest.mark.asyncio
async def test_stream_getmany(
stream_engine: StreamEngine, consumer_record_factory: Callable[..., ConsumerRecord]
):
topic_partition_crs = {
TopicPartition(topic="local--hello-kpn", partition=0): [
consumer_record_factory(offset=1),
consumer_record_factory(offset=2),
consumer_record_factory(offset=3),
]
}

save_to_db = mock.Mock()

@stream_engine.stream("local--hello-kpn")
async def stream(stream: Stream):
data = await stream.getmany(max_records=3)
save_to_db(data)

async def getmany(*args, **kwargs):
return topic_partition_crs

with mock.patch.multiple(Consumer, start=mock.DEFAULT, getmany=getmany):
await stream_engine.start_streams()
await asyncio.sleep(0.1)
save_to_db.assert_called_once_with(topic_partition_crs)


@pytest.mark.asyncio
async def test_stream_decorator(stream_engine: StreamEngine):
topic = "local--hello-kpn"
Expand Down

0 comments on commit 30caae5

Please sign in to comment.