From 00e25e3bb4a897a25668d482781f7010703a2884 Mon Sep 17 00:00:00 2001 From: Konstantinos Panayiotou Date: Thu, 19 Dec 2024 23:02:53 +0200 Subject: [PATCH] Improvements for redis multi-topic subscriptions --- commlib/transports/redis.py | 29 ++++++++++++++------------- examples/minimize_conns/wpublisher.py | 5 ++++- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/commlib/transports/redis.py b/commlib/transports/redis.py index 08ca3ea..c904bb1 100644 --- a/commlib/transports/redis.py +++ b/commlib/transports/redis.py @@ -131,6 +131,7 @@ def start(self) -> None: def stop(self) -> None: if self.is_connected: self._redis.connection_pool.disconnect() + self._rsub.close() self._redis.close() self._connected = False @@ -160,9 +161,11 @@ def subscribe(self, topic: str, callback: Callable): return t def msubscribe(self, topics: Dict[str, callable]): + _topics = {} for topic, callback in topics.items(): _clb = functools.partial(self._on_msg_internal, callback) - self._sub = self._rsub.psubscribe(**{topic: _clb}) + _topics[topic] = _clb + self._sub = self._rsub.psubscribe(**_topics) t = self._rsub.run_in_thread(0.001, daemon=True) return t @@ -206,7 +209,7 @@ def __init__(self, *args, **kwargs): def _send_response(self, data: Dict[str, Any], reply_to: str): self._comm_obj.header.timestamp = gen_timestamp() # pylint: disable=E0237 self._comm_obj.data = data - _resp = self._comm_obj.dict() + _resp = self._comm_obj.model_dump() self._transport.push_msg_to_queue(reply_to, _resp) def _on_request_handle(self, data: Dict[str, Any], header: Dict[str, Any]): @@ -226,7 +229,7 @@ def _on_request_internal(self, data: Dict[str, Any], header: Dict[str, Any]): else: resp = self.on_request(self._msg_type.Request(**data)) # RPCMessage.Response object here - resp = resp.dict() + resp = resp.model_dump() except RPCRequestError: self.log.error(str(exc), exc_info=False) return @@ -285,14 +288,14 @@ def _prepare_request(self, data: Dict[str, Any]): self._comm_obj.header.timestamp = gen_timestamp() # pylint: disable=E0237 self._comm_obj.header.reply_to = self._gen_queue_name() self._comm_obj.data = data - return self._comm_obj.dict() + return self._comm_obj.model_dump() def call(self, msg: RPCMessage.Request, timeout: float = 30) -> RPCMessage.Response: # TODO: Evaluate msg type passed here. if self._msg_type is None: data = msg else: - data = msg.dict() + data = msg.model_dump() _msg = self._prepare_request(data) _reply_to = _msg["header"]["reply_to"] @@ -495,6 +498,7 @@ def __init__(self, *args, **kwargs): ) self._subs: Dict[str, callable] = {} self._subscriber_threads = [] + self._subscriber_thread = None def subscribe(self, topic, callback: callable): """subscribe. @@ -508,25 +512,22 @@ def subscribe(self, topic, callback: callable): def stop(self): """Stop background thread that handle subscribed topic messages""" for sub_thread in self._subscriber_threads: - if self._subscriber_thread is not None: - sub_thread.stop() + sub_thread.stop() + if self._subscriber_thread is not None: + self._subscriber_thread.stop() super().stop() def run_forever(self, interval: float = 0.001): self._transport.start() + _topics = {} for topic, callback in self._subs.items(): - st = self._transport.subscribe(topic, functools.partial(self._on_message, callback)) - self._subscriber_threads.append(st) + _topics[topic] = functools.partial(self._on_message, callback) + self._subscriber_thread = self._transport.msubscribe(_topics) while True: if self._t_stop_event is not None: if self._t_stop_event.is_set(): self.log.debug("Stop event caught in thread") break - # if self._transport._rsub is not None: - # try: - # self._transport._rsub.ping() - # except: - # pass time.sleep(interval) def _on_message(self, callback: callable, payload: Dict[str, Any]): diff --git a/examples/minimize_conns/wpublisher.py b/examples/minimize_conns/wpublisher.py index 14efad0..12a491c 100755 --- a/examples/minimize_conns/wpublisher.py +++ b/examples/minimize_conns/wpublisher.py @@ -33,7 +33,10 @@ class SonarMessage(PubSubMessage): conn_params = ConnectionParameters() node = Node( - node_name="example5_publisher", connection_params=conn_params, debug=True + node_name="example5_publisher", + connection_params=conn_params, + debug=True, + heartbeats=False, ) mpub = node.create_mpublisher()