Skip to content

Commit

Permalink
Several improvements in Redis and MQTT connection handling
Browse files Browse the repository at this point in the history
  • Loading branch information
klpanagi committed Dec 19, 2024
1 parent 4f5fee3 commit 12457f4
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 31 deletions.
2 changes: 1 addition & 1 deletion commlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ def create_wpublisher(self, mpub, topic: str, msg_type: PubSubMessage = None):
"""

pub = self._transport_module.WPublisher(mpub, topic, msg_type)
self._publishers.append(pub)
# self._publishers.append(pub)
return pub

def create_subscriber(self, *args, **kwargs):
Expand Down
9 changes: 2 additions & 7 deletions commlib/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,8 @@ def logger(cls) -> logging.Logger:
pubsub_logger = logging.getLogger(__name__)
return pubsub_logger

def __init__(
self,
topic: str,
msg_type: Optional[PubSubMessage] = None,
on_message: Optional[Callable] = None,
*args,
**kwargs):
def __init__(self, topic: str, msg_type: Optional[PubSubMessage] = None,
on_message: Optional[Callable] = None, *args, **kwargs):
"""__init__.
Initializes a new instance of the `BaseSubscriber` class.
Expand Down
1 change: 1 addition & 0 deletions commlib/transports/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def run(self) -> None:
if not self._transport.is_connected and \
self._state not in (EndpointState.CONNECTED,
EndpointState.CONNECTING):
self._transport.start()
self._state = EndpointState.CONNECTED
else:
self.logger().debug(
Expand Down
16 changes: 6 additions & 10 deletions commlib/transports/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,9 @@ def connect(self) -> None:
# self._reconnect_attempts = 0
# raise ConnectionError()

def on_connect(
self,
client: Any,
userdata: Any,
flags: Dict[str, Any],
rc: int,
properties: Any = None,
):
def on_connect(self, client: Any, userdata: Any,
flags: Dict[str, Any], rc: int,
properties: Any = None):
"""on_connect.
Callback for on-connect event.
Expand Down Expand Up @@ -240,7 +235,8 @@ def on_disconnect(self, client: Any, userdata: Any, rc: int, unk: Any = None):
self._connected = False
self._client.loop_stop()
err_msg = ""
if rc == MQTTReturnCode.AUTHORIZATION_ERROR or rc == MQTTReturnCode.AUTHENTICATION_ERROR:
if rc == MQTTReturnCode.AUTHORIZATION_ERROR or \
rc == MQTTReturnCode.AUTHENTICATION_ERROR:
err_msg = "Authentication error with MQTT broker"
elif rc == MQTTReturnCode.CONNECTION_SUCCESS:
pass
Expand Down Expand Up @@ -821,7 +817,7 @@ def _prepare_request(self, data: Dict[str, Any]):
self._comm_obj.header.timestamp = gen_timestamp() # pylint: disable=E0237
self._comm_obj.header.reply_to = self._gen_queue_name()
self._comm_obj.data = data
return self._comm_obj.dict()
return self._comm_obj.model_dump()

def _on_response_wrapper(self, client: Any, userdata: Any, msg: Dict[str, Any]):
"""_on_response_wrapper.
Expand Down
23 changes: 22 additions & 1 deletion commlib/transports/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ConnectionParameters(BaseConnectionParameters):
db: int = 0
username: str = ""
password: str = ""
socket_timeout: float = 10


class RedisConnection(redis.Redis):
Expand Down Expand Up @@ -102,6 +103,9 @@ def connect(self) -> None:
password=self._conn_params.password,
db=self._conn_params.db,
decode_responses=True,
socket_timeout=self._conn_params.socket_timeout,
socket_keep_alive=True,
retry_on_timeout=True,
)
else:
self._redis = RedisConnection(
Expand All @@ -111,6 +115,8 @@ def connect(self) -> None:
password=self._conn_params.password,
db=self._conn_params.db,
decode_responses=False,
socket_timeout=self._conn_params.socket_timeout,
retry_on_timeout=True,
)

self._rsub = self._redis.pubsub()
Expand All @@ -119,6 +125,8 @@ def connect(self) -> None:
def start(self) -> None:
if not self.is_connected:
self.connect()
while self.is_connected is False:
time.sleep(0.01)

def stop(self) -> None:
if self.is_connected:
Expand Down Expand Up @@ -148,7 +156,13 @@ def publish(self, queue_name: str, data: Dict[str, Any]):
def subscribe(self, topic: str, callback: Callable):
_clb = functools.partial(self._on_msg_internal, callback)
self._sub = self._rsub.psubscribe(**{topic: _clb})
self._rsub.get_message()
t = self._rsub.run_in_thread(0.001, daemon=True)
return t

def msubscribe(self, topics: Dict[str, callable]):
for topic, callback in topics.items():
_clb = functools.partial(self._on_msg_internal, callback)
self._sub = self._rsub.psubscribe(**{topic: _clb})
t = self._rsub.run_in_thread(0.001, daemon=True)
return t

Expand Down Expand Up @@ -442,6 +456,8 @@ def run_forever(self, interval: float = 0.001):
if self._t_stop_event.is_set():
self.log.debug("Stop event caught in thread")
break
if self._transport._rsub is not None:
self._transport._rsub.ping()
time.sleep(interval)

def _on_message(self, payload: Dict[str, Any]):
Expand Down Expand Up @@ -506,6 +522,11 @@ def run_forever(self, interval: float = 0.001):
if self._t_stop_event.is_set():
self.log.debug("Stop event caught in thread")
break
# if self._transport._rsub is not None:
# try:
# self._transport._rsub.ping()
# except:
# pass
time.sleep(interval)

def _on_message(self, callback: callable, payload: Dict[str, Any]):
Expand Down
38 changes: 31 additions & 7 deletions examples/minimize_conns/wpublisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,19 @@
import sys
import time

from pydantic import Field

from commlib.msg import MessageHeader, PubSubMessage
from commlib.node import Node


class SonarMessage(PubSubMessage):
header: MessageHeader = Field(default_factory=lambda: MessageHeader())
range: float = -1
hfov: float = 30.6
vfov: float = 14.2


if __name__ == "__main__":
if len(sys.argv) < 2:
broker = "redis"
Expand All @@ -27,15 +38,28 @@

mpub = node.create_mpublisher()

mpub.run()

topicA = "topic.a"
topicB = "topic.b"
topicA = "sonar.left"
topicB = "sonar.right"
topicC = "sonar.front"

wpub_1 = node.create_wpublisher(mpub, topicA)
wpub_2 = node.create_wpublisher(mpub, topicB)
wpub_3 = node.create_wpublisher(mpub, topicC)

node.run()

sonar_left_range = 1
sonar_right_range = 1
sonar_front_range = 1

while True:
wpub_1.publish({"a": 1})
wpub_2.publish({"b": 1})
time.sleep(1)
sonar_left_msg = SonarMessage(range=sonar_left_range)
sonar_right_msg = SonarMessage(range=sonar_right_range)
sonar_front_mst = SonarMessage(range=sonar_front_range)
wpub_1.publish(sonar_left_msg)
wpub_2.publish(sonar_right_msg)
wpub_3.publish(sonar_front_mst)
time.sleep(0.5)
sonar_left_range += 1
sonar_right_range += 1
sonar_front_range += 1
16 changes: 11 additions & 5 deletions examples/minimize_conns/wsubscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@


def clb_1(msg):
print("Callback 1")
print(f"Sonar Left 1: {msg}")


def clb_2(msg):
print("Callback 2")
print(f"Sonar Right: {msg}")

def clb_3(msg):
print(f"Sonar Front: {msg}")


if __name__ == "__main__":
Expand All @@ -30,15 +33,18 @@ def clb_2(msg):
conn_params = ConnectionParameters()

node = Node(
node_name="example5_listener", connection_params=conn_params, debug=True
node_name="example5_listener", connection_params=conn_params,
debug=True, heartbeats=False
)

sub = node.create_wsubscriber()

topicA = "topic.a"
topicB = "topic.b"
topicA = "sonar.left"
topicB = "sonar.right"
topicC = "sonar.front"

sub.subscribe(topicA, clb_1)
sub.subscribe(topicB, clb_2)
sub.subscribe(topicC, clb_3)

node.run_forever()

0 comments on commit 12457f4

Please sign in to comment.