From a0fc7d21a048eea81c64d484f52bef15b3ea3817 Mon Sep 17 00:00:00 2001 From: Konstantinos Panayiotou Date: Thu, 19 Dec 2024 20:52:56 +0200 Subject: [PATCH] WPublisher and WSubscriber Redis implementation --- commlib/transports/mqtt.py | 2 +- commlib/transports/redis.py | 93 +++++++++++++++++++++++++++++++++++-- 2 files changed, 91 insertions(+), 4 deletions(-) diff --git a/commlib/transports/mqtt.py b/commlib/transports/mqtt.py index a5e5faa..7181a8e 100644 --- a/commlib/transports/mqtt.py +++ b/commlib/transports/mqtt.py @@ -394,7 +394,7 @@ def publish(self, msg: PubSubMessage) -> None: elif isinstance(msg, dict): data = msg elif isinstance(msg, PubSubMessage): - data = msg.dict() + data = msg.model_dump() self._transport.publish(self._topic, data, qos=MQTTQoS.L0) self._msg_seq += 1 diff --git a/commlib/transports/redis.py b/commlib/transports/redis.py index 448b236..4f2a00e 100644 --- a/commlib/transports/redis.py +++ b/commlib/transports/redis.py @@ -1,7 +1,7 @@ import functools import logging import time -from typing import Any, Callable, Dict, Optional, Tuple +from typing import Any, Callable, Dict, Optional, Tuple, Union import redis @@ -340,7 +340,7 @@ def publish(self, msg: PubSubMessage) -> None: elif isinstance(msg, dict): data = msg elif isinstance(msg, PubSubMessage): - data = msg.dict() + data = msg.model_dump() self.log.debug(f"Publishing Message to topic <{self._topic}>") self._transport.publish(self._topic, data) self._msg_seq += 1 @@ -375,12 +375,35 @@ def publish(self, msg: PubSubMessage, topic: str) -> None: elif isinstance(msg, dict): data = msg elif isinstance(msg, PubSubMessage): - data = msg.dict() + data = msg.model_dump() self.log.debug(f"Publishing Message: <{topic}>:{data}") self._transport.publish(topic, data) self._msg_seq += 1 +class WPublisher: + """WPublisher. + MQTT Wrapped-Publisher + """ + def __init__(self, mpub: MPublisher, topic: str, + msg_type: Union[PubSubMessage, None] = None,): + """__init__. + + Args: + mpub (MPublisher): Multi-Topic Publisher + topic (str): topic + msg_type (PubSubMessage, optional): Message Type + """ + self._mpub = mpub + self._topic = topic + self._msg_type = msg_type + + def publish(self, msg: Union[PubSubMessage, None]) -> None: + if self._msg_type is not None and not isinstance(msg, self._msg_type): + raise ValueError(f'Argument "msg" must be of type {self._msg_type}') + self._mpub.publish(msg, self._topic) + + class Subscriber(BaseSubscriber): """Subscriber. Redis Subscriber @@ -439,6 +462,70 @@ def _unpack_comm_msg(self, msg: Dict[str, Any]) -> Tuple: return _data, _uri +class WSubscriber(BaseSubscriber): + + def __init__(self, *args, **kwargs): + """__init__. + + Args: + args: See BaseSubscriber + kwargs: See BaseSubscriber + """ + super().__init__(topic=None, *args, **kwargs) + self._transport = RedisTransport( + conn_params=self._conn_params, + serializer=self._serializer, + compression=self._compression, + ) + self._subs: Dict[str, callable] = {} + self._subscriber_threads = [] + + def subscribe(self, topic, callback: callable): + """subscribe. + + Args: + topic (str): topic + msg_type (PubSubMessage, optional): Message Type + """ + self._subs[topic] = callback + + def stop(self): + """Stop background thread that handle subscribed topic messages""" + for sub_thread in self._subscriber_threads: + if self._subscriber_thread is not None: + sub_thread.stop() + super().stop() + + def run_forever(self, interval: float = 0.001): + self._transport.start() + for topic, callback in self._subs.items(): + st = self._transport.subscribe(topic, functools.partial(self._on_message, callback)) + self._subscriber_threads.append(st) + while True: + if self._t_stop_event is not None: + if self._t_stop_event.is_set(): + self.log.debug("Stop event caught in thread") + break + time.sleep(interval) + + def _on_message(self, callback: callable, payload: Dict[str, Any]): + try: + data, uri = self._unpack_comm_msg(payload) + if callback is not None: + if self._msg_type is None: + _clb = functools.partial(callback, data) + else: + _clb = functools.partial(callback, self._msg_type(**data)) + _clb() + except Exception: + self.log.error("Exception caught in _on_message", exc_info=True) + + def _unpack_comm_msg(self, msg: Dict[str, Any]) -> Tuple: + _uri = msg["channel"] + _data = self._serializer.deserialize(msg["data"]) + return _data, _uri + + class PSubscriber(Subscriber): """PSubscriber. Redis Pattern-based Subscriber.