Skip to content

Commit

Permalink
Use donfig to control default spec
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Feb 22, 2024
1 parent 9221117 commit 41aac1b
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 7 deletions.
9 changes: 9 additions & 0 deletions cubed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
except Exception: # pragma: no cover
__version__ = "unknown"

from donfig import Config

config = Config(
"cubed",
# default spec is local temp dir and a modest amount of memory (200MB, of which 100MB is reserved)
defaults=[{"spec": {"allowed_mem": 200_000_000, "reserved_mem": 100_000_000}}],
)

from .array_api import Array
from .core.array import compute, measure_reserved_mem, visualize
from .core.gufunc import apply_gufunc
Expand All @@ -28,6 +36,7 @@
"TaskEndEvent",
"apply_gufunc",
"compute",
"config",
"from_array",
"from_zarr",
"map_blocks",
Expand Down
10 changes: 4 additions & 6 deletions cubed/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import numpy as np
from toolz import map, reduce

from cubed import config
from cubed.runtime.types import Callback, Executor
from cubed.spec import Spec
from cubed.spec import Spec, spec_from_config
from cubed.storage.zarr import open_if_lazy_zarr_array
from cubed.utils import chunk_memory
from cubed.vendor.dask.array.core import normalize_chunks
Expand Down Expand Up @@ -41,11 +42,8 @@ def __init__(self, name, zarray, spec, plan):
self._chunks = normalize_chunks(
zarray.chunks, shape=self.shape, dtype=self.dtype
)
# if no spec is supplied, use a default with local temp dir,
# and a modest amount of memory (200MB, of which 100MB is reserved)
self.spec = spec or Spec(
None, allowed_mem=200_000_000, reserved_mem=100_000_000
)
# get spec from config if not supplied
self.spec = spec or spec_from_config(config)
self.plan = plan

@property
Expand Down
2 changes: 2 additions & 0 deletions cubed/runtime/executors/modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
aws_image = modal.Image.debian_slim().pip_install(
[
"array-api-compat",
"donfig",
"fsspec",
"mypy_extensions", # for rechunker
"networkx",
Expand All @@ -44,6 +45,7 @@
gcp_image = modal.Image.debian_slim().pip_install(
[
"array-api-compat",
"donfig",
"fsspec",
"mypy_extensions", # for rechunker
"networkx",
Expand Down
16 changes: 16 additions & 0 deletions cubed/spec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from functools import lru_cache
from typing import Optional, Union

import donfig
from donfig.config_obj import expand_environment_variables

from cubed.runtime.create import create_executor
from cubed.runtime.types import Executor
from cubed.utils import convert_to_bytes
Expand Down Expand Up @@ -48,6 +52,7 @@ def __init__(
else:
self._allowed_mem = convert_to_bytes(allowed_mem)

self._executor: Optional[Executor]
if executor is not None:
self._executor = executor
elif executor_name is not None:
Expand Down Expand Up @@ -109,3 +114,14 @@ def __eq__(self, other):
)
else:
return False


def spec_from_config(config):
return _spec_from_serialized_config(config.serialize())


@lru_cache # ensure arrays have the same Spec object for a given config
def _spec_from_serialized_config(ser: str):
config = donfig.deserialize(ser)
spec_dict = expand_environment_variables(config["spec"])
return Spec(**spec_dict)
1 change: 1 addition & 0 deletions cubed/tests/runtime/test_modal_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
image = modal.Image.debian_slim().pip_install(
[
"array-api-compat",
"donfig",
"fsspec",
"mypy_extensions", # for rechunker
"networkx",
Expand Down
12 changes: 11 additions & 1 deletion cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,21 @@ def test_default_spec(executor):

def test_default_spec_allowed_mem_exceeded():
# default spec fails for large computations
a = xp.ones((100000, 100000), chunks=(10000, 10000))
a = xp.ones((20000, 1000), chunks=(10000, 1000))
with pytest.raises(ValueError):
xp.negative(a)


def test_default_spec_config_override():
# override default spec to increase allowed_mem
from cubed import config

with config.set({"spec.allowed_mem": "500MB"}):
a = xp.ones((20000, 1000), chunks=(10000, 1000))
b = xp.negative(a)
assert_array_equal(b.compute(), -np.ones((20000, 1000)))


def test_different_specs(tmp_path):
spec1 = cubed.Spec(tmp_path, allowed_mem=100000)
spec2 = cubed.Spec(tmp_path, allowed_mem=200000)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ requires-python = ">=3.9"
dependencies = [
"aiostream",
"array-api-compat",
"donfig",
"fsspec",
"mypy_extensions", # for rechunker
"networkx < 2.8.3",
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-dask.*]
ignore_missing_imports = True
[mypy-donfig.*]
ignore_missing_imports = True
[mypy-distributed.*]
ignore_missing_imports = True
[mypy-fsspec.*]
Expand Down

0 comments on commit 41aac1b

Please sign in to comment.