diff --git a/kstreams/streams.py b/kstreams/streams.py index fe765b45..b75c95a7 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -250,6 +250,8 @@ async def func_wrapper_with_typing(self, calling_type: UDFType) -> None: else: # typing with cr and stream await self.func(cr, self) + except errors.ConsumerStoppedError: + return except Exception as e: logger.exception(f"CRASHED Stream!!! Task {self._consumer_task} \n\n {e}")