Skip to content

Commit

Permalink
Expose Lithops wait_dur_sec for time to wait between checking for c…
Browse files Browse the repository at this point in the history
…ompleted tasks

Set to a smaller value for tests so they complete faster.
  • Loading branch information
tomwhite committed Apr 30, 2024
1 parent 1b9cc54 commit 94ea354
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
9 changes: 8 additions & 1 deletion cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def map_unordered(
retries: int = 2,
use_backups: bool = True,
return_stats: bool = False,
wait_dur_sec: Optional[int] = 1,
**kwargs,
) -> Iterator[Any]:
"""
Expand All @@ -66,10 +67,12 @@ def map_unordered(
:param retries: The number of times to retry a failed task before raising an exception.
:param use_backups: Whether to launch backup tasks to mitigate against slow-running tasks.
:param return_stats: Whether to return lithops stats.
:param wait_dur_sec: Time interval to wait between each check for completed tasks.
:return: Function values (and optionally stats) as they are completed, not necessarily in the input order.
"""
return_when = ALWAYS if use_backups else ANY_COMPLETED
wait_dur_sec = wait_dur_sec or 1

group_name_to_function: Dict[str, Callable[..., Any]] = {}
# backups are launched based on task start and end times for the group
Expand Down Expand Up @@ -104,6 +107,7 @@ def map_unordered(
throw_except=False,
return_when=return_when,
show_progressbar=False,
wait_dur_sec=wait_dur_sec,
)
for future in finished:
if future.error:
Expand Down Expand Up @@ -154,7 +158,7 @@ def map_unordered(
backup = futures[0]
backups[future] = backup
backups[backup] = future
time.sleep(1)
time.sleep(wait_dur_sec)


def execute_dag(
Expand All @@ -166,6 +170,7 @@ def execute_dag(
**kwargs,
) -> None:
use_backups = kwargs.pop("use_backups", True)
wait_dur_sec = kwargs.pop("wait_dur_sec", None)
allowed_mem = spec.allowed_mem if spec is not None else None
function_executor = FunctionExecutor(**kwargs)
runtime_memory_mb = function_executor.config[function_executor.backend].get(
Expand All @@ -189,6 +194,7 @@ def execute_dag(
[name],
use_backups=use_backups,
return_stats=True,
wait_dur_sec=wait_dur_sec,
# kwargs below
func=pipeline.function,
config=pipeline.config,
Expand Down Expand Up @@ -217,6 +223,7 @@ def execute_dag(
group_names,
use_backups=use_backups,
return_stats=True,
wait_dur_sec=wait_dur_sec,
# TODO: kwargs
):
handle_callbacks(callbacks, stats)
Expand Down
10 changes: 8 additions & 2 deletions cubed/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
from cubed.runtime.create import create_executor
from cubed.runtime.types import Callback

LITHOPS_LOCAL_CONFIG = {"lithops": {"backend": "localhost", "storage": "localhost"}}
LITHOPS_LOCAL_CONFIG = {
"lithops": {
"backend": "localhost",
"storage": "localhost",
"monitoring_interval": 0.1,
}
}

ALL_EXECUTORS = [create_executor("single-threaded")]

Expand Down Expand Up @@ -39,7 +45,7 @@
pass

try:
executor_options = dict(config=LITHOPS_LOCAL_CONFIG)
executor_options = dict(config=LITHOPS_LOCAL_CONFIG, wait_dur_sec=0.1)
ALL_EXECUTORS.append(create_executor("lithops", executor_options))
MAIN_EXECUTORS.append(create_executor("lithops", executor_options))
except ImportError:
Expand Down

0 comments on commit 94ea354

Please sign in to comment.