Skip to content

Commit

Permalink
Debug error handling (#577)
Browse files Browse the repository at this point in the history
* Debug error handling

* skip tests with lower python versions

* Update shared.py

* Update shared.py

* fix tests

* Update shared.py

* Update shared.py

* Update test_dependencies_executor.py
  • Loading branch information
jan-janssen authored Feb 13, 2025
1 parent b1145f3 commit 61e1d7c
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 7 deletions.
25 changes: 23 additions & 2 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import queue
import sys
import time
from asyncio.exceptions import CancelledError
from concurrent.futures import Future
from time import sleep
from typing import Any, Callable, Optional, Union
Expand Down Expand Up @@ -361,7 +362,10 @@ def execute_tasks_with_dependencies(
task_dict is not None and "fn" in task_dict and "future" in task_dict
):
future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
if len(future_lst) == 0 or ready_flag:
exception_lst = _get_exception_lst(future_lst=future_lst)
if len(exception_lst) > 0:
task_dict["future"].set_exception(exception_lst[0])
elif len(future_lst) == 0 or ready_flag:
# No future objects are used in the input or all future objects are already done
task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
args=task_dict["args"], kwargs=task_dict["kwargs"]
Expand Down Expand Up @@ -455,7 +459,10 @@ def _submit_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> l
"""
wait_tmp_lst = []
for task_wait_dict in wait_lst:
if all(future.done() for future in task_wait_dict["future_lst"]):
exception_lst = _get_exception_lst(future_lst=task_wait_dict["future_lst"])
if len(exception_lst) > 0:
task_wait_dict["future"].set_exception(exception_lst[0])
elif all(future.done() for future in task_wait_dict["future_lst"]):
del task_wait_dict["future_lst"]
task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input(
args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"]
Expand Down Expand Up @@ -663,3 +670,17 @@ def _execute_task_with_cache(
future = task_dict["future"]
future.set_result(result)
future_queue.task_done()


def _get_exception_lst(future_lst: list) -> list:
def get_exception(future_obj: Future) -> bool:
try:
excp = future_obj.exception(timeout=10**-10)
return excp is not None and not isinstance(excp, CancelledError)
except TimeoutError:
return False

if sys.version_info[0] >= 3 and sys.version_info[1] >= 11:
return [f.exception() for f in future_lst if get_exception(future_obj=f)]
else:
return []
188 changes: 183 additions & 5 deletions tests/test_dependencies_executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from concurrent.futures import Future
import unittest
import sys
from time import sleep
from queue import Queue

Expand Down Expand Up @@ -42,7 +43,7 @@ def return_input_dict(input_dict):
return input_dict


def raise_error():
def raise_error(parameter):
raise RuntimeError


Expand Down Expand Up @@ -106,6 +107,119 @@ def test_dependency_steps(self):
self.assertTrue(fs2.done())
q.put({"shutdown": True, "wait": True})

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_dependency_steps_error(self):
cloudpickle_register(ind=1)
fs1 = Future()
fs2 = Future()
q = Queue()
q.put(
{
"fn": raise_error,
"args": (),
"kwargs": {"parameter": 0},
"future": fs1,
"resource_dict": {"cores": 1},
}
)
q.put(
{
"fn": add_function,
"args": (),
"kwargs": {"parameter_1": 1, "parameter_2": fs1},
"future": fs2,
"resource_dict": {"cores": 1},
}
)
executor = create_single_node_executor(
max_workers=1,
max_cores=2,
resource_dict={
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
},
)
process = RaisingThread(
target=execute_tasks_with_dependencies,
kwargs={
"future_queue": q,
"executor_queue": executor._future_queue,
"executor": executor,
"refresh_rate": 0.01,
},
)
process.start()
self.assertFalse(fs1.done())
self.assertFalse(fs2.done())
self.assertTrue(fs1.exception() is not None)
self.assertTrue(fs2.exception() is not None)
with self.assertRaises(RuntimeError):
fs2.result()
q.put({"shutdown": True, "wait": True})

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_dependency_steps_error_before(self):
cloudpickle_register(ind=1)
fs1 = Future()
fs1.set_exception(RuntimeError())
fs2 = Future()
q = Queue()
q.put(
{
"fn": add_function,
"args": (),
"kwargs": {"parameter_1": 1, "parameter_2": 2},
"future": fs1,
"resource_dict": {"cores": 1},
}
)
q.put(
{
"fn": add_function,
"args": (),
"kwargs": {"parameter_1": 1, "parameter_2": fs1},
"future": fs2,
"resource_dict": {"cores": 1},
}
)
executor = create_single_node_executor(
max_workers=1,
max_cores=2,
resource_dict={
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
},
)
process = RaisingThread(
target=execute_tasks_with_dependencies,
kwargs={
"future_queue": q,
"executor_queue": executor._future_queue,
"executor": executor,
"refresh_rate": 0.01,
},
)
process.start()
self.assertTrue(fs1.exception() is not None)
self.assertTrue(fs2.exception() is not None)
with self.assertRaises(RuntimeError):
fs2.result()
q.put({"shutdown": True, "wait": True})

def test_many_to_one(self):
length = 5
parameter = 1
Expand Down Expand Up @@ -148,22 +262,86 @@ def test_block_allocation_false_one_worker(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)
_ = exe.submit(raise_error, parameter=0)

def test_block_allocation_true_one_worker(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)
_ = exe.submit(raise_error, parameter=0)

def test_block_allocation_false_two_workers(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)
_ = exe.submit(raise_error, parameter=0)

def test_block_allocation_true_two_workers(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe:
cloudpickle_register(ind=1)
_ = exe.submit(raise_error)
_ = exe.submit(raise_error, parameter=0)

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_block_allocation_false_one_worker_loop(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe:
cloudpickle_register(ind=1)
lst = []
for i in range(1, 4):
lst = exe.submit(
raise_error,
parameter=lst,
)
lst.result()

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_block_allocation_true_one_worker_loop(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe:
cloudpickle_register(ind=1)
lst = []
for i in range(1, 4):
lst = exe.submit(
raise_error,
parameter=lst,
)
lst.result()

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_block_allocation_false_two_workers_loop(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe:
cloudpickle_register(ind=1)
lst = []
for i in range(1, 4):
lst = exe.submit(
raise_error,
parameter=lst,
)
lst.result()

@unittest.skipIf(
condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11),
reason="requires Python 3.11 or higher",
)
def test_block_allocation_true_two_workers_loop(self):
with self.assertRaises(RuntimeError):
with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe:
cloudpickle_register(ind=1)
lst = []
for i in range(1, 4):
lst = exe.submit(
raise_error,
parameter=lst,
)
lst.result()

0 comments on commit 61e1d7c

Please sign in to comment.