From 31b274297dea615276e9d684dcecccde147085d2 Mon Sep 17 00:00:00 2001 From: marcosschroh Date: Tue, 19 Dec 2023 16:11:20 +0100 Subject: [PATCH] fix(Stream): handle errors.ConsumerStoppedError exception for the new consumtion way --- kstreams/streams.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kstreams/streams.py b/kstreams/streams.py index fe765b45..b75c95a7 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -250,6 +250,8 @@ async def func_wrapper_with_typing(self, calling_type: UDFType) -> None: else: # typing with cr and stream await self.func(cr, self) + except errors.ConsumerStoppedError: + return except Exception as e: logger.exception(f"CRASHED Stream!!! Task {self._consumer_task} \n\n {e}")