Skip to content

Commit

Permalink
feat: KstreamsRebalanceListener added as default rebalance listener (#…
Browse files Browse the repository at this point in the history
…102)

Co-authored-by: Marcos Schroh <[email protected]>
  • Loading branch information
marcosschroh and marcosschroh authored Feb 28, 2023
1 parent 0497af8 commit f86612c
Show file tree
Hide file tree
Showing 12 changed files with 492 additions and 20 deletions.
15 changes: 15 additions & 0 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,21 @@ For some cases you will need a `RebalanceListener` so when partitions are `assig
- Saving offsets in a custom store when a partition is `revoked`
- Load a state or cache warmup on completion of a successful partition re-assignment.

If you do not provide a `RebalanceListener` kstreams will set the default `KstreamsRebalanceListener`. If `manual` commit is enabled, `KstreamsRebalanceListener` will call `commit`
before the `stream` partitions are revoked to avoid the error `CommitFailedError` and *duplicate* message delivery after a rebalance. See code [example](https://github.com/kpn/kstreams/tree/master/examples/stream-with-manual-commit) with
manual `commit`

::: kstreams.KstreamsRebalanceListener
options:
show_root_heading: true
docstring_section_style: table
show_signature_annotations: false
show_bases: false

### Base Class

If you want to define a custom `RebalanceListener`, it has to inherits from `kstreams.RebalanceListener`

::: kstreams.RebalanceListener
options:
show_root_heading: true
Expand Down
2 changes: 1 addition & 1 deletion examples/json_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async def consume(stream: Stream):
print(f"Event consumed: headers: {cr.headers}, value: {cr.value}")
assert cr.value == data
finally:
await stream.consumer.stop()
await stream.stop()


async def produce():
Expand Down
61 changes: 61 additions & 0 deletions examples/stream-with-manual-commit/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Stream with Manual Commit

`Stream` example with a `manual commit` (`enable_auto_commit=False`)

## Description

This example demostrate that using the `KstreamsRebalanceListener` rebalace listener with `manual commit` will trigger a `commit` before the `Stream` partitions
are `revoked`. This behaviour will avoid the error `CommitFailedError` and *duplicate* message delivery after a rebalance.

The `Stream` process in batches and commits every 10 events. If a rebalance is triggered before the commit is called and the partition to `commit` was revoked
then the error `CommitFailedError` should be raised if `KstreamsRebalanceListener` is not used.

## Requirements

python 3.8+, poetry, docker-compose

## Installation

```bash
poetry install
```

## Usage

1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start`
2. Inside this folder execute `poetry run app`
3. From `kstreams` project root, you can use the `./scripts/cluster/events/send` to send events to the kafka cluster. A prompt will open. Enter messages to send. The command is:

```bash
./scripts/cluster/events/send "local--hello-world"
> kstreams # Type this!!!
```

Then, on the consume side, you should see something similar to the following logs:

```bash
❯ me@me-pc stream-with-manual-commit % poetry run app

INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'local--hello-world'})
INFO:aiokafka.consumer.consumer:Subscribed to topic(s): {'local--hello-world'}
INFO:aiokafka.consumer.group_coordinator:Discovered coordinator 1 for group example-group
INFO:aiokafka.consumer.group_coordinator:Revoking previously assigned partitions set() for group example-group
INFO:aiokafka.consumer.group_coordinator:(Re-)joining group example-group
INFO:aiokafka.consumer.group_coordinator:Joined group 'example-group' (generation 34) with member_id aiokafka-0.8.0-f5fac56e-71b7-41cf-9308-3363c8f82fd2
INFO:aiokafka.consumer.group_coordinator:Elected group leader -- performing partition assignments using roundrobin
INFO:aiokafka.consumer.group_coordinator:Successfully synced group example-group with generation 34
INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='local--hello-world', partition=0)} for group example-group

Event consumed: headers: (), payload: ConsumerRecord(topic='local--hello-world', partition=0, offset=21, timestamp=1677506271687, timestamp_type=0, key=None, value=b'kstream', checksum=None, serialized_key_size=-1, serialized_value_size=7, headers=())
```

Then if you run the same program in a different terminal you should see that the `commit` is called for you before the partitions are revoked.

```bash
INFO:kstreams.rebalance_listener: Manual commit enabled for stream <kstreams.streams.Stream object at 0x1073e7a50>. Performing `commit` before revoking partitions
```

## Note

If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
`kstreams` is pointing to the parent folder. You will have to set the latest version.
Loading

0 comments on commit f86612c

Please sign in to comment.