Skip to content

Commit

Permalink
finished_tasks become fut_gen method local var
Browse files Browse the repository at this point in the history
  • Loading branch information
Bodong-Yang committed Jun 24, 2024
1 parent 4da2b8e commit 982324b
Showing 1 changed file with 14 additions and 23 deletions.
37 changes: 14 additions & 23 deletions src/otaclient_common/retry_task_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ def __init__(
"""
self._start_lock, self._started = threading.Lock(), False
self._total_task_num = 0
self._finished_task_counter = itertools.count(start=1)
self._finished_task = 0
self._retry_counter = itertools.count(start=1)
self._retry_count = 0
self._concurrent_semaphore = threading.Semaphore(max_concurrent)
Expand Down Expand Up @@ -112,40 +110,33 @@ def _task_done_cb(
self._concurrent_semaphore.release() # always release se first
self._fut_queue.put_nowait(fut)

# ------ on task succeeded ------ #
if not fut.exception():
self._finished_task = next(self._finished_task_counter)
return

# ------ on threadpool shutdown(by watchdog) ------ #
if self._shutdown or self._broken:
return

# ------ on task failed ------ #
self._retry_count = next(self._retry_counter)
with contextlib.suppress(Exception): # on threadpool shutdown
self.submit(func, item).add_done_callback(
partial(self._task_done_cb, item=item, func=func)
)
if fut.exception():
self._retry_count = next(self._retry_counter)
with contextlib.suppress(Exception): # on threadpool shutdown
self.submit(func, item).add_done_callback(
partial(self._task_done_cb, item=item, func=func)
)

def _fut_gen(self, interval: int) -> Generator[Future[Any], Any, None]:
finished_tasks = 0
while True:
if self._shutdown or self._broken or concurrent_fut_thread._shutdown:
logger.warning(
f"failed to ensure all tasks, {self._finished_task=}, {self._total_task_num=}"
f"failed to ensure all tasks, {finished_tasks=}, {self._total_task_num=}"
)
raise TasksEnsureFailed

try:
yield self._fut_queue.get_nowait()
done_fut = self._fut_queue.get_nowait()
if not done_fut.exception():
finished_tasks += 1
yield done_fut
except Empty:
if (
self._total_task_num == 0
or self._finished_task != self._total_task_num
):
if self._total_task_num == 0 or finished_tasks != self._total_task_num:
time.sleep(interval)
continue
return
return # all tasks finished and futs are yielded

def ensure_tasks(
self,
Expand Down

0 comments on commit 982324b

Please sign in to comment.