diff --git a/src/plumpy/exceptions.py b/src/plumpy/exceptions.py index 6f0c75a4..1e6f3b26 100644 --- a/src/plumpy/exceptions.py +++ b/src/plumpy/exceptions.py @@ -38,3 +38,6 @@ class PersistenceError(Exception): class ClosedError(Exception): """Raised when an mutable operation is attempted on a closed process""" + +class TaskRejectedError(Exception): + """ A task was rejected by the coordinacor""" diff --git a/src/plumpy/message.py b/src/plumpy/message.py index b18d4123..024d008f 100644 --- a/src/plumpy/message.py +++ b/src/plumpy/message.py @@ -7,6 +7,8 @@ import kiwipy +from plumpy.exceptions import PersistenceError, TaskRejectedError + from . import loaders, persistence from .utils import PID_TYPE @@ -188,8 +190,6 @@ async def __call__(self, communicator: kiwipy.Communicator, task: Dict[str, Any] Receive a task. :param task: The task message """ - from plumpy.rmq import communications - task_type = task[TASK_KEY] if task_type == LAUNCH_TASK: return await self._launch(communicator, **task.get(TASK_ARGS, {})) @@ -198,7 +198,7 @@ async def __call__(self, communicator: kiwipy.Communicator, task: Dict[str, Any] if task_type == CREATE_TASK: return await self._create(communicator, **task.get(TASK_ARGS, {})) - raise communications.TaskRejected + raise TaskRejectedError async def _launch( self, @@ -220,10 +220,8 @@ async def _launch( :param init_kwargs: keyword arguments to the process constructor :return: the pid of the created process or the outputs (if nowait=False) """ - from plumpy.rmq import communications - if persist and not self._persister: - raise communications.TaskRejected('Cannot persist process, no persister') + raise PersistenceError('Cannot persist process, no persister') if init_args is None: init_args = () @@ -255,11 +253,9 @@ async def _continue( :param nowait: if True don't wait for the process to complete :param tag: the checkpoint tag to continue from """ - from plumpy.rmq import communications - if not self._persister: LOGGER.warning('rejecting task: cannot continue process<%d> because no persister is available', pid) - raise communications.TaskRejected('Cannot continue process, no persister') + raise PersistenceError('Cannot continue process, no persister') # Do not catch exceptions here, because if these operations fail, the continue task should except and bubble up saved_state = self._persister.load_checkpoint(pid, tag) @@ -292,10 +288,8 @@ async def _create( :param init_kwargs: keyword arguments to the process constructor :return: the pid of the created process """ - from plumpy.rmq import communications - if persist and not self._persister: - raise communications.TaskRejected('Cannot persist process, no persister') + raise PersistenceError('Cannot persist process, no persister') if init_args is None: init_args = ()