Skip to content

Commit

Permalink
Enable backup tasks by default for all executors that support them
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Feb 12, 2024
1 parent 9ad8fbe commit f24ae64
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 19 deletions.
2 changes: 1 addition & 1 deletion cubed/runtime/executors/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
async def async_map_unordered(
create_futures_func: Callable[..., List[Tuple[Any, Future]]],
input: Iterable[Any],
use_backups: bool = False,
use_backups: bool = True,
create_backup_futures_func: Optional[
Callable[..., List[Tuple[Any, Future]]]
] = None,
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/dask_distributed_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def map_unordered(
map_function: Callable[..., Any],
map_iterdata: Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]],
retries: int = 2,
use_backups: bool = False,
use_backups: bool = True,
batch_size: Optional[int] = None,
return_stats: bool = False,
name: Optional[str] = None,
Expand Down
6 changes: 3 additions & 3 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def map_unordered(
include_modules: List[str] = [],
timeout: Optional[int] = None,
retries: int = 2,
use_backups: bool = False,
use_backups: bool = True,
return_stats: bool = False,
**kwargs,
) -> Iterator[Any]:
Expand Down Expand Up @@ -138,7 +138,7 @@ def map_unordered(
future, now, start_times[group_name], end_times[group_name]
):
input = future.input
logger.info("Running backup task for %s", input)
logger.warn("Running backup task for %s", input)
futures = lithops_function_executor.map(
group_name_to_function[group_name],
[input],
Expand Down Expand Up @@ -166,7 +166,7 @@ def execute_dag(
compute_arrays_in_parallel: Optional[bool] = None,
**kwargs,
) -> None:
use_backups = kwargs.pop("use_backups", False)
use_backups = kwargs.pop("use_backups", True)
allowed_mem = spec.allowed_mem if spec is not None else None
function_executor = FunctionExecutor(**kwargs)
runtime_memory_mb = function_executor.config[function_executor.backend].get(
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/modal_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
async def map_unordered(
app_function: Function,
input: Iterable[Any],
use_backups: bool = False,
use_backups: bool = True,
backup_function: Optional[Function] = None,
batch_size: Optional[int] = None,
return_stats: bool = False,
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/python_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def map_unordered(
function: Callable[..., Any],
input: Iterable[Any],
retries: int = 2,
use_backups: bool = False,
use_backups: bool = True,
batch_size: Optional[int] = None,
return_stats: bool = False,
name: Optional[str] = None,
Expand Down
8 changes: 6 additions & 2 deletions cubed/tests/runtime/test_dask_distributed_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ async def run_test(function, input, retries, use_backups=False, batch_size=None)
],
)
# fmt: on
def test_success(tmp_path, timing_map, n_tasks, retries):
@pytest.mark.parametrize("use_backups", [False, True])
def test_success(tmp_path, timing_map, n_tasks, retries, use_backups):
outputs = asyncio.run(
run_test(
function=partial(deterministic_failure, tmp_path, timing_map),
input=range(n_tasks),
retries=retries,
use_backups=use_backups,
)
)

Expand All @@ -66,13 +68,15 @@ def test_success(tmp_path, timing_map, n_tasks, retries):
],
)
# fmt: on
def test_failure(tmp_path, timing_map, n_tasks, retries):
@pytest.mark.parametrize("use_backups", [False, True])
def test_failure(tmp_path, timing_map, n_tasks, retries, use_backups):
with pytest.raises(RuntimeError):
asyncio.run(
run_test(
function=partial(deterministic_failure, tmp_path, timing_map),
input=range(n_tasks),
retries=retries,
use_backups=use_backups,
)
)

Expand Down
8 changes: 6 additions & 2 deletions cubed/tests/runtime/test_lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ def run_test(function, input, retries, timeout=10, use_backups=False):
],
)
# fmt: on
def test_success(tmp_path, timing_map, n_tasks, retries):
@pytest.mark.parametrize("use_backups", [False, True])
def test_success(tmp_path, timing_map, n_tasks, retries, use_backups):
outputs = run_test(
function=partial(deterministic_failure, tmp_path, timing_map),
input=range(n_tasks),
retries=retries,
use_backups=use_backups,
)

assert outputs == set(range(n_tasks))
Expand All @@ -65,12 +67,14 @@ def test_success(tmp_path, timing_map, n_tasks, retries):
],
)
# fmt: on
def test_failure(tmp_path, timing_map, n_tasks, retries):
@pytest.mark.parametrize("use_backups", [False, True])
def test_failure(tmp_path, timing_map, n_tasks, retries, use_backups):
with pytest.raises(RuntimeError):
run_test(
function=partial(deterministic_failure, tmp_path, timing_map),
input=range(n_tasks),
retries=retries,
use_backups=use_backups,
)

check_invocation_counts(tmp_path, timing_map, n_tasks, retries)
Expand Down
14 changes: 10 additions & 4 deletions cubed/tests/runtime/test_modal_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,17 @@ async def run_test(app_function, input, use_backups=False, batch_size=None, **kw
],
)
# fmt: on
@pytest.mark.parametrize("use_backups", [False, True])
@pytest.mark.cloud
def test_success(timing_map, n_tasks, retries):
def test_success(timing_map, n_tasks, retries, use_backups):
try:
outputs = asyncio.run(
run_test(
app_function=deterministic_failure_modal,
input=range(n_tasks),
use_backups=use_backups,
path=tmp_path,
timing_map=timing_map
timing_map=timing_map,
)
)

Expand All @@ -109,14 +111,16 @@ def test_success(timing_map, n_tasks, retries):
],
)
# fmt: on
@pytest.mark.parametrize("use_backups", [False, True])
@pytest.mark.cloud
def test_failure(timing_map, n_tasks, retries):
def test_failure(timing_map, n_tasks, retries, use_backups):
try:
with pytest.raises(RuntimeError):
asyncio.run(
run_test(
app_function=deterministic_failure_modal,
input=range(n_tasks),
use_backups=use_backups,
path=tmp_path,
timing_map=timing_map,
)
Expand All @@ -137,13 +141,15 @@ def test_failure(timing_map, n_tasks, retries):
],
)
# fmt: on
@pytest.mark.parametrize("use_backups", [False, True])
@pytest.mark.cloud
def test_large_number_of_tasks(timing_map, n_tasks, retries):
def test_large_number_of_tasks(timing_map, n_tasks, retries, use_backups):
try:
outputs = asyncio.run(
run_test(
app_function=deterministic_failure_modal,
input=range(n_tasks),
use_backups=use_backups,
path=tmp_path,
timing_map=timing_map
)
Expand Down
8 changes: 6 additions & 2 deletions cubed/tests/runtime/test_python_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ async def run_test(function, input, retries=2, use_backups=False, batch_size=Non
],
)
# fmt: on
def test_success(tmp_path, timing_map, n_tasks, retries):
@pytest.mark.parametrize("use_backups", [False, True])
def test_success(tmp_path, timing_map, n_tasks, retries, use_backups):
outputs = asyncio.run(
run_test(
function=partial(deterministic_failure, tmp_path, timing_map),
input=range(n_tasks),
retries=retries,
use_backups=use_backups,
)
)

Expand All @@ -62,13 +64,15 @@ def test_success(tmp_path, timing_map, n_tasks, retries):
],
)
# fmt: on
def test_failure(tmp_path, timing_map, n_tasks, retries):
@pytest.mark.parametrize("use_backups", [False, True])
def test_failure(tmp_path, timing_map, n_tasks, retries, use_backups):
with pytest.raises(RuntimeError):
asyncio.run(
run_test(
function=partial(deterministic_failure, tmp_path, timing_map),
input=range(n_tasks),
retries=retries,
use_backups=use_backups,
)
)

Expand Down
2 changes: 1 addition & 1 deletion docs/user-guide/reliability.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ A few slow running tasks (called stragglers) can disproportionately slow down th

When a backup task is launched the original task is not cancelled, so it is to be expected that both tasks will complete and write their (identical) output. This only works since tasks are idempotent and each write a single, whole Zarr chunk in an atomic operation. (Updates to a single key are atomic in both [Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html#ConsistencyModel) and Google Cloud Storage.)

Note that this feature is experimental and disabled by default since it has not been tested at scale yet.
Backup tasks are enabled by default, but if you need to turn them off you can do so with ``use_backups=False``.
3 changes: 2 additions & 1 deletion docs/user-guide/scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ Weak scaling requires more workers than output chunks, so for large problems it
With fewer workers than chunks we would expect linear strong scaling, as every new worker added has nothing to wait for.

Stragglers are tasks that take much longer than average, who disproportionately hold up the next step of the computation.
To handle stragglers, you should consider turning on backups (with ``use_backups=True``), as any failures that are restarted essentially become stragglers.
Stargglers are handled by running backup tasks for any tasks that are running very slowly. This feature is enabled by default, but
if you need to turn it off you can do so with ``use_backups=False``.
Worker start-up time is another practical speed consideration, though it would delay computations of all scales equally.

### Multi-step Calculation
Expand Down

0 comments on commit f24ae64

Please sign in to comment.