diff --git a/kstreams/streams.py b/kstreams/streams.py index 532ae25b..629d295c 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -244,10 +244,17 @@ async def func_wrapper(self, func: typing.Awaitable) -> None: logger.exception(f"CRASHED Stream!!! Task {self._consumer_task} \n\n {e}") async def func_wrapper_with_typing(self) -> None: - while self.running: - cr = await self.getone() - async with self.is_processing: - await self.func(cr) + try: + while self.running: + cr = await self.getone() + async with self.is_processing: + await self.func(cr) + except Exception as e: + logger.exception("Unhandled error occurred while listening to the stream") + if sys.version_info >= (3, 11): + e.add_note(f"Task: {self._consumer_task}") + e.add_note(f"Handler: {self.func}") + e.add_note(f"Topics: {self.topics}") def seek_to_initial_offsets(self) -> None: if not self.seeked_initial_offsets and self.consumer is not None: