From faecc64c8c0518c2d16981dcf9d65f57283aea5b Mon Sep 17 00:00:00 2001 From: owinter Date: Fri, 29 Nov 2024 11:54:39 +0000 Subject: [PATCH 1/3] bugfix: with some new signatures, need to remove duplicates --- ibllib/oneibl/data_handlers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibllib/oneibl/data_handlers.py b/ibllib/oneibl/data_handlers.py index 2ccc0960d..a0b085150 100644 --- a/ibllib/oneibl/data_handlers.py +++ b/ibllib/oneibl/data_handlers.py @@ -563,7 +563,7 @@ def getData(self, one=None): one = one or self.one session_datasets = one.list_datasets(one.path2eid(self.session_path), details=True) dfs = [file.filter(session_datasets)[1] for file in self.signature['input_files']] - return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs) + return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs).drop_duplicates() def getOutputFiles(self): """ From 76c4de062bf0b577183b7a66e04bb0d349304844 Mon Sep 17 00:00:00 2001 From: owinter Date: Fri, 29 Nov 2024 11:55:22 +0000 Subject: [PATCH 2/3] add an option to raise within tasks --- ibllib/pipes/tasks.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/ibllib/pipes/tasks.py b/ibllib/pipes/tasks.py index d6f7fe1cd..bbb2d0abb 100644 --- a/ibllib/pipes/tasks.py +++ b/ibllib/pipes/tasks.py @@ -112,9 +112,10 @@ class Task(abc.ABC): force = False # whether to re-download missing input files on local server if not present job_size = 'small' # either 'small' or 'large', defines whether task should be run as part of the large or small job services env = None # the environment name within which to run the task (NB: the env is not activated automatically!) + on_error = 'continue' # whether to raise an exception on error ('raise') or report the error and continue ('continue') def __init__(self, session_path, parents=None, taskid=None, one=None, - machine=None, clobber=True, location='server', scratch_folder=None, **kwargs): + machine=None, clobber=True, location='server', scratch_folder=None, on_error='continue', **kwargs): """ Base task class :param session_path: session path @@ -129,6 +130,18 @@ def __init__(self, session_path, parents=None, taskid=None, one=None, :param scratch_folder: optional: Path where to write intermediate temporary data :param args: running arguments """ + self.on_error = on_error + self.log = '' # placeholder to keep the log of the task for registration + self.cpu = kwargs.get('cpu', 1) + self.gpu = kwargs.get('gpu', 0) + self.io_charge = kwargs.get('io_charge', 5) + self.priority = kwargs.get('priority', 30) + self.ram = kwargs.get('ram', 4) + self.level = 0 # level in the pipeline hierarchy: level 0 means there is no parent task + self.outputs = [] # placeholder for a list of Path containing output files + self.time_elapsed_secs = None + self.time_out_secs = 3600 * 2 # time-out after which a task is considered dead + self.version = ibllib.__version__ self.taskid = taskid self.one = one self.session_path = session_path @@ -263,10 +276,12 @@ def run(self, **kwargs): self.outputs = outputs if not self.outputs else self.outputs # ensure None if no inputs registered else: self.outputs.extend(ensure_list(outputs)) # Add output files to list of inputs to register - except Exception: + except Exception as e: _logger.error(traceback.format_exc()) _logger.info(f'Job {self.__class__} errored') self.status = -1 + if self.on_error == 'raise': + raise e self.time_elapsed_secs = time.time() - start_time # log the outputs From fe13ebb044c22e2aab4a6db22a916a78bddd4810 Mon Sep 17 00:00:00 2001 From: owinter Date: Fri, 29 Nov 2024 12:03:44 +0000 Subject: [PATCH 3/3] typo --- ibllib/pipes/tasks.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/ibllib/pipes/tasks.py b/ibllib/pipes/tasks.py index bbb2d0abb..660c6502d 100644 --- a/ibllib/pipes/tasks.py +++ b/ibllib/pipes/tasks.py @@ -131,17 +131,6 @@ def __init__(self, session_path, parents=None, taskid=None, one=None, :param args: running arguments """ self.on_error = on_error - self.log = '' # placeholder to keep the log of the task for registration - self.cpu = kwargs.get('cpu', 1) - self.gpu = kwargs.get('gpu', 0) - self.io_charge = kwargs.get('io_charge', 5) - self.priority = kwargs.get('priority', 30) - self.ram = kwargs.get('ram', 4) - self.level = 0 # level in the pipeline hierarchy: level 0 means there is no parent task - self.outputs = [] # placeholder for a list of Path containing output files - self.time_elapsed_secs = None - self.time_out_secs = 3600 * 2 # time-out after which a task is considered dead - self.version = ibllib.__version__ self.taskid = taskid self.one = one self.session_path = session_path