Skip to content

Commit

Permalink
refine watchdog thread
Browse files Browse the repository at this point in the history
  • Loading branch information
Bodong-Yang committed Oct 2, 2024
1 parent d0dba8c commit c072752
Showing 1 changed file with 28 additions and 24 deletions.
52 changes: 28 additions & 24 deletions src/otaclient_common/retry_task_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,31 +99,35 @@ def _watchdog(
interval: int,
) -> None:
"""Watchdog will shutdown the threadpool on certain conditions being met."""
while not self._shutdown and not concurrent_fut_thread._shutdown:
if (
isinstance(max_retry, int)
and max_retry > 0
and self._retry_count > max_retry
):
logger.warning(f"exceed {max_retry=}, abort")
return self.shutdown(wait=True)

if callable(watchdog_func):
try:
watchdog_func()
except Exception as e:
logger.warning(
f"custom watchdog func failed: {e!r}, shutdown the pool"
)
logger.warning("draining the workitem queue ...")
self.shutdown(wait=False)
# drain the worker queues to cancel all the futs
with contextlib.suppress(Empty):
while True:
self._work_queue.get_nowait()
self.shutdown(wait=True)
return
_checker_funcs = []
if isinstance(max_retry, int) and max_retry > 0:

def _max_retry_check():
if self._retry_count > max_retry:
raise ValueError("exceed max retry count")

_checker_funcs.append(_max_retry_check)

if callable(watchdog_func):
_checker_funcs.append(watchdog_func)

if not _checker_funcs:
return

while not self._shutdown:
time.sleep(interval)
try:
for _func in _checker_funcs:
_func()
except Exception as e:
logger.warning(f"watchdog failed: {e!r}, shutdown the pool")
logger.warning("draining the workitem queue ...")
self.shutdown(wait=False)
# drain the worker queues to cancel all the futs
with contextlib.suppress(Empty):
while True:
self._work_queue.get_nowait()
self.shutdown(wait=True)

def _task_done_cb(
self, fut: Future[Any], /, *, item: T, func: Callable[[T], Any]
Expand Down

0 comments on commit c072752

Please sign in to comment.