From f7427e0eaa295accae92a51b639ee6ffef09ae9b Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Wed, 11 Dec 2024 12:11:33 +0100 Subject: [PATCH] Defer import of aio_pika --- src/plumpy/exceptions.py | 20 +++++++++++++++++--- src/plumpy/processes.py | 5 +++-- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/plumpy/exceptions.py b/src/plumpy/exceptions.py index 70b5aa2d..68113cc5 100644 --- a/src/plumpy/exceptions.py +++ b/src/plumpy/exceptions.py @@ -1,7 +1,17 @@ # -*- coding: utf-8 -*- from typing import Optional -__all__ = ['ClosedError', 'InvalidStateError', 'KilledError', 'PersistenceError', 'UnsuccessfulResult'] +from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed + +__all__ = [ + 'ClosedError', + 'CommunicatorChannelInvalidStateError', + 'CommunicatorConnectionClosed', + 'InvalidStateError', + 'KilledError', + 'PersistenceError', + 'UnsuccessfulResult', +] class KilledError(Exception): @@ -9,8 +19,7 @@ class KilledError(Exception): class InvalidStateError(Exception): - """ - Raised when an operation is attempted that requires the process to be in a state + """Raised when an operation is attempted that requires the process to be in a state that is different from the current state """ @@ -33,3 +42,8 @@ class PersistenceError(Exception): class ClosedError(Exception): """Raised when an mutable operation is attempted on a closed process""" + + +# Alias aio_pika +CommunicatorConnectionClosed = ConnectionClosed +CommunicatorChannelInvalidStateError = ChannelInvalidStateError diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index ffddf7b5..fe715857 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -37,7 +37,6 @@ import kiwipy import yaml -from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed from . import events, exceptions, futures, persistence, ports, process_comms, process_states, utils from .base import state_machine @@ -697,6 +696,8 @@ def on_entering(self, state: process_states.State) -> None: call_with_super_check(self.on_except, state.get_exc_info()) # type: ignore def on_entered(self, from_state: Optional[process_states.State]) -> None: + from plumpy.exceptions import CommunicatorChannelInvalidStateError, CommunicatorConnectionClosed + # Map these onto direct functions that the subclass can implement state_label = self._state.LABEL if state_label == process_states.ProcessState.RUNNING: @@ -716,7 +717,7 @@ def on_entered(self, from_state: Optional[process_states.State]) -> None: self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject) try: self._communicator.broadcast_send(body=None, sender=self.pid, subject=subject) - except (ConnectionClosed, ChannelInvalidStateError): + except (CommunicatorConnectionClosed, CommunicatorChannelInvalidStateError): message = 'Process<%s>: no connection available to broadcast state change from %s to %s' self.logger.warning(message, self.pid, from_label, self.state.value) except kiwipy.TimeoutError: