Skip to content

Commit

Permalink
feat: RebalanceListener interface added so a rebalance listener can b…
Browse files Browse the repository at this point in the history
…e set to Streams (#100)

Co-authored-by: Marcos Schroh <[email protected]>
  • Loading branch information
marcosschroh and marcosschroh authored Feb 27, 2023
1 parent ddf0b26 commit d92423d
Show file tree
Hide file tree
Showing 14 changed files with 586 additions and 20 deletions.
17 changes: 17 additions & 0 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,20 @@ async with stream as stream_flow: # Use the context manager
!!! note
If for some reason you interrupt the "async for in" in the async generator, the Stream will stopped consuming events
meaning that the lag will increase.

## Rebalance Listener

For some cases you will need a `RebalanceListener` so when partitions are `assigned` or `revoked` to the stream different accions can be performed.

### Use cases

- Cleanup or custom state save on the start of a rebalance operation
- Saving offsets in a custom store when a partition is `revoked`
- Load a state or cache warmup on completion of a successful partition re-assignment.

::: kstreams.RebalanceListener
options:
show_root_heading: true
docstring_section_style: table
show_signature_annotations: false
show_bases: false
55 changes: 55 additions & 0 deletions examples/stream-with-rebalance-listener/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Stream with a Rebalance Listener

Simple consumer example with `kstreams` that has a custom `RebalanceListener`

## Requirements

python 3.8+, poetry, docker-compose

## Installation

```bash
poetry install
```

## Usage

1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start`
2. Inside this folder execute `poetry run app`
3. From `kstreams` project root, you can use the `./scripts/cluster/events/send` to send events to the kafka cluster. A prompt will open. Enter messages to send. The command is:

```bash
./scripts/cluster/events/send "local--hello-world"
```

Then, on the consume side, you should see something similar to the following logs:

```bash
❯ me@me-pc stream-with-rebalance-listener % poetry run app

Consumer started

Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError

Partition revoked set() for stream <kstreams.streams.Stream object at 0x10a060650>
Partition assigned {TopicPartition(topic='local--hello-world', partition=0)} for stream <kstreams.streams.Stream object at 0x10a060650>


Event consumed: headers: (), payload: ConsumerRecord(topic='local--hello-world', partition=0, offset=0, timestamp=1660733921761, timestamp_type=0, key=None, value=b'boo', checksum=None, serialized_key_size=-1, serialized_value_size=3, headers=())
```

Then if you run the same program in a different terminal you should see that the callbacks are called again:

```bash
Partition revoked frozenset({TopicPartition(topic='local--hello-world', partition=0)}) for stream <kstreams.streams.Stream object at 0x10a060650>
Partition assigned set() for stream <kstreams.streams.Stream object at 0x10a060650>
```

## Note

If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
`kstreams` is pointing to the parent folder. You will have to set the latest version.
264 changes: 264 additions & 0 deletions examples/stream-with-rebalance-listener/poetry.lock

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions examples/stream-with-rebalance-listener/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[tool.poetry]
name = "stream-with-rebalance-listener"
version = "0.1.0"
description = ""
authors = ["Marcos Schroh <[email protected]>"]
readme = "README.md"
packages = [{include = "stream_with_rebalance_listener"}]

[tool.poetry.dependencies]
python = "^3.8"
aiorun = "^2022.4.1"
kstreams = { path = "../../.", develop = true }

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.poetry.scripts]
app = "stream_with_rebalance_listener.app:main"
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import signal
from typing import List

import aiorun

import kstreams

stream_engine = kstreams.create_engine(title="my-stream-engine")


class MyListener(kstreams.RebalanceListener):
async def on_partitions_revoked(
self, revoked: List[kstreams.TopicPartition]
) -> None:
print(f"Partition revoked {revoked} for stream {self.stream}")

async def on_partitions_assigned(
self, assigned: List[kstreams.TopicPartition]
) -> None:
print(f"Partition assigned {assigned} for stream {self.stream}")


@stream_engine.stream(
topics=["local--hello-world"],
group_id="example-group",
rebalance_listener=MyListener(),
)
async def consume(stream):
print("Consumer started")
try:
async for cr in stream:
print(f"Event consumed: headers: {cr.headers}, payload: {cr}")
finally:
# Terminate the program if something fails. (aiorun will cath this signal and properly shutdown this program.)
signal.alarm(signal.SIGTERM)


async def start():
await stream_engine.start()


async def shutdown(loop):
await stream_engine.stop()


def main():
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
2 changes: 2 additions & 0 deletions kstreams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .clients import Consumer, ConsumerType, Producer, ProducerType
from .create import StreamEngine, create_engine
from .prometheus.monitor import PrometheusMonitor, PrometheusMonitorType
from .rebalance_listener import RebalanceListener
from .streams import Stream, stream
from .structs import TopicPartitionOffset

Expand All @@ -15,6 +16,7 @@
"create_engine",
"PrometheusMonitor",
"PrometheusMonitorType",
"RebalanceListener",
"Stream",
"stream",
"ConsumerRecord",
Expand Down
3 changes: 3 additions & 0 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .exceptions import DuplicateStreamException, EngineNotStartedException
from .prometheus.monitor import PrometheusMonitor
from .prometheus.tasks import metrics_task
from .rebalance_listener import RebalanceListener
from .serializers import Deserializer, Serializer
from .streams import Stream, StreamFunc, stream
from .types import Headers
Expand Down Expand Up @@ -215,6 +216,7 @@ def stream(
name: Optional[str] = None,
deserializer: Optional[Deserializer] = None,
initial_offsets: Optional[List[TopicPartitionOffset]] = None,
rebalance_listener: Optional[RebalanceListener] = None,
**kwargs,
) -> Callable[[StreamFunc], Stream]:
def decorator(func: StreamFunc) -> Stream:
Expand All @@ -223,6 +225,7 @@ def decorator(func: StreamFunc) -> Stream:
name=name,
deserializer=deserializer,
initial_offsets=initial_offsets,
rebalance_listener=rebalance_listener,
**kwargs,
)(func)
self.add_stream(stream_from_func)
Expand Down
91 changes: 91 additions & 0 deletions kstreams/rebalance_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from typing import List

from aiokafka.abc import ConsumerRebalanceListener

from kstreams import TopicPartition


# Can not use a Protocol here because aiokafka forces to have a concrete instance
# that inherits from ConsumerRebalanceListener, if we use a protocol we will
# have to force the end users to import the class and inherit from it,
# then we will mix protocols and inheritance
class RebalanceListener(ConsumerRebalanceListener):
"""
A callback interface that the user can implement to trigger custom actions
when the set of partitions are assigned or revoked to the `Stream`.
!!! Example
```python
from kstreams import RebalanceListener, TopicPartition
from .resource import stream_engine
class MyRebalanceListener(RebalanceListener):
async def on_partitions_revoked(
self, revoked: List[TopicPartition]
) -> None:
# Do something with the revoked partitions
# or with the Stream
print(self.stream)
async def on_partitions_assigned(
self, assigned: List[TopicPartition]
) -> None:
# Do something with the assigned partitions
# or with the Stream
print(self.stream)
@stream_engine.stream(topic, rebalance_listener=MyRebalanceListener())
async def my_stream(stream: Stream):
async for event in stream:
...
```
"""

def __init__(self) -> None:
self.stream = None

async def on_partitions_revoked(self, revoked: List[TopicPartition]) -> None:
"""
Coroutine to be called *before* a rebalance operation starts and
*after* the consumer stops fetching data.
If you are using manual commit you have to commit all consumed offsets
here, to avoid duplicate message delivery after rebalance is finished.
Use cases:
- cleanup or custom state save on the start of a rebalance operation
- saving offsets in a custom store
Attributes:
revoked List[TopicPartitions]: Partitions that were assigned
to the consumer on the last rebalance
!!! note
The `Stream` is available using `self.stream`
"""
... # pragma: no cover

async def on_partitions_assigned(self, assigned: List[TopicPartition]) -> None:
"""
Coroutine to be called *after* partition re-assignment completes
and *before* the consumer starts fetching data again.
It is guaranteed that all the processes in a consumer group will
execute their `on_partitions_revoked` callback before any instance
executes its `on_partitions_assigned` callback.
Use cases:
- Load a state or cache warmup on completion of a successful
partition re-assignment.
Attributes:
assigned List[TopicPartition]: Partitions assigned to the
consumer (may include partitions that were previously assigned)
!!! note
The `Stream` is available using `self.stream`
"""
... # pragma: no cover
30 changes: 23 additions & 7 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from .backends.kafka import Kafka
from .clients import Consumer, ConsumerType
from .rebalance_listener import RebalanceListener
from .serializers import Deserializer

logger = logging.getLogger(__name__)
Expand All @@ -45,6 +46,8 @@ class Stream:
when an event is consumed
initial_offsets List[kstreams.TopicPartitionOffset]: List of
TopicPartitionOffset that will `seek` the initial offsets to
rebalance_listener kstreams.rebalance_listener.RebalanceListener: Listener
callbacks when partition are assigned or revoked
!!! Example
```python title="Usage"
Expand Down Expand Up @@ -91,6 +94,7 @@ def __init__(
model: Optional[Any] = None,
deserializer: Optional[Deserializer] = None,
initial_offsets: Optional[List[TopicPartitionOffset]] = None,
rebalance_listener: Optional[RebalanceListener] = None,
) -> None:
self.func = func
self.backend = backend
Expand All @@ -103,6 +107,7 @@ def __init__(
self.deserializer = deserializer
self.running = False
self.initial_offsets = initial_offsets
self.rebalance_listener = rebalance_listener

# aiokafka expects topic names as arguments, meaning that
# can receive N topics -> N arguments,
Expand All @@ -113,7 +118,7 @@ def _create_consumer(self) -> Type[ConsumerType]:
if self.backend is None:
raise BackendNotSet("A backend has not been set for this stream")
config = {**self.backend.dict(), **self.config}
return self.consumer_class(*self.topics, **config)
return self.consumer_class(**config)

async def stop(self) -> None:
if not self.running:
Expand All @@ -126,6 +131,19 @@ async def stop(self) -> None:
if self._consumer_task is not None:
self._consumer_task.cancel()

async def subscribe(self) -> None:
# Always create a consumer on stream.start
self.consumer = self._create_consumer()
await self.consumer.start()
self.running = True

# set the stream to the listener to it will be available
# when the callbacks are called
if self.rebalance_listener is not None:
self.rebalance_listener.stream = self # type: ignore

self.consumer.subscribe(topics=self.topics, listener=self.rebalance_listener)

async def start(self) -> Optional[AsyncGenerator]:
if self.running:
return None
Expand All @@ -141,14 +159,10 @@ async def func_wrapper(func):
f"CRASHED Stream!!! Task {self._consumer_task} \n\n {e}"
)

# Always create a consumer on stream.start
self.consumer = self._create_consumer()
func = self.func(self)
await self.consumer.start()
self.running = True

await self.subscribe()
self._seek_to_initial_offsets()

func = self.func(self)
if inspect.isasyncgen(func):
return func
else:
Expand Down Expand Up @@ -237,6 +251,7 @@ def stream(
name: Optional[str] = None,
deserializer: Optional[Deserializer] = None,
initial_offsets: Optional[List[TopicPartitionOffset]] = None,
rebalance_listener: Optional[RebalanceListener] = None,
**kwargs,
) -> Callable[[StreamFunc], Stream]:
def decorator(func: StreamFunc) -> Stream:
Expand All @@ -246,6 +261,7 @@ def decorator(func: StreamFunc) -> Stream:
name=name,
deserializer=deserializer,
initial_offsets=initial_offsets,
rebalance_listener=rebalance_listener,
config=kwargs,
)
update_wrapper(s, func)
Expand Down
Loading

0 comments on commit d92423d

Please sign in to comment.