Skip to content

Commit

Permalink
fix: log possible error in the typed handler wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
eigenein committed May 14, 2024
1 parent 1f73928 commit a6fd162
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,17 @@ async def func_wrapper(self, func: typing.Awaitable) -> None:
logger.exception(f"CRASHED Stream!!! Task {self._consumer_task} \n\n {e}")

async def func_wrapper_with_typing(self) -> None:
while self.running:
cr = await self.getone()
async with self.is_processing:
await self.func(cr)
try:
while self.running:
cr = await self.getone()
async with self.is_processing:
await self.func(cr)
except Exception as e:
logger.exception("An unhandled exception occurred while listening to the stream")
if sys.version_info >= (3, 11):
e.add_note(f"Task: {self._consumer_task}")
e.add_note(f"Handler: {self.func}")
e.add_note(f"Topics: {self.topics}")

def seek_to_initial_offsets(self) -> None:
if not self.seeked_initial_offsets and self.consumer is not None:
Expand Down

0 comments on commit a6fd162

Please sign in to comment.