Skip to content

Commit

Permalink
Add an optional timeout_sec input to Client.call() to fix issue #1181 (
Browse files Browse the repository at this point in the history
…#1188)

Signed-off-by: KKSTB <[email protected]>
  • Loading branch information
KKSTB authored Nov 17, 2023
1 parent f909278 commit e97c41b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 4 deletions.
15 changes: 11 additions & 4 deletions rclpy/rclpy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,19 @@ def __init__(

self._lock = threading.Lock()

def call(self, request: SrvTypeRequest) -> SrvTypeResponse:
def call(
self,
request: SrvTypeRequest,
timeout_sec: Optional[float] = None
) -> Optional[SrvTypeResponse]:
"""
Make a service request and wait for the result.
.. warning:: Do not call this method in a callback or a deadlock may occur.
.. warning:: Do not call this method in a callback, or a deadlock or timeout may occur.
:param request: The service request.
:return: The service response.
:param timeout_sec: Seconds to wait. If ``None``, then wait forever.
:return: The service response, or None if timed out.
:raises: TypeError if the type of the passed request isn't an instance
of the Request type of the provided service when the client was
constructed.
Expand All @@ -97,7 +102,9 @@ def unblock(future):
# The callback might have been added after the future is completed,
# resulting in the event never being set.
if not future.done():
event.wait()
if not event.wait(timeout_sec):
# Timed out. remove_pending_request() to free resources
self.remove_pending_request(future)
if future.exception() is not None:
raise future.exception()
return future.result()
Expand Down
59 changes: 59 additions & 0 deletions rclpy/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
# limitations under the License.

import platform
import threading
import time
import traceback
import unittest

from rcl_interfaces.srv import GetParameters
import rclpy
import rclpy.executors
import rclpy.node
from rclpy.utilities import get_rmw_implementation_identifier
from test_msgs.srv import Empty

Expand Down Expand Up @@ -56,6 +59,21 @@ def do_test_service_name(cls, test_service_name_list):
client.destroy()
node.destroy_node()

@staticmethod
def _spin_rclpy_node(
rclpy_node: rclpy.node.Node,
rclpy_executor: rclpy.executors.SingleThreadedExecutor
) -> None:
try:
rclpy_executor.spin()
except rclpy.executors.ExternalShutdownException:
pass
except Exception as err:
traceback.print_exc()
print(rclpy_node.get_name() + ': ' + str(err))
print(rclpy_node.get_name() + ': rclpy_node exit')
# rclpy_node.destroy_node()

def test_wait_for_service_5sec(self):
cli = self.node.create_client(GetParameters, 'get/parameters')
try:
Expand Down Expand Up @@ -183,6 +201,47 @@ def test_get_service_name_after_remapping(self):
]
TestClient.do_test_service_name(test_service_name_list)

def test_sync_call(self):
def _service(request, response):
return response
cli = self.node.create_client(GetParameters, 'get/parameters')
srv = self.node.create_service(GetParameters, 'get/parameters', _service)
try:
self.assertTrue(cli.wait_for_service(timeout_sec=20))
executor = rclpy.executors.SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)
executor_thread = threading.Thread(
target=TestClient._spin_rclpy_node, args=(self.node, executor))
executor_thread.start()
result = cli.call(GetParameters.Request(), 0.5)
self.assertTrue(result is not None)
executor.shutdown()
executor_thread.join()
finally:
self.node.destroy_client(cli)
self.node.destroy_service(srv)

def test_sync_call_timeout(self):
def _service(request, response):
time.sleep(1)
return response
cli = self.node.create_client(GetParameters, 'get/parameters')
srv = self.node.create_service(GetParameters, 'get/parameters', _service)
try:
self.assertTrue(cli.wait_for_service(timeout_sec=20))
executor = rclpy.executors.SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)
executor_thread = threading.Thread(
target=TestClient._spin_rclpy_node, args=(self.node, executor))
executor_thread.start()
result = cli.call(GetParameters.Request(), 0.5)
self.assertTrue(result is None)
executor.shutdown()
executor_thread.join()
finally:
self.node.destroy_client(cli)
self.node.destroy_service(srv)


if __name__ == '__main__':
unittest.main()

0 comments on commit e97c41b

Please sign in to comment.