Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: getmany added to Stream. Closes #128 #147

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Consuming can be done using `kstreams.Stream`. You only need to decorate a `coro
show_root_heading: true
docstring_section_style: table
show_signature_annotations: false
members:
-

## Dependency Injection and typing

Expand Down Expand Up @@ -256,6 +258,16 @@ async with stream as stream_flow: # Use the context manager
If for some reason you interrupt the "async for in" in the async generator, the Stream will stopped consuming events
meaning that the lag will increase.

## Get many

::: kstreams.streams.Stream.getmany
options:
docstring_section_style: table
show_signature_annotations: false

!!! warning
This approach does not works with `Dependency Injection`.

## Rebalance Listener

For some cases you will need a `RebalanceListener` so when partitions are `assigned` or `revoked` to the stream different accions can be performed.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kstreams import stream, ConsumerRecord
from kstreams import ConsumerRecord, stream


@stream("local--hello-world", group_id="example-group")
Expand Down
43 changes: 43 additions & 0 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,49 @@ async def getone(self) -> ConsumerRecord:

return consumer_record

async def getmany(
self,
partitions: Optional[List[TopicPartition]] = None,
timeout_ms: int = 0,
max_records: Optional[int] = None,
) -> Dict[TopicPartition, List[ConsumerRecord]]:
"""
Get a batch of events from the assigned TopicPartition.

Prefetched events are returned in batches by topic-partition.
If messages is not available in the prefetched buffer this method waits
`timeout_ms` milliseconds.

Attributes:
partitions List[TopicPartition] | None: The partitions that need
fetching message. If no one partition specified then all
subscribed partitions will be used
timeout_ms int | None: milliseconds spent waiting if
marcosschroh marked this conversation as resolved.
Show resolved Hide resolved
data is not available in the buffer. If 0, returns immediately
with any records that are available currently in the buffer,
else returns empty. Must not be negative.
max_records int | None: The amount of records to fetch.
if `timeout_ms` was defined and reached and the fetched records
has not reach `max_records` then returns immediately
with any records that are available currently in the buffer

Returns:
Topic to list of records

!!! Example
```python
@stream_engine.stream(topic, ...)
async def stream(stream: Stream):
while True:
data = await stream.getmany(max_records=5)
print(data)
```
"""
partitions = partitions or []
return await self.consumer.getmany( # type: ignore
*partitions, timeout_ms=timeout_ms, max_records=max_records
)

async def start(self) -> Optional[AsyncGenerator]:
if self.running:
return None
Expand Down
16 changes: 16 additions & 0 deletions kstreams/test_utils/test_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,22 @@ async def getone(

return None

async def getmany(
self,
*partitions: List[TopicPartition],
timeout_ms: int = 0,
max_records: int = 1,
) -> Dict[TopicPartition, List[ConsumerRecord]]:
"""
Basic getmany implementation.
`partitions` and `timeout_ms` could be added to the logic
but it seems unnecessary for now; if end users request them we
can add it
"""
return {
self._assignment[0]: [await self.getone() for _ in range(0, max_records)]
}

def seek(self, *, partition: TopicPartition, offset: int) -> None:
# This method intends to have the same signature as aiokafka but with kwargs
# rather than positional arguments
Expand Down
26 changes: 26 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,32 @@ async def consume(stream):
save_to_db.assert_called_once_with(event)


@pytest.mark.asyncio
async def test_stream_consume_many(stream_engine: StreamEngine):
event = b'{"message": "Hello world!"}'
max_records = 2
save_to_db = Mock()

@stream_engine.stream(topic)
async def stream(stream: Stream):
while True:
data = await stream.getmany(max_records=max_records)
save_to_db(
[
cr.value
for consumer_records_list in data.values()
for cr in consumer_records_list
]
)

client = TestStreamClient(stream_engine)
async with client:
await client.send(topic, value=event, key="1")
await client.send(topic, value=event, key="1")

save_to_db.assert_called_once_with([event for _ in range(0, max_records)])


@pytest.mark.asyncio
async def test_stream_consume_events_as_generator(stream_engine: StreamEngine):
topic = "local--hello-kpn"
Expand Down
28 changes: 28 additions & 0 deletions tests/test_stream_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,34 @@ async def getone(_):
assert not stream.running


@pytest.mark.asyncio
async def test_stream_getmany(
stream_engine: StreamEngine, consumer_record_factory: Callable[..., ConsumerRecord]
):
topic_partition_crs = {
TopicPartition(topic="local--hello-kpn", partition=0): [
consumer_record_factory(offset=1),
consumer_record_factory(offset=2),
consumer_record_factory(offset=3),
]
}

save_to_db = mock.Mock()

@stream_engine.stream("local--hello-kpn")
async def stream(stream: Stream):
data = await stream.getmany(max_records=3)
save_to_db(data)

async def getmany(*args, **kwargs):
return topic_partition_crs

with mock.patch.multiple(Consumer, start=mock.DEFAULT, getmany=getmany):
await stream_engine.start_streams()
await asyncio.sleep(0.1)
save_to_db.assert_called_once_with(topic_partition_crs)


@pytest.mark.asyncio
async def test_stream_decorator(stream_engine: StreamEngine):
topic = "local--hello-kpn"
Expand Down
Loading