Skip to content

Commit

Permalink
Improvements for redis multi-topic subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
klpanagi committed Dec 19, 2024
1 parent 12457f4 commit 00e25e3
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
29 changes: 15 additions & 14 deletions commlib/transports/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ def start(self) -> None:
def stop(self) -> None:
if self.is_connected:
self._redis.connection_pool.disconnect()
self._rsub.close()
self._redis.close()
self._connected = False

Expand Down Expand Up @@ -160,9 +161,11 @@ def subscribe(self, topic: str, callback: Callable):
return t

def msubscribe(self, topics: Dict[str, callable]):
_topics = {}
for topic, callback in topics.items():
_clb = functools.partial(self._on_msg_internal, callback)
self._sub = self._rsub.psubscribe(**{topic: _clb})
_topics[topic] = _clb
self._sub = self._rsub.psubscribe(**_topics)
t = self._rsub.run_in_thread(0.001, daemon=True)
return t

Expand Down Expand Up @@ -206,7 +209,7 @@ def __init__(self, *args, **kwargs):
def _send_response(self, data: Dict[str, Any], reply_to: str):
self._comm_obj.header.timestamp = gen_timestamp() # pylint: disable=E0237
self._comm_obj.data = data
_resp = self._comm_obj.dict()
_resp = self._comm_obj.model_dump()
self._transport.push_msg_to_queue(reply_to, _resp)

def _on_request_handle(self, data: Dict[str, Any], header: Dict[str, Any]):
Expand All @@ -226,7 +229,7 @@ def _on_request_internal(self, data: Dict[str, Any], header: Dict[str, Any]):
else:
resp = self.on_request(self._msg_type.Request(**data))
# RPCMessage.Response object here
resp = resp.dict()
resp = resp.model_dump()
except RPCRequestError:
self.log.error(str(exc), exc_info=False)
return
Expand Down Expand Up @@ -285,14 +288,14 @@ def _prepare_request(self, data: Dict[str, Any]):
self._comm_obj.header.timestamp = gen_timestamp() # pylint: disable=E0237
self._comm_obj.header.reply_to = self._gen_queue_name()
self._comm_obj.data = data
return self._comm_obj.dict()
return self._comm_obj.model_dump()

def call(self, msg: RPCMessage.Request, timeout: float = 30) -> RPCMessage.Response:
# TODO: Evaluate msg type passed here.
if self._msg_type is None:
data = msg
else:
data = msg.dict()
data = msg.model_dump()

_msg = self._prepare_request(data)
_reply_to = _msg["header"]["reply_to"]
Expand Down Expand Up @@ -495,6 +498,7 @@ def __init__(self, *args, **kwargs):
)
self._subs: Dict[str, callable] = {}
self._subscriber_threads = []
self._subscriber_thread = None

def subscribe(self, topic, callback: callable):
"""subscribe.
Expand All @@ -508,25 +512,22 @@ def subscribe(self, topic, callback: callable):
def stop(self):
"""Stop background thread that handle subscribed topic messages"""
for sub_thread in self._subscriber_threads:
if self._subscriber_thread is not None:
sub_thread.stop()
sub_thread.stop()
if self._subscriber_thread is not None:
self._subscriber_thread.stop()
super().stop()

def run_forever(self, interval: float = 0.001):
self._transport.start()
_topics = {}
for topic, callback in self._subs.items():
st = self._transport.subscribe(topic, functools.partial(self._on_message, callback))
self._subscriber_threads.append(st)
_topics[topic] = functools.partial(self._on_message, callback)
self._subscriber_thread = self._transport.msubscribe(_topics)
while True:
if self._t_stop_event is not None:
if self._t_stop_event.is_set():
self.log.debug("Stop event caught in thread")
break
# if self._transport._rsub is not None:
# try:
# self._transport._rsub.ping()
# except:
# pass
time.sleep(interval)

def _on_message(self, callback: callable, payload: Dict[str, Any]):
Expand Down
5 changes: 4 additions & 1 deletion examples/minimize_conns/wpublisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ class SonarMessage(PubSubMessage):
conn_params = ConnectionParameters()

node = Node(
node_name="example5_publisher", connection_params=conn_params, debug=True
node_name="example5_publisher",
connection_params=conn_params,
debug=True,
heartbeats=False,
)

mpub = node.create_mpublisher()
Expand Down

0 comments on commit 00e25e3

Please sign in to comment.