From d2b365a68aee93e77f2a1713693e3e65dac2c4cf Mon Sep 17 00:00:00 2001 From: Konstantinos Panayiotou Date: Thu, 19 Dec 2024 18:04:40 +0200 Subject: [PATCH] fixes poutanaola --- commlib/endpoints.py | 8 ++++++-- commlib/pubsub.py | 1 - commlib/rpc.py | 3 +-- commlib/transports/amqp.py | 6 ++++-- commlib/transports/mqtt.py | 13 +++++++++++-- commlib/transports/redis.py | 21 +++++++++++---------- examples/simple_pubsub/subscriber.py | 2 +- 7 files changed, 34 insertions(+), 20 deletions(-) diff --git a/commlib/endpoints.py b/commlib/endpoints.py index 49285ab..03e7724 100644 --- a/commlib/endpoints.py +++ b/commlib/endpoints.py @@ -61,6 +61,11 @@ def __init__( self._conn_params = conn_params self._state = EndpointState.DISCONNECTED + @property + def connected(self): + if self._transport.is_connected is None: return False + return self._transport.is_connected + @property def log(self): return self.logger() @@ -89,8 +94,7 @@ def run(self): self._transport.start() self._state = EndpointState.CONNECTED else: - self.logger().error( - f"Transport already connected - cannot run {self.__class__.__name__}") + self.log.error("Transport already connected - Skipping") def stop(self) -> None: """ diff --git a/commlib/pubsub.py b/commlib/pubsub.py index ce00e30..73692f3 100644 --- a/commlib/pubsub.py +++ b/commlib/pubsub.py @@ -127,7 +127,6 @@ def run(self) -> None: if not self._transport.is_connected and \ self._state not in (EndpointState.CONNECTED, EndpointState.CONNECTING): - self._transport.start() self._main_thread = threading.Thread(target=self.run_forever) self._main_thread.daemon = True self._t_stop_event = threading.Event() diff --git a/commlib/rpc.py b/commlib/rpc.py index 3298e7a..0b69a76 100644 --- a/commlib/rpc.py +++ b/commlib/rpc.py @@ -209,14 +209,13 @@ def run(self) -> None: if not self._transport.is_connected and \ self._state not in (EndpointState.CONNECTED, EndpointState.CONNECTING): - self._transport.start() self._main_thread = threading.Thread(target=self.run_forever) self._main_thread.daemon = True self._t_stop_event = threading.Event() self._main_thread.start() self._state = EndpointState.CONNECTED else: - self.logger().error( + self.log.error( f"Transport already connected - cannot run {self.__class__.__name__}") def stop(self): diff --git a/commlib/transports/amqp.py b/commlib/transports/amqp.py index e436437..4c3b37a 100644 --- a/commlib/transports/amqp.py +++ b/commlib/transports/amqp.py @@ -471,7 +471,7 @@ def __init__( def run_forever(self, raise_if_exists: bool = False): """Run RPC Service in normal mode. Blocking operation.""" - status = self._transport.connect() + status = self._transport.start() if not status: raise ConnectionError("Failed to connect to AMQP broker") @@ -670,6 +670,7 @@ def delay(self) -> float: def run(self): self._transport.detach_amqp_events_thread() + super().run() def gen_corr_id(self) -> str: """Generate correlationID.""" @@ -802,6 +803,7 @@ def __init__( def run(self) -> None: self._transport.detach_amqp_events_thread() + super().run() def publish(self, msg: PubSubMessage) -> None: """Publish message once. @@ -924,7 +926,7 @@ def hz(self) -> float: def run_forever(self) -> None: """Start Subscriber. Blocking method.""" - self._transport.connect() + self._transport.start() _exch_ex = self._transport.exchange_exists(self._topic_exchange) if _exch_ex.method.NAME != "Exchange.DeclareOk": self._transport.create_exchange(self._topic_exchange, ExchangeType.Topic) diff --git a/commlib/transports/mqtt.py b/commlib/transports/mqtt.py index afbb471..2d4167f 100644 --- a/commlib/transports/mqtt.py +++ b/commlib/transports/mqtt.py @@ -453,9 +453,16 @@ def run(self): self.log.debug(f"Started Subscriber: <{self._topic}>") def run_forever(self): + self._transport.start() self._transport.subscribe(self._topic, self._on_message) - self.log.debug(f"Started Subscriber: <{self._topic}>") - self._transport.loop_forever() + while True: + if self._t_stop_event is not None: + if self._t_stop_event.is_set(): + self.log.debug("Stop event caught in thread") + break + time.sleep(0.001) + self._transport.stop() + def _on_message(self, client: Any, userdata: Any, msg: Dict[str, Any]): """_on_message. @@ -571,6 +578,7 @@ def _unpack_comm_msg(self, msg: Any) -> Tuple[CommRPCMessage, str]: def run_forever(self): """run_forever.""" + self._transport.start() self._transport.subscribe( self._rpc_name, self._on_request_handle, qos=MQTTQoS.L1 ) @@ -690,6 +698,7 @@ def _register_endpoints(self): self._register_endpoint(uri, callback, msg_type) def run_forever(self): + self._transport.start() self._register_endpoints() while True: if self._t_stop_event is not None: diff --git a/commlib/transports/redis.py b/commlib/transports/redis.py index 20b10f7..448b236 100644 --- a/commlib/transports/redis.py +++ b/commlib/transports/redis.py @@ -222,6 +222,7 @@ def _on_request_internal(self, data: Dict[str, Any], header: Dict[str, Any]): self._send_response(resp, _req_msg.header.reply_to) def run_forever(self): + self._transport.start() if self._transport.queue_exists(self._rpc_name): self._transport.delete_queue(self._rpc_name) while True: @@ -402,23 +403,23 @@ def __init__(self, queue_size: Optional[int] = 1, *args, **kwargs): compression=self._compression, ) - def run(self): - super().run() - self._subscriber_thread = self._transport.subscribe( - self._topic, self._on_message - ) - self.log.debug(f"Started Subscriber: <{self._topic}>") - def stop(self): """Stop background thread that handle subscribed topic messages""" if self._subscriber_thread is not None: self._subscriber_thread.stop() super().stop() - def run_forever(self): - self.run() + def run_forever(self, interval: float = 0.001): + self._transport.start() + self._subscriber_thread = self._transport.subscribe( + self._topic, self._on_message + ) while True: - time.sleep(0.001) + if self._t_stop_event is not None: + if self._t_stop_event.is_set(): + self.log.debug("Stop event caught in thread") + break + time.sleep(interval) def _on_message(self, payload: Dict[str, Any]): try: diff --git a/examples/simple_pubsub/subscriber.py b/examples/simple_pubsub/subscriber.py index 0af6228..4295773 100755 --- a/examples/simple_pubsub/subscriber.py +++ b/examples/simple_pubsub/subscriber.py @@ -37,7 +37,7 @@ def on_message(msg): conn_params = ConnectionParameters() node = Node( - node_name="sensors.sonar.front", + node_name="obstacle_avoidance", connection_params=conn_params, # heartbeat_uri='nodes.add_two_ints.heartbeat', debug=True,