Skip to content

Commit

Permalink
Add VirtualInMemoryArray that keeps small arrays in memory rather than
Browse files Browse the repository at this point in the history
materializing to disk.
  • Loading branch information
tomwhite committed Dec 17, 2023
1 parent 3d627a5 commit 1d55905
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 17 deletions.
13 changes: 7 additions & 6 deletions cubed/array_api/creation_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
from cubed.backend_array_api import namespace as nxp
from cubed.core import Plan, gensym
from cubed.core.ops import map_direct
from cubed.core.plan import new_temp_path
from cubed.storage.virtual import virtual_empty, virtual_full, virtual_offsets
from cubed.storage.zarr import lazy_from_array
from cubed.storage.virtual import (
virtual_empty,
virtual_full,
virtual_in_memory,
virtual_offsets,
)
from cubed.utils import to_chunksize
from cubed.vendor.dask.array.core import normalize_chunks

Expand Down Expand Up @@ -70,11 +73,9 @@ def asarray(
if dtype is None:
dtype = a.dtype

# write to zarr
chunksize = to_chunksize(normalize_chunks(chunks, shape=a.shape, dtype=dtype))
name = gensym()
zarr_path = new_temp_path(name=name, spec=spec)
target = lazy_from_array(a, dtype=dtype, chunks=chunksize, store=zarr_path)
target = virtual_in_memory(a, chunks=chunksize)

plan = Plan._new(name, "asarray", target)
return Array(name, target, spec, plan)
Expand Down
38 changes: 38 additions & 0 deletions cubed/storage/virtual.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,37 @@ def __getitem__(self, key):
)


class VirtualInMemoryArray:
"""A small array that is held in memory but never materialized on disk."""

def __init__(
self,
array: np.ndarray, # TODO: generalise
chunks: T_RegularChunks,
):
self.array = array
# use an in-memory Zarr array as a template since it normalizes its properties
# and is needed for oindex
template = zarr.empty(
array.shape,
dtype=array.dtype,
chunks=chunks,
store=zarr.storage.MemoryStore(),
)
self.shape = template.shape
self.dtype = template.dtype
self.chunks = template.chunks
self.template = template
template[...] = array

def __getitem__(self, key):
return self.array.__getitem__(key)

@property
def oindex(self):
return self.template.oindex


def _key_to_index_tuple(selection):
if isinstance(selection, slice):
selection = (selection,)
Expand Down Expand Up @@ -131,3 +162,10 @@ def virtual_full(

def virtual_offsets(shape: T_Shape) -> VirtualOffsetsArray:
return VirtualOffsetsArray(shape)


def virtual_in_memory(
array: np.ndarray,
chunks: T_RegularChunks,
) -> VirtualInMemoryArray:
return VirtualInMemoryArray(array, chunks)
5 changes: 2 additions & 3 deletions cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,8 @@ def test_rechunk_same_chunks(spec):
b = a.rechunk((2, 1))
task_counter = TaskCounter()
res = b.compute(callbacks=[task_counter])
# no tasks except array creation task should have run since chunks are same
num_created_arrays = 1
assert task_counter.value == num_created_arrays
# no tasks should have run since chunks are same
assert task_counter.value == 0

assert_array_equal(res, np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]))

Expand Down
8 changes: 4 additions & 4 deletions cubed/tests/test_executor_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_callbacks(spec, executor):
np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]]),
)

num_created_arrays = 3
num_created_arrays = 1
assert task_counter.value == num_created_arrays + 4


Expand Down Expand Up @@ -132,12 +132,12 @@ def test_resume(spec, executor):
c = xp.add(a, b)
d = xp.negative(c)

num_created_arrays = 4 # a, b, c, d
num_created_arrays = 2 # c, d
assert d.plan.num_tasks(optimize_graph=False) == num_created_arrays + 8

task_counter = TaskCounter()
c.compute(executor=executor, callbacks=[task_counter], optimize_graph=False)
num_created_arrays = 3 # a, b, c
num_created_arrays = 1 # c
assert task_counter.value == num_created_arrays + 4

# since c has already been computed, when computing d only 4 tasks are run, instead of 8
Expand All @@ -146,7 +146,7 @@ def test_resume(spec, executor):
executor=executor, callbacks=[task_counter], optimize_graph=False, resume=True
)
# the create arrays tasks are run again, even though they exist
num_created_arrays = 4 # a, b, c, d
num_created_arrays = 2 # c, d
assert task_counter.value == num_created_arrays + 4


Expand Down
8 changes: 4 additions & 4 deletions cubed/tests/test_optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ def test_fusion(spec):
c = xp.astype(b, np.float32)
d = xp.negative(c)

num_created_arrays = 4 # a, b, c, d
num_created_arrays = 3 # b, c, d
assert d.plan.num_tasks(optimize_graph=False) == num_created_arrays + 12
num_created_arrays = 2 # a, d
num_created_arrays = 1 # d
assert d.plan.num_tasks(optimize_graph=True) == num_created_arrays + 4

task_counter = TaskCounter()
Expand All @@ -39,9 +39,9 @@ def test_fusion_transpose(spec):
c = xp.astype(b, np.float32)
d = c.T

num_created_arrays = 4 # a, b, c, d
num_created_arrays = 3 # b, c, d
assert d.plan.num_tasks(optimize_graph=False) == num_created_arrays + 12
num_created_arrays = 2 # a, d
num_created_arrays = 1 # d
assert d.plan.num_tasks(optimize_graph=True) == num_created_arrays + 4

task_counter = TaskCounter()
Expand Down

0 comments on commit 1d55905

Please sign in to comment.