diff --git a/kuka_rsi_driver/test/move_sim.py b/kuka_rsi_driver/test/move_sim.py index d567580..44c0da0 100644 --- a/kuka_rsi_driver/test/move_sim.py +++ b/kuka_rsi_driver/test/move_sim.py @@ -40,15 +40,13 @@ from launch_testing.actions import ReadyToTest -from controller_manager_msgs.srv import ListControllers - from launch import LaunchDescription from launch.actions import IncludeLaunchDescription from launch.launch_description_sources import PythonLaunchDescriptionSource from launch.substitutions import PathJoinSubstitution sys.path.append(os.path.dirname(__file__)) -from test_interface import wait_for_service, call_service +from test_interface import ControllerManagerInterface @pytest.mark.launch_test @@ -113,6 +111,9 @@ def tearDownClass(cls): cls.node.destroy_node() rclpy.shutdown() + def setUp(self): + self.controller_manager_interface = ControllerManagerInterface(self.node) + def test_controllers_loaded(self): expected_controllers_active = [ "joint_state_broadcaster", @@ -122,22 +123,11 @@ def test_controllers_loaded(self): ] expected_controllers_inactive = ["forward_position_controller"] - list_controllers_client = wait_for_service( - "controller_manager/list_controllers", - ListControllers, - self.node, - 10, - ) - self.assertIsNotNone(list_controllers_client) - # Wait until all controllers are loaded for _ in range(5): - res = call_service( - list_controllers_client, ListControllers.Request(), self.node - ) - self.assertIsNotNone(res) + controllers = self.controller_manager_interface.list_controllers() - if len(res.controller) == ( + if len(controllers) == ( len(expected_controllers_active) + len(expected_controllers_inactive) ): break @@ -147,9 +137,9 @@ def test_controllers_loaded(self): # Verify controller activations for _ in range(5): # Check controllers - controllers_active = [c.name for c in res.controller if c.state == "active"] + controllers_active = [c.name for c in controllers if c.state == "active"] controllers_inactive = [ - c.name for c in res.controller if c.state == "inactive" + c.name for c in controllers if c.state == "inactive" ] if (set(controllers_active) == set(expected_controllers_active)) and ( @@ -159,10 +149,7 @@ def test_controllers_loaded(self): # Spawners are not yet done, so wait and retry time.sleep(1) - - res = _call_service( - self.node, list_controllers_client, ListControllers.Request() - ) + controllers = self.controller_manager_interface.list_controllers() # Previous calls did not show controller activation success => Fail self.fail() diff --git a/kuka_rsi_driver/test/test_interface.py b/kuka_rsi_driver/test/test_interface.py index 19de702..fe37600 100644 --- a/kuka_rsi_driver/test/test_interface.py +++ b/kuka_rsi_driver/test/test_interface.py @@ -27,6 +27,37 @@ # POSSIBILITY OF SUCH DAMAGE. import rclpy +from controller_manager_msgs.srv import ListControllers + + +class ControllerManagerInterface: + def __init__(self, node): + self._node = node + self._log = node.get_logger().get_child("controller_manager_interface") + + self._list_controllers_client = wait_for_service( + "controller_manager/list_controllers", ListControllers, node, 10, self._log + ) + + def list_controllers(self): + res = call_service( + self._list_controllers_client, + ListControllers.Request(), + self._node, + self._log, + ) + + if len(res.controller) > 0: + self._log.info("Controllers:") + for controller in res.controller: + self._log.info( + f" - {controller.name} ({controller.type}) - {controller.state}" + ) + else: + self._log.info("Controllers: []") + + return res.controller + def wait_for_service(srv_name, srv_type, node, timeout=10, log=None): if log is None: @@ -36,10 +67,11 @@ def wait_for_service(srv_name, srv_type, node, timeout=10, log=None): log.info(f"Waiting for service '{srv_name}' (timeout: {timeout:.2f}s)...") if not client.wait_for_service(timeout): - log.error( - f" Could not reach service '{srv_name}' within timeout {timeout:.2f}s" + error_msg = ( + f"Could not reach service '{srv_name}' within timeout {timeout:.2f}s" ) - return None + log.error(f" {error_msg}") + raise RuntimeError(error_msg) log.info(f" Successfully waited for service '{srv_name}'") return client @@ -54,8 +86,9 @@ def call_service(client, request, node, log=None): rclpy.spin_until_future_complete(node, future) if future.result() is None: - log.error(f" Could not call service '{client.srv_name}': {future.exception()}") - return None + error_msg = f"Could not call service '{client.srv_name}': {future.exception()}" + log.error(f" {error_msg}") + raise RuntimeError(error_msg) log.info(f" Received result: {future.result()}") return future.result()