Skip to content

Commit

Permalink
fix: prevent ConsumerStoppedError exception when engine or a stream i…
Browse files Browse the repository at this point in the history
…s stopped. Related to #234
  • Loading branch information
marcosschroh committed Nov 15, 2024
1 parent 08691d8 commit 1bafde9
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 28 deletions.
3 changes: 2 additions & 1 deletion examples/fastapi-webapp/fastapi_webapp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from kstreams.streams_utils import StreamErrorPolicy

from .resources import stream_engine
from .streams import consume
from .streams import consume, consume_2
from .views import router

app = FastAPI()
Expand All @@ -21,6 +21,7 @@ async def shutdown_event():


stream_engine.add_stream(consume, error_policy=StreamErrorPolicy.STOP_APPLICATION)
stream_engine.add_stream(consume_2, error_policy=StreamErrorPolicy.RESTART)
app.include_router(router)
app.add_middleware(PrometheusMiddleware, filter_unhandled_paths=True)
app.add_api_route("/metrics", metrics) # type: ignore
8 changes: 8 additions & 0 deletions examples/fastapi-webapp/fastapi_webapp/streams.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

import asyncio
from kstreams import ConsumerRecord, stream

logger = logging.getLogger(__name__)
Expand All @@ -11,3 +12,10 @@ async def consume(cr: ConsumerRecord):

if cr.value == b"error":
raise ValueError("error....")


@stream("local--hello-world", group_id="example-group-2")
async def consume_2(cr: ConsumerRecord):
print(f"Event consumed: headers: {cr.headers}, payload: {cr}")
await asyncio.sleep(10)
raise ValueError
9 changes: 5 additions & 4 deletions examples/fastapi-webapp/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import aiorun

import kstreams
import kstreams.streams_utils

logger = logging.getLogger(__name__)

Expand Down
11 changes: 6 additions & 5 deletions examples/graceful-shutdown-example/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions examples/simple-example/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 39 additions & 11 deletions kstreams/middleware/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import sys
import typing

from aiokafka import errors

from kstreams import types
from kstreams.streams_utils import StreamErrorPolicy

Expand Down Expand Up @@ -81,8 +79,6 @@ def __init__(
async def __call__(self, cr: types.ConsumerRecord) -> typing.Any:
try:
return await self.next_call(cr)
except errors.ConsumerStoppedError as exc:
await self.cleanup_policy(exc)
except Exception as exc:
logger.exception(
"Unhandled error occurred while listening to the stream. "
Expand All @@ -95,25 +91,57 @@ async def __call__(self, cr: types.ConsumerRecord) -> typing.Any:
await self.cleanup_policy(exc)

async def cleanup_policy(self, exc: Exception) -> None:
# always release the asyncio.Lock `is_processing` to
# stop or restart properly the `stream`
"""
Execute clenup policicy according to the Stream configuration.
At this point we are inside the asyncio.Lock `is_processing`
as an event is being processed and an exeption has occured.
The Lock must be released to stop the Stream
(which must happen for any policy), then before re-raising
the exception the Lock must be acquire again to continue the processing
Exception and policies:
- STOP: The exception is re-raised as the Stream will be stopped
and the end user will deal with it
- STOP_ENGINE: The exception is re-raised as the Engine will be stopped
(all Streams and Producer) and the end user will deal with it
- RESTART: The exception is not re-raised as the Stream
will recover and continue the processing. The logger.exception
from __call__ will record that something went wrong
- STOP_APPLICATION: The exception is not re-raised as the entire
application will be stopped. This is only useful when using kstreams
with another library like FastAPI. The logger.exception
from __call__ will record that something went wrong
Args:
exc (Exception): Any Exception that causes the Stream to crash
Raises:
exc: Exception is the policy is `STOP` or `STOP_ENGINE`
"""
self.stream.is_processing.release()

if self.error_policy == StreamErrorPolicy.RESTART:
await self.stream.stop()
logger.info(f"Restarting stream {self.stream}")
await self.stream.start()
elif self.error_policy == StreamErrorPolicy.STOP:
await self.stream.stop()
# acquire `is_processing` Lock again to resume processing
# and avoid `RuntimeError: Lock is not acquired.`
await self.stream.is_processing.acquire()
raise exc
elif self.error_policy == StreamErrorPolicy.STOP_ENGINE:
await self.engine.stop()
# acquire `is_processing` Lock again to resume processing
# and avoid `RuntimeError: Lock is not acquired.`
await self.stream.is_processing.acquire()
raise exc
else:
# STOP_APPLICATION
await self.engine.stop()
await self.stream.is_processing.acquire()
signal.raise_signal(signal.SIGTERM)

# acquire the asyncio.Lock `is_processing` again to resume the processing
# and avoid `RuntimeError: Lock is not acquired.`
await self.stream.is_processing.acquire()
16 changes: 13 additions & 3 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,19 @@ async def start(self) -> None:

async def func_wrapper_with_typing(self) -> None:
while self.running:
cr = await self.getone()
async with self.is_processing:
await self.func(cr)
try:
cr = await self.getone()
async with self.is_processing:
await self.func(cr)
except errors.ConsumerStoppedError:
# This exception is only raised when we are inside the `getone`
# coroutine waiting for an event and `await consumer.stop()`
# is called. If this happens it means that the end users has called
# `engine.stop()` or has been another exception that causes all the
# streams to stop. In any case the exception should not be re raised.
logger.info(
f"Stream {self} stopped after Coordinator was closed {self.topics}"
)

def seek_to_initial_offsets(self) -> None:
if not self.seeked_initial_offsets and self.consumer is not None:
Expand Down

0 comments on commit 1bafde9

Please sign in to comment.