Skip to content

Commit

Permalink
Adds flag to Node and Endpoint run() methods to wait for connection
Browse files Browse the repository at this point in the history
  • Loading branch information
klpanagi committed Dec 21, 2024
1 parent 1b2ffe6 commit 7b4409c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 3 deletions.
6 changes: 5 additions & 1 deletion commlib/endpoints.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from enum import Enum
import time

from commlib.compression import CompressionType
from commlib.connection import BaseConnectionParameters
Expand Down Expand Up @@ -74,7 +75,7 @@ def log(self):
def debug(self):
return self._debug

def run(self):
def run(self, wait: bool = False) -> None:
"""
Starts the subscriber and connects to the transport if it is not already connected.
Expand All @@ -92,6 +93,9 @@ def run(self):
self._state not in (EndpointState.CONNECTED,
EndpointState.CONNECTING):
self._transport.start()
if wait:
while not self._transport.is_connected:
time.sleep(0.001)
self._state = EndpointState.CONNECTED
else:
self.log.warning("Transport already connected - Skipping")
Expand Down
7 changes: 5 additions & 2 deletions commlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def endpoints(self):

@property
def health(self):
return set([e._transport.is_connected for e in self.endpoints]) == {True}
return set([e.connected for e in self.endpoints]) == {True}

@property
def log(self) -> logging.Logger:
Expand Down Expand Up @@ -304,7 +304,7 @@ def create_start_service(self, uri: str = "") -> None:
on_request=self._start_rpc_callback,
)

def run(self) -> None:
def run(self, wait: bool = False) -> None:
"""run
Starts the node by running all its subscribers, publishers, RPC services,
RPC clients, action services, and action clients. If the node has control services,
Expand All @@ -320,6 +320,9 @@ def run(self) -> None:
e.run()
if self._heartbeats:
self._init_heartbeat_thread()
if wait:
while not self.health:
time.sleep(0.001)
self.state = NodeState.RUNNING

def run_forever(self, sleep_rate: float = 0.01) -> None:
Expand Down

0 comments on commit 7b4409c

Please sign in to comment.