diff --git a/src/otaclient/app/configs.py b/src/otaclient/app/configs.py index 074ff80a8..9a76ab8b7 100644 --- a/src/otaclient/app/configs.py +++ b/src/otaclient/app/configs.py @@ -134,6 +134,7 @@ class BaseConfig(_InternalSettings): # now only REBUILD mode is available STANDBY_CREATION_MODE = CreateStandbyMechanism.REBUILD MAX_CONCURRENT_PROCESS_FILE_TASKS = 512 + MAX_PROCESS_FILE_THREAD = 6 CREATE_STANDBY_RETRY_MAX = 1024 CREATE_STANDBY_BACKOFF_FACTOR = 1 CREATE_STANDBY_BACKOFF_MAX = 6 diff --git a/src/otaclient/app/create_standby/common.py b/src/otaclient/app/create_standby/common.py index eb3df40a2..fabc6ff8e 100644 --- a/src/otaclient/app/create_standby/common.py +++ b/src/otaclient/app/create_standby/common.py @@ -22,7 +22,7 @@ import random import threading import time -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import Future, ThreadPoolExecutor from dataclasses import dataclass from hashlib import sha256 from pathlib import Path @@ -325,13 +325,23 @@ def _process_delta_src(self): # ------ create the threadpool executor ------ # thread_local = threading.local() + max_pending_tasks = threading.Semaphore(cfg.MAX_CONCURRENT_PROCESS_FILE_TASKS) def _initializer(): thread_local.buffer = buffer = bytearray(cfg.CHUNK_SIZE) thread_local.view = memoryview(buffer) + def _task_done_callback(fut: Future[Any]): + max_pending_tasks.release() # always release se first + if exc := fut.exception(): + logger.warning( + f"detect error during file preparing, still continue: {exc!r}" + ) + pool = ThreadPoolExecutor( - thread_name_prefix="scan_slot", initializer=_initializer + max_workers=cfg.MAX_PROCESS_FILE_THREAD, + thread_name_prefix="scan_slot", + initializer=_initializer, ) # scan old slot and generate delta based on path, @@ -417,11 +427,12 @@ def _initializer(): self._rm.append(str(canonical_fpath)) continue + max_pending_tasks.acquire() pool.submit( self._prepare_local_copy_from_active_slot, delta_src_fpath, thread_local=thread_local, - ) + ).add_done_callback(_task_done_callback) # wait for all files being processed pool.shutdown(wait=True)