diff --git a/src/otaclient/create_standby/_delta_gen.py b/src/otaclient/create_standby/_delta_gen.py index 95d37e9c9..30824abcb 100644 --- a/src/otaclient/create_standby/_delta_gen.py +++ b/src/otaclient/create_standby/_delta_gen.py @@ -19,12 +19,11 @@ import logging import os import threading -from concurrent.futures import Future, ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor from functools import partial from hashlib import sha256 from pathlib import Path from queue import Queue -from typing import Any from ota_metadata.file_table._orm import ( FTDirORM, @@ -53,7 +52,7 @@ SHA256HEXSTRINGLEN = 256 // 8 * 2 DELETE_BATCH_SIZE = 512 -DB_CONN_NUMS = 3 +DB_CONN_NUMS = 6 class DeltaGenerator: @@ -161,6 +160,7 @@ def _process_file( except Exception as e: burst_suppressed_logger.exception(f"failed to proces {fpath}: {e!r}") finally: + self._max_pending_tasks.release() # always release se first tmp_f.unlink(missing_ok=True) @staticmethod @@ -168,15 +168,6 @@ def _thread_worker_initializer(thread_local) -> None: thread_local.buffer = buffer = bytearray(cfg.CHUNK_SIZE) thread_local.view = memoryview(buffer) - def _task_done_callback(self, _: Future[Any]) -> None: - self._max_pending_tasks.release() # always release se first - - def _check_dir_should_fully_scan(self, dpath: Path) -> bool: - for parent in reversed(dpath.parents): - if str(parent) in self.FULL_SCAN_PATHS: - return True - return False - def _post_calculate_delta(self) -> None: """ After all the local resources have been collected, we check the copy_dir @@ -219,7 +210,7 @@ def _post_calculate_delta(self) -> None: # API - def calculate_delta(self) -> None: + def calculate_delta(self) -> None: # NOSONAR logger.debug("process delta src, generate delta and prepare local copy...") _canonical_root = Path(CANONICAL_ROOT) @@ -247,9 +238,12 @@ def calculate_delta(self) -> None: ) # ------ check whether we should skip this folder ------ # - dir_should_fully_scan = self._check_dir_should_fully_scan( - canonical_curdir_path - ) + dir_should_fully_scan = False + for parent in reversed(canonical_curdir_path.parents): + if str(parent) in self.FULL_SCAN_PATHS: + dir_should_fully_scan = True + break + dir_depth_exceeded = ( len(canonical_curdir_path.parents) > self.MAX_FOLDER_DEEPTH ) @@ -258,7 +252,7 @@ def calculate_delta(self) -> None: _should_skip_dir = dir_depth_exceeded or ( _str_canon_fpath != CANONICAL_ROOT and not dir_should_fully_scan - and not self._ft_dir_orm.orm_select_entry(path=_str_canon_fpath) + and not self._ft_dir_orm.check_entry(path=_str_canon_fpath) ) if _should_skip_dir: dirnames.clear() @@ -291,7 +285,7 @@ def calculate_delta(self) -> None: canonical_curdir_path / fname, fully_scan=dir_should_fully_scan, thread_local=thread_local, - ).add_done_callback(self._task_done_callback) + ) finally: pool.shutdown(wait=True) self._ft_regular_orm.orm_pool_shutdown()