From 01024806d3368cb556d8fd6f074b1ba9bfd70c53 Mon Sep 17 00:00:00 2001 From: Pavel Perestoronin Date: Tue, 14 May 2024 17:13:23 +0200 Subject: [PATCH] fix: log possible error in the typed handler wrapper --- kstreams/streams.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/kstreams/streams.py b/kstreams/streams.py index 532ae25b..629d295c 100644 --- a/kstreams/streams.py +++ b/kstreams/streams.py @@ -244,10 +244,17 @@ async def func_wrapper(self, func: typing.Awaitable) -> None: logger.exception(f"CRASHED Stream!!! Task {self._consumer_task} \n\n {e}") async def func_wrapper_with_typing(self) -> None: - while self.running: - cr = await self.getone() - async with self.is_processing: - await self.func(cr) + try: + while self.running: + cr = await self.getone() + async with self.is_processing: + await self.func(cr) + except Exception as e: + logger.exception("Unhandled error occurred while listening to the stream") + if sys.version_info >= (3, 11): + e.add_note(f"Task: {self._consumer_task}") + e.add_note(f"Handler: {self.func}") + e.add_note(f"Topics: {self.topics}") def seek_to_initial_offsets(self) -> None: if not self.seeked_initial_offsets and self.consumer is not None: