diff --git a/commlib/node.py b/commlib/node.py index fe55fbb..282e6c4 100644 --- a/commlib/node.py +++ b/commlib/node.py @@ -429,7 +429,7 @@ def create_wpublisher(self, mpub, topic: str, msg_type: PubSubMessage = None): """ pub = self._transport_module.WPublisher(mpub, topic, msg_type) - self._publishers.append(pub) + # self._publishers.append(pub) return pub def create_subscriber(self, *args, **kwargs): diff --git a/commlib/pubsub.py b/commlib/pubsub.py index c0d770a..8c1470e 100644 --- a/commlib/pubsub.py +++ b/commlib/pubsub.py @@ -56,13 +56,8 @@ def logger(cls) -> logging.Logger: pubsub_logger = logging.getLogger(__name__) return pubsub_logger - def __init__( - self, - topic: str, - msg_type: Optional[PubSubMessage] = None, - on_message: Optional[Callable] = None, - *args, - **kwargs): + def __init__(self, topic: str, msg_type: Optional[PubSubMessage] = None, + on_message: Optional[Callable] = None, *args, **kwargs): """__init__. Initializes a new instance of the `BaseSubscriber` class. diff --git a/commlib/transports/mock.py b/commlib/transports/mock.py index a2d60ef..ff49afe 100644 --- a/commlib/transports/mock.py +++ b/commlib/transports/mock.py @@ -70,6 +70,7 @@ def run(self) -> None: if not self._transport.is_connected and \ self._state not in (EndpointState.CONNECTED, EndpointState.CONNECTING): + self._transport.start() self._state = EndpointState.CONNECTED else: self.logger().debug( diff --git a/commlib/transports/mqtt.py b/commlib/transports/mqtt.py index 7181a8e..dd74898 100644 --- a/commlib/transports/mqtt.py +++ b/commlib/transports/mqtt.py @@ -195,14 +195,9 @@ def connect(self) -> None: # self._reconnect_attempts = 0 # raise ConnectionError() - def on_connect( - self, - client: Any, - userdata: Any, - flags: Dict[str, Any], - rc: int, - properties: Any = None, - ): + def on_connect(self, client: Any, userdata: Any, + flags: Dict[str, Any], rc: int, + properties: Any = None): """on_connect. Callback for on-connect event. @@ -240,7 +235,8 @@ def on_disconnect(self, client: Any, userdata: Any, rc: int, unk: Any = None): self._connected = False self._client.loop_stop() err_msg = "" - if rc == MQTTReturnCode.AUTHORIZATION_ERROR or rc == MQTTReturnCode.AUTHENTICATION_ERROR: + if rc == MQTTReturnCode.AUTHORIZATION_ERROR or \ + rc == MQTTReturnCode.AUTHENTICATION_ERROR: err_msg = "Authentication error with MQTT broker" elif rc == MQTTReturnCode.CONNECTION_SUCCESS: pass @@ -821,7 +817,7 @@ def _prepare_request(self, data: Dict[str, Any]): self._comm_obj.header.timestamp = gen_timestamp() # pylint: disable=E0237 self._comm_obj.header.reply_to = self._gen_queue_name() self._comm_obj.data = data - return self._comm_obj.dict() + return self._comm_obj.model_dump() def _on_response_wrapper(self, client: Any, userdata: Any, msg: Dict[str, Any]): """_on_response_wrapper. diff --git a/commlib/transports/redis.py b/commlib/transports/redis.py index 4f2a00e..08ca3ea 100644 --- a/commlib/transports/redis.py +++ b/commlib/transports/redis.py @@ -44,6 +44,7 @@ class ConnectionParameters(BaseConnectionParameters): db: int = 0 username: str = "" password: str = "" + socket_timeout: float = 10 class RedisConnection(redis.Redis): @@ -102,6 +103,9 @@ def connect(self) -> None: password=self._conn_params.password, db=self._conn_params.db, decode_responses=True, + socket_timeout=self._conn_params.socket_timeout, + socket_keep_alive=True, + retry_on_timeout=True, ) else: self._redis = RedisConnection( @@ -111,6 +115,8 @@ def connect(self) -> None: password=self._conn_params.password, db=self._conn_params.db, decode_responses=False, + socket_timeout=self._conn_params.socket_timeout, + retry_on_timeout=True, ) self._rsub = self._redis.pubsub() @@ -119,6 +125,8 @@ def connect(self) -> None: def start(self) -> None: if not self.is_connected: self.connect() + while self.is_connected is False: + time.sleep(0.01) def stop(self) -> None: if self.is_connected: @@ -148,7 +156,13 @@ def publish(self, queue_name: str, data: Dict[str, Any]): def subscribe(self, topic: str, callback: Callable): _clb = functools.partial(self._on_msg_internal, callback) self._sub = self._rsub.psubscribe(**{topic: _clb}) - self._rsub.get_message() + t = self._rsub.run_in_thread(0.001, daemon=True) + return t + + def msubscribe(self, topics: Dict[str, callable]): + for topic, callback in topics.items(): + _clb = functools.partial(self._on_msg_internal, callback) + self._sub = self._rsub.psubscribe(**{topic: _clb}) t = self._rsub.run_in_thread(0.001, daemon=True) return t @@ -442,6 +456,8 @@ def run_forever(self, interval: float = 0.001): if self._t_stop_event.is_set(): self.log.debug("Stop event caught in thread") break + if self._transport._rsub is not None: + self._transport._rsub.ping() time.sleep(interval) def _on_message(self, payload: Dict[str, Any]): @@ -506,6 +522,11 @@ def run_forever(self, interval: float = 0.001): if self._t_stop_event.is_set(): self.log.debug("Stop event caught in thread") break + # if self._transport._rsub is not None: + # try: + # self._transport._rsub.ping() + # except: + # pass time.sleep(interval) def _on_message(self, callback: callable, payload: Dict[str, Any]): diff --git a/examples/minimize_conns/wpublisher.py b/examples/minimize_conns/wpublisher.py index e24f246..14efad0 100755 --- a/examples/minimize_conns/wpublisher.py +++ b/examples/minimize_conns/wpublisher.py @@ -3,8 +3,19 @@ import sys import time +from pydantic import Field + +from commlib.msg import MessageHeader, PubSubMessage from commlib.node import Node + +class SonarMessage(PubSubMessage): + header: MessageHeader = Field(default_factory=lambda: MessageHeader()) + range: float = -1 + hfov: float = 30.6 + vfov: float = 14.2 + + if __name__ == "__main__": if len(sys.argv) < 2: broker = "redis" @@ -27,15 +38,28 @@ mpub = node.create_mpublisher() - mpub.run() - - topicA = "topic.a" - topicB = "topic.b" + topicA = "sonar.left" + topicB = "sonar.right" + topicC = "sonar.front" wpub_1 = node.create_wpublisher(mpub, topicA) wpub_2 = node.create_wpublisher(mpub, topicB) + wpub_3 = node.create_wpublisher(mpub, topicC) + + node.run() + + sonar_left_range = 1 + sonar_right_range = 1 + sonar_front_range = 1 while True: - wpub_1.publish({"a": 1}) - wpub_2.publish({"b": 1}) - time.sleep(1) + sonar_left_msg = SonarMessage(range=sonar_left_range) + sonar_right_msg = SonarMessage(range=sonar_right_range) + sonar_front_mst = SonarMessage(range=sonar_front_range) + wpub_1.publish(sonar_left_msg) + wpub_2.publish(sonar_right_msg) + wpub_3.publish(sonar_front_mst) + time.sleep(0.5) + sonar_left_range += 1 + sonar_right_range += 1 + sonar_front_range += 1 diff --git a/examples/minimize_conns/wsubscriber.py b/examples/minimize_conns/wsubscriber.py index 8237bdc..d703d2a 100755 --- a/examples/minimize_conns/wsubscriber.py +++ b/examples/minimize_conns/wsubscriber.py @@ -6,11 +6,14 @@ def clb_1(msg): - print("Callback 1") + print(f"Sonar Left 1: {msg}") def clb_2(msg): - print("Callback 2") + print(f"Sonar Right: {msg}") + +def clb_3(msg): + print(f"Sonar Front: {msg}") if __name__ == "__main__": @@ -30,15 +33,18 @@ def clb_2(msg): conn_params = ConnectionParameters() node = Node( - node_name="example5_listener", connection_params=conn_params, debug=True + node_name="example5_listener", connection_params=conn_params, + debug=True, heartbeats=False ) sub = node.create_wsubscriber() - topicA = "topic.a" - topicB = "topic.b" + topicA = "sonar.left" + topicB = "sonar.right" + topicC = "sonar.front" sub.subscribe(topicA, clb_1) sub.subscribe(topicB, clb_2) + sub.subscribe(topicC, clb_3) node.run_forever()