From 1bafde9172812d446d1d3ca2e601f79e03708e8f Mon Sep 17 00:00:00 2001 From: marcosschroh Date: Fri, 15 Nov 2024 14:50:39 +0100 Subject: [PATCH] fix: prevent ConsumerStoppedError exception when engine or a stream is stopped. Related to #234 --- examples/fastapi-webapp/fastapi_webapp/app.py | 3 +- .../fastapi-webapp/fastapi_webapp/streams.py | 8 +++ examples/fastapi-webapp/poetry.lock | 9 ++-- .../graceful_shutdown_example/app.py | 1 + .../graceful-shutdown-example/poetry.lock | 11 ++-- examples/simple-example/poetry.lock | 9 ++-- kstreams/middleware/middleware.py | 50 +++++++++++++++---- kstreams/streams.py | 16 ++++-- 8 files changed, 79 insertions(+), 28 deletions(-) diff --git a/examples/fastapi-webapp/fastapi_webapp/app.py b/examples/fastapi-webapp/fastapi_webapp/app.py index 80476864..8d95d74e 100644 --- a/examples/fastapi-webapp/fastapi_webapp/app.py +++ b/examples/fastapi-webapp/fastapi_webapp/app.py @@ -4,7 +4,7 @@ from kstreams.streams_utils import StreamErrorPolicy from .resources import stream_engine -from .streams import consume +from .streams import consume, consume_2 from .views import router app = FastAPI() @@ -21,6 +21,7 @@ async def shutdown_event(): stream_engine.add_stream(consume, error_policy=StreamErrorPolicy.STOP_APPLICATION) +stream_engine.add_stream(consume_2, error_policy=StreamErrorPolicy.RESTART) app.include_router(router) app.add_middleware(PrometheusMiddleware, filter_unhandled_paths=True) app.add_api_route("/metrics", metrics) # type: ignore diff --git a/examples/fastapi-webapp/fastapi_webapp/streams.py b/examples/fastapi-webapp/fastapi_webapp/streams.py index 256bf271..da5b9c20 100644 --- a/examples/fastapi-webapp/fastapi_webapp/streams.py +++ b/examples/fastapi-webapp/fastapi_webapp/streams.py @@ -1,5 +1,6 @@ import logging +import asyncio from kstreams import ConsumerRecord, stream logger = logging.getLogger(__name__) @@ -11,3 +12,10 @@ async def consume(cr: ConsumerRecord): if cr.value == b"error": raise ValueError("error....") + + +@stream("local--hello-world", group_id="example-group-2") +async def consume_2(cr: ConsumerRecord): + print(f"Event consumed: headers: {cr.headers}, payload: {cr}") + await asyncio.sleep(10) + raise ValueError diff --git a/examples/fastapi-webapp/poetry.lock b/examples/fastapi-webapp/poetry.lock index c2cf5d71..185e628d 100644 --- a/examples/fastapi-webapp/poetry.lock +++ b/examples/fastapi-webapp/poetry.lock @@ -160,12 +160,13 @@ standard = ["email-validator (>=2.0.0)", "fastapi-cli[standard] (>=0.0.5)", "htt [[package]] name = "future" -version = "0.18.3" +version = "1.0.0" description = "Clean single-source support for Python 3 and 2" optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" files = [ - {file = "future-0.18.3.tar.gz", hash = "sha256:34a17436ed1e96697a86f9de3d15a3b0be01d8bc8de9c1dffd59fb8234ed5307"}, + {file = "future-1.0.0-py3-none-any.whl", hash = "sha256:929292d34f5872e70396626ef385ec22355a1fae8ad29e1a734c3e43f9fbc216"}, + {file = "future-1.0.0.tar.gz", hash = "sha256:bd2968309307861edae1458a4f8a4f3598c03be43b97521076aebf5d94c07b05"}, ] [[package]] @@ -192,7 +193,7 @@ files = [ [[package]] name = "kstreams" -version = "0.23.0" +version = "0.24.8" description = "Build simple kafka streams applications" optional = false python-versions = "^3.8" @@ -201,7 +202,7 @@ develop = true [package.dependencies] aiokafka = "<1.0" -future = "^0.18.2" +future = "^1.0.0" prometheus-client = "<1.0" pydantic = ">=2.0.0,<3.0.0" PyYAML = ">=5.4,<7.0.0" diff --git a/examples/graceful-shutdown-example/graceful_shutdown_example/app.py b/examples/graceful-shutdown-example/graceful_shutdown_example/app.py index 57b4fa17..56190238 100644 --- a/examples/graceful-shutdown-example/graceful_shutdown_example/app.py +++ b/examples/graceful-shutdown-example/graceful_shutdown_example/app.py @@ -4,6 +4,7 @@ import aiorun import kstreams +import kstreams.streams_utils logger = logging.getLogger(__name__) diff --git a/examples/graceful-shutdown-example/poetry.lock b/examples/graceful-shutdown-example/poetry.lock index 15d3c280..95a2ae3f 100644 --- a/examples/graceful-shutdown-example/poetry.lock +++ b/examples/graceful-shutdown-example/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "aiokafka" @@ -93,17 +93,18 @@ files = [ [[package]] name = "future" -version = "0.18.3" +version = "1.0.0" description = "Clean single-source support for Python 3 and 2" optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" files = [ - {file = "future-0.18.3.tar.gz", hash = "sha256:34a17436ed1e96697a86f9de3d15a3b0be01d8bc8de9c1dffd59fb8234ed5307"}, + {file = "future-1.0.0-py3-none-any.whl", hash = "sha256:929292d34f5872e70396626ef385ec22355a1fae8ad29e1a734c3e43f9fbc216"}, + {file = "future-1.0.0.tar.gz", hash = "sha256:bd2968309307861edae1458a4f8a4f3598c03be43b97521076aebf5d94c07b05"}, ] [[package]] name = "kstreams" -version = "0.18.5" +version = "0.24.8" description = "Build simple kafka streams applications" optional = false python-versions = "^3.8" @@ -112,7 +113,7 @@ develop = true [package.dependencies] aiokafka = "<1.0" -future = "^0.18.2" +future = "^1.0.0" prometheus-client = "<1.0" pydantic = ">=2.0.0,<3.0.0" PyYAML = ">=5.4,<7.0.0" diff --git a/examples/simple-example/poetry.lock b/examples/simple-example/poetry.lock index a93228eb..95a2ae3f 100644 --- a/examples/simple-example/poetry.lock +++ b/examples/simple-example/poetry.lock @@ -93,17 +93,18 @@ files = [ [[package]] name = "future" -version = "0.18.3" +version = "1.0.0" description = "Clean single-source support for Python 3 and 2" optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" files = [ - {file = "future-0.18.3.tar.gz", hash = "sha256:34a17436ed1e96697a86f9de3d15a3b0be01d8bc8de9c1dffd59fb8234ed5307"}, + {file = "future-1.0.0-py3-none-any.whl", hash = "sha256:929292d34f5872e70396626ef385ec22355a1fae8ad29e1a734c3e43f9fbc216"}, + {file = "future-1.0.0.tar.gz", hash = "sha256:bd2968309307861edae1458a4f8a4f3598c03be43b97521076aebf5d94c07b05"}, ] [[package]] name = "kstreams" -version = "0.21.0" +version = "0.24.8" description = "Build simple kafka streams applications" optional = false python-versions = "^3.8" @@ -112,7 +113,7 @@ develop = true [package.dependencies] aiokafka = "<1.0" -future = "^0.18.2" +future = "^1.0.0" prometheus-client = "<1.0" pydantic = ">=2.0.0,<3.0.0" PyYAML = ">=5.4,<7.0.0" diff --git a/kstreams/middleware/middleware.py b/kstreams/middleware/middleware.py index 118f7cd3..f5b164bd 100644 --- a/kstreams/middleware/middleware.py +++ b/kstreams/middleware/middleware.py @@ -3,8 +3,6 @@ import sys import typing -from aiokafka import errors - from kstreams import types from kstreams.streams_utils import StreamErrorPolicy @@ -81,8 +79,6 @@ def __init__( async def __call__(self, cr: types.ConsumerRecord) -> typing.Any: try: return await self.next_call(cr) - except errors.ConsumerStoppedError as exc: - await self.cleanup_policy(exc) except Exception as exc: logger.exception( "Unhandled error occurred while listening to the stream. " @@ -95,25 +91,57 @@ async def __call__(self, cr: types.ConsumerRecord) -> typing.Any: await self.cleanup_policy(exc) async def cleanup_policy(self, exc: Exception) -> None: - # always release the asyncio.Lock `is_processing` to - # stop or restart properly the `stream` + """ + Execute clenup policicy according to the Stream configuration. + + At this point we are inside the asyncio.Lock `is_processing` + as an event is being processed and an exeption has occured. + The Lock must be released to stop the Stream + (which must happen for any policy), then before re-raising + the exception the Lock must be acquire again to continue the processing + + Exception and policies: + + - STOP: The exception is re-raised as the Stream will be stopped + and the end user will deal with it + + - STOP_ENGINE: The exception is re-raised as the Engine will be stopped + (all Streams and Producer) and the end user will deal with it + + - RESTART: The exception is not re-raised as the Stream + will recover and continue the processing. The logger.exception + from __call__ will record that something went wrong + + - STOP_APPLICATION: The exception is not re-raised as the entire + application will be stopped. This is only useful when using kstreams + with another library like FastAPI. The logger.exception + from __call__ will record that something went wrong + + Args: + exc (Exception): Any Exception that causes the Stream to crash + + Raises: + exc: Exception is the policy is `STOP` or `STOP_ENGINE` + """ self.stream.is_processing.release() if self.error_policy == StreamErrorPolicy.RESTART: await self.stream.stop() - logger.info(f"Restarting stream {self.stream}") await self.stream.start() elif self.error_policy == StreamErrorPolicy.STOP: await self.stream.stop() + # acquire `is_processing` Lock again to resume processing + # and avoid `RuntimeError: Lock is not acquired.` + await self.stream.is_processing.acquire() raise exc elif self.error_policy == StreamErrorPolicy.STOP_ENGINE: await self.engine.stop() + # acquire `is_processing` Lock again to resume processing + # and avoid `RuntimeError: Lock is not acquired.` + await self.stream.is_processing.acquire() raise exc else: # STOP_APPLICATION await self.engine.stop() + await self.stream.is_processing.acquire() signal.raise_signal(signal.SIGTERM) - - # acquire the asyncio.Lock `is_processing` again to resume the processing - # and avoid `RuntimeError: Lock is not acquired.` - await self.stream.is_processing.acquire() diff --git a/kstreams/streams.py b/kstreams/streams.py index b247c6d7..bf17682e 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -364,9 +364,19 @@ async def start(self) -> None: async def func_wrapper_with_typing(self) -> None: while self.running: - cr = await self.getone() - async with self.is_processing: - await self.func(cr) + try: + cr = await self.getone() + async with self.is_processing: + await self.func(cr) + except errors.ConsumerStoppedError: + # This exception is only raised when we are inside the `getone` + # coroutine waiting for an event and `await consumer.stop()` + # is called. If this happens it means that the end users has called + # `engine.stop()` or has been another exception that causes all the + # streams to stop. In any case the exception should not be re raised. + logger.info( + f"Stream {self} stopped after Coordinator was closed {self.topics}" + ) def seek_to_initial_offsets(self) -> None: if not self.seeked_initial_offsets and self.consumer is not None: