diff --git a/docs/stream.md b/docs/stream.md index a627e984..1e1e1e78 100644 --- a/docs/stream.md +++ b/docs/stream.md @@ -237,6 +237,21 @@ For some cases you will need a `RebalanceListener` so when partitions are `assig - Saving offsets in a custom store when a partition is `revoked` - Load a state or cache warmup on completion of a successful partition re-assignment. +If you do not provide a `RebalanceListener` kstreams will set the default `KstreamsRebalanceListener`. If `manual` commit is enabled, `KstreamsRebalanceListener` will call `commit` +before the `stream` partitions are revoked to avoid the error `CommitFailedError` and *duplicate* message delivery after a rebalance. See code [example](https://github.com/kpn/kstreams/tree/master/examples/stream-with-manual-commit) with +manual `commit` + +::: kstreams.KstreamsRebalanceListener + options: + show_root_heading: true + docstring_section_style: table + show_signature_annotations: false + show_bases: false + +### Base Class + +If you want to define a custom `RebalanceListener`, it has to inherits from `kstreams.RebalanceListener` + ::: kstreams.RebalanceListener options: show_root_heading: true diff --git a/examples/json_serialization.py b/examples/json_serialization.py index 3130ab7b..a9ec7d41 100644 --- a/examples/json_serialization.py +++ b/examples/json_serialization.py @@ -47,7 +47,7 @@ async def consume(stream: Stream): print(f"Event consumed: headers: {cr.headers}, value: {cr.value}") assert cr.value == data finally: - await stream.consumer.stop() + await stream.stop() async def produce(): diff --git a/examples/stream-with-manual-commit/README.md b/examples/stream-with-manual-commit/README.md new file mode 100644 index 00000000..adf31ace --- /dev/null +++ b/examples/stream-with-manual-commit/README.md @@ -0,0 +1,61 @@ +# Stream with Manual Commit + +`Stream` example with a `manual commit` (`enable_auto_commit=False`) + +## Description + +This example demostrate that using the `KstreamsRebalanceListener` rebalace listener with `manual commit` will trigger a `commit` before the `Stream` partitions +are `revoked`. This behaviour will avoid the error `CommitFailedError` and *duplicate* message delivery after a rebalance. + +The `Stream` process in batches and commits every 10 events. If a rebalance is triggered before the commit is called and the partition to `commit` was revoked +then the error `CommitFailedError` should be raised if `KstreamsRebalanceListener` is not used. + +## Requirements + +python 3.8+, poetry, docker-compose + +## Installation + +```bash +poetry install +``` + +## Usage + +1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start` +2. Inside this folder execute `poetry run app` +3. From `kstreams` project root, you can use the `./scripts/cluster/events/send` to send events to the kafka cluster. A prompt will open. Enter messages to send. The command is: + +```bash +./scripts/cluster/events/send "local--hello-world" +> kstreams # Type this!!! +``` + +Then, on the consume side, you should see something similar to the following logs: + +```bash +❯ me@me-pc stream-with-manual-commit % poetry run app + +INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'local--hello-world'}) +INFO:aiokafka.consumer.consumer:Subscribed to topic(s): {'local--hello-world'} +INFO:aiokafka.consumer.group_coordinator:Discovered coordinator 1 for group example-group +INFO:aiokafka.consumer.group_coordinator:Revoking previously assigned partitions set() for group example-group +INFO:aiokafka.consumer.group_coordinator:(Re-)joining group example-group +INFO:aiokafka.consumer.group_coordinator:Joined group 'example-group' (generation 34) with member_id aiokafka-0.8.0-f5fac56e-71b7-41cf-9308-3363c8f82fd2 +INFO:aiokafka.consumer.group_coordinator:Elected group leader -- performing partition assignments using roundrobin +INFO:aiokafka.consumer.group_coordinator:Successfully synced group example-group with generation 34 +INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='local--hello-world', partition=0)} for group example-group + +Event consumed: headers: (), payload: ConsumerRecord(topic='local--hello-world', partition=0, offset=21, timestamp=1677506271687, timestamp_type=0, key=None, value=b'kstream', checksum=None, serialized_key_size=-1, serialized_value_size=7, headers=()) +``` + +Then if you run the same program in a different terminal you should see that the `commit` is called for you before the partitions are revoked. + +```bash +INFO:kstreams.rebalance_listener: Manual commit enabled for stream . Performing `commit` before revoking partitions +``` + +## Note + +If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where +`kstreams` is pointing to the parent folder. You will have to set the latest version. diff --git a/examples/stream-with-manual-commit/poetry.lock b/examples/stream-with-manual-commit/poetry.lock new file mode 100644 index 00000000..367e2845 --- /dev/null +++ b/examples/stream-with-manual-commit/poetry.lock @@ -0,0 +1,264 @@ +[[package]] +name = "aiokafka" +version = "0.8.0" +description = "Kafka integration with asyncio." +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +async-timeout = "*" +kafka-python = ">=2.0.2" +packaging = "*" + +[package.extras] +all = ["gssapi", "lz4", "python-snappy (>=0.5)", "zstandard"] +gssapi = ["gssapi"] +lz4 = ["lz4"] +snappy = ["python-snappy (>=0.5)"] +zstd = ["zstandard"] + +[[package]] +name = "aiorun" +version = "2022.11.1" +description = "Boilerplate for asyncio applications" +category = "main" +optional = false +python-versions = ">=3.5" + +[package.extras] +dev = ["pytest", "pytest-cov"] + +[[package]] +name = "async-timeout" +version = "4.0.2" +description = "Timeout context manager for asyncio programs" +category = "main" +optional = false +python-versions = ">=3.6" + +[[package]] +name = "future" +version = "0.18.3" +description = "Clean single-source support for Python 3 and 2" +category = "main" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" + +[[package]] +name = "kafka-python" +version = "2.0.2" +description = "Pure Python client for Apache Kafka" +category = "main" +optional = false +python-versions = "*" + +[package.extras] +crc32c = ["crc32c"] + +[[package]] +name = "kstreams" +version = "0.8.0" +description = "Build simple kafka streams applications" +category = "main" +optional = false +python-versions = "^3.8" +develop = true + +[package.dependencies] +aiokafka = "<1.0" +future = "^0.18.2" +prometheus-client = "<1.0" +pydantic = "^1.9.0" +PyYAML = "^5.4.1" + +[package.source] +type = "directory" +url = "../.." + +[[package]] +name = "packaging" +version = "23.0" +description = "Core utilities for Python packages" +category = "main" +optional = false +python-versions = ">=3.7" + +[[package]] +name = "prometheus-client" +version = "0.16.0" +description = "Python client for the Prometheus monitoring system." +category = "main" +optional = false +python-versions = ">=3.6" + +[package.extras] +twisted = ["twisted"] + +[[package]] +name = "pydantic" +version = "1.10.5" +description = "Data validation and settings management using python type hints" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +typing-extensions = ">=4.2.0" + +[package.extras] +dotenv = ["python-dotenv (>=0.10.4)"] +email = ["email-validator (>=1.0.3)"] + +[[package]] +name = "pyyaml" +version = "5.4.1" +description = "YAML parser and emitter for Python" +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" + +[[package]] +name = "typing-extensions" +version = "4.5.0" +description = "Backported and Experimental Type Hints for Python 3.7+" +category = "main" +optional = false +python-versions = ">=3.7" + +[metadata] +lock-version = "1.1" +python-versions = "^3.8" +content-hash = "cf82ee33b18e25727b6743a3db6b44da1f1389b6dc79ef2eea1158aa6a532503" + +[metadata.files] +aiokafka = [ + {file = "aiokafka-0.8.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f3f96301337fa7f7242f46651619b8e9e8fa8f23902dc11416fe764436d662d3"}, + {file = "aiokafka-0.8.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:12457b59a06cff7369cace8d4460049b2ef2ab7f7cdc9c4924d577b9d4b48bf9"}, + {file = "aiokafka-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ec896d114be157a886e3227bbe3f00658dc4d6f17b203bc44075650817703f0b"}, + {file = "aiokafka-0.8.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8857cbd76e97186e54b98ebb3ea7778fb3618826bb9e4d01375bfab0a1d93d69"}, + {file = "aiokafka-0.8.0-cp310-cp310-win32.whl", hash = "sha256:539d8584652e354e7f7bbaf8843e936d91bfc28e224a53a82e1bcb64ac7f6dda"}, + {file = "aiokafka-0.8.0-cp310-cp310-win_amd64.whl", hash = "sha256:0e9d61912678ecae6b3d407107c1a935f21d55af4585b70d8f5dcc39ecb949ce"}, + {file = "aiokafka-0.8.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:021e9f0027ca63c6c04daccfdd0e985f7a56d51bd0d43f482f674a58fada52f5"}, + {file = "aiokafka-0.8.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5a8038698a47333cdb0cd198cb4b3ccd2fbbc86ba9a4b9afc3eebe6544db0c2f"}, + {file = "aiokafka-0.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9bf6d0da5804ae8888c357034d1a6750baa610999181e930678da0e87cec610d"}, + {file = "aiokafka-0.8.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:95682f661f295fac2f5f3f0132aea7c44a1b6c92726161daa509af67ac506885"}, + {file = "aiokafka-0.8.0-cp311-cp311-win32.whl", hash = "sha256:65e1d27a1c1cd38c66e0b22928af74b192f7598da9acd5bb939c6acea5bb5036"}, + {file = "aiokafka-0.8.0-cp311-cp311-win_amd64.whl", hash = "sha256:f0a216a27f05b050d5a5308fb3444014fa6bca5f0cd63468eaa169c5f19ea1dd"}, + {file = "aiokafka-0.8.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:2567465ee6de4d248fc416f2eef7d33bbe246a79073410ae2368b5bdaeb758c1"}, + {file = "aiokafka-0.8.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7e292841beda7cfdcd6939aab6cc2a623acd3d655b166f7ff97c658f14ced8c5"}, + {file = "aiokafka-0.8.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d459ab92a360cac240cf10b9ce88e64c1e41d942c7f63b1df6c2aafe27f946d0"}, + {file = "aiokafka-0.8.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9364eb81340b3a70a1222a4701c73a49ea0026a79bf138b4aec342f012d6f039"}, + {file = "aiokafka-0.8.0-cp37-cp37m-win32.whl", hash = "sha256:4439a03820dc64a8c3aa5fe17809541e6a001f6f6196aad6b6b88e7ded2b5396"}, + {file = "aiokafka-0.8.0-cp37-cp37m-win_amd64.whl", hash = "sha256:e07f07290a150552273c02bc5109d0a40bc0f32abc0ae5aeaa1e54fb86369251"}, + {file = "aiokafka-0.8.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:e005b53597fe9bc6681a2a3b50728d235cf2fb8801e52790678c691c85383565"}, + {file = "aiokafka-0.8.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dfb6dfef6c18726a783d102a6c1b0dfb6d43785a46ff34e967ddfa8f774532dd"}, + {file = "aiokafka-0.8.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5202073bb8d2350b72805d45ff0125c800ed101506a4ba7be2f03ad1ba8ad1e6"}, + {file = "aiokafka-0.8.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:881209100355a92696c6501ba1c2b32127bb1f7f2f318b400b3973ab0b52efed"}, + {file = "aiokafka-0.8.0-cp38-cp38-win32.whl", hash = "sha256:6f50a940411ae6cd0d7bcaf2d821539e3a59b6de012f77de18a573565c9f638f"}, + {file = "aiokafka-0.8.0-cp38-cp38-win_amd64.whl", hash = "sha256:1c3fd832a06fdd68e82f100fe678a800dd6dbf5e8db6af9521be333f965c325b"}, + {file = "aiokafka-0.8.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:2760f095a8ffb5b9b97ad28a43a6a93f38d67cf3bc95b42e6b27462b614c8561"}, + {file = "aiokafka-0.8.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dea214c2588237cf0d404624ccd99f466a2e853ca22d7153bb680b2d3f25cdde"}, + {file = "aiokafka-0.8.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:57aa55b48004da9bf5a5d37d3412c2d373b0bf32118bdc5c78cc5635998674cc"}, + {file = "aiokafka-0.8.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b86e3c1d649824103427c021593d75f44e01db1ffbc880b154e04098b534355f"}, + {file = "aiokafka-0.8.0-cp39-cp39-win32.whl", hash = "sha256:23f1fbdf54790a3751216e33e62228c8e1eb7feebcb19ef532cd3e4f13ae51ce"}, + {file = "aiokafka-0.8.0-cp39-cp39-win_amd64.whl", hash = "sha256:b36066df820e30f56386deb56d72efba287ba65419848888ea4b42f9e2741cff"}, + {file = "aiokafka-0.8.0.tar.gz", hash = "sha256:49b30479f68ba9a484a0e3362fb9c48797d7320066db9fcd53e755451f389acb"}, +] +aiorun = [ + {file = "aiorun-2022.11.1-py3-none-any.whl", hash = "sha256:8fbfc2aab258021deef2b1f38284c652af9fd3710e94c7b0e736a55d161fa0cb"}, + {file = "aiorun-2022.11.1.tar.gz", hash = "sha256:d820cebffdea82f9c1750cc396f3a58e4c0d277a2c51f11e86ed6ab7736dce59"}, +] +async-timeout = [ + {file = "async-timeout-4.0.2.tar.gz", hash = "sha256:2163e1640ddb52b7a8c80d0a67a08587e5d245cc9c553a74a847056bc2976b15"}, + {file = "async_timeout-4.0.2-py3-none-any.whl", hash = "sha256:8ca1e4fcf50d07413d66d1a5e416e42cfdf5851c981d679a09851a6853383b3c"}, +] +future = [ + {file = "future-0.18.3.tar.gz", hash = "sha256:34a17436ed1e96697a86f9de3d15a3b0be01d8bc8de9c1dffd59fb8234ed5307"}, +] +kafka-python = [ + {file = "kafka-python-2.0.2.tar.gz", hash = "sha256:04dfe7fea2b63726cd6f3e79a2d86e709d608d74406638c5da33a01d45a9d7e3"}, + {file = "kafka_python-2.0.2-py2.py3-none-any.whl", hash = "sha256:2d92418c7cb1c298fa6c7f0fb3519b520d0d7526ac6cb7ae2a4fc65a51a94b6e"}, +] +kstreams = [] +packaging = [ + {file = "packaging-23.0-py3-none-any.whl", hash = "sha256:714ac14496c3e68c99c29b00845f7a2b85f3bb6f1078fd9f72fd20f0570002b2"}, + {file = "packaging-23.0.tar.gz", hash = "sha256:b6ad297f8907de0fa2fe1ccbd26fdaf387f5f47c7275fedf8cce89f99446cf97"}, +] +prometheus-client = [ + {file = "prometheus_client-0.16.0-py3-none-any.whl", hash = "sha256:0836af6eb2c8f4fed712b2f279f6c0a8bbab29f9f4aa15276b91c7cb0d1616ab"}, + {file = "prometheus_client-0.16.0.tar.gz", hash = "sha256:a03e35b359f14dd1630898543e2120addfdeacd1a6069c1367ae90fd93ad3f48"}, +] +pydantic = [ + {file = "pydantic-1.10.5-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:5920824fe1e21cbb3e38cf0f3dd24857c8959801d1031ce1fac1d50857a03bfb"}, + {file = "pydantic-1.10.5-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:3bb99cf9655b377db1a9e47fa4479e3330ea96f4123c6c8200e482704bf1eda2"}, + {file = "pydantic-1.10.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2185a3b3d98ab4506a3f6707569802d2d92c3a7ba3a9a35683a7709ea6c2aaa2"}, + {file = "pydantic-1.10.5-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f582cac9d11c227c652d3ce8ee223d94eb06f4228b52a8adaafa9fa62e73d5c9"}, + {file = "pydantic-1.10.5-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:c9e5b778b6842f135902e2d82624008c6a79710207e28e86966cd136c621bfee"}, + {file = "pydantic-1.10.5-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:72ef3783be8cbdef6bca034606a5de3862be6b72415dc5cb1fb8ddbac110049a"}, + {file = "pydantic-1.10.5-cp310-cp310-win_amd64.whl", hash = "sha256:45edea10b75d3da43cfda12f3792833a3fa70b6eee4db1ed6aed528cef17c74e"}, + {file = "pydantic-1.10.5-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:63200cd8af1af2c07964546b7bc8f217e8bda9d0a2ef0ee0c797b36353914984"}, + {file = "pydantic-1.10.5-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:305d0376c516b0dfa1dbefeae8c21042b57b496892d721905a6ec6b79494a66d"}, + {file = "pydantic-1.10.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1fd326aff5d6c36f05735c7c9b3d5b0e933b4ca52ad0b6e4b38038d82703d35b"}, + {file = "pydantic-1.10.5-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6bb0452d7b8516178c969d305d9630a3c9b8cf16fcf4713261c9ebd465af0d73"}, + {file = "pydantic-1.10.5-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:9a9d9155e2a9f38b2eb9374c88f02fd4d6851ae17b65ee786a87d032f87008f8"}, + {file = "pydantic-1.10.5-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:f836444b4c5ece128b23ec36a446c9ab7f9b0f7981d0d27e13a7c366ee163f8a"}, + {file = "pydantic-1.10.5-cp311-cp311-win_amd64.whl", hash = "sha256:8481dca324e1c7b715ce091a698b181054d22072e848b6fc7895cd86f79b4449"}, + {file = "pydantic-1.10.5-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:87f831e81ea0589cd18257f84386bf30154c5f4bed373b7b75e5cb0b5d53ea87"}, + {file = "pydantic-1.10.5-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7ce1612e98c6326f10888df951a26ec1a577d8df49ddcaea87773bfbe23ba5cc"}, + {file = "pydantic-1.10.5-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:58e41dd1e977531ac6073b11baac8c013f3cd8706a01d3dc74e86955be8b2c0c"}, + {file = "pydantic-1.10.5-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:6a4b0aab29061262065bbdede617ef99cc5914d1bf0ddc8bcd8e3d7928d85bd6"}, + {file = "pydantic-1.10.5-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:36e44a4de37b8aecffa81c081dbfe42c4d2bf9f6dff34d03dce157ec65eb0f15"}, + {file = "pydantic-1.10.5-cp37-cp37m-win_amd64.whl", hash = "sha256:261f357f0aecda005934e413dfd7aa4077004a174dafe414a8325e6098a8e419"}, + {file = "pydantic-1.10.5-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:b429f7c457aebb7fbe7cd69c418d1cd7c6fdc4d3c8697f45af78b8d5a7955760"}, + {file = "pydantic-1.10.5-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:663d2dd78596c5fa3eb996bc3f34b8c2a592648ad10008f98d1348be7ae212fb"}, + {file = "pydantic-1.10.5-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:51782fd81f09edcf265823c3bf43ff36d00db246eca39ee765ef58dc8421a642"}, + {file = "pydantic-1.10.5-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c428c0f64a86661fb4873495c4fac430ec7a7cef2b8c1c28f3d1a7277f9ea5ab"}, + {file = "pydantic-1.10.5-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:76c930ad0746c70f0368c4596020b736ab65b473c1f9b3872310a835d852eb19"}, + {file = "pydantic-1.10.5-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:3257bd714de9db2102b742570a56bf7978e90441193acac109b1f500290f5718"}, + {file = "pydantic-1.10.5-cp38-cp38-win_amd64.whl", hash = "sha256:f5bee6c523d13944a1fdc6f0525bc86dbbd94372f17b83fa6331aabacc8fd08e"}, + {file = "pydantic-1.10.5-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:532e97c35719f137ee5405bd3eeddc5c06eb91a032bc755a44e34a712420daf3"}, + {file = "pydantic-1.10.5-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ca9075ab3de9e48b75fa8ccb897c34ccc1519177ad8841d99f7fd74cf43be5bf"}, + {file = "pydantic-1.10.5-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bd46a0e6296346c477e59a954da57beaf9c538da37b9df482e50f836e4a7d4bb"}, + {file = "pydantic-1.10.5-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3353072625ea2a9a6c81ad01b91e5c07fa70deb06368c71307529abf70d23325"}, + {file = "pydantic-1.10.5-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:3f9d9b2be177c3cb6027cd67fbf323586417868c06c3c85d0d101703136e6b31"}, + {file = "pydantic-1.10.5-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:b473d00ccd5c2061fd896ac127b7755baad233f8d996ea288af14ae09f8e0d1e"}, + {file = "pydantic-1.10.5-cp39-cp39-win_amd64.whl", hash = "sha256:5f3bc8f103b56a8c88021d481410874b1f13edf6e838da607dcb57ecff9b4594"}, + {file = "pydantic-1.10.5-py3-none-any.whl", hash = "sha256:7c5b94d598c90f2f46b3a983ffb46ab806a67099d118ae0da7ef21a2a4033b28"}, + {file = "pydantic-1.10.5.tar.gz", hash = "sha256:9e337ac83686645a46db0e825acceea8e02fca4062483f40e9ae178e8bd1103a"}, +] +pyyaml = [ + {file = "PyYAML-5.4.1-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:3b2b1824fe7112845700f815ff6a489360226a5609b96ec2190a45e62a9fc922"}, + {file = "PyYAML-5.4.1-cp27-cp27m-win32.whl", hash = "sha256:129def1b7c1bf22faffd67b8f3724645203b79d8f4cc81f674654d9902cb4393"}, + {file = "PyYAML-5.4.1-cp27-cp27m-win_amd64.whl", hash = "sha256:4465124ef1b18d9ace298060f4eccc64b0850899ac4ac53294547536533800c8"}, + {file = "PyYAML-5.4.1-cp27-cp27mu-manylinux1_x86_64.whl", hash = "sha256:bb4191dfc9306777bc594117aee052446b3fa88737cd13b7188d0e7aa8162185"}, + {file = "PyYAML-5.4.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:6c78645d400265a062508ae399b60b8c167bf003db364ecb26dcab2bda048253"}, + {file = "PyYAML-5.4.1-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:4e0583d24c881e14342eaf4ec5fbc97f934b999a6828693a99157fde912540cc"}, + {file = "PyYAML-5.4.1-cp36-cp36m-manylinux2014_aarch64.whl", hash = "sha256:72a01f726a9c7851ca9bfad6fd09ca4e090a023c00945ea05ba1638c09dc3347"}, + {file = "PyYAML-5.4.1-cp36-cp36m-manylinux2014_s390x.whl", hash = "sha256:895f61ef02e8fed38159bb70f7e100e00f471eae2bc838cd0f4ebb21e28f8541"}, + {file = "PyYAML-5.4.1-cp36-cp36m-win32.whl", hash = "sha256:3bd0e463264cf257d1ffd2e40223b197271046d09dadf73a0fe82b9c1fc385a5"}, + {file = "PyYAML-5.4.1-cp36-cp36m-win_amd64.whl", hash = "sha256:e4fac90784481d221a8e4b1162afa7c47ed953be40d31ab4629ae917510051df"}, + {file = "PyYAML-5.4.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:5accb17103e43963b80e6f837831f38d314a0495500067cb25afab2e8d7a4018"}, + {file = "PyYAML-5.4.1-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:e1d4970ea66be07ae37a3c2e48b5ec63f7ba6804bdddfdbd3cfd954d25a82e63"}, + {file = "PyYAML-5.4.1-cp37-cp37m-manylinux2014_aarch64.whl", hash = "sha256:cb333c16912324fd5f769fff6bc5de372e9e7a202247b48870bc251ed40239aa"}, + {file = "PyYAML-5.4.1-cp37-cp37m-manylinux2014_s390x.whl", hash = "sha256:fe69978f3f768926cfa37b867e3843918e012cf83f680806599ddce33c2c68b0"}, + {file = "PyYAML-5.4.1-cp37-cp37m-win32.whl", hash = "sha256:dd5de0646207f053eb0d6c74ae45ba98c3395a571a2891858e87df7c9b9bd51b"}, + {file = "PyYAML-5.4.1-cp37-cp37m-win_amd64.whl", hash = "sha256:08682f6b72c722394747bddaf0aa62277e02557c0fd1c42cb853016a38f8dedf"}, + {file = "PyYAML-5.4.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:d2d9808ea7b4af864f35ea216be506ecec180628aced0704e34aca0b040ffe46"}, + {file = "PyYAML-5.4.1-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:8c1be557ee92a20f184922c7b6424e8ab6691788e6d86137c5d93c1a6ec1b8fb"}, + {file = "PyYAML-5.4.1-cp38-cp38-manylinux2014_aarch64.whl", hash = "sha256:fd7f6999a8070df521b6384004ef42833b9bd62cfee11a09bda1079b4b704247"}, + {file = "PyYAML-5.4.1-cp38-cp38-manylinux2014_s390x.whl", hash = "sha256:bfb51918d4ff3d77c1c856a9699f8492c612cde32fd3bcd344af9be34999bfdc"}, + {file = "PyYAML-5.4.1-cp38-cp38-win32.whl", hash = "sha256:fa5ae20527d8e831e8230cbffd9f8fe952815b2b7dae6ffec25318803a7528fc"}, + {file = "PyYAML-5.4.1-cp38-cp38-win_amd64.whl", hash = "sha256:0f5f5786c0e09baddcd8b4b45f20a7b5d61a7e7e99846e3c799b05c7c53fa696"}, + {file = "PyYAML-5.4.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:294db365efa064d00b8d1ef65d8ea2c3426ac366c0c4368d930bf1c5fb497f77"}, + {file = "PyYAML-5.4.1-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:74c1485f7707cf707a7aef42ef6322b8f97921bd89be2ab6317fd782c2d53183"}, + {file = "PyYAML-5.4.1-cp39-cp39-manylinux2014_aarch64.whl", hash = "sha256:d483ad4e639292c90170eb6f7783ad19490e7a8defb3e46f97dfe4bacae89122"}, + {file = "PyYAML-5.4.1-cp39-cp39-manylinux2014_s390x.whl", hash = "sha256:fdc842473cd33f45ff6bce46aea678a54e3d21f1b61a7750ce3c498eedfe25d6"}, + {file = "PyYAML-5.4.1-cp39-cp39-win32.whl", hash = "sha256:49d4cdd9065b9b6e206d0595fee27a96b5dd22618e7520c33204a4a3239d5b10"}, + {file = "PyYAML-5.4.1-cp39-cp39-win_amd64.whl", hash = "sha256:c20cfa2d49991c8b4147af39859b167664f2ad4561704ee74c1de03318e898db"}, + {file = "PyYAML-5.4.1.tar.gz", hash = "sha256:607774cbba28732bfa802b54baa7484215f530991055bb562efbed5b2f20a45e"}, +] +typing-extensions = [ + {file = "typing_extensions-4.5.0-py3-none-any.whl", hash = "sha256:fb33085c39dd998ac16d1431ebc293a8b3eedd00fd4a32de0ff79002c19511b4"}, + {file = "typing_extensions-4.5.0.tar.gz", hash = "sha256:5cb5f4a79139d699607b3ef622a1dedafa84e115ab0024e0d9c044a9479ca7cb"}, +] diff --git a/examples/stream-with-manual-commit/pyproject.toml b/examples/stream-with-manual-commit/pyproject.toml new file mode 100644 index 00000000..21089327 --- /dev/null +++ b/examples/stream-with-manual-commit/pyproject.toml @@ -0,0 +1,19 @@ +[tool.poetry] +name = "stream-with-manual-commit" +version = "0.1.0" +description = "" +authors = ["Marcos Schroh "] +readme = "README.md" +packages = [{include = "stream_with_manual_commit"}] + +[tool.poetry.dependencies] +python = "^3.8" +aiorun = "^2022.4.1" +kstreams = { path = "../../.", develop = true } + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry.scripts] +app = "stream_with_manual_commit.app:main" diff --git a/examples/stream-with-manual-commit/stream_with_manual_commit/__init__.py b/examples/stream-with-manual-commit/stream_with_manual_commit/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/stream-with-manual-commit/stream_with_manual_commit/app.py b/examples/stream-with-manual-commit/stream_with_manual_commit/app.py new file mode 100644 index 00000000..0288afb4 --- /dev/null +++ b/examples/stream-with-manual-commit/stream_with_manual_commit/app.py @@ -0,0 +1,50 @@ +import logging +from typing import List + +import aiorun + +import kstreams + +logging.basicConfig(level=logging.INFO) + + +stream_engine = kstreams.create_engine(title="my-stream-engine") + + +batch: List[kstreams.ConsumerRecord] = [] + + +async def process_msg_batch(events_batch: List[kstreams.ConsumerRecord]): + for event in events_batch: + print(f"Event consumed: headers: {event.headers}, payload: {event.value}") + + +@stream_engine.stream( + topics=["local--hello-world"], + group_id="example-group", + enable_auto_commit=False, # it means that we need to call commit +) +async def consume(stream): + global batch + + async for event in stream: + # Not need to catch the error `CommitFailedError` + # as the steam rebalance listener is the + # default one KstreamsRebalanceListener + batch.append(event) + if len(batch) == 10: + await process_msg_batch(batch) + await stream.commit() + batch = [] + + +async def start(): + await stream_engine.start() + + +async def shutdown(loop): + await stream_engine.stop() + + +def main(): + aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown) diff --git a/kstreams/__init__.py b/kstreams/__init__.py index 0596982b..9d5caaf2 100644 --- a/kstreams/__init__.py +++ b/kstreams/__init__.py @@ -3,7 +3,7 @@ from .clients import Consumer, ConsumerType, Producer, ProducerType from .create import StreamEngine, create_engine from .prometheus.monitor import PrometheusMonitor, PrometheusMonitorType -from .rebalance_listener import RebalanceListener +from .rebalance_listener import KstreamsRebalanceListener, RebalanceListener from .streams import Stream, stream from .structs import TopicPartitionOffset @@ -16,6 +16,7 @@ "create_engine", "PrometheusMonitor", "PrometheusMonitorType", + "KstreamsRebalanceListener", "RebalanceListener", "Stream", "stream", diff --git a/kstreams/rebalance_listener.py b/kstreams/rebalance_listener.py index b7267aa0..868c0338 100644 --- a/kstreams/rebalance_listener.py +++ b/kstreams/rebalance_listener.py @@ -1,9 +1,13 @@ -from typing import List +import asyncio +import logging +from typing import Set from aiokafka.abc import ConsumerRebalanceListener from kstreams import TopicPartition +logger = logging.getLogger(__name__) + # Can not use a Protocol here because aiokafka forces to have a concrete instance # that inherits from ConsumerRebalanceListener, if we use a protocol we will @@ -23,14 +27,14 @@ class RebalanceListener(ConsumerRebalanceListener): class MyRebalanceListener(RebalanceListener): async def on_partitions_revoked( - self, revoked: List[TopicPartition] + self, revoked: Set[TopicPartition] ) -> None: # Do something with the revoked partitions # or with the Stream print(self.stream) async def on_partitions_assigned( - self, assigned: List[TopicPartition] + self, assigned: Set[TopicPartition] ) -> None: # Do something with the assigned partitions # or with the Stream @@ -47,7 +51,7 @@ async def my_stream(stream: Stream): def __init__(self) -> None: self.stream = None - async def on_partitions_revoked(self, revoked: List[TopicPartition]) -> None: + async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None: """ Coroutine to be called *before* a rebalance operation starts and *after* the consumer stops fetching data. @@ -60,7 +64,7 @@ async def on_partitions_revoked(self, revoked: List[TopicPartition]) -> None: - saving offsets in a custom store Attributes: - revoked List[TopicPartitions]: Partitions that were assigned + revoked Set[TopicPartitions]: Partitions that were assigned to the consumer on the last rebalance !!! note @@ -68,7 +72,7 @@ async def on_partitions_revoked(self, revoked: List[TopicPartition]) -> None: """ ... # pragma: no cover - async def on_partitions_assigned(self, assigned: List[TopicPartition]) -> None: + async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None: """ Coroutine to be called *after* partition re-assignment completes and *before* the consumer starts fetching data again. @@ -82,10 +86,37 @@ async def on_partitions_assigned(self, assigned: List[TopicPartition]) -> None: partition re-assignment. Attributes: - assigned List[TopicPartition]: Partitions assigned to the + assigned Set[TopicPartition]: Partitions assigned to the consumer (may include partitions that were previously assigned) !!! note The `Stream` is available using `self.stream` """ ... # pragma: no cover + + +class KstreamsRebalanceListener(RebalanceListener): + async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None: + """ + Coroutine to be called *before* a rebalance operation starts and + *after* the consumer stops fetching data. + + If manual commit is enabled, `commit` is called before the consumers + partitions are revoked to prevent the error `CommitFailedError` + and duplicate message delivery after a rebalance. + + Attributes: + revoked Set[TopicPartitions]: Partitions that were assigned + to the consumer on the last rebalance + """ + if ( + revoked + and self.stream is not None + and not self.stream.consumer._enable_auto_commit + ): + logger.info( + f"Manual commit enabled for stream {self.stream}. " + "Performing `commit` before revoking partitions" + ) + async with asyncio.Lock(): + await self.stream.commit() diff --git a/kstreams/streams.py b/kstreams/streams.py index f8264a96..47589da9 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -24,7 +24,7 @@ from .backends.kafka import Kafka from .clients import Consumer, ConsumerType -from .rebalance_listener import RebalanceListener +from .rebalance_listener import KstreamsRebalanceListener, RebalanceListener from .serializers import Deserializer logger = logging.getLogger(__name__) @@ -131,19 +131,26 @@ async def stop(self) -> None: if self._consumer_task is not None: self._consumer_task.cancel() - async def subscribe(self) -> None: + async def _subscribe(self) -> None: # Always create a consumer on stream.start self.consumer = self._create_consumer() await self.consumer.start() self.running = True + if self.rebalance_listener is None: + # creates the default listener to manage the commit and + # clean the metrics + self.rebalance_listener = KstreamsRebalanceListener() + # set the stream to the listener to it will be available # when the callbacks are called - if self.rebalance_listener is not None: - self.rebalance_listener.stream = self # type: ignore + self.rebalance_listener.stream = self # type: ignore self.consumer.subscribe(topics=self.topics, listener=self.rebalance_listener) + async def commit(self, offsets: Optional[Dict[TopicPartition, int]] = None): + await self.consumer.commit(offsets=offsets) # type: ignore + async def start(self) -> Optional[AsyncGenerator]: if self.running: return None @@ -159,7 +166,7 @@ async def func_wrapper(func): f"CRASHED Stream!!! Task {self._consumer_task} \n\n {e}" ) - await self.subscribe() + await self._subscribe() self._seek_to_initial_offsets() func = self.func(self) diff --git a/tests/test_client.py b/tests/test_client.py index faaa1949..4827429a 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -243,7 +243,7 @@ async def test_consumer_commit(stream_engine: StreamEngine): @stream_engine.stream(topic_name, name=name) async def my_stream(stream: Stream): async for cr in stream: - await stream.consumer.commit({tp: cr.offset}) + await stream.commit({tp: cr.offset}) client = TestStreamClient(stream_engine) async with client: diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 9512c4cc..57cfd265 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -1,9 +1,9 @@ -from typing import List +from typing import Set from unittest import mock import pytest -from kstreams import RebalanceListener, TopicPartition +from kstreams import KstreamsRebalanceListener, RebalanceListener, TopicPartition from kstreams.backends.kafka import Kafka from kstreams.clients import Consumer from kstreams.engine import Stream, StreamEngine @@ -54,10 +54,10 @@ async def test_add_stream_with_rebalance_listener(stream_engine: StreamEngine): topic = "local--hello-kpn" class MyRebalanceListener(RebalanceListener): - async def on_partitions_revoked(self, revoked: List[TopicPartition]) -> None: + async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None: ... - async def on_partitions_assigned(self, assigned: List[TopicPartition]) -> None: + async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None: ... rebalance_listener = MyRebalanceListener() @@ -72,10 +72,34 @@ async def my_stream(stream: Stream): ... await stream_engine.start() + await stream_engine.stop() assert my_stream.rebalance_listener == rebalance_listener assert rebalance_listener.stream == my_stream # checking that the subscription has also the rebalance_listener assert my_stream.consumer._subscription._listener == rebalance_listener - await stream_engine.stop() + + +@pytest.mark.asyncio +async def test_stream_default_rebalance_listener(stream_engine: StreamEngine): + topic = "local--hello-kpn" + + with mock.patch("kstreams.clients.aiokafka.AIOKafkaConsumer.start"), mock.patch( + "kstreams.clients.aiokafka.AIOKafkaProducer.start" + ): + + @stream_engine.stream(topic) + async def hello_stream(stream: Stream): + async for _ in stream: + ... + + await stream_engine.start() + await stream_engine.stop() + + assert isinstance(hello_stream.rebalance_listener, KstreamsRebalanceListener) + + # checking that the subscription has also the rebalance_listener + assert isinstance( + hello_stream.consumer._subscription._listener, KstreamsRebalanceListener + )