Skip to content

Commit

Permalink
Parameterize test_mem_utilization to run on lithops and processes exe…
Browse files Browse the repository at this point in the history
…cutors
  • Loading branch information
tomwhite committed Jul 23, 2024
1 parent 633cfba commit 49068fd
Showing 1 changed file with 90 additions and 48 deletions.
138 changes: 90 additions & 48 deletions cubed/tests/test_mem_utilization.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,60 @@
import math
import platform
import shutil
import sys
from functools import partial, reduce

import pytest

from cubed.core.ops import partial_reduce
from cubed.core.optimization import multiple_inputs_optimize_dag

pytest.importorskip("lithops")

import cubed
import cubed.array_api as xp
import cubed.random
from cubed.backend_array_api import namespace as nxp
from cubed.core.ops import partial_reduce
from cubed.core.optimization import multiple_inputs_optimize_dag
from cubed.extensions.history import HistoryCallback
from cubed.extensions.mem_warn import MemoryWarningCallback
from cubed.runtime.executors.lithops import LithopsExecutor
from cubed.runtime.create import create_executor
from cubed.tests.utils import LITHOPS_LOCAL_CONFIG

ALLOWED_MEM = 2_000_000_000

EXECUTORS = {}

if platform.system() != "Windows":
EXECUTORS["processes"] = create_executor("processes")

# Run with max_tasks_per_child=1 so that each task is run in a new process,
# allowing us to perform a stronger check on peak memory
if sys.version_info >= (3, 11):
executor_options = dict(max_tasks_per_child=1)
EXECUTORS["processes-single-task"] = create_executor(
"processes", executor_options
)

try:
executor_options = dict(config=LITHOPS_LOCAL_CONFIG, wait_dur_sec=0.1)
EXECUTORS["lithops"] = create_executor("lithops", executor_options)
except ImportError:
pass


@pytest.fixture()
def spec(tmp_path, reserved_mem):
return cubed.Spec(tmp_path, allowed_mem=2_000_000_000, reserved_mem=reserved_mem)
return cubed.Spec(tmp_path, allowed_mem=ALLOWED_MEM, reserved_mem=reserved_mem)


@pytest.fixture(
scope="module",
params=EXECUTORS.values(),
ids=EXECUTORS.keys(),
)
def executor(request):
return request.param


@pytest.fixture(scope="module")
def reserved_mem():
executor = LithopsExecutor(config=LITHOPS_LOCAL_CONFIG)
def reserved_mem(executor):
res = cubed.measure_reserved_mem(executor) * 1.1 # add some wiggle room
return round_up_to_multiple(res, 10_000_000) # round up to nearest multiple of 10MB

Expand All @@ -40,58 +68,58 @@ def round_up_to_multiple(x, multiple=10):


@pytest.mark.slow
def test_index(tmp_path, spec):
def test_index(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = a[1:, :]
run_operation(tmp_path, "index", b)
run_operation(tmp_path, executor, "index", b)


@pytest.mark.slow
def test_index_step(tmp_path, spec):
def test_index_step(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = a[::2, :]
run_operation(tmp_path, "index_step", b)
run_operation(tmp_path, executor, "index_step", b)


# Creation Functions


@pytest.mark.slow
def test_eye(tmp_path, spec):
def test_eye(tmp_path, spec, executor):
a = xp.eye(10000, 10000, chunks=(5000, 5000), spec=spec)
run_operation(tmp_path, "eye", a)
run_operation(tmp_path, executor, "eye", a)


@pytest.mark.slow
def test_tril(tmp_path, spec):
def test_tril(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.tril(a)
run_operation(tmp_path, "tril", b)
run_operation(tmp_path, executor, "tril", b)


# Elementwise Functions


@pytest.mark.slow
def test_add(tmp_path, spec):
def test_add(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
c = xp.add(a, b)
run_operation(tmp_path, "add", c)
run_operation(tmp_path, executor, "add", c)


@pytest.mark.slow
def test_add_reduce_left(tmp_path, spec):
def test_add_reduce_left(tmp_path, spec, executor):
# Perform the `add` operation repeatedly on pairs of arrays, also known as fold left.
# See https://en.wikipedia.org/wiki/Fold_(higher-order_function)
#
Expand All @@ -111,11 +139,13 @@ def test_add_reduce_left(tmp_path, spec):
]
result = reduce(lambda x, y: xp.add(x, y), arrs)
opt_fn = partial(multiple_inputs_optimize_dag, max_total_source_arrays=n_arrays * 2)
run_operation(tmp_path, "add_reduce_left", result, optimize_function=opt_fn)
run_operation(
tmp_path, executor, "add_reduce_left", result, optimize_function=opt_fn
)


@pytest.mark.slow
def test_add_reduce_right(tmp_path, spec):
def test_add_reduce_right(tmp_path, spec, executor):
# Perform the `add` operation repeatedly on pairs of arrays, also known as fold right.
# See https://en.wikipedia.org/wiki/Fold_(higher-order_function)
#
Expand All @@ -137,23 +167,25 @@ def test_add_reduce_right(tmp_path, spec):
]
result = reduce(lambda x, y: xp.add(y, x), reversed(arrs))
opt_fn = partial(multiple_inputs_optimize_dag, max_total_source_arrays=n_arrays * 2)
run_operation(tmp_path, "add_reduce_right", result, optimize_function=opt_fn)
run_operation(
tmp_path, executor, "add_reduce_right", result, optimize_function=opt_fn
)


@pytest.mark.slow
def test_negative(tmp_path, spec):
def test_negative(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.negative(a)
run_operation(tmp_path, "negative", b)
run_operation(tmp_path, executor, "negative", b)


# Linear Algebra Functions


@pytest.mark.slow
def test_matmul(tmp_path, spec):
def test_matmul(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
Expand All @@ -163,20 +195,20 @@ def test_matmul(tmp_path, spec):
c = xp.astype(a, xp.float32)
d = xp.astype(b, xp.float32)
e = xp.matmul(c, d)
run_operation(tmp_path, "matmul", e)
run_operation(tmp_path, executor, "matmul", e)


@pytest.mark.slow
def test_matrix_transpose(tmp_path, spec):
def test_matrix_transpose(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.matrix_transpose(a)
run_operation(tmp_path, "matrix_transpose", b)
run_operation(tmp_path, executor, "matrix_transpose", b)


@pytest.mark.slow
def test_tensordot(tmp_path, spec):
def test_tensordot(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
Expand All @@ -186,14 +218,14 @@ def test_tensordot(tmp_path, spec):
c = xp.astype(a, xp.float32)
d = xp.astype(b, xp.float32)
e = xp.tensordot(c, d, axes=1)
run_operation(tmp_path, "tensordot", e)
run_operation(tmp_path, executor, "tensordot", e)


# Manipulation Functions


@pytest.mark.slow
def test_concat(tmp_path, spec):
def test_concat(tmp_path, spec, executor):
# Note 'a' has one fewer element in axis=0 to force chunking to cross array boundaries
a = cubed.random.random(
(9999, 10000), chunks=(5000, 5000), spec=spec
Expand All @@ -202,81 +234,80 @@ def test_concat(tmp_path, spec):
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
c = xp.concat((a, b), axis=0)
run_operation(tmp_path, "concat", c)
run_operation(tmp_path, executor, "concat", c)


@pytest.mark.slow
def test_reshape(tmp_path, spec):
def test_reshape(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
# need intermediate reshape due to limitations in Dask's reshape_rechunk
b = xp.reshape(a, (5000, 2, 10000))
c = xp.reshape(b, (5000, 20000))
run_operation(tmp_path, "reshape", c)
run_operation(tmp_path, executor, "reshape", c)


@pytest.mark.slow
def test_stack(tmp_path, spec):
def test_stack(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
c = xp.stack((a, b), axis=0)
run_operation(tmp_path, "stack", c)
run_operation(tmp_path, executor, "stack", c)


# Searching Functions


@pytest.mark.slow
def test_argmax(tmp_path, spec):
def test_argmax(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.argmax(a, axis=0)
run_operation(tmp_path, "argmax", b)
run_operation(tmp_path, executor, "argmax", b)


# Statistical Functions


@pytest.mark.slow
def test_max(tmp_path, spec):
def test_max(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.max(a, axis=0)
run_operation(tmp_path, "max", b)
run_operation(tmp_path, executor, "max", b)


@pytest.mark.slow
def test_mean(tmp_path, spec):
def test_mean(tmp_path, spec, executor):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = xp.mean(a, axis=0)
run_operation(tmp_path, "mean", b)
run_operation(tmp_path, executor, "mean", b)


@pytest.mark.slow
def test_sum_partial_reduce(tmp_path, spec):
def test_sum_partial_reduce(tmp_path, spec, executor):
a = cubed.random.random(
(40000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = partial_reduce(a, nxp.sum, split_every={0: 8})
run_operation(tmp_path, "sum_partial_reduce", b)
run_operation(tmp_path, executor, "sum_partial_reduce", b)


# Internal functions


def run_operation(tmp_path, name, result_array, *, optimize_function=None):
def run_operation(tmp_path, executor, name, result_array, *, optimize_function=None):
# result_array.visualize(f"cubed-{name}-unoptimized", optimize_graph=False)
# result_array.visualize(f"cubed-{name}", optimize_function=optimize_function)
executor = LithopsExecutor(config=LITHOPS_LOCAL_CONFIG)
hist = HistoryCallback()
mem_warn = MemoryWarningCallback()
# use store=None to write to temporary zarr
Expand All @@ -291,8 +322,19 @@ def run_operation(tmp_path, name, result_array, *, optimize_function=None):
df = hist.stats_df
print(df)

# check peak memory does not exceed allowed mem
assert (df["peak_measured_mem_end_mb_max"] <= ALLOWED_MEM // 1_000_000).all()

# check change in peak memory is no more than projected mem
assert (df["peak_measured_mem_delta_mb_max"] <= df["projected_mem_mb"]).all()

# check projected_mem_utilization does not exceed 1
assert (df["projected_mem_utilization"] <= 1.0).all()
# except on processes executor that runs multiple tasks in a process
if (
executor.name != "processes"
or executor.kwargs.get("max_tasks_per_child", None) == 1
):
assert (df["projected_mem_utilization"] <= 1.0).all()

# delete temp files for this test immediately since they are so large
shutil.rmtree(tmp_path)

0 comments on commit 49068fd

Please sign in to comment.