diff --git a/src/plumpy/exceptions.py b/src/plumpy/exceptions.py index 70b5aa2d..299bab29 100644 --- a/src/plumpy/exceptions.py +++ b/src/plumpy/exceptions.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- from typing import Optional +from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed + __all__ = ['ClosedError', 'InvalidStateError', 'KilledError', 'PersistenceError', 'UnsuccessfulResult'] @@ -9,8 +11,7 @@ class KilledError(Exception): class InvalidStateError(Exception): - """ - Raised when an operation is attempted that requires the process to be in a state + """Raised when an operation is attempted that requires the process to be in a state that is different from the current state """ @@ -33,3 +34,7 @@ class PersistenceError(Exception): class ClosedError(Exception): """Raised when an mutable operation is attempted on a closed process""" + +# Alias aio_pika +CommunicatorConnectionClosed = ConnectionClosed +CommunicatorChannelInvalidStateError = ChannelInvalidStateError diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index ffddf7b5..fe715857 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -37,7 +37,6 @@ import kiwipy import yaml -from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed from . import events, exceptions, futures, persistence, ports, process_comms, process_states, utils from .base import state_machine @@ -697,6 +696,8 @@ def on_entering(self, state: process_states.State) -> None: call_with_super_check(self.on_except, state.get_exc_info()) # type: ignore def on_entered(self, from_state: Optional[process_states.State]) -> None: + from plumpy.exceptions import CommunicatorChannelInvalidStateError, CommunicatorConnectionClosed + # Map these onto direct functions that the subclass can implement state_label = self._state.LABEL if state_label == process_states.ProcessState.RUNNING: @@ -716,7 +717,7 @@ def on_entered(self, from_state: Optional[process_states.State]) -> None: self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject) try: self._communicator.broadcast_send(body=None, sender=self.pid, subject=subject) - except (ConnectionClosed, ChannelInvalidStateError): + except (CommunicatorConnectionClosed, CommunicatorChannelInvalidStateError): message = 'Process<%s>: no connection available to broadcast state change from %s to %s' self.logger.warning(message, self.pid, from_label, self.state.value) except kiwipy.TimeoutError: