Skip to content

Commit

Permalink
Memray integration (cubed-dev#558)
Browse files Browse the repository at this point in the history
* Use Memray to examine tasks running on `lithops` or `processes` executors

* Add Memray documentation

* Fix mypy

* Update docs/user-guide/diagnostics.md

Co-authored-by: Tom Nicholas <[email protected]>

* Make memray check more defensive

---------

Co-authored-by: Tom Nicholas <[email protected]>
  • Loading branch information
2 people authored and thodson-usgs committed Oct 24, 2024
1 parent 8ca25f1 commit 544a060
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 2 deletions.
12 changes: 10 additions & 2 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@
)
from cubed.runtime.pipeline import visit_node_generations, visit_nodes
from cubed.runtime.types import Callback, DagExecutor
from cubed.runtime.utils import handle_callbacks, handle_operation_start_callbacks
from cubed.runtime.utils import (
handle_callbacks,
handle_operation_start_callbacks,
profile_memray,
)
from cubed.spec import Spec

logger = logging.getLogger(__name__)


@profile_memray
def run_func(input, func=None, config=None, name=None, compute_id=None):
result = func(input, config=config)
return result
Expand Down Expand Up @@ -171,6 +176,7 @@ def execute_dag(
) -> None:
use_backups = kwargs.pop("use_backups", True)
wait_dur_sec = kwargs.pop("wait_dur_sec", None)
compute_id = kwargs.pop("compute_id")
allowed_mem = spec.allowed_mem if spec is not None else None
function_executor = FunctionExecutor(**kwargs)
runtime_memory_mb = function_executor.config[function_executor.backend].get(
Expand Down Expand Up @@ -199,6 +205,7 @@ def execute_dag(
func=pipeline.function,
config=pipeline.config,
name=name,
compute_id=compute_id,
):
handle_callbacks(callbacks, stats)
else:
Expand All @@ -224,7 +231,8 @@ def execute_dag(
use_backups=use_backups,
return_stats=True,
wait_dur_sec=wait_dur_sec,
# TODO: kwargs
# TODO: other kwargs (func, config, name)
compute_id=compute_id,
):
handle_callbacks(callbacks, stats)

Expand Down
2 changes: 2 additions & 0 deletions cubed/runtime/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
execution_stats,
handle_callbacks,
handle_operation_start_callbacks,
profile_memray,
)
from cubed.spec import Spec

Expand Down Expand Up @@ -59,6 +60,7 @@ def execute_dag(
[callback.on_task_end(event) for callback in callbacks]


@profile_memray
@execution_stats
def run_func(input, func=None, config=None, name=None, compute_id=None):
return func(input, config=config)
Expand Down
33 changes: 33 additions & 0 deletions cubed/runtime/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import time
from contextlib import nullcontext
from functools import partial
from itertools import islice
from pathlib import Path

from cubed.runtime.types import OperationStartEvent, TaskEndEvent
from cubed.utils import peak_measured_mem

try:
import memray
except ImportError:
memray = None

sym_counter = 0


Expand Down Expand Up @@ -39,6 +46,32 @@ def execution_stats(func):
return partial(execute_with_stats, func)


def execute_with_memray(function, input, **kwargs):
# only run memray if installed, and only for first input (for operations that run on block locations)
if (
memray is not None
and "compute_id" in kwargs
and isinstance(input, list)
and all(isinstance(i, int) for i in input)
and sum(input) == 0
):
compute_id = kwargs["compute_id"]
name = kwargs["name"]
memray_dir = Path(f"history/{compute_id}/memray")
memray_dir.mkdir(parents=True, exist_ok=True)
cm = memray.Tracker(memray_dir / f"{name}.bin")
else:
cm = nullcontext()
with cm:
result = result = function(input, **kwargs)
return result


def profile_memray(func):
"""Decorator to profile a function call with memray."""
return partial(execute_with_memray, func)


def handle_operation_start_callbacks(callbacks, name):
if callbacks is not None:
event = OperationStartEvent(name)
Expand Down
Binary file added docs/images/memray-add.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
38 changes: 38 additions & 0 deletions docs/user-guide/diagnostics.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,41 @@ The timeline callback will write a graphic `timeline.svg` to a directory with th

### Examples in use
See the [examples](https://github.com/cubed-dev/cubed/blob/main/examples/README.md) for more information about how to use them.

## Memray

[Memray](https://github.com/bloomberg/memray), a memory profiler for Python, can be used to track and view memory allocations when running a single task in a Cubed computation.

This is not usually needed when using Cubed, but for developers writing new operations, improving projected memory sizes, or for debugging a memory issue, it can be very useful to understand how memory is actually allocated in Cubed.

To enable Memray memory profiling in Cubed, simply install memray (`pip install memray`). Then use a local executor that runs tasks in separate processes, such as `processes` (Python 3.11 or later) or `lithops`. When you run a computation, Cubed will enable Memray for the first task in each operation (so if an array has 100 chunks it will only produce one Memray trace).

Here is an example of a simple addition operation, with 200MB chunks. (It is adapted from [test_mem_utilization.py](https://github.com/cubed-dev/cubed/blob/main/cubed/tests/test_mem_utilization.py) in Cubed's test suite.)

```python
import cubed.array_api as xp
import cubed.random

a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
c = xp.add(a, b)
c.compute(optimize_graph=False)
```

The optimizer is turned off so that generation of the random arrays is not fused with the add operation. This way we can see the memory allocations for that operation alone.

After the computation is complete there will be a collection of `.bin` files in the `history/compute-{id}/memray` directory - with one for each operation. To view them we convert them to HTML flame graphs as follows:

```shell
(cd $(ls -d history/compute-* | tail -1)/memray; for f in $(ls *.bin); do echo $f; python -m memray flamegraph --temporal -f -o $f.html $f; done)
```
Here is the flame graph for the add operation:
![Memray temporal view of an 'add' operation](../images/memray-add.png)
Annotations have been added to explain what is going on in this example. Note that reading a chunk from Zarr requires twice the chunk memory (400MB) since there is a buffer for the compressed Zarr block (200MB), as well as the resulting array (200MB). After the first chunk has been loaded the memory dips back to 200MB since the compressed buffer is no longer retained.
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-IPython.*]
ignore_missing_imports = True
[mypy-memray.*]
ignore_missing_imports = True
[mypy-modal.*]
ignore_missing_imports = True
[mypy-matplotlib.*]
Expand Down

0 comments on commit 544a060

Please sign in to comment.