Skip to content

Commit

Permalink
Move TaskRejectError as the common exception for task launch
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 14, 2024
1 parent 22c184a commit 7d41561
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
3 changes: 3 additions & 0 deletions src/plumpy/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ class PersistenceError(Exception):

class ClosedError(Exception):
"""Raised when an mutable operation is attempted on a closed process"""

class TaskRejectedError(Exception):
""" A task was rejected by the coordinacor"""
18 changes: 6 additions & 12 deletions src/plumpy/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import kiwipy

from plumpy.exceptions import PersistenceError, TaskRejectedError

from . import loaders, persistence
from .utils import PID_TYPE

Expand Down Expand Up @@ -188,8 +190,6 @@ async def __call__(self, communicator: kiwipy.Communicator, task: Dict[str, Any]
Receive a task.
:param task: The task message
"""
from plumpy.rmq import communications

task_type = task[TASK_KEY]
if task_type == LAUNCH_TASK:
return await self._launch(communicator, **task.get(TASK_ARGS, {}))
Expand All @@ -198,7 +198,7 @@ async def __call__(self, communicator: kiwipy.Communicator, task: Dict[str, Any]
if task_type == CREATE_TASK:
return await self._create(communicator, **task.get(TASK_ARGS, {}))

raise communications.TaskRejected
raise TaskRejectedError

async def _launch(
self,
Expand All @@ -220,10 +220,8 @@ async def _launch(
:param init_kwargs: keyword arguments to the process constructor
:return: the pid of the created process or the outputs (if nowait=False)
"""
from plumpy.rmq import communications

if persist and not self._persister:
raise communications.TaskRejected('Cannot persist process, no persister')
raise PersistenceError('Cannot persist process, no persister')

if init_args is None:
init_args = ()
Expand Down Expand Up @@ -255,11 +253,9 @@ async def _continue(
:param nowait: if True don't wait for the process to complete
:param tag: the checkpoint tag to continue from
"""
from plumpy.rmq import communications

if not self._persister:
LOGGER.warning('rejecting task: cannot continue process<%d> because no persister is available', pid)
raise communications.TaskRejected('Cannot continue process, no persister')
raise PersistenceError('Cannot continue process, no persister')

# Do not catch exceptions here, because if these operations fail, the continue task should except and bubble up
saved_state = self._persister.load_checkpoint(pid, tag)
Expand Down Expand Up @@ -292,10 +288,8 @@ async def _create(
:param init_kwargs: keyword arguments to the process constructor
:return: the pid of the created process
"""
from plumpy.rmq import communications

if persist and not self._persister:
raise communications.TaskRejected('Cannot persist process, no persister')
raise PersistenceError('Cannot persist process, no persister')

if init_args is None:
init_args = ()
Expand Down

0 comments on commit 7d41561

Please sign in to comment.