Skip to content

Commit

Permalink
Remove max_tasks_per_child=1 limitation from processes executor (#515)
Browse files Browse the repository at this point in the history
* Allow max_tasks_per_child to be set for the `processes` executor, default to not set.

* Parameterize test_mem_utilization to run on lithops and processes executors

* Run slow tests on PRs
  • Loading branch information
tomwhite authored Jul 23, 2024
1 parent 59c593d commit 69e9f94
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 56 deletions.
1 change: 1 addition & 0 deletions .github/workflows/slow-tests.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: Slow tests

on:
pull_request:
schedule:
# Every weekday at 03:49 UTC, see https://crontab.guru/
- cron: "49 3 * * 1-5"
Expand Down
14 changes: 11 additions & 3 deletions cubed/runtime/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,19 @@ async def async_execute_dag(
if spec is not None:
check_runtime_memory(spec, max_workers)
if use_processes:
max_tasks_per_child = kwargs.pop("max_tasks_per_child", None)
context = multiprocessing.get_context("spawn")
# max_tasks_per_child is only supported from Python 3.11
concurrent_executor = ProcessPoolExecutor(
max_workers=max_workers, mp_context=context, max_tasks_per_child=1
)
if max_tasks_per_child is None:
concurrent_executor = ProcessPoolExecutor(
max_workers=max_workers, mp_context=context
)
else:
concurrent_executor = ProcessPoolExecutor(
max_workers=max_workers,
mp_context=context,
max_tasks_per_child=max_tasks_per_child,
)
else:
concurrent_executor = ThreadPoolExecutor(max_workers=max_workers)
try:
Expand Down
138 changes: 90 additions & 48 deletions cubed/tests/test_mem_utilization.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,60 @@
import math
import platform
import shutil
import sys
from functools import partial, reduce

import pytest

from cubed.core.ops import partial_reduce
from cubed.core.optimization import multiple_inputs_optimize_dag

pytest.importorskip("lithops")

import cubed
import cubed.array_api as xp
import cubed.random
from cubed.backend_array_api import namespace as nxp
from cubed.core.ops import partial_reduce
from cubed.core.optimization import multiple_inputs_optimize_dag
from cubed.extensions.history import HistoryCallback
from cubed.extensions.mem_warn import MemoryWarningCallback
from cubed.runtime.executors.lithops import LithopsExecutor
from cubed.runtime.create import create_executor
from cubed.tests.utils import LITHOPS_LOCAL_CONFIG

ALLOWED_MEM = 2_000_000_000

EXECUTORS = {}

if platform.system() != "Windows":
EXECUTORS["processes"] = create_executor("processes")

# Run with max_tasks_per_child=1 so that each task is run in a new process,
# allowing us to perform a stronger check on peak memory
if sys.version_info >= (3, 11):
executor_options = dict(max_tasks_per_child=1)
EXECUTORS["processes-single-task"] = create_executor(
"processes", executor_options
)

try:
executor_options = dict(config=LITHOPS_LOCAL_CONFIG, wait_dur_sec=0.1)
EXECUTORS["lithops"] = create_executor("lithops", executor_options)
except ImportError:
pass


@pytest.fixture()
def spec(tmp_path, reserved_mem):
return cubed.Spec(tmp_path, allowed_mem=2_000_000_000, reserved_mem=reserved_mem)
return cubed.Spec(tmp_path, allowed_mem=ALLOWED_MEM, reserved_mem=reserved_mem)


@pytest.fixture(
scope="module",
params=EXECUTORS.values(),
ids=EXECUTORS.keys(),
)
def executor(request):
return request.param


@pytest.fixture(scope="module")
def reserved_mem():
executor = LithopsExecutor(config=LITHOPS_LOCAL_CONFIG)
def reserved_mem(executor):
res = cubed.measure_reserved_mem(executor) * 1.1 # add some wiggle room
return round_up_to_multiple(res, 10_000_000) # round up to nearest multiple of 10MB

Expand All @@ -40,58 +68,58 @@ def round_up_to_multiple(x, multiple=10):


@pytest.mark.slow
def test_index(tmp_path, spec):
def test_index(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = a[1:, :]
run_operation(tmp_path, "index", b)
run_operation(tmp_path, executor, "index", b)


@pytest.mark.slow
def test_index_step(tmp_path, spec):
def test_index_step(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = a[::2, :]
run_operation(tmp_path, "index_step", b)
run_operation(tmp_path, executor, "index_step", b)


# Creation Functions


@pytest.mark.slow
def test_eye(tmp_path, spec):
def test_eye(tmp_path, spec, executor):
a = xp.eye(10000, 10000, chunks=(5000, 5000), spec=spec)
run_operation(tmp_path, "eye", a)
run_operation(tmp_path, executor, "eye", a)


@pytest.mark.slow
def test_tril(tmp_path, spec):
def test_tril(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.tril(a)
run_operation(tmp_path, "tril", b)
run_operation(tmp_path, executor, "tril", b)


# Elementwise Functions


@pytest.mark.slow
def test_add(tmp_path, spec):
def test_add(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
c = xp.add(a, b)
run_operation(tmp_path, "add", c)
run_operation(tmp_path, executor, "add", c)


@pytest.mark.slow
def test_add_reduce_left(tmp_path, spec):
def test_add_reduce_left(tmp_path, spec, executor):
# Perform the `add` operation repeatedly on pairs of arrays, also known as fold left.
# See https://en.wikipedia.org/wiki/Fold_(higher-order_function)
#
Expand All @@ -111,11 +139,13 @@ def test_add_reduce_left(tmp_path, spec):
]
result = reduce(lambda x, y: xp.add(x, y), arrs)
opt_fn = partial(multiple_inputs_optimize_dag, max_total_source_arrays=n_arrays * 2)
run_operation(tmp_path, "add_reduce_left", result, optimize_function=opt_fn)
run_operation(
tmp_path, executor, "add_reduce_left", result, optimize_function=opt_fn
)


@pytest.mark.slow
def test_add_reduce_right(tmp_path, spec):
def test_add_reduce_right(tmp_path, spec, executor):
# Perform the `add` operation repeatedly on pairs of arrays, also known as fold right.
# See https://en.wikipedia.org/wiki/Fold_(higher-order_function)
#
Expand All @@ -137,23 +167,25 @@ def test_add_reduce_right(tmp_path, spec):
]
result = reduce(lambda x, y: xp.add(y, x), reversed(arrs))
opt_fn = partial(multiple_inputs_optimize_dag, max_total_source_arrays=n_arrays * 2)
run_operation(tmp_path, "add_reduce_right", result, optimize_function=opt_fn)
run_operation(
tmp_path, executor, "add_reduce_right", result, optimize_function=opt_fn
)


@pytest.mark.slow
def test_negative(tmp_path, spec):
def test_negative(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.negative(a)
run_operation(tmp_path, "negative", b)
run_operation(tmp_path, executor, "negative", b)


# Linear Algebra Functions


@pytest.mark.slow
def test_matmul(tmp_path, spec):
def test_matmul(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
Expand All @@ -163,20 +195,20 @@ def test_matmul(tmp_path, spec):
c = xp.astype(a, xp.float32)
d = xp.astype(b, xp.float32)
e = xp.matmul(c, d)
run_operation(tmp_path, "matmul", e)
run_operation(tmp_path, executor, "matmul", e)


@pytest.mark.slow
def test_matrix_transpose(tmp_path, spec):
def test_matrix_transpose(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.matrix_transpose(a)
run_operation(tmp_path, "matrix_transpose", b)
run_operation(tmp_path, executor, "matrix_transpose", b)


@pytest.mark.slow
def test_tensordot(tmp_path, spec):
def test_tensordot(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
Expand All @@ -186,14 +218,14 @@ def test_tensordot(tmp_path, spec):
c = xp.astype(a, xp.float32)
d = xp.astype(b, xp.float32)
e = xp.tensordot(c, d, axes=1)
run_operation(tmp_path, "tensordot", e)
run_operation(tmp_path, executor, "tensordot", e)


# Manipulation Functions


@pytest.mark.slow
def test_concat(tmp_path, spec):
def test_concat(tmp_path, spec, executor):
# Note 'a' has one fewer element in axis=0 to force chunking to cross array boundaries
a = cubed.random.random(
(9999, 10000), chunks=(5000, 5000), spec=spec
Expand All @@ -202,81 +234,80 @@ def test_concat(tmp_path, spec):
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
c = xp.concat((a, b), axis=0)
run_operation(tmp_path, "concat", c)
run_operation(tmp_path, executor, "concat", c)


@pytest.mark.slow
def test_reshape(tmp_path, spec):
def test_reshape(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
# need intermediate reshape due to limitations in Dask's reshape_rechunk
b = xp.reshape(a, (5000, 2, 10000))
c = xp.reshape(b, (5000, 20000))
run_operation(tmp_path, "reshape", c)
run_operation(tmp_path, executor, "reshape", c)


@pytest.mark.slow
def test_stack(tmp_path, spec):
def test_stack(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
c = xp.stack((a, b), axis=0)
run_operation(tmp_path, "stack", c)
run_operation(tmp_path, executor, "stack", c)


# Searching Functions


@pytest.mark.slow
def test_argmax(tmp_path, spec):
def test_argmax(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.argmax(a, axis=0)
run_operation(tmp_path, "argmax", b)
run_operation(tmp_path, executor, "argmax", b)


# Statistical Functions


@pytest.mark.slow
def test_max(tmp_path, spec):
def test_max(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.max(a, axis=0)
run_operation(tmp_path, "max", b)
run_operation(tmp_path, executor, "max", b)


@pytest.mark.slow
def test_mean(tmp_path, spec):
def test_mean(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.mean(a, axis=0)
run_operation(tmp_path, "mean", b)
run_operation(tmp_path, executor, "mean", b)


@pytest.mark.slow
def test_sum_partial_reduce(tmp_path, spec):
def test_sum_partial_reduce(tmp_path, spec, executor):
a = cubed.random.random(
(40000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = partial_reduce(a, nxp.sum, split_every={0: 8})
run_operation(tmp_path, "sum_partial_reduce", b)
run_operation(tmp_path, executor, "sum_partial_reduce", b)


# Internal functions


def run_operation(tmp_path, name, result_array, *, optimize_function=None):
def run_operation(tmp_path, executor, name, result_array, *, optimize_function=None):
# result_array.visualize(f"cubed-{name}-unoptimized", optimize_graph=False)
# result_array.visualize(f"cubed-{name}", optimize_function=optimize_function)
executor = LithopsExecutor(config=LITHOPS_LOCAL_CONFIG)
hist = HistoryCallback()
mem_warn = MemoryWarningCallback()
# use store=None to write to temporary zarr
Expand All @@ -291,8 +322,19 @@ def run_operation(tmp_path, name, result_array, *, optimize_function=None):
df = hist.stats_df
print(df)

# check peak memory does not exceed allowed mem
assert (df["peak_measured_mem_end_mb_max"] <= ALLOWED_MEM // 1_000_000).all()

# check change in peak memory is no more than projected mem
assert (df["peak_measured_mem_delta_mb_max"] <= df["projected_mem_mb"]).all()

# check projected_mem_utilization does not exceed 1
assert (df["projected_mem_utilization"] <= 1.0).all()
# except on processes executor that runs multiple tasks in a process
if (
executor.name != "processes"
or executor.kwargs.get("max_tasks_per_child", None) == 1
):
assert (df["projected_mem_utilization"] <= 1.0).all()

# delete temp files for this test immediately since they are so large
shutil.rmtree(tmp_path)
7 changes: 2 additions & 5 deletions cubed/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import platform
import sys
from typing import Iterable

import networkx as nx
Expand Down Expand Up @@ -29,10 +28,8 @@
# ThreadsExecutor calls `peak_measured_mem` which is not supported on Windows
ALL_EXECUTORS.append(create_executor("threads"))

# ProcessesExecutor uses an API available from 3.11 onwards (max_tasks_per_child)
if sys.version_info >= (3, 11):
ALL_EXECUTORS.append(create_executor("processes"))
MAIN_EXECUTORS.append(create_executor("processes"))
ALL_EXECUTORS.append(create_executor("processes"))
MAIN_EXECUTORS.append(create_executor("processes"))

try:
ALL_EXECUTORS.append(create_executor("beam"))
Expand Down

0 comments on commit 69e9f94

Please sign in to comment.