Skip to content

Commit

Permalink
Move controller manager interfacing to custom class
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertWilbrandt committed Jan 4, 2025
1 parent 788e979 commit b2a6a3c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 27 deletions.
31 changes: 9 additions & 22 deletions kuka_rsi_driver/test/move_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,13 @@

from launch_testing.actions import ReadyToTest

from controller_manager_msgs.srv import ListControllers

from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import PathJoinSubstitution

sys.path.append(os.path.dirname(__file__))
from test_interface import wait_for_service, call_service
from test_interface import ControllerManagerInterface


@pytest.mark.launch_test
Expand Down Expand Up @@ -113,6 +111,9 @@ def tearDownClass(cls):
cls.node.destroy_node()
rclpy.shutdown()

def setUp(self):
self.controller_manager_interface = ControllerManagerInterface(self.node)

def test_controllers_loaded(self):
expected_controllers_active = [
"joint_state_broadcaster",
Expand All @@ -122,22 +123,11 @@ def test_controllers_loaded(self):
]
expected_controllers_inactive = ["forward_position_controller"]

list_controllers_client = wait_for_service(
"controller_manager/list_controllers",
ListControllers,
self.node,
10,
)
self.assertIsNotNone(list_controllers_client)

# Wait until all controllers are loaded
for _ in range(5):
res = call_service(
list_controllers_client, ListControllers.Request(), self.node
)
self.assertIsNotNone(res)
controllers = self.controller_manager_interface.list_controllers()

if len(res.controller) == (
if len(controllers) == (
len(expected_controllers_active) + len(expected_controllers_inactive)
):
break
Expand All @@ -147,9 +137,9 @@ def test_controllers_loaded(self):
# Verify controller activations
for _ in range(5):
# Check controllers
controllers_active = [c.name for c in res.controller if c.state == "active"]
controllers_active = [c.name for c in controllers if c.state == "active"]
controllers_inactive = [
c.name for c in res.controller if c.state == "inactive"
c.name for c in controllers if c.state == "inactive"
]

if (set(controllers_active) == set(expected_controllers_active)) and (
Expand All @@ -159,10 +149,7 @@ def test_controllers_loaded(self):

# Spawners are not yet done, so wait and retry
time.sleep(1)

res = _call_service(
self.node, list_controllers_client, ListControllers.Request()
)
controllers = self.controller_manager_interface.list_controllers()

# Previous calls did not show controller activation success => Fail
self.fail()
43 changes: 38 additions & 5 deletions kuka_rsi_driver/test/test_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,37 @@
# POSSIBILITY OF SUCH DAMAGE.
import rclpy

from controller_manager_msgs.srv import ListControllers


class ControllerManagerInterface:
def __init__(self, node):
self._node = node
self._log = node.get_logger().get_child("controller_manager_interface")

self._list_controllers_client = wait_for_service(
"controller_manager/list_controllers", ListControllers, node, 10, self._log
)

def list_controllers(self):
res = call_service(
self._list_controllers_client,
ListControllers.Request(),
self._node,
self._log,
)

if len(res.controller) > 0:
self._log.info("Controllers:")
for controller in res.controller:
self._log.info(
f" - {controller.name} ({controller.type}) - {controller.state}"
)
else:
self._log.info("Controllers: []")

return res.controller


def wait_for_service(srv_name, srv_type, node, timeout=10, log=None):
if log is None:
Expand All @@ -36,10 +67,11 @@ def wait_for_service(srv_name, srv_type, node, timeout=10, log=None):

log.info(f"Waiting for service '{srv_name}' (timeout: {timeout:.2f}s)...")
if not client.wait_for_service(timeout):
log.error(
f" Could not reach service '{srv_name}' within timeout {timeout:.2f}s"
error_msg = (
f"Could not reach service '{srv_name}' within timeout {timeout:.2f}s"
)
return None
log.error(f" {error_msg}")
raise RuntimeError(error_msg)

log.info(f" Successfully waited for service '{srv_name}'")
return client
Expand All @@ -54,8 +86,9 @@ def call_service(client, request, node, log=None):

rclpy.spin_until_future_complete(node, future)
if future.result() is None:
log.error(f" Could not call service '{client.srv_name}': {future.exception()}")
return None
error_msg = f"Could not call service '{client.srv_name}': {future.exception()}"
log.error(f" {error_msg}")
raise RuntimeError(error_msg)

log.info(f" Received result: {future.result()}")
return future.result()

0 comments on commit b2a6a3c

Please sign in to comment.