diff --git a/docs/stream.md b/docs/stream.md index 79ff0e42..c4325a91 100644 --- a/docs/stream.md +++ b/docs/stream.md @@ -161,6 +161,7 @@ If your stream `crashes` for any reason the event consumption is stopped, meanin - `StreamErrorPolicy.STOP` (**default**): Stop the `Stream` when an exception occurs. The exception is raised after the stream is properly stopped. - `StreamErrorPolicy.RESTART`: Stop and restart the `Stream` when an exception occurs. The event that caused the exception is skipped. The exception is *NOT raised* because the application should contine working, however `logger.exception()` is used to alert the user. - `StreamErrorPolicy.STOP_ENGINE`: Stop the `StreamEngine` when an exception occurs. The exception is raised after *ALL* the Streams were properly stopped. +- `StreamErrorPolicy.STOP_APPLICATION`: Stop the `StreamEngine` when an exception occurs and raises `signal.SIGTERM`. Useful when using `kstreams` with other libraries such us `FastAPI`. In the following example, the `StreamErrorPolicy.RESTART` error policy is specifed. If the `Stream` crashed with the `ValueError` exception it is restarted: diff --git a/examples/fastapi-webapp/README.md b/examples/fastapi-webapp/README.md index c406f284..704b1ed2 100644 --- a/examples/fastapi-webapp/README.md +++ b/examples/fastapi-webapp/README.md @@ -37,6 +37,47 @@ After doing a `GET` to `http://localhost:8000/events` you should see the followi Event consumed: headers: (), payload: b'{"message": "hello world!"}' ``` +## Stop FastAPI application + +It is possible to stop the `FastAPI` application when a `Stream` crashes using [StreamErrorPolicy.STOP_APPLICATION](https://github.com/kpn/kstreams/blob/master/examples/fastapi-webapp/fastapi_webapp/app.py#L23) +For this use case, if the event `error` is sent then the `stream` will crash with [ValueError("error....")](https://github.com/kpn/kstreams/blob/master/examples/fastapi-webapp/fastapi_webapp/streams.py#L13) + +To this this behaviour execute: + +```bash +./scripts/cluster/events/send "local--kstream" +``` + +and type `error` + +Then you should see something similar to the following logs: + +```bash +INFO: Will watch for changes in these directories: ['kstreams/examples/fastapi-webapp'] +INFO: Uvicorn running on http://localhost:8000 (Press CTRL+C to quit) +INFO: Started reloader process [41292] using StatReload +INFO: Started server process [41297] +INFO: Waiting for application startup. +INFO: Application startup complete. +Event consumed: headers: (), payload: b'error' +Unhandled error occurred while listening to the stream. Stream consuming from topics ['local--kstream'] CRASHED!!! + +Traceback (most recent call last): + File "kstreams/kstreams/middleware/middleware.py", line 83, in __call__ + return await self.next_call(cr) + ^^^^^^^^^^^^^^^^^^^^^^^^ + File "kstreams/kstreams/middleware/udf_middleware.py", line 59, in __call__ + return await self.next_call(*params) + ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + File "kstreams/examples/fastapi-webapp/fastapi_webapp/streams.py", line 13, in consume + raise ValueError("error....") +ValueError: error.... +INFO: Shutting down +INFO: Waiting for application shutdown. +INFO: Application shutdown complete. +INFO: Finished server process [41297] +``` + ## Note If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where diff --git a/examples/fastapi-webapp/fastapi_webapp/app.py b/examples/fastapi-webapp/fastapi_webapp/app.py index 3147ee09..80476864 100644 --- a/examples/fastapi-webapp/fastapi_webapp/app.py +++ b/examples/fastapi-webapp/fastapi_webapp/app.py @@ -1,6 +1,8 @@ from fastapi import FastAPI from starlette_prometheus import PrometheusMiddleware, metrics +from kstreams.streams_utils import StreamErrorPolicy + from .resources import stream_engine from .streams import consume from .views import router @@ -18,7 +20,7 @@ async def shutdown_event(): await stream_engine.stop() -stream_engine.add_stream(consume) +stream_engine.add_stream(consume, error_policy=StreamErrorPolicy.STOP_APPLICATION) app.include_router(router) app.add_middleware(PrometheusMiddleware, filter_unhandled_paths=True) app.add_api_route("/metrics", metrics) # type: ignore diff --git a/examples/fastapi-webapp/fastapi_webapp/streams.py b/examples/fastapi-webapp/fastapi_webapp/streams.py index ff1fd368..256bf271 100644 --- a/examples/fastapi-webapp/fastapi_webapp/streams.py +++ b/examples/fastapi-webapp/fastapi_webapp/streams.py @@ -8,3 +8,6 @@ @stream("local--kstream", group_id="kstreams--group-id") async def consume(cr: ConsumerRecord): print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}") + + if cr.value == b"error": + raise ValueError("error....") diff --git a/examples/fastapi-webapp/poetry.lock b/examples/fastapi-webapp/poetry.lock index 0b65d757..c2cf5d71 100644 --- a/examples/fastapi-webapp/poetry.lock +++ b/examples/fastapi-webapp/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "aiokafka" @@ -140,22 +140,23 @@ test = ["pytest (>=6)"] [[package]] name = "fastapi" -version = "0.109.2" +version = "0.112.2" description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production" optional = false python-versions = ">=3.8" files = [ - {file = "fastapi-0.109.2-py3-none-any.whl", hash = "sha256:2c9bab24667293b501cad8dd388c05240c850b58ec5876ee3283c47d6e1e3a4d"}, - {file = "fastapi-0.109.2.tar.gz", hash = "sha256:f3817eac96fe4f65a2ebb4baa000f394e55f5fccdaf7f75250804bc58f354f73"}, + {file = "fastapi-0.112.2-py3-none-any.whl", hash = "sha256:db84b470bd0e2b1075942231e90e3577e12a903c4dc8696f0d206a7904a7af1c"}, + {file = "fastapi-0.112.2.tar.gz", hash = "sha256:3d4729c038414d5193840706907a41839d839523da6ed0c2811f1168cac1798c"}, ] [package.dependencies] pydantic = ">=1.7.4,<1.8 || >1.8,<1.8.1 || >1.8.1,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<2.1.0 || >2.1.0,<3.0.0" -starlette = ">=0.36.3,<0.37.0" +starlette = ">=0.37.2,<0.39.0" typing-extensions = ">=4.8.0" [package.extras] -all = ["email-validator (>=2.0.0)", "httpx (>=0.23.0)", "itsdangerous (>=1.1.0)", "jinja2 (>=2.11.2)", "orjson (>=3.2.1)", "pydantic-extra-types (>=2.0.0)", "pydantic-settings (>=2.0.0)", "python-multipart (>=0.0.7)", "pyyaml (>=5.3.1)", "ujson (>=4.0.1,!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0)", "uvicorn[standard] (>=0.12.0)"] +all = ["email-validator (>=2.0.0)", "fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "itsdangerous (>=1.1.0)", "jinja2 (>=2.11.2)", "orjson (>=3.2.1)", "pydantic-extra-types (>=2.0.0)", "pydantic-settings (>=2.0.0)", "python-multipart (>=0.0.7)", "pyyaml (>=5.3.1)", "ujson (>=4.0.1,!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0)", "uvicorn[standard] (>=0.12.0)"] +standard = ["email-validator (>=2.0.0)", "fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "jinja2 (>=2.11.2)", "python-multipart (>=0.0.7)", "uvicorn[standard] (>=0.12.0)"] [[package]] name = "future" @@ -191,7 +192,7 @@ files = [ [[package]] name = "kstreams" -version = "0.18.5" +version = "0.23.0" description = "Build simple kafka streams applications" optional = false python-versions = "^3.8" @@ -417,13 +418,13 @@ files = [ [[package]] name = "starlette" -version = "0.36.3" +version = "0.38.2" description = "The little ASGI library that shines." optional = false python-versions = ">=3.8" files = [ - {file = "starlette-0.36.3-py3-none-any.whl", hash = "sha256:13d429aa93a61dc40bf503e8c801db1f1bca3dc706b10ef2434a36123568f044"}, - {file = "starlette-0.36.3.tar.gz", hash = "sha256:90a671733cfb35771d8cc605e0b679d23b992f8dcfad48cc60b38cb29aeb7080"}, + {file = "starlette-0.38.2-py3-none-any.whl", hash = "sha256:4ec6a59df6bbafdab5f567754481657f7ed90dc9d69b0c9ff017907dd54faeff"}, + {file = "starlette-0.38.2.tar.gz", hash = "sha256:c7c0441065252160993a1a37cf2a73bb64d271b17303e0b0c1eb7191cfb12d75"}, ] [package.dependencies] @@ -480,4 +481,4 @@ standard = ["colorama (>=0.4)", "httptools (>=0.4.0)", "python-dotenv (>=0.13)", [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "bd4adcaed4970b23ae5a549e8d2223f07b6632554350eac7b77a2540501e9ba4" +content-hash = "082fe4939c89768e63d2538ac6fc0a5f704c09a3cd4f88c0a6fa2579a8095681" diff --git a/examples/fastapi-webapp/pyproject.toml b/examples/fastapi-webapp/pyproject.toml index b475ee96..fb0eb4f4 100644 --- a/examples/fastapi-webapp/pyproject.toml +++ b/examples/fastapi-webapp/pyproject.toml @@ -6,7 +6,7 @@ authors = ["Marcos Schroh "] [tool.poetry.dependencies] python = "^3.8" -fastapi = "^0.109.1" +fastapi = "^0.112" kstreams = { path = "../../.", develop = true } starlette-prometheus = "^0.9.0" uvicorn = "^0.18.2" diff --git a/kstreams/middleware/middleware.py b/kstreams/middleware/middleware.py index c77bc2c5..6032f443 100644 --- a/kstreams/middleware/middleware.py +++ b/kstreams/middleware/middleware.py @@ -1,4 +1,5 @@ import logging +import signal import sys import typing @@ -62,6 +63,14 @@ async def __call__(self, cr: ConsumerRecord) -> typing.Any: class ExceptionMiddleware(BaseMiddleware): + """ + This is always the first Middleware in the middleware stack + to catch any exception that might occur. Any exception raised + when consuming events that is not handled by the end user + will be handled by this ExceptionMiddleware executing the + policy_error that was stablished. + """ + def __init__( self, *, engine: "StreamEngine", error_policy: StreamErrorPolicy, **kwargs ) -> None: @@ -97,9 +106,13 @@ async def cleanup_policy(self, exc: Exception) -> None: elif self.error_policy == StreamErrorPolicy.STOP: await self.stream.stop() raise exc - else: + elif self.error_policy == StreamErrorPolicy.STOP_ENGINE: await self.engine.stop() raise exc + else: + # STOP_APPLICATION + await self.engine.stop() + signal.raise_signal(signal.SIGTERM) # acquire the asyncio.Lock `is_processing` again to resume the processing # and avoid `RuntimeError: Lock is not acquired.` diff --git a/kstreams/streams_utils.py b/kstreams/streams_utils.py index ae38e733..d1c55133 100644 --- a/kstreams/streams_utils.py +++ b/kstreams/streams_utils.py @@ -14,6 +14,7 @@ class StreamErrorPolicy(str, enum.Enum): RESTART = "RESTART" STOP = "STOP" STOP_ENGINE = "STOP_ENGINE" + STOP_APPLICATION = "STOP_APPLICATION" def setup_type(params: List[inspect.Parameter]) -> UDFType: diff --git a/tests/test_streams_error_policy.py b/tests/test_streams_error_policy.py index eb9cc421..83632d71 100644 --- a/tests/test_streams_error_policy.py +++ b/tests/test_streams_error_policy.py @@ -84,6 +84,41 @@ async def my_stream_two(cr: ConsumerRecord): save_to_db.assert_called_once_with(b'{"message": "Hello world!"}') +@pytest.mark.asyncio +async def test_stop_application_error_policy(stream_engine: StreamEngine): + event = b'{"message": "Hello world!"}' + topic = "kstrems--local" + topic_two = "kstrems--local-two" + save_to_db = mock.Mock() + client = TestStreamClient(stream_engine) + + with mock.patch("signal.raise_signal"): + @stream_engine.stream(topic, error_policy=StreamErrorPolicy.STOP_APPLICATION) + async def my_stream(cr: ConsumerRecord): + raise ValueError("Crashing Stream...") + + @stream_engine.stream(topic_two) + async def my_stream_two(cr: ConsumerRecord): + save_to_db(cr.value) + + async with client: + # send event and crash the first Stream, then the second one + # should be stopped because of StreamErrorPolicy.STOP_ENGINE + await client.send(topic, value=event, key="1") + + # Send an event to the second Stream, it should be consumed + # as the Stream has been stopped + await client.send(topic_two, value=event, key="1") + + # Both streams are stopped before leaving the context + assert not my_stream.running + assert not my_stream_two.running + + # check that the event was consumed only once. + # The StreamEngine must wait for graceful shutdown + save_to_db.assert_called_once_with(b'{"message": "Hello world!"}') + + @pytest.mark.asyncio async def test_restart_stream_error_policy(stream_engine: StreamEngine): event = b'{"message": "Hello world!"}'