Skip to content

Commit

Permalink
Add a 'processes' executor that runs using local processes. (#411)
Browse files Browse the repository at this point in the history
* Add a 'processes' executor that runs using local processes.
This is implemented by using a ProcessPoolExecutor in AsyncPythonDagExecutor.

Also:
* Ensure that NumPy only uses a single thread
* Set max_tasks_per_child=1 to use a new process for each task
* Only test processes executor on Python 3.11 and later
* Fix mypy checks
  • Loading branch information
tomwhite authored Mar 6, 2024
1 parent 395ee54 commit be962a4
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/mypy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
python-version: ["3.9"]
python-version: ["3.11"]

steps:
- name: Checkout source
Expand Down
4 changes: 4 additions & 0 deletions cubed/runtime/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def create_executor(name: str, executor_options: Optional[dict] = None) -> Execu
from cubed.runtime.executors.modal import ModalDagExecutor

return ModalDagExecutor(**executor_options)
elif name == "processes":
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor

return AsyncPythonDagExecutor(retries=0, use_processes=True, **executor_options)
elif name == "single-threaded":
from cubed.runtime.executors.python import PythonDagExecutor

Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def async_map_unordered(
if backup:
if not backup.done() or not backup.exception():
continue
raise task.exception()
raise task.exception() # type: ignore
end_times[task] = time.monotonic()
if return_stats:
result, stats = task.result()
Expand Down
66 changes: 62 additions & 4 deletions cubed/runtime/executors/python_async.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
from concurrent.futures import Executor, ThreadPoolExecutor
import multiprocessing
import os
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
from functools import partial
from typing import Any, AsyncIterator, Callable, Iterable, Optional, Sequence

import cloudpickle
from aiostream import stream
from aiostream.core import Stream
from networkx import MultiDiGraph
Expand All @@ -26,6 +29,15 @@ def run_func(input, func=None, config=None, name=None, compute_id=None):
return result


def unpickle_and_call(f, inp, **kwargs):
import cloudpickle

f = cloudpickle.loads(f)
inp = cloudpickle.loads(inp)
kwargs = {k: cloudpickle.loads(v) for k, v in kwargs.items()}
return f(inp, **kwargs)


async def map_unordered(
concurrent_executor: Executor,
function: Callable[..., Any],
Expand All @@ -40,6 +52,8 @@ async def map_unordered(
if retries == 0:
retrying_function = function
else:
if isinstance(concurrent_executor, ProcessPoolExecutor):
raise NotImplementedError("Retries not supported for ProcessPoolExecutor")
retryer = Retrying(reraise=True, stop=stop_after_attempt(retries + 1))
retrying_function = partial(retryer, function)

Expand All @@ -54,8 +68,31 @@ def create_futures_func(input, **kwargs):
for i in input
]

def create_futures_func_multiprocessing(input, **kwargs):
# Pickle the function, args, and kwargs using cloudpickle.
# They will be unpickled by unpickle_and_call.
pickled_kwargs = {k: cloudpickle.dumps(v) for k, v in kwargs.items()}
return [
(
i,
asyncio.wrap_future(
concurrent_executor.submit(
unpickle_and_call,
cloudpickle.dumps(retrying_function),
cloudpickle.dumps(i),
**pickled_kwargs,
)
),
)
for i in input
]

if isinstance(concurrent_executor, ProcessPoolExecutor):
create_futures = create_futures_func_multiprocessing
else:
create_futures = create_futures_func
async for result in async_map_unordered(
create_futures_func,
create_futures,
input,
use_backups=use_backups,
batch_size=batch_size,
Expand Down Expand Up @@ -91,7 +128,17 @@ async def async_execute_dag(
compute_arrays_in_parallel: Optional[bool] = None,
**kwargs,
) -> None:
concurrent_executor = ThreadPoolExecutor()
concurrent_executor: Executor
use_processes = kwargs.pop("use_processes", False)
if use_processes:
max_workers = kwargs.pop("max_workers", None)
context = multiprocessing.get_context("spawn")
# max_tasks_per_child is only supported from Python 3.11
concurrent_executor = ProcessPoolExecutor(
max_workers=max_workers, mp_context=context, max_tasks_per_child=1
)
else:
concurrent_executor = ThreadPoolExecutor()
try:
if not compute_arrays_in_parallel:
# run one pipeline at a time
Expand Down Expand Up @@ -125,6 +172,9 @@ async def async_execute_dag(
class AsyncPythonDagExecutor(DagExecutor):
"""An execution engine that uses Python asyncio."""

def __init__(self, **kwargs):
self.kwargs = kwargs

def execute_dag(
self,
dag: MultiDiGraph,
Expand All @@ -134,13 +184,21 @@ def execute_dag(
compute_id: Optional[str] = None,
**kwargs,
) -> None:
# Tell NumPy to use a single thread
# from https://stackoverflow.com/questions/30791550/limit-number-of-threads-in-numpy
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"

merged_kwargs = {**self.kwargs, **kwargs}
asyncio.run(
async_execute_dag(
dag,
callbacks=callbacks,
resume=resume,
spec=spec,
compute_id=compute_id,
**kwargs,
**merged_kwargs,
)
)
5 changes: 5 additions & 0 deletions cubed/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import platform
import sys
from typing import Iterable

import networkx as nx
Expand All @@ -20,6 +21,10 @@
# AsyncPythonDagExecutor calls `peak_measured_mem` which is not supported on Windows
ALL_EXECUTORS.append(create_executor("threads"))

# AsyncPythonDagExecutor (processes) uses an API available from 3.11 onwards (max_tasks_per_child)
if sys.version_info >= (3, 11):
ALL_EXECUTORS.append(create_executor("processes"))
MAIN_EXECUTORS.append(create_executor("processes"))

try:
ALL_EXECUTORS.append(create_executor("beam"))
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-array_api_compat.*]
ignore_missing_imports = True
[mypy-cloudpickle.*]
ignore_missing_imports = True
[mypy-coiled.*]
ignore_missing_imports = True
[mypy-dask.*]
Expand Down

0 comments on commit be962a4

Please sign in to comment.