Skip to content

Commit

Permalink
chore: extract @log_broken decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Dec 19, 2024
1 parent 3e02aca commit f6bb533
Showing 1 changed file with 53 additions and 50 deletions.
103 changes: 53 additions & 50 deletions a_sync/primitives/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,17 @@ def get_multi_nowait(self, i: int, can_return_less: bool = False) -> List[T]:
return items


def log_broken(func: Callable[[Any], NoReturn]) -> Callable[[Any], NoReturn]:
@wraps(func)
async def __worker_exc_wrap(self):
try:
return func(self)
except Exception as e:
logger.error("%s for %s is broken!!!", type(self).__name__, func)
logger.exception(e)
raise
return __worker_exc_wrap

class ProcessingQueue(_Queue[Tuple[P, "asyncio.Future[V]"]], Generic[P, V]):
"""
A queue designed for processing tasks asynchronously with multiple workers.
Expand Down Expand Up @@ -480,6 +491,7 @@ def _workers(self) -> "asyncio.Task[NoReturn]":
task._workers = workers
return task

@log_broken
async def __worker_coro(self) -> NoReturn:
"""
The coroutine executed by worker tasks to process the queue.
Expand All @@ -502,37 +514,32 @@ async def __worker_coro(self) -> NoReturn:
else:
fut: asyncio.Future[V]
while True:
args, kwargs, fut = await get_next_job()
try:
args, kwargs, fut = await get_next_job()
if fut is None:
# the weakref was already cleaned up, we don't need to process this item
task_done()
continue
result = await func(*args, **kwargs)
fut.set_result(result)
except InvalidStateError:
logger.error(
"cannot set result for %s %s: %s",
func.__name__,
fut,
result,
)
except Exception as e:
try:
if fut is None:
# the weakref was already cleaned up, we don't need to process this item
task_done()
continue
result = await func(*args, **kwargs)
fut.set_result(result)
fut.set_exception(e)
except InvalidStateError:
logger.error(
"cannot set result for %s %s: %s",
"cannot set exception for %s %s: %s",
func.__name__,
fut,
result,
e,
)
except Exception as e:
try:
fut.set_exception(e)
except InvalidStateError:
logger.error(
"cannot set exception for %s %s: %s",
func.__name__,
fut,
e,
)
task_done()
except Exception as e:
logger.error("%s for %s is broken!!!", type(self).__name__, func)
logger.exception(e)
raise
task_done()


def _validate_args(i: int, can_return_less: bool) -> None:
Expand Down Expand Up @@ -889,6 +896,7 @@ def _get(self):
fut, args, kwargs = super()._get()
return args, kwargs, fut()

@log_broken
async def __worker_coro(self) -> NoReturn:
"""
Worker coroutine responsible for processing tasks in the queue.
Expand All @@ -911,35 +919,30 @@ async def __worker_coro(self) -> NoReturn:
fut: SmartFuture[V]
while True:
try:
args, kwargs, fut = await get_next_job()
if fut is None:
# the weakref was already cleaned up, we don't need to process this item
task_done()
continue
log_debug("processing %s", fut)
result = await func(*args, **kwargs)
fut.set_result(result)
except InvalidStateError:
logger.error(
"cannot set result for %s %s: %s",
func.__name__,
fut,
result,
)
except Exception as e:
log_debug("%s: %s", type(e).__name__, e)
try:
args, kwargs, fut = await get_next_job()
if fut is None:
# the weakref was already cleaned up, we don't need to process this item
task_done()
continue
log_debug("processing %s", fut)
result = await func(*args, **kwargs)
fut.set_result(result)
fut.set_exception(e)
except InvalidStateError:
logger.error(
"cannot set result for %s %s: %s",
"cannot set exception for %s %s: %s",
func.__name__,
fut,
result,
e,
)
except Exception as e:
log_debug("%s: %s", type(e).__name__, e)
try:
fut.set_exception(e)
except InvalidStateError:
logger.error(
"cannot set exception for %s %s: %s",
func.__name__,
fut,
e,
)
task_done()
except Exception as e:
logger.error("%s for %s is broken!!!", type(self).__name__, func)
logger.exception(e)
raise
task_done()

0 comments on commit f6bb533

Please sign in to comment.