Skip to content

Commit

Permalink
feat: Stream error policy added
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh committed Sep 2, 2024
1 parent 057aaa4 commit f7abb93
Show file tree
Hide file tree
Showing 14 changed files with 291 additions and 107 deletions.
66 changes: 34 additions & 32 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,52 +156,54 @@ stream = Stream(

## Stream crashing

If your stream `crashes` for any reason, the event consumption will stop meaning that non event will be consumed from the `topic`.
As an end user you are responsable of deciding what to do. In future version approaches like `re-try`, `stream engine stops on stream crash` might be introduced.
If your stream `crashes` for any reason the event consumption is stopped, meaning that non event will be consumed from the `topic`. However, it is possible to set three different `error policies` per stream:

```python title="Crashing example"
import aiorun
- `StreamErrorPolicy.STOP`: Stop the `Stream` when an exception occurs. The exception is raised after the stream is properly stopped.
- `StreamErrorPolicy.RESTART`: Stop and restart the `Stream` when an exception occurs. The event that caused the exception is skipped. The exception is *NOT raised* because the application should contine working, however `logger.exception()` is used to alert the user.
- `StreamErrorPolicy.STOP_ENGINE`: Stop the `StreamEngine` when an exception occurs. The exception is raised after *ALL* the Streams were properly stopped.

In the following example, the `StreamErrorPolicy.RESTART` error policy is specifed. If the `Stream` crashed with the `ValueError` exception it is restarted:

```python
from kstreams import create_engine, ConsumerRecord
from kstreams.stream_utils import StreamErrorPolicy

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream("local--kstreams", group_id="de-my-partition")
@stream_engine.stream(
"local--hello-world",
group_id="example-group",
error_policy=StreamErrorPolicy.RESTART
)
async def stream(cr: ConsumerRecord) -> None:
print(f"Event consumed. Payload {cr.payload}")

if cr.key == b"error":
# Stream will be restarted after the ValueError is raised
raise ValueError("error....")

async def produce():
await stream_engine.send(
"local--kstreams",
value=b"Hi"
)


async def start():
await stream_engine.start()
await produce()
print(f"Event consumed. Payload {cr.value}")
```

We can see the logs:

async def shutdown(loop):
await stream_engine.stop()
```bash
ValueError: error....
INFO:aiokafka.consumer.group_coordinator:LeaveGroup request succeeded
INFO:aiokafka.consumer.consumer:Unsubscribed all topics or patterns and assigned partitions
INFO:kstreams.streams:Stream consuming from topics ['local--hello-world'] has stopped!!!


if __name__ == "__main__":
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
INFO:kstreams.middleware.middleware:Restarting stream <kstreams.streams.Stream object at 0x102d44050>
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'local--hello-world'})
...
INFO:aiokafka.consumer.group_coordinator:Setting newly assigned partitions {TopicPartition(topic='local--hello-world', partition=0)} for group example-group
```

```bash
CRASHED Stream!!! Task <Task pending name='Task-23' coro=<BaseStream.start.<locals>.func_wrapper() running at /Users/Projects/kstreams/kstreams/streams.py:55>>

'ConsumerRecord' object has no attribute 'payload'
Traceback (most recent call last):
File "/Users/Projects/kstreams/kstreams/streams.py", line 52, in func_wrapper
await self.func(self)
File "/Users/Projects/kstreams/examples/fastapi_example/streaming/streams.py", line 9, in stream
print(f"Event consumed: headers: {cr.headers}, payload: {cr.payload}")
AttributeError: 'ConsumerRecord' object has no attribute 'payload'
```
!!! note
The default error policy is `StreamErrorPolicy.STOP` if not specified, which is compliant with previous `kstreams` versions

!!! note
If you are using `aiorun` with `stop_on_unhandled_errors=True` and the `error_policy` is `StreamErrorPolicy.RESTART` then the `application` will NOT stop as the exception that caused the `Stream` to `crash` is not `raised`

## Changing consumer behavior

Expand Down
26 changes: 19 additions & 7 deletions kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .serializers import Deserializer, Serializer
from .streams import Stream, StreamFunc
from .streams import stream as stream_func
from .streams_utils import UDFType
from .streams_utils import StreamErrorPolicy, UDFType
from .types import EngineHooks, Headers, NextMiddlewareCall
from .utils import encode_headers, execute_hooks

Expand Down Expand Up @@ -342,7 +342,9 @@ def get_stream(self, name: str) -> typing.Optional[Stream]:

return stream

def add_stream(self, stream: Stream) -> None:
def add_stream(
self, stream: Stream, error_policy: StreamErrorPolicy = StreamErrorPolicy.STOP
) -> None:
if self.exist_stream(stream.name):
raise DuplicateStreamException(name=stream.name)
stream.backend = self.backend
Expand All @@ -367,12 +369,18 @@ def add_stream(self, stream: Stream) -> None:
# NOTE: When `no typing` support is deprecated this check can
# be removed
if stream.udf_handler.type != UDFType.NO_TYPING:
stream.func = self.build_stream_middleware_stack(stream=stream)
stream.func = self.build_stream_middleware_stack(
stream=stream, error_policy=error_policy
)

def build_stream_middleware_stack(self, *, stream: Stream) -> NextMiddlewareCall:
def build_stream_middleware_stack(
self, *, stream: Stream, error_policy: StreamErrorPolicy
) -> NextMiddlewareCall:
assert stream.udf_handler, "UdfHandler can not be None"

stream.middlewares = [Middleware(ExceptionMiddleware)] + stream.middlewares
stream.middlewares = [
Middleware(ExceptionMiddleware, engine=self, error_policy=error_policy),
] + stream.middlewares

next_call = stream.udf_handler
for middleware, options in reversed(stream.middlewares):
Expand All @@ -382,9 +390,12 @@ def build_stream_middleware_stack(self, *, stream: Stream) -> NextMiddlewareCall
return next_call

async def remove_stream(self, stream: Stream) -> None:
consumer = stream.consumer
self._streams.remove(stream)
await stream.stop()
self.monitor.clean_stream_consumer_metrics(stream)

if consumer is not None:
self.monitor.clean_stream_consumer_metrics(consumer=consumer)

def stream(
self,
Expand All @@ -396,6 +407,7 @@ def stream(
rebalance_listener: typing.Optional[RebalanceListener] = None,
middlewares: typing.Optional[typing.List[Middleware]] = None,
subscribe_by_pattern: bool = False,
error_policy: StreamErrorPolicy = StreamErrorPolicy.STOP,
**kwargs,
) -> typing.Callable[[StreamFunc], Stream]:
def decorator(func: StreamFunc) -> Stream:
Expand All @@ -409,7 +421,7 @@ def decorator(func: StreamFunc) -> Stream:
subscribe_by_pattern=subscribe_by_pattern,
**kwargs,
)(func)
self.add_stream(stream_from_func)
self.add_stream(stream_from_func, error_policy=error_policy)

return stream_from_func

Expand Down
32 changes: 24 additions & 8 deletions kstreams/middleware/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from aiokafka import errors

from kstreams import ConsumerRecord, types
from kstreams.streams_utils import StreamErrorPolicy

if typing.TYPE_CHECKING:
from kstreams import Stream # pragma: no cover
from kstreams import Stream, StreamEngine # pragma: no cover


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -61,12 +62,18 @@ async def __call__(self, cr: ConsumerRecord) -> typing.Any:


class ExceptionMiddleware(BaseMiddleware):
def __init__(
self, *, engine: "StreamEngine", error_policy: StreamErrorPolicy, **kwargs
) -> None:
super().__init__(**kwargs)
self.engine = engine
self.error_policy = error_policy

async def __call__(self, cr: ConsumerRecord) -> typing.Any:
try:
return await self.next_call(cr)
except errors.ConsumerStoppedError as exc:
await self.cleanup_policy()
raise exc
await self.cleanup_policy(exc)
except Exception as exc:
logger.exception(
"Unhandled error occurred while listening to the stream. "
Expand All @@ -76,14 +83,23 @@ async def __call__(self, cr: ConsumerRecord) -> typing.Any:
exc.add_note(f"Handler: {self.stream.func}")
exc.add_note(f"Topics: {self.stream.topics}")

await self.cleanup_policy()
raise exc
await self.cleanup_policy(exc)

async def cleanup_policy(self) -> None:
async def cleanup_policy(self, exc: Exception) -> None:
# always release the asyncio.Lock `is_processing` to
# stop properly the `stream`
# stop or restart properly the `stream`
self.stream.is_processing.release()
await self.stream.stop()

if self.error_policy == StreamErrorPolicy.RESTART:
await self.stream.stop()
logger.info(f"Restarting stream {self.stream}")
await self.stream.start()
elif self.error_policy == StreamErrorPolicy.STOP:
await self.stream.stop()
raise exc
else:
await self.engine.stop()
raise exc

# acquire the asyncio.Lock `is_processing` again to resume the processing
# and avoid `RuntimeError: Lock is not acquired.`
Expand Down
52 changes: 27 additions & 25 deletions kstreams/prometheus/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,31 +124,33 @@ def _clean_consumer_metrics(self) -> None:
self.MET_POSITION.clear()
self.MET_HIGHWATER.clear()

def clean_stream_consumer_metrics(self, stream: Stream) -> None:
if stream.consumer is not None:
topic_partitions = stream.consumer.assignment()
group_id = stream.consumer._group_id
for topic_partition in topic_partitions:
topic = topic_partition.topic
partition = topic_partition.partition

metrics_found = False
for sample in self.MET_LAG.collect()[0].samples:
if {
"topic": topic,
"partition": str(partition),
"consumer_group": group_id,
} == sample.labels:
metrics_found = True

if metrics_found:
self.MET_LAG.remove(topic, partition, group_id)
self.MET_POSITION_LAG.remove(topic, partition, group_id)
self.MET_COMMITTED.remove(topic, partition, group_id)
self.MET_POSITION.remove(topic, partition, group_id)
self.MET_HIGHWATER.remove(topic, partition, group_id)
else:
logger.debug(f"Metrics for stream: {stream.name} not found")
def clean_stream_consumer_metrics(self, consumer: Consumer) -> None:
topic_partitions = consumer.assignment()
group_id = consumer._group_id
for topic_partition in topic_partitions:
topic = topic_partition.topic
partition = topic_partition.partition

metrics_found = False
for sample in self.MET_LAG.collect()[0].samples:
if {
"topic": topic,
"partition": str(partition),
"consumer_group": group_id,
} == sample.labels:
metrics_found = True

if metrics_found:
self.MET_LAG.remove(topic, partition, group_id)
self.MET_POSITION_LAG.remove(topic, partition, group_id)
self.MET_COMMITTED.remove(topic, partition, group_id)
self.MET_POSITION.remove(topic, partition, group_id)
self.MET_HIGHWATER.remove(topic, partition, group_id)
else:
logger.debug(
"Metrics for consumer with group-id: "
f"{consumer._group_id} not found"
)

def add_producer(self, producer):
self._producer = producer
Expand Down
32 changes: 21 additions & 11 deletions kstreams/rebalance_listener.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import asyncio
import logging
from typing import Set
import typing

from aiokafka.abc import ConsumerRebalanceListener

from kstreams import TopicPartition

logger = logging.getLogger(__name__)

if typing.TYPE_CHECKING:
from kstreams import Stream, StreamEngine


# Can not use a Protocol here because aiokafka forces to have a concrete instance
# that inherits from ConsumerRebalanceListener, if we use a protocol we will
Expand Down Expand Up @@ -49,11 +52,11 @@ async def my_stream(stream: Stream):
"""

def __init__(self) -> None:
self.stream = None
self.stream: typing.Optional["Stream"] = None
# engine added so it can react on rebalance events
self.engine = None
self.engine: typing.Optional["StreamEngine"] = None

async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None:
async def on_partitions_revoked(self, revoked: typing.Set[TopicPartition]) -> None:
"""
Coroutine to be called *before* a rebalance operation starts and
*after* the consumer stops fetching data.
Expand All @@ -74,7 +77,9 @@ async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None:
"""
... # pragma: no cover

async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None:
async def on_partitions_assigned(
self, assigned: typing.Set[TopicPartition]
) -> None:
"""
Coroutine to be called *after* partition re-assignment completes
and *before* the consumer starts fetching data again.
Expand All @@ -98,7 +103,7 @@ async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None:


class MetricsRebalanceListener(RebalanceListener):
async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None:
async def on_partitions_revoked(self, revoked: typing.Set[TopicPartition]) -> None:
"""
Coroutine to be called *before* a rebalance operation starts and
*after* the consumer stops fetching data.
Expand All @@ -112,10 +117,14 @@ async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None:
# lock all asyncio Tasks so no new metrics will be added to the Monitor
if revoked and self.engine is not None:
async with asyncio.Lock():
if self.stream is not None:
self.engine.monitor.clean_stream_consumer_metrics(self.stream)

async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None:
if self.stream is not None and self.stream.consumer is not None:
self.engine.monitor.clean_stream_consumer_metrics(
self.stream.consumer
)

async def on_partitions_assigned(
self, assigned: typing.Set[TopicPartition]
) -> None:
"""
Coroutine to be called *after* partition re-assignment completes
and *before* the consumer starts fetching data again.
Expand All @@ -134,7 +143,7 @@ async def on_partitions_assigned(self, assigned: Set[TopicPartition]) -> None:


class ManualCommitRebalanceListener(MetricsRebalanceListener):
async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None:
async def on_partitions_revoked(self, revoked: typing.Set[TopicPartition]) -> None:
"""
Coroutine to be called *before* a rebalance operation starts and
*after* the consumer stops fetching data.
Expand All @@ -150,6 +159,7 @@ async def on_partitions_revoked(self, revoked: Set[TopicPartition]) -> None:
if (
revoked
and self.stream is not None
and self.stream.consumer is not None
and not self.stream.consumer._enable_auto_commit
):
logger.info(
Expand Down
5 changes: 5 additions & 0 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ async def stop(self) -> None:
if self.consumer is not None:
await self.consumer.stop()

# we have to do this operations because aiokafka bug
# https://github.com/aio-libs/aiokafka/issues/1010
self.consumer.unsubscribe()
self.consumer = None

logger.info(
f"Stream consuming from topics {self.topics} has stopped!!! \n\n"
)
Expand Down
6 changes: 6 additions & 0 deletions kstreams/streams_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ class UDFType(str, enum.Enum):
WITH_TYPING = "WITH_TYPING"


class StreamErrorPolicy(str, enum.Enum):
RESTART = "RESTART"
STOP = "STOP"
STOP_ENGINE = "STOP_ENGINE"


def setup_type(params: List[inspect.Parameter]) -> UDFType:
"""
Inspect the user defined function (coroutine) to get the proper way to call it
Expand Down
Loading

0 comments on commit f7abb93

Please sign in to comment.