From be962a43befa024d51be2c378651c55d3e330f91 Mon Sep 17 00:00:00 2001 From: Tom White Date: Wed, 6 Mar 2024 09:58:59 +0000 Subject: [PATCH] Add a 'processes' executor that runs using local processes. (#411) * Add a 'processes' executor that runs using local processes. This is implemented by using a ProcessPoolExecutor in AsyncPythonDagExecutor. Also: * Ensure that NumPy only uses a single thread * Set max_tasks_per_child=1 to use a new process for each task * Only test processes executor on Python 3.11 and later * Fix mypy checks --- .github/workflows/mypy.yml | 2 +- cubed/runtime/create.py | 4 ++ cubed/runtime/executors/asyncio.py | 2 +- cubed/runtime/executors/python_async.py | 66 +++++++++++++++++++++++-- cubed/tests/utils.py | 5 ++ setup.cfg | 2 + 6 files changed, 75 insertions(+), 6 deletions(-) diff --git a/.github/workflows/mypy.yml b/.github/workflows/mypy.yml index 98f91e4f..cce977e1 100644 --- a/.github/workflows/mypy.yml +++ b/.github/workflows/mypy.yml @@ -18,7 +18,7 @@ jobs: fail-fast: false matrix: os: ["ubuntu-latest"] - python-version: ["3.9"] + python-version: ["3.11"] steps: - name: Checkout source diff --git a/cubed/runtime/create.py b/cubed/runtime/create.py index 0db4b5b1..3bced979 100644 --- a/cubed/runtime/create.py +++ b/cubed/runtime/create.py @@ -32,6 +32,10 @@ def create_executor(name: str, executor_options: Optional[dict] = None) -> Execu from cubed.runtime.executors.modal import ModalDagExecutor return ModalDagExecutor(**executor_options) + elif name == "processes": + from cubed.runtime.executors.python_async import AsyncPythonDagExecutor + + return AsyncPythonDagExecutor(retries=0, use_processes=True, **executor_options) elif name == "single-threaded": from cubed.runtime.executors.python import PythonDagExecutor diff --git a/cubed/runtime/executors/asyncio.py b/cubed/runtime/executors/asyncio.py index 84c58115..7e4ab86d 100644 --- a/cubed/runtime/executors/asyncio.py +++ b/cubed/runtime/executors/asyncio.py @@ -53,7 +53,7 @@ async def async_map_unordered( if backup: if not backup.done() or not backup.exception(): continue - raise task.exception() + raise task.exception() # type: ignore end_times[task] = time.monotonic() if return_stats: result, stats = task.result() diff --git a/cubed/runtime/executors/python_async.py b/cubed/runtime/executors/python_async.py index 6c5400e6..a2a61f97 100644 --- a/cubed/runtime/executors/python_async.py +++ b/cubed/runtime/executors/python_async.py @@ -1,8 +1,11 @@ import asyncio -from concurrent.futures import Executor, ThreadPoolExecutor +import multiprocessing +import os +from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor from functools import partial from typing import Any, AsyncIterator, Callable, Iterable, Optional, Sequence +import cloudpickle from aiostream import stream from aiostream.core import Stream from networkx import MultiDiGraph @@ -26,6 +29,15 @@ def run_func(input, func=None, config=None, name=None, compute_id=None): return result +def unpickle_and_call(f, inp, **kwargs): + import cloudpickle + + f = cloudpickle.loads(f) + inp = cloudpickle.loads(inp) + kwargs = {k: cloudpickle.loads(v) for k, v in kwargs.items()} + return f(inp, **kwargs) + + async def map_unordered( concurrent_executor: Executor, function: Callable[..., Any], @@ -40,6 +52,8 @@ async def map_unordered( if retries == 0: retrying_function = function else: + if isinstance(concurrent_executor, ProcessPoolExecutor): + raise NotImplementedError("Retries not supported for ProcessPoolExecutor") retryer = Retrying(reraise=True, stop=stop_after_attempt(retries + 1)) retrying_function = partial(retryer, function) @@ -54,8 +68,31 @@ def create_futures_func(input, **kwargs): for i in input ] + def create_futures_func_multiprocessing(input, **kwargs): + # Pickle the function, args, and kwargs using cloudpickle. + # They will be unpickled by unpickle_and_call. + pickled_kwargs = {k: cloudpickle.dumps(v) for k, v in kwargs.items()} + return [ + ( + i, + asyncio.wrap_future( + concurrent_executor.submit( + unpickle_and_call, + cloudpickle.dumps(retrying_function), + cloudpickle.dumps(i), + **pickled_kwargs, + ) + ), + ) + for i in input + ] + + if isinstance(concurrent_executor, ProcessPoolExecutor): + create_futures = create_futures_func_multiprocessing + else: + create_futures = create_futures_func async for result in async_map_unordered( - create_futures_func, + create_futures, input, use_backups=use_backups, batch_size=batch_size, @@ -91,7 +128,17 @@ async def async_execute_dag( compute_arrays_in_parallel: Optional[bool] = None, **kwargs, ) -> None: - concurrent_executor = ThreadPoolExecutor() + concurrent_executor: Executor + use_processes = kwargs.pop("use_processes", False) + if use_processes: + max_workers = kwargs.pop("max_workers", None) + context = multiprocessing.get_context("spawn") + # max_tasks_per_child is only supported from Python 3.11 + concurrent_executor = ProcessPoolExecutor( + max_workers=max_workers, mp_context=context, max_tasks_per_child=1 + ) + else: + concurrent_executor = ThreadPoolExecutor() try: if not compute_arrays_in_parallel: # run one pipeline at a time @@ -125,6 +172,9 @@ async def async_execute_dag( class AsyncPythonDagExecutor(DagExecutor): """An execution engine that uses Python asyncio.""" + def __init__(self, **kwargs): + self.kwargs = kwargs + def execute_dag( self, dag: MultiDiGraph, @@ -134,6 +184,14 @@ def execute_dag( compute_id: Optional[str] = None, **kwargs, ) -> None: + # Tell NumPy to use a single thread + # from https://stackoverflow.com/questions/30791550/limit-number-of-threads-in-numpy + os.environ["MKL_NUM_THREADS"] = "1" + os.environ["NUMEXPR_NUM_THREADS"] = "1" + os.environ["OMP_NUM_THREADS"] = "1" + os.environ["VECLIB_MAXIMUM_THREADS"] = "1" + + merged_kwargs = {**self.kwargs, **kwargs} asyncio.run( async_execute_dag( dag, @@ -141,6 +199,6 @@ def execute_dag( resume=resume, spec=spec, compute_id=compute_id, - **kwargs, + **merged_kwargs, ) ) diff --git a/cubed/tests/utils.py b/cubed/tests/utils.py index ab3ea264..7cdcd108 100644 --- a/cubed/tests/utils.py +++ b/cubed/tests/utils.py @@ -1,4 +1,5 @@ import platform +import sys from typing import Iterable import networkx as nx @@ -20,6 +21,10 @@ # AsyncPythonDagExecutor calls `peak_measured_mem` which is not supported on Windows ALL_EXECUTORS.append(create_executor("threads")) + # AsyncPythonDagExecutor (processes) uses an API available from 3.11 onwards (max_tasks_per_child) + if sys.version_info >= (3, 11): + ALL_EXECUTORS.append(create_executor("processes")) + MAIN_EXECUTORS.append(create_executor("processes")) try: ALL_EXECUTORS.append(create_executor("beam")) diff --git a/setup.cfg b/setup.cfg index 8eba3c7a..62965f08 100644 --- a/setup.cfg +++ b/setup.cfg @@ -24,6 +24,8 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-array_api_compat.*] ignore_missing_imports = True +[mypy-cloudpickle.*] +ignore_missing_imports = True [mypy-coiled.*] ignore_missing_imports = True [mypy-dask.*]