Skip to content

Commit

Permalink
Fixes redis transport
Browse files Browse the repository at this point in the history
  • Loading branch information
klpanagi committed Dec 19, 2024
1 parent b3efaf2 commit 9939d71
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 7 deletions.
4 changes: 2 additions & 2 deletions commlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ def run_forever(self, sleep_rate: float = 0.01) -> None:
try:
while self.state != NodeState.EXITED:
time.sleep(sleep_rate)
except:
pass
except Exception as e:
self.log.error(f"Exception occurred during run_forever: {str(e)}")
self.stop()

def stop(self):
Expand Down
3 changes: 2 additions & 1 deletion commlib/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,14 @@ def run(self) -> None:
if not self._transport.is_connected and \
self._state not in (EndpointState.CONNECTED,
EndpointState.CONNECTING):
self._transport.start()
self._main_thread = threading.Thread(target=self.run_forever)
self._main_thread.daemon = True
self._t_stop_event = threading.Event()
self._main_thread.start()
self._state = EndpointState.CONNECTED
else:
self.logger().debug(
self.logger().error(
f"Transport already connected - cannot run {self.__class__.__name__}")

def stop(self):
Expand Down
4 changes: 2 additions & 2 deletions commlib/transports/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(
super().__init__(*args, **kwargs)
self._serializer = serializer
self._compression = compression
self.connect()
self._connected = False

@property
def is_connected(self) -> bool:
Expand Down Expand Up @@ -236,7 +236,7 @@ def run_forever(self):

def _detach_request_handler(self, payload: str):
data, header = self._unpack_comm_msg(payload)
self.log.info(f"RPC Request <{self._rpc_name}>")
# self.log.info(f"RPC Request <{self._rpc_name}>")
self._on_request_handle(data, header)

def _unpack_comm_msg(self, payload: str) -> Tuple:
Expand Down
1 change: 1 addition & 0 deletions examples/simple_rpc/simple_rpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Response(RPCMessage.Response):
node = Node(
node_name="myclient",
connection_params=conn_params,
heartbeats=False,
# heartbeat_uri='nodes.add_two_ints.heartbeat',
debug=True,
)
Expand Down
4 changes: 3 additions & 1 deletion examples/simple_rpc/simple_rpc_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python

import sys
import time

from commlib.msg import RPCMessage
from commlib.node import Node
Expand Down Expand Up @@ -40,6 +41,7 @@ def add_two_int_handler(msg):
node = Node(
node_name="add_two_ints_node",
connection_params=conn_params,
heartbeats=False,
# heartbeat_uri='nodes.add_two_ints.heartbeat',
debug=True,
)
Expand All @@ -50,4 +52,4 @@ def add_two_int_handler(msg):
on_request=add_two_int_handler,
)

node.run_forever(sleep_rate=1)
node.run_forever()
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ wheel>=0.38.0
pydantic>=2.0.0
ujson>=5.7.0
rich
redis[hiredis]
redis[hiredis]==5.0.1
paho-mqtt<2.0.0
pika>=1.3.1
requests>=2.1.0

0 comments on commit 9939d71

Please sign in to comment.