From 61e1d7cf7d04f01f89979f45c407e6309c1d3a48 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 13 Feb 2025 23:42:48 +0100 Subject: [PATCH] Debug error handling (#577) * Debug error handling * skip tests with lower python versions * Update shared.py * Update shared.py * fix tests * Update shared.py * Update shared.py * Update test_dependencies_executor.py --- executorlib/interactive/shared.py | 25 +++- tests/test_dependencies_executor.py | 188 +++++++++++++++++++++++++++- 2 files changed, 206 insertions(+), 7 deletions(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 4ce64cce..4439145e 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -3,6 +3,7 @@ import queue import sys import time +from asyncio.exceptions import CancelledError from concurrent.futures import Future from time import sleep from typing import Any, Callable, Optional, Union @@ -361,7 +362,10 @@ def execute_tasks_with_dependencies( task_dict is not None and "fn" in task_dict and "future" in task_dict ): future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict) - if len(future_lst) == 0 or ready_flag: + exception_lst = _get_exception_lst(future_lst=future_lst) + if len(exception_lst) > 0: + task_dict["future"].set_exception(exception_lst[0]) + elif len(future_lst) == 0 or ready_flag: # No future objects are used in the input or all future objects are already done task_dict["args"], task_dict["kwargs"] = _update_futures_in_input( args=task_dict["args"], kwargs=task_dict["kwargs"] @@ -455,7 +459,10 @@ def _submit_waiting_task(wait_lst: list[dict], executor_queue: queue.Queue) -> l """ wait_tmp_lst = [] for task_wait_dict in wait_lst: - if all(future.done() for future in task_wait_dict["future_lst"]): + exception_lst = _get_exception_lst(future_lst=task_wait_dict["future_lst"]) + if len(exception_lst) > 0: + task_wait_dict["future"].set_exception(exception_lst[0]) + elif all(future.done() for future in task_wait_dict["future_lst"]): del task_wait_dict["future_lst"] task_wait_dict["args"], task_wait_dict["kwargs"] = _update_futures_in_input( args=task_wait_dict["args"], kwargs=task_wait_dict["kwargs"] @@ -663,3 +670,17 @@ def _execute_task_with_cache( future = task_dict["future"] future.set_result(result) future_queue.task_done() + + +def _get_exception_lst(future_lst: list) -> list: + def get_exception(future_obj: Future) -> bool: + try: + excp = future_obj.exception(timeout=10**-10) + return excp is not None and not isinstance(excp, CancelledError) + except TimeoutError: + return False + + if sys.version_info[0] >= 3 and sys.version_info[1] >= 11: + return [f.exception() for f in future_lst if get_exception(future_obj=f)] + else: + return [] diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index 0aa1d835..69f5ddff 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -1,5 +1,6 @@ from concurrent.futures import Future import unittest +import sys from time import sleep from queue import Queue @@ -42,7 +43,7 @@ def return_input_dict(input_dict): return input_dict -def raise_error(): +def raise_error(parameter): raise RuntimeError @@ -106,6 +107,119 @@ def test_dependency_steps(self): self.assertTrue(fs2.done()) q.put({"shutdown": True, "wait": True}) + @unittest.skipIf( + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11), + reason="requires Python 3.11 or higher", + ) + def test_dependency_steps_error(self): + cloudpickle_register(ind=1) + fs1 = Future() + fs2 = Future() + q = Queue() + q.put( + { + "fn": raise_error, + "args": (), + "kwargs": {"parameter": 0}, + "future": fs1, + "resource_dict": {"cores": 1}, + } + ) + q.put( + { + "fn": add_function, + "args": (), + "kwargs": {"parameter_1": 1, "parameter_2": fs1}, + "future": fs2, + "resource_dict": {"cores": 1}, + } + ) + executor = create_single_node_executor( + max_workers=1, + max_cores=2, + resource_dict={ + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, + ) + process = RaisingThread( + target=execute_tasks_with_dependencies, + kwargs={ + "future_queue": q, + "executor_queue": executor._future_queue, + "executor": executor, + "refresh_rate": 0.01, + }, + ) + process.start() + self.assertFalse(fs1.done()) + self.assertFalse(fs2.done()) + self.assertTrue(fs1.exception() is not None) + self.assertTrue(fs2.exception() is not None) + with self.assertRaises(RuntimeError): + fs2.result() + q.put({"shutdown": True, "wait": True}) + + @unittest.skipIf( + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11), + reason="requires Python 3.11 or higher", + ) + def test_dependency_steps_error_before(self): + cloudpickle_register(ind=1) + fs1 = Future() + fs1.set_exception(RuntimeError()) + fs2 = Future() + q = Queue() + q.put( + { + "fn": add_function, + "args": (), + "kwargs": {"parameter_1": 1, "parameter_2": 2}, + "future": fs1, + "resource_dict": {"cores": 1}, + } + ) + q.put( + { + "fn": add_function, + "args": (), + "kwargs": {"parameter_1": 1, "parameter_2": fs1}, + "future": fs2, + "resource_dict": {"cores": 1}, + } + ) + executor = create_single_node_executor( + max_workers=1, + max_cores=2, + resource_dict={ + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, + ) + process = RaisingThread( + target=execute_tasks_with_dependencies, + kwargs={ + "future_queue": q, + "executor_queue": executor._future_queue, + "executor": executor, + "refresh_rate": 0.01, + }, + ) + process.start() + self.assertTrue(fs1.exception() is not None) + self.assertTrue(fs2.exception() is not None) + with self.assertRaises(RuntimeError): + fs2.result() + q.put({"shutdown": True, "wait": True}) + def test_many_to_one(self): length = 5 parameter = 1 @@ -148,22 +262,86 @@ def test_block_allocation_false_one_worker(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe: cloudpickle_register(ind=1) - _ = exe.submit(raise_error) + _ = exe.submit(raise_error, parameter=0) def test_block_allocation_true_one_worker(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe: cloudpickle_register(ind=1) - _ = exe.submit(raise_error) + _ = exe.submit(raise_error, parameter=0) def test_block_allocation_false_two_workers(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe: cloudpickle_register(ind=1) - _ = exe.submit(raise_error) + _ = exe.submit(raise_error, parameter=0) def test_block_allocation_true_two_workers(self): with self.assertRaises(RuntimeError): with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe: cloudpickle_register(ind=1) - _ = exe.submit(raise_error) + _ = exe.submit(raise_error, parameter=0) + + @unittest.skipIf( + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11), + reason="requires Python 3.11 or higher", + ) + def test_block_allocation_false_one_worker_loop(self): + with self.assertRaises(RuntimeError): + with SingleNodeExecutor(max_cores=1, block_allocation=False) as exe: + cloudpickle_register(ind=1) + lst = [] + for i in range(1, 4): + lst = exe.submit( + raise_error, + parameter=lst, + ) + lst.result() + + @unittest.skipIf( + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11), + reason="requires Python 3.11 or higher", + ) + def test_block_allocation_true_one_worker_loop(self): + with self.assertRaises(RuntimeError): + with SingleNodeExecutor(max_cores=1, block_allocation=True) as exe: + cloudpickle_register(ind=1) + lst = [] + for i in range(1, 4): + lst = exe.submit( + raise_error, + parameter=lst, + ) + lst.result() + + @unittest.skipIf( + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11), + reason="requires Python 3.11 or higher", + ) + def test_block_allocation_false_two_workers_loop(self): + with self.assertRaises(RuntimeError): + with SingleNodeExecutor(max_cores=2, block_allocation=False) as exe: + cloudpickle_register(ind=1) + lst = [] + for i in range(1, 4): + lst = exe.submit( + raise_error, + parameter=lst, + ) + lst.result() + + @unittest.skipIf( + condition=not (sys.version_info[0] >= 3 and sys.version_info[1] >= 11), + reason="requires Python 3.11 or higher", + ) + def test_block_allocation_true_two_workers_loop(self): + with self.assertRaises(RuntimeError): + with SingleNodeExecutor(max_cores=2, block_allocation=True) as exe: + cloudpickle_register(ind=1) + lst = [] + for i in range(1, 4): + lst = exe.submit( + raise_error, + parameter=lst, + ) + lst.result()