Skip to content

Commit

Permalink
Defer import of aio_pika
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 11, 2024
1 parent 4611154 commit f7427e0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
20 changes: 17 additions & 3 deletions src/plumpy/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
# -*- coding: utf-8 -*-
from typing import Optional

__all__ = ['ClosedError', 'InvalidStateError', 'KilledError', 'PersistenceError', 'UnsuccessfulResult']
from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed

__all__ = [
'ClosedError',
'CommunicatorChannelInvalidStateError',
'CommunicatorConnectionClosed',
'InvalidStateError',
'KilledError',
'PersistenceError',
'UnsuccessfulResult',
]


class KilledError(Exception):
"""The process was killed."""


class InvalidStateError(Exception):
"""
Raised when an operation is attempted that requires the process to be in a state
"""Raised when an operation is attempted that requires the process to be in a state
that is different from the current state
"""

Expand All @@ -33,3 +42,8 @@ class PersistenceError(Exception):

class ClosedError(Exception):
"""Raised when an mutable operation is attempted on a closed process"""


# Alias aio_pika
CommunicatorConnectionClosed = ConnectionClosed
CommunicatorChannelInvalidStateError = ChannelInvalidStateError
5 changes: 3 additions & 2 deletions src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import kiwipy
import yaml
from aio_pika.exceptions import ChannelInvalidStateError, ConnectionClosed

from . import events, exceptions, futures, persistence, ports, process_comms, process_states, utils
from .base import state_machine
Expand Down Expand Up @@ -697,6 +696,8 @@ def on_entering(self, state: process_states.State) -> None:
call_with_super_check(self.on_except, state.get_exc_info()) # type: ignore

def on_entered(self, from_state: Optional[process_states.State]) -> None:
from plumpy.exceptions import CommunicatorChannelInvalidStateError, CommunicatorConnectionClosed

# Map these onto direct functions that the subclass can implement
state_label = self._state.LABEL
if state_label == process_states.ProcessState.RUNNING:
Expand All @@ -716,7 +717,7 @@ def on_entered(self, from_state: Optional[process_states.State]) -> None:
self.logger.info('Process<%s>: Broadcasting state change: %s', self.pid, subject)
try:
self._communicator.broadcast_send(body=None, sender=self.pid, subject=subject)
except (ConnectionClosed, ChannelInvalidStateError):
except (CommunicatorConnectionClosed, CommunicatorChannelInvalidStateError):
message = 'Process<%s>: no connection available to broadcast state change from %s to %s'
self.logger.warning(message, self.pid, from_label, self.state.value)
except kiwipy.TimeoutError:
Expand Down

0 comments on commit f7427e0

Please sign in to comment.