Skip to content

Commit

Permalink
log thread pool initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Oct 27, 2023
1 parent a780b1b commit f3ee81e
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,8 +762,10 @@ class _ThreadPools:
_task_pool: futures.ThreadPoolExecutor
_subworkflow_pools: List[futures.ThreadPoolExecutor]
_subworkflow_concurrency: int
_logger: logging.Logger

def __init__(self, cfg: config.Loader, cleanup: ExitStack) -> None:
def __init__(self, cfg: config.Loader, cleanup: ExitStack, logger: logging.Logger) -> None:
self._logger = logger
self._lock = threading.Lock()
self._cleanup = cleanup.enter_context(ExitStack())

Expand All @@ -777,6 +779,7 @@ def __init__(self, cfg: config.Loader, cleanup: ExitStack) -> None:
or multiprocessing.cpu_count()
)

self._logger.info(_("initializing task thread pool", task_concurrency=task_concurrency))
self._task_pool = futures.ThreadPoolExecutor(max_workers=task_concurrency)
self._cleanup.callback(futures.ThreadPoolExecutor.shutdown, self._task_pool)

Expand All @@ -794,6 +797,13 @@ def submit_subworkflow(self, call_depth: int, *args, **kwargs): # pyre-ignore
if call_depth >= len(self._subworkflow_pools):
# First time at this call depth -- initialize a thread pool for it
assert call_depth == len(self._subworkflow_pools)
self._logger.info(
_(
"initializing subworkflow thread pool",
subworkflow_concurrency=self._subworkflow_concurrency,
call_depth=call_depth,
)
)
pool = futures.ThreadPoolExecutor(self._subworkflow_concurrency)
self._cleanup.callback(futures.ThreadPoolExecutor.shutdown, pool)
self._subworkflow_pools.append(pool)
Expand Down Expand Up @@ -911,7 +921,7 @@ def run_local_workflow(
version = "UNKNOWN"
logger.notice(_("miniwdl", version=version, uname=" ".join(os.uname()))) # pyre-fixme

thread_pools = _ThreadPools(cfg, cleanup)
thread_pools = _ThreadPools(cfg, cleanup, logger)
else:
assert _run_id_stack and _cache
thread_pools = _thread_pools
Expand Down

0 comments on commit f3ee81e

Please sign in to comment.