Skip to content

Commit

Permalink
feat: error policy STOP_APPLICATION added. Closes #162 (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh authored Sep 3, 2024
1 parent 2817c21 commit f31a3a3
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ If your stream `crashes` for any reason the event consumption is stopped, meanin
- `StreamErrorPolicy.STOP` (**default**): Stop the `Stream` when an exception occurs. The exception is raised after the stream is properly stopped.
- `StreamErrorPolicy.RESTART`: Stop and restart the `Stream` when an exception occurs. The event that caused the exception is skipped. The exception is *NOT raised* because the application should contine working, however `logger.exception()` is used to alert the user.
- `StreamErrorPolicy.STOP_ENGINE`: Stop the `StreamEngine` when an exception occurs. The exception is raised after *ALL* the Streams were properly stopped.
- `StreamErrorPolicy.STOP_APPLICATION`: Stop the `StreamEngine` when an exception occurs and raises `signal.SIGTERM`. Useful when using `kstreams` with other libraries such us `FastAPI`.

In the following example, the `StreamErrorPolicy.RESTART` error policy is specifed. If the `Stream` crashed with the `ValueError` exception it is restarted:

Expand Down
41 changes: 41 additions & 0 deletions examples/fastapi-webapp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,47 @@ After doing a `GET` to `http://localhost:8000/events` you should see the followi
Event consumed: headers: (), payload: b'{"message": "hello world!"}'
```
## Stop FastAPI application
It is possible to stop the `FastAPI` application when a `Stream` crashes using [StreamErrorPolicy.STOP_APPLICATION](https://github.com/kpn/kstreams/blob/master/examples/fastapi-webapp/fastapi_webapp/app.py#L23)
For this use case, if the event `error` is sent then the `stream` will crash with [ValueError("error....")](https://github.com/kpn/kstreams/blob/master/examples/fastapi-webapp/fastapi_webapp/streams.py#L13)
To this this behaviour execute:
```bash
./scripts/cluster/events/send "local--kstream"
```
and type `error`
Then you should see something similar to the following logs:
```bash
INFO: Will watch for changes in these directories: ['kstreams/examples/fastapi-webapp']
INFO: Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
INFO: Started reloader process [41292] using StatReload
INFO: Started server process [41297]
INFO: Waiting for application startup.
INFO: Application startup complete.
Event consumed: headers: (), payload: b'error'
Unhandled error occurred while listening to the stream. Stream consuming from topics ['local--kstream'] CRASHED!!!
Traceback (most recent call last):
File "kstreams/kstreams/middleware/middleware.py", line 83, in __call__
return await self.next_call(cr)
^^^^^^^^^^^^^^^^^^^^^^^^
File "kstreams/kstreams/middleware/udf_middleware.py", line 59, in __call__
return await self.next_call(*params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "kstreams/examples/fastapi-webapp/fastapi_webapp/streams.py", line 13, in consume
raise ValueError("error....")
ValueError: error....
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [41297]
```
## Note
If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
Expand Down
4 changes: 3 additions & 1 deletion examples/fastapi-webapp/fastapi_webapp/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from fastapi import FastAPI
from starlette_prometheus import PrometheusMiddleware, metrics

from kstreams.streams_utils import StreamErrorPolicy

from .resources import stream_engine
from .streams import consume
from .views import router
Expand All @@ -18,7 +20,7 @@ async def shutdown_event():
await stream_engine.stop()


stream_engine.add_stream(consume)
stream_engine.add_stream(consume, error_policy=StreamErrorPolicy.STOP_APPLICATION)
app.include_router(router)
app.add_middleware(PrometheusMiddleware, filter_unhandled_paths=True)
app.add_api_route("/metrics", metrics) # type: ignore
3 changes: 3 additions & 0 deletions examples/fastapi-webapp/fastapi_webapp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@
@stream("local--kstream", group_id="kstreams--group-id")
async def consume(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")

if cr.value == b"error":
raise ValueError("error....")
23 changes: 12 additions & 11 deletions examples/fastapi-webapp/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/fastapi-webapp/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Marcos Schroh <[email protected]>"]

[tool.poetry.dependencies]
python = "^3.8"
fastapi = "^0.109.1"
fastapi = "^0.112"
kstreams = { path = "../../.", develop = true }
starlette-prometheus = "^0.9.0"
uvicorn = "^0.18.2"
Expand Down
15 changes: 14 additions & 1 deletion kstreams/middleware/middleware.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import signal
import sys
import typing

Expand Down Expand Up @@ -62,6 +63,14 @@ async def __call__(self, cr: ConsumerRecord) -> typing.Any:


class ExceptionMiddleware(BaseMiddleware):
"""
This is always the first Middleware in the middleware stack
to catch any exception that might occur. Any exception raised
when consuming events that is not handled by the end user
will be handled by this ExceptionMiddleware executing the
policy_error that was stablished.
"""

def __init__(
self, *, engine: "StreamEngine", error_policy: StreamErrorPolicy, **kwargs
) -> None:
Expand Down Expand Up @@ -97,9 +106,13 @@ async def cleanup_policy(self, exc: Exception) -> None:
elif self.error_policy == StreamErrorPolicy.STOP:
await self.stream.stop()
raise exc
else:
elif self.error_policy == StreamErrorPolicy.STOP_ENGINE:
await self.engine.stop()
raise exc
else:
# STOP_APPLICATION
await self.engine.stop()
signal.raise_signal(signal.SIGTERM)

# acquire the asyncio.Lock `is_processing` again to resume the processing
# and avoid `RuntimeError: Lock is not acquired.`
Expand Down
1 change: 1 addition & 0 deletions kstreams/streams_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class StreamErrorPolicy(str, enum.Enum):
RESTART = "RESTART"
STOP = "STOP"
STOP_ENGINE = "STOP_ENGINE"
STOP_APPLICATION = "STOP_APPLICATION"


def setup_type(params: List[inspect.Parameter]) -> UDFType:
Expand Down
35 changes: 35 additions & 0 deletions tests/test_streams_error_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,41 @@ async def my_stream_two(cr: ConsumerRecord):
save_to_db.assert_called_once_with(b'{"message": "Hello world!"}')


@pytest.mark.asyncio
async def test_stop_application_error_policy(stream_engine: StreamEngine):
event = b'{"message": "Hello world!"}'
topic = "kstrems--local"
topic_two = "kstrems--local-two"
save_to_db = mock.Mock()
client = TestStreamClient(stream_engine)

with mock.patch("signal.raise_signal"):
@stream_engine.stream(topic, error_policy=StreamErrorPolicy.STOP_APPLICATION)
async def my_stream(cr: ConsumerRecord):
raise ValueError("Crashing Stream...")

@stream_engine.stream(topic_two)
async def my_stream_two(cr: ConsumerRecord):
save_to_db(cr.value)

async with client:
# send event and crash the first Stream, then the second one
# should be stopped because of StreamErrorPolicy.STOP_ENGINE
await client.send(topic, value=event, key="1")

# Send an event to the second Stream, it should be consumed
# as the Stream has been stopped
await client.send(topic_two, value=event, key="1")

# Both streams are stopped before leaving the context
assert not my_stream.running
assert not my_stream_two.running

# check that the event was consumed only once.
# The StreamEngine must wait for graceful shutdown
save_to_db.assert_called_once_with(b'{"message": "Hello world!"}')


@pytest.mark.asyncio
async def test_restart_stream_error_policy(stream_engine: StreamEngine):
event = b'{"message": "Hello world!"}'
Expand Down

0 comments on commit f31a3a3

Please sign in to comment.