Skip to content

Commit

Permalink
Change use_backups default to True only on cloud stores (#619)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite authored Nov 20, 2024
1 parent 5879d09 commit eb01803
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 15 deletions.
17 changes: 16 additions & 1 deletion cubed/runtime/backup.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
import math
from typing import Dict, TypeVar
from typing import Dict, Optional, TypeVar

from cubed.spec import Spec
from cubed.utils import is_cloud_storage_path

T = TypeVar("T")


def use_backups_default(spec: Optional[Spec]) -> bool:
"""The default setting for ``use_backups``.
Backup tasks are only enabled on cloud object stores since they provide atomic writes.
"""
return (
spec is not None
and spec.work_dir is not None
and is_cloud_storage_path(spec.work_dir)
)


def should_launch_backup(
task: T,
now: float,
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
async def async_map_unordered(
create_futures_func: Callable[..., List[Tuple[Any, Future]]],
input: Iterable[Any],
use_backups: bool = True,
use_backups: bool = False,
create_backup_futures_func: Optional[
Callable[..., List[Tuple[Any, Future]]]
] = None,
Expand Down
5 changes: 4 additions & 1 deletion cubed/runtime/executors/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dask.distributed import Client
from networkx import MultiDiGraph

from cubed.runtime.backup import use_backups_default
from cubed.runtime.executors.asyncio import async_map_unordered
from cubed.runtime.pipeline import visit_node_generations, visit_nodes
from cubed.runtime.types import Callback, CubedPipeline, DagExecutor
Expand All @@ -41,7 +42,7 @@ async def map_unordered(
map_function: Callable[..., Any],
map_iterdata: Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]],
retries: int = 2,
use_backups: bool = True,
use_backups: bool = False,
batch_size: Optional[int] = None,
return_stats: bool = False,
name: Optional[str] = None,
Expand Down Expand Up @@ -125,6 +126,8 @@ async def async_execute_dag(
async with Client(asynchronous=True, **compute_kwargs) as client:
if spec is not None:
check_runtime_memory(spec, client)
if "use_backups" not in kwargs and use_backups_default(spec):
kwargs["use_backups"] = True
if not compute_arrays_in_parallel:
# run one pipeline at a time
for name, node in visit_nodes(dag, resume=resume):
Expand Down
6 changes: 3 additions & 3 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from lithops.wait import ALWAYS, ANY_COMPLETED
from networkx import MultiDiGraph

from cubed.runtime.backup import should_launch_backup
from cubed.runtime.backup import should_launch_backup, use_backups_default
from cubed.runtime.executors.lithops_retries import (
RetryingFunctionExecutor,
RetryingFuture,
Expand Down Expand Up @@ -53,7 +53,7 @@ def map_unordered(
include_modules: List[str] = [],
timeout: Optional[int] = None,
retries: int = 2,
use_backups: bool = True,
use_backups: bool = False,
return_stats: bool = False,
wait_dur_sec: Optional[int] = 1,
**kwargs,
Expand Down Expand Up @@ -174,7 +174,7 @@ def execute_dag(
compute_arrays_in_parallel: Optional[bool] = None,
**kwargs,
) -> None:
use_backups = kwargs.pop("use_backups", True)
use_backups = kwargs.pop("use_backups", use_backups_default(spec))
wait_dur_sec = kwargs.pop("wait_dur_sec", None)
compute_id = kwargs.pop("compute_id")
allowed_mem = spec.allowed_mem if spec is not None else None
Expand Down
5 changes: 4 additions & 1 deletion cubed/runtime/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from networkx import MultiDiGraph
from tenacity import Retrying, stop_after_attempt

from cubed.runtime.backup import use_backups_default
from cubed.runtime.executors.asyncio import async_map_unordered
from cubed.runtime.pipeline import visit_node_generations, visit_nodes
from cubed.runtime.types import Callback, CubedPipeline, DagExecutor, TaskEndEvent
Expand Down Expand Up @@ -80,7 +81,7 @@ async def map_unordered(
function: Callable[..., Any],
input: Iterable[Any],
retries: int = 2,
use_backups: bool = True,
use_backups: bool = False,
batch_size: Optional[int] = None,
return_stats: bool = False,
name: Optional[str] = None,
Expand Down Expand Up @@ -180,6 +181,8 @@ async def async_execute_dag(
use_processes = kwargs.pop("use_processes", False)
if spec is not None:
check_runtime_memory(spec, max_workers)
if "use_backups" not in kwargs and use_backups_default(spec):
kwargs["use_backups"] = True
if use_processes:
max_tasks_per_child = kwargs.pop("max_tasks_per_child", None)
if isinstance(use_processes, str):
Expand Down
5 changes: 4 additions & 1 deletion cubed/runtime/executors/modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from networkx import MultiDiGraph
from tenacity import retry, retry_if_exception_type, stop_after_attempt

from cubed.runtime.backup import use_backups_default
from cubed.runtime.executors.asyncio import async_map_unordered
from cubed.runtime.pipeline import visit_node_generations, visit_nodes
from cubed.runtime.types import Callback, DagExecutor
Expand Down Expand Up @@ -119,7 +120,7 @@ def run_remotely(self, input, func=None, config=None, name=None, compute_id=None
async def map_unordered(
app_function: Function,
input: Iterable[Any],
use_backups: bool = True,
use_backups: bool = False,
backup_function: Optional[Function] = None,
batch_size: Optional[int] = None,
return_stats: bool = False,
Expand Down Expand Up @@ -209,6 +210,8 @@ async def async_execute_dag(
) -> None:
if spec is not None:
check_runtime_memory(spec)
if "use_backups" not in kwargs and use_backups_default(spec):
kwargs["use_backups"] = True
async with app.run(show_progress=False):
cloud = cloud or "aws"
if cloud == "aws":
Expand Down
12 changes: 12 additions & 0 deletions cubed/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
block_id_to_offset,
broadcast_trick,
extract_stack_summaries,
is_cloud_storage_path,
is_local_path,
join_path,
map_nested,
Expand Down Expand Up @@ -77,6 +78,17 @@ def test_is_local_path():
assert is_local_path("file://absolute_path/path")
assert is_local_path("file:///absolute_path/path")
assert not is_local_path("s3://host/path")
assert not is_local_path("gs://host/path")


def test_is_cloud_storage_path():
assert not is_cloud_storage_path("relative_path/path")
assert not is_cloud_storage_path("/absolute_path/path")
assert not is_cloud_storage_path("file:relative_path/path")
assert not is_cloud_storage_path("file://absolute_path/path")
assert not is_cloud_storage_path("file:///absolute_path/path")
assert is_cloud_storage_path("s3://host/path")
assert is_cloud_storage_path("gs://host/path")


def test_memory_repr():
Expand Down
9 changes: 7 additions & 2 deletions cubed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,14 @@ def join_path(dir_url: PathType, child_path: str) -> str:
return urlunsplit(split_parts)


def is_local_path(path: str):
def is_local_path(path: PathType):
"""Determine if a path string is for the local filesystem."""
return urlsplit(path).scheme in ("", "file")
return urlsplit(str(path)).scheme in ("", "file")


def is_cloud_storage_path(path: PathType):
"""Determine if a path string is for cloud storage."""
return urlsplit(str(path)).scheme in ("gs", "s3")


def memory_repr(num: int) -> str:
Expand Down
8 changes: 4 additions & 4 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ since it is deliberately designed not to have anything except the most basic fea
| Property | Default | Description |
|------------------------------|---------|----------------------------------------------------------------------------------------------------|
| `retries` | 2 | The number of times to retry a task if it fails. |
| `use_backups` | `True` | Whether to use backup tasks for mitigating stragglers. |
| `use_backups` | | Whether to use backup tasks for mitigating stragglers. Defaults to `True` only if `work_dir` is a filesystem supporting atomic writes (currently a cloud store like S3 or GCS). |
| `batch_size` | `None` | Number of input tasks to submit to be run in parallel. The default is not to batch. |
| `compute_arrays_in_parallel` | `False` | Whether arrays are computed one at a time or in parallel. |
| `max_workers` | `None` | The maximum number of workers to use in the `ThreadPoolExecutor`. Defaults to number of CPU cores. |
Expand All @@ -128,7 +128,7 @@ since it is deliberately designed not to have anything except the most basic fea

| Property | Default | Description |
|------------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------|
| `use_backups` | `True` | Whether to use backup tasks for mitigating stragglers. |
| `use_backups` | | Whether to use backup tasks for mitigating stragglers. Defaults to `True` only if `work_dir` is a filesystem supporting atomic writes (currently a cloud store like S3 or GCS). |
| `batch_size` | `None` | Number of input tasks to submit to be run in parallel. `None` means don't batch. |
| `compute_arrays_in_parallel` | `False` | Whether arrays are computed one at a time or in parallel. |
| `max_workers` | `None` | The maximum number of workers to use in the `ProcessPoolExecutor`. Defaults to number of CPU cores. |
Expand All @@ -154,7 +154,7 @@ Note that there is currently no way to set retries or a timeout for the Coiled e
| Property | Default | Description |
|------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------|
| `retries` | 2 | The number of times to retry a task if it fails. |
| `use_backups` | `True` | Whether to use backup tasks for mitigating stragglers. |
| `use_backups` | | Whether to use backup tasks for mitigating stragglers. Defaults to `True` only if `work_dir` is a filesystem supporting atomic writes (currently a cloud store like S3 or GCS). |
| `batch_size` | `None` | Number of input tasks to submit to be run in parallel. The default is not to batch. |
| `compute_arrays_in_parallel` | `False` | Whether arrays are computed one at a time or in parallel. |
| `compute_kwargs` | `None` | Keyword arguments to pass to Dask's [`distributed.Client`](https://distributed.dask.org/en/latest/api.html#client) constructor. |
Expand All @@ -167,7 +167,7 @@ Note that there is currently no way to set a timeout for the Dask executor.
|------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `retries` | 2 | The number of times to retry a task if it fails. |
| `timeout` | `None` | Tasks that take longer than the timeout will be automatically killed and retried. Defaults to the timeout specified when [deploying the lithops runtime image](https://lithops-cloud.github.io/docs/source/cli.html#lithops-runtime-deploy-runtime-name). This is 180 seconds in the [examples](https://github.com/cubed-dev/cubed/blob/main/examples/README.md). |
| `use_backups` | `True` | Whether to use backup tasks for mitigating stragglers. |
| `use_backups` | | Whether to use backup tasks for mitigating stragglers. Defaults to `True` only if `work_dir` is a filesystem supporting atomic writes (currently a cloud store like S3 or GCS). |
| `compute_arrays_in_parallel` | `False` | Whether arrays are computed one at a time or in parallel. |
| Other properties | N/A | Other properties will be passed as keyword arguments to the [`lithops.executors.FunctionExecutor`](https://lithops-cloud.github.io/docs/source/api_futures.html#lithops.executors.FunctionExecutor) constructor. |

Expand Down
2 changes: 1 addition & 1 deletion docs/user-guide/reliability.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ A few slow running tasks (called stragglers) can disproportionately slow down th

When a backup task is launched the original task is not cancelled, so it is to be expected that both tasks will complete and write their (identical) output. This only works since tasks are idempotent and each write a single, whole Zarr chunk in an atomic operation. (Updates to a single key are atomic in both [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html#ConsistencyModel) and Google Cloud Storage.)

Backup tasks are enabled by default, but if you need to turn them off you can do so with ``use_backups=False``.
Backup tasks are only enabled by default on filesystems supporting atomic writes, which currently includes cloud stores like S3 or GCS. You can turn backup tasks off completely on cloud stores by setting ``use_backups=False``.

0 comments on commit eb01803

Please sign in to comment.