diff --git a/example_1/test/test_rrbot_launch.py b/example_1/test/test_rrbot_launch.py
index 7bc625146..496f44fc4 100644
--- a/example_1/test/test_rrbot_launch.py
+++ b/example_1/test/test_rrbot_launch.py
@@ -40,11 +40,9 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -78,34 +76,19 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
def test_controller_running(self, proc_output):
- cname = "forward_position_controller"
+ cnames = ["forward_position_controller", "joint_state_broadcaster"]
- start = time.time()
- found = False
- while time.time() - start < 10.0 and not found:
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- if c.name == cname and c.state == "active":
- found = True
- break
- assert found, f"{cname} not found!"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert set(msg.name) == {"joint1", "joint2"}, "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published("/joint_states", ["joint1", "joint2"])
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_10/package.xml b/example_10/package.xml
index e94c0a1e2..faeb4b47a 100644
--- a/example_10/package.xml
+++ b/example_10/package.xml
@@ -32,11 +32,11 @@
rviz2
xacro
- ament_cmake_gtest
ament_cmake_pytest
launch_testing_ament_cmake
launch_testing_ros
liburdfdom-tools
+ ros2_control_demo_testing
xacro
diff --git a/example_10/test/test_rrbot_launch.py b/example_10/test/test_rrbot_launch.py
index f8ab7d5c0..e5b9e1e5e 100644
--- a/example_10/test/test_rrbot_launch.py
+++ b/example_10/test/test_rrbot_launch.py
@@ -40,11 +40,9 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# This function specifies the processes to be run for our test
@@ -78,7 +76,7 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
@@ -86,30 +84,11 @@ def test_node_start(self, proc_output):
def test_controller_running(self, proc_output):
cnames = ["forward_position_controller", "gpio_controller", "joint_state_broadcaster"]
- found = {cname: False for cname in cnames} # Define 'found' as a dictionary
- start = time.time()
- while time.time() - start < 10.0 and not all(found.values()):
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- for cname in cnames:
- if c.name == cname and c.state == "active":
- found[cname] = True
- break
-
- assert all(
- found.values()
- ), f"Controller(s) not found: {', '.join([cname for cname, is_found in found.items() if not is_found])}"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert msg.name == ["joint1", "joint2"], "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published("/joint_states", ["joint1", "joint2"])
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_11/package.xml b/example_11/package.xml
index 1514e9ef2..ad8f02520 100644
--- a/example_11/package.xml
+++ b/example_11/package.xml
@@ -31,7 +31,12 @@
rviz2
xacro
- ament_cmake_gtest
+ ament_cmake_pytest
+ launch_testing_ament_cmake
+ launch_testing_ros
+ liburdfdom-tools
+ ros2_control_demo_testing
+ xacro
ament_cmake
diff --git a/example_11/test/test_carlikebot_launch.py b/example_11/test/test_carlikebot_launch.py
index 8582936c9..e0e8827a3 100644
--- a/example_11/test/test_carlikebot_launch.py
+++ b/example_11/test/test_carlikebot_launch.py
@@ -40,11 +40,9 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -78,37 +76,25 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
def test_controller_running(self, proc_output):
- cname = "bicycle_steering_controller"
+ cnames = ["bicycle_steering_controller", "joint_state_broadcaster"]
- start = time.time()
- found = False
- while time.time() - start < 10.0 and not found:
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- if c.name == cname and c.state == "active":
- found = True
- break
- assert found, f"{cname} not found!"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert msg.name == [
- "virtual_front_wheel_joint",
- "virtual_rear_wheel_joint",
- ], "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published(
+ "/joint_states",
+ [
+ "virtual_front_wheel_joint",
+ "virtual_rear_wheel_joint",
+ ],
+ )
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_12/package.xml b/example_12/package.xml
index 040f29593..2c2c5c45c 100644
--- a/example_12/package.xml
+++ b/example_12/package.xml
@@ -33,11 +33,11 @@
rviz2
xacro
- ament_cmake_gtest
ament_cmake_pytest
launch_testing_ament_cmake
launch_testing_ros
liburdfdom-tools
+ ros2_control_demo_testing
xacro
diff --git a/example_12/test/test_rrbot_launch.py b/example_12/test/test_rrbot_launch.py
index 4bb644c28..f88930e16 100644
--- a/example_12/test/test_rrbot_launch.py
+++ b/example_12/test/test_rrbot_launch.py
@@ -41,11 +41,9 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -79,7 +77,7 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
@@ -91,30 +89,11 @@ def test_controller_running(self, proc_output):
"joint2_position_controller",
"joint_state_broadcaster",
]
- found = {cname: False for cname in cnames} # Define 'found' as a dictionary
- start = time.time()
- while time.time() - start < 10.0 and not all(found.values()):
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- for cname in cnames:
- if c.name == cname and c.state == "active":
- found[cname] = True
- break
-
- assert all(
- found.values()
- ), f"Controller(s) not found: {', '.join([cname for cname, is_found in found.items() if not is_found])}"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert msg.name == ["joint1", "joint2"], "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published("/joint_states", ["joint1", "joint2"])
# This is our second test fixture. Each method is a test case.
@@ -141,7 +120,7 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
@@ -155,30 +134,11 @@ def test_controller_running(self, proc_output):
"position_controller",
"forward_position_controller",
]
- found = {cname: False for cname in cnames} # Define 'found' as a dictionary
- start = time.time()
- while time.time() - start < 10.0 and not all(found.values()):
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- for cname in cnames:
- if c.name == cname and c.state == "active":
- found[cname] = True
- break
-
- assert all(
- found.values()
- ), f"Controller(s) not found: {', '.join([cname for cname, is_found in found.items() if not is_found])}"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert msg.name == ["joint1", "joint2"], "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published("/joint_states", ["joint1", "joint2"])
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_14/package.xml b/example_14/package.xml
index a04d763fb..fe4c74df5 100644
--- a/example_14/package.xml
+++ b/example_14/package.xml
@@ -31,11 +31,11 @@
rviz2
xacro
- ament_cmake_gtest
ament_cmake_pytest
launch_testing_ament_cmake
launch_testing_ros
liburdfdom-tools
+ ros2_control_demo_testing
xacro
diff --git a/example_14/test/test_rrbot_modular_actuators_without_feedback_sensors_for_position_feedback_launch.py b/example_14/test/test_rrbot_modular_actuators_without_feedback_sensors_for_position_feedback_launch.py
index 695538fd6..fab0cf4be 100644
--- a/example_14/test/test_rrbot_modular_actuators_without_feedback_sensors_for_position_feedback_launch.py
+++ b/example_14/test/test_rrbot_modular_actuators_without_feedback_sensors_for_position_feedback_launch.py
@@ -40,11 +40,9 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -78,34 +76,22 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
def test_controller_running(self, proc_output):
- cname = "forward_velocity_controller"
+ cnames = [
+ "forward_velocity_controller",
+ "joint_state_broadcaster",
+ ]
- start = time.time()
- found = False
- while time.time() - start < 10.0 and not found:
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- if c.name == cname and c.state == "active":
- found = True
- break
- assert found, f"{cname} not found!"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert set(msg.name) == {"joint1", "joint2"}, "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published("/joint_states", ["joint1", "joint2"])
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_15/CMakeLists.txt b/example_15/CMakeLists.txt
index 84e220a57..53ab216dc 100644
--- a/example_15/CMakeLists.txt
+++ b/example_15/CMakeLists.txt
@@ -26,7 +26,6 @@ install(
)
if(BUILD_TESTING)
- find_package(ament_cmake_gtest REQUIRED)
find_package(ament_cmake_pytest REQUIRED)
ament_add_pytest_test(test_rrbot_namespace_launch test/test_rrbot_namespace_launch.py)
diff --git a/example_15/bringup/launch/multi_controller_manager_example_two_rrbots.launch.py b/example_15/bringup/launch/multi_controller_manager_example_two_rrbots.launch.py
index 2eafcd2b4..1af2ee8b4 100644
--- a/example_15/bringup/launch/multi_controller_manager_example_two_rrbots.launch.py
+++ b/example_15/bringup/launch/multi_controller_manager_example_two_rrbots.launch.py
@@ -54,14 +54,14 @@ def generate_launch_description():
)
declared_arguments.append(
DeclareLaunchArgument(
- "start_rviz",
+ "start_rviz_multi",
default_value="true",
description="Start RViz2 automatically with this launch file.",
)
)
# Initialize Arguments
- start_rviz = LaunchConfiguration("start_rviz")
+ start_rviz_lc = LaunchConfiguration("start_rviz_multi")
# Initialize Arguments
use_mock_hardware = LaunchConfiguration("use_mock_hardware")
@@ -131,13 +131,13 @@ def generate_launch_description():
[FindPackageShare("ros2_control_demo_example_15"), "rviz", "multi_controller_manager.rviz"]
)
- rviz_node = Node(
+ rviz_node_multi = Node(
package="rviz2",
executable="rviz2",
- name="rviz2",
+ name="rviz2_multi",
output="log",
arguments=["-d", rviz_config_file],
- condition=IfCondition(start_rviz),
+ condition=IfCondition(start_rviz_lc),
)
included_launch_files = [
@@ -148,7 +148,7 @@ def generate_launch_description():
nodes_to_start = [
rrbot_1_position_trajectory_controller_spawner,
rrbot_2_position_trajectory_controller_spawner,
- rviz_node,
+ rviz_node_multi,
]
return LaunchDescription(declared_arguments + included_launch_files + nodes_to_start)
diff --git a/example_15/package.xml b/example_15/package.xml
index 8aea9905c..8fc182919 100644
--- a/example_15/package.xml
+++ b/example_15/package.xml
@@ -29,11 +29,11 @@
rviz2
xacro
- ament_cmake_gtest
ament_cmake_pytest
launch_testing_ament_cmake
launch_testing_ros
liburdfdom-tools
+ ros2_control_demo_testing
xacro
diff --git a/example_15/test/test_multi_controller_manager_launch.py b/example_15/test/test_multi_controller_manager_launch.py
index 73b560542..dac496013 100644
--- a/example_15/test/test_multi_controller_manager_launch.py
+++ b/example_15/test/test_multi_controller_manager_launch.py
@@ -40,11 +40,10 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -57,7 +56,7 @@ def generate_test_description():
"launch/multi_controller_manager_example_two_rrbots.launch.py",
)
),
- launch_arguments={"start_rviz": "false"}.items(),
+ launch_arguments={"start_rviz_multi": "false"}.items(),
)
return LaunchDescription([launch_include, ReadyToTest()])
@@ -78,61 +77,23 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
def test_controller_running_cm1(self, proc_output):
- cname = "forward_position_controller"
+ cnames = [
+ "forward_position_controller",
+ "joint_state_broadcaster",
+ ]
+ check_controllers_running(self.node, cnames, "/rrbot_1")
+ check_controllers_running(self.node, cnames, "/rrbot_2")
- start = time.time()
- found = False
- while time.time() - start < 10.0 and not found:
- controllers = list_controllers(
- self.node, "/rrbot_1/controller_manager", 5.0
- ).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- if c.name == cname and c.state == "active":
- found = True
- break
- assert found, f"{cname} not found!"
-
- def test_controller_running_cm2(self, proc_output):
-
- cname = "forward_position_controller"
- start = time.time()
- found = False
- while time.time() - start < 10.0 and not found:
- controllers = list_controllers(
- self.node, "/rrbot_2/controller_manager", 5.0
- ).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- if c.name == cname and c.state == "active":
- found = True
- break
- assert found, f"{cname} not found!"
-
- def test_check_if_msgs_published_cm1(self):
- wait_for_topics = WaitForTopics([("/rrbot_1/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/rrbot_1/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/rrbot_1/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert msg.name == ["rrbot_1_joint1", "rrbot_1_joint2"], "Wrong joint names"
- wait_for_topics.shutdown()
-
- def test_check_if_msgs_published_cm2(self):
- wait_for_topics = WaitForTopics([("/rrbot_2/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/rrbot_2/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/rrbot_2/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert msg.name == ["rrbot_2_joint1", "rrbot_2_joint2"], "Wrong joint names"
- wait_for_topics.shutdown()
+ def test_check_if_msgs_published(self):
+ check_if_js_published("/rrbot_1/joint_states", ["rrbot_1_joint1", "rrbot_1_joint2"])
+ check_if_js_published("/rrbot_2/joint_states", ["rrbot_2_joint1", "rrbot_2_joint2"])
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_15/test/test_rrbot_namespace_launch.py b/example_15/test/test_rrbot_namespace_launch.py
index 4a15d1cf8..cfb0f8de6 100644
--- a/example_15/test/test_rrbot_namespace_launch.py
+++ b/example_15/test/test_rrbot_namespace_launch.py
@@ -40,11 +40,10 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -78,34 +77,22 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
def test_controller_running(self, proc_output):
- cname = "forward_position_controller"
+ cnames = [
+ "forward_position_controller",
+ "joint_state_broadcaster",
+ ]
- start = time.time()
- found = False
- while time.time() - start < 10.0 and not found:
- controllers = list_controllers(self.node, "/rrbot/controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- if c.name == cname and c.state == "active":
- found = True
- break
- assert found, f"{cname} not found!"
+ check_controllers_running(self.node, cnames, "/rrbot")
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/rrbot/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/rrbot/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/rrbot/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert msg.name == ["joint1", "joint2"], "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published("/rrbot/joint_states", ["joint1", "joint2"])
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_2/package.xml b/example_2/package.xml
index ef767ca3f..15977eb36 100644
--- a/example_2/package.xml
+++ b/example_2/package.xml
@@ -32,11 +32,11 @@
rviz2
xacro
- ament_cmake_gtest
ament_cmake_pytest
launch_testing_ament_cmake
launch_testing_ros
liburdfdom-tools
+ ros2_control_demo_testing
xacro
diff --git a/example_2/test/test_diffbot_launch.py b/example_2/test/test_diffbot_launch.py
index b0586cb94..94477b462 100644
--- a/example_2/test/test_diffbot_launch.py
+++ b/example_2/test/test_diffbot_launch.py
@@ -40,11 +40,9 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -78,34 +76,19 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
def test_controller_running(self, proc_output):
- cname = "diffbot_base_controller"
+ cnames = ["diffbot_base_controller", "joint_state_broadcaster"]
- start = time.time()
- found = False
- while time.time() - start < 10.0 and not found:
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- if c.name == cname and c.state == "active":
- found = True
- break
- assert found, f"{cname} not found!"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert set(msg.name) == {"left_wheel_joint", "right_wheel_joint"}, "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published("/joint_states", ["left_wheel_joint", "right_wheel_joint"])
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_3/package.xml b/example_3/package.xml
index d816da90c..b7cd1251c 100644
--- a/example_3/package.xml
+++ b/example_3/package.xml
@@ -32,11 +32,11 @@
velocity_controllers
xacro
- ament_cmake_gtest
ament_cmake_pytest
launch_testing_ament_cmake
launch_testing_ros
liburdfdom-tools
+ ros2_control_demo_testing
xacro
diff --git a/example_3/test/test_rrbot_system_multi_interface_launch.py b/example_3/test/test_rrbot_system_multi_interface_launch.py
index 9ef70d3a5..a388e5e26 100644
--- a/example_3/test/test_rrbot_system_multi_interface_launch.py
+++ b/example_3/test/test_rrbot_system_multi_interface_launch.py
@@ -40,11 +40,9 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -78,34 +76,19 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
def test_controller_running(self, proc_output):
- cname = "forward_velocity_controller"
+ cnames = ["forward_velocity_controller", "joint_state_broadcaster"]
- start = time.time()
- found = False
- while time.time() - start < 10.0 and not found:
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- if c.name == cname and c.state == "active":
- found = True
- break
- assert found, f"{cname} not found!"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert set(msg.name) == {"joint1", "joint2"}, "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published("/joint_states", ["joint1", "joint2"])
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_4/package.xml b/example_4/package.xml
index 6b0baa48d..de578666f 100644
--- a/example_4/package.xml
+++ b/example_4/package.xml
@@ -31,11 +31,11 @@
rviz2
xacro
- ament_cmake_gtest
ament_cmake_pytest
launch_testing_ament_cmake
launch_testing_ros
liburdfdom-tools
+ ros2_control_demo_testing
xacro
diff --git a/example_4/test/test_rrbot_system_with_sensor_launch.py b/example_4/test/test_rrbot_system_with_sensor_launch.py
index fc4a8b1fc..e74009300 100644
--- a/example_4/test/test_rrbot_system_with_sensor_launch.py
+++ b/example_4/test/test_rrbot_system_with_sensor_launch.py
@@ -40,11 +40,9 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -78,34 +76,19 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
def test_controller_running(self, proc_output):
- cname = "forward_position_controller"
+ cnames = ["forward_position_controller", "fts_broadcaster", "joint_state_broadcaster"]
- start = time.time()
- found = False
- while time.time() - start < 10.0 and not found:
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- if c.name == cname and c.state == "active":
- found = True
- break
- assert found, f"{cname} not found!"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert set(msg.name) == {"joint1", "joint2"}, "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published("/joint_states", ["joint1", "joint2"])
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_5/package.xml b/example_5/package.xml
index a651ae1e2..0495f8439 100644
--- a/example_5/package.xml
+++ b/example_5/package.xml
@@ -31,11 +31,11 @@
rviz2
xacro
- ament_cmake_gtest
ament_cmake_pytest
launch_testing_ament_cmake
launch_testing_ros
liburdfdom-tools
+ ros2_control_demo_testing
xacro
diff --git a/example_5/test/test_rrbot_system_with_external_sensor_launch.py b/example_5/test/test_rrbot_system_with_external_sensor_launch.py
index 55d510017..14b38614f 100644
--- a/example_5/test/test_rrbot_system_with_external_sensor_launch.py
+++ b/example_5/test/test_rrbot_system_with_external_sensor_launch.py
@@ -40,11 +40,9 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -78,7 +76,7 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
@@ -86,30 +84,11 @@ def test_node_start(self, proc_output):
def test_controller_running(self, proc_output):
cnames = ["forward_position_controller", "fts_broadcaster", "joint_state_broadcaster"]
- found = {cname: False for cname in cnames} # Define 'found' as a dictionary
- start = time.time()
- while time.time() - start < 10.0 and not all(found.values()):
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- for cname in cnames:
- if c.name == cname and c.state == "active":
- found[cname] = True
- break
-
- assert all(
- found.values()
- ), f"Controller(s) not found: {', '.join([cname for cname, is_found in found.items() if not is_found])}"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert set(msg.name) == {"joint1", "joint2"}, "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published("/joint_states", ["joint1", "joint2"])
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_6/package.xml b/example_6/package.xml
index 856970fef..63daf437b 100644
--- a/example_6/package.xml
+++ b/example_6/package.xml
@@ -30,11 +30,11 @@
rviz2
xacro
- ament_cmake_gtest
ament_cmake_pytest
launch_testing_ament_cmake
launch_testing_ros
liburdfdom-tools
+ ros2_control_demo_testing
xacro
diff --git a/example_6/test/test_rrbot_modular_actuators_launch.py b/example_6/test/test_rrbot_modular_actuators_launch.py
index 58d39d8e5..36394774e 100644
--- a/example_6/test/test_rrbot_modular_actuators_launch.py
+++ b/example_6/test/test_rrbot_modular_actuators_launch.py
@@ -40,11 +40,9 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -78,34 +76,19 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
def test_controller_running(self, proc_output):
- cname = "forward_position_controller"
+ cnames = ["forward_position_controller", "joint_state_broadcaster"]
- start = time.time()
- found = False
- while time.time() - start < 10.0 and not found:
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- if c.name == cname and c.state == "active":
- found = True
- break
- assert found, f"{cname} not found!"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert set(msg.name) == {"joint1", "joint2"}, "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published("/joint_states", ["joint1", "joint2"])
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_7/package.xml b/example_7/package.xml
index 9bd69814b..c0a578130 100644
--- a/example_7/package.xml
+++ b/example_7/package.xml
@@ -39,11 +39,11 @@
urdf
xacro
- ament_cmake_gtest
ament_cmake_pytest
launch_testing_ament_cmake
launch_testing_ros
liburdfdom-tools
+ ros2_control_demo_testing
xacro
diff --git a/example_7/test/test_r6bot_controller_launch.py b/example_7/test/test_r6bot_controller_launch.py
index 433396cc0..7d2c546b7 100644
--- a/example_7/test/test_r6bot_controller_launch.py
+++ b/example_7/test/test_r6bot_controller_launch.py
@@ -40,11 +40,9 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -78,41 +76,21 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
def test_controller_running(self, proc_output):
- cname = "r6bot_controller"
+ cnames = ["r6bot_controller", "joint_state_broadcaster"]
- start = time.time()
- found = False
- while time.time() - start < 10.0 and not found:
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- if c.name == cname and c.state == "active":
- found = True
- break
- assert found, f"{cname} not found!"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 6, "Wrong number of joints in message"
- assert set(msg.name) == {
- "joint_1",
- "joint_2",
- "joint_3",
- "joint_4",
- "joint_5",
- "joint_6",
- }, "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published(
+ "/joint_states", ["joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_6"]
+ )
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_8/package.xml b/example_8/package.xml
index 275fc4aaa..c7783a448 100644
--- a/example_8/package.xml
+++ b/example_8/package.xml
@@ -33,11 +33,11 @@
rviz2
xacro
- ament_cmake_gtest
ament_cmake_pytest
launch_testing_ament_cmake
launch_testing_ros
liburdfdom-tools
+ ros2_control_demo_testing
xacro
diff --git a/example_8/test/test_rrbot_transmissions_system_position_only_launch.py b/example_8/test/test_rrbot_transmissions_system_position_only_launch.py
index eb0d01a68..26ee20f8c 100644
--- a/example_8/test/test_rrbot_transmissions_system_position_only_launch.py
+++ b/example_8/test/test_rrbot_transmissions_system_position_only_launch.py
@@ -40,11 +40,9 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -78,34 +76,19 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
def test_controller_running(self, proc_output):
- cname = "forward_position_controller"
+ cnames = ["forward_position_controller", "joint_state_broadcaster"]
- start = time.time()
- found = False
- while time.time() - start < 10.0 and not found:
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- if c.name == cname and c.state == "active":
- found = True
- break
- assert found, f"{cname} not found!"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert set(msg.name) == {"joint1", "joint2"}, "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published("/joint_states", ["joint1", "joint2"])
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/example_9/package.xml b/example_9/package.xml
index 5d289b25e..13e887ffb 100644
--- a/example_9/package.xml
+++ b/example_9/package.xml
@@ -35,11 +35,11 @@
rviz2
xacro
- ament_cmake_gtest
ament_cmake_pytest
launch_testing_ament_cmake
launch_testing_ros
liburdfdom-tools
+ ros2_control_demo_testing
xacro
diff --git a/example_9/test/test_rrbot_launch.py b/example_9/test/test_rrbot_launch.py
index de229e637..a287535d9 100644
--- a/example_9/test/test_rrbot_launch.py
+++ b/example_9/test/test_rrbot_launch.py
@@ -40,11 +40,9 @@
from launch_testing.actions import ReadyToTest
# import launch_testing.markers
-from launch_testing_ros import WaitForTopics
-from controller_manager.controller_manager_services import list_controllers
import rclpy
from rclpy.node import Node
-from sensor_msgs.msg import JointState
+from ros2_control_demo_testing.test_utils import check_controllers_running, check_if_js_published
# Executes the given launch file and checks if all nodes can be started
@@ -78,34 +76,19 @@ def tearDown(self):
def test_node_start(self, proc_output):
start = time.time()
found = False
- while time.time() - start < 2.0 and not found:
+ while time.time() - start < 5.0 and not found:
found = "robot_state_publisher" in self.node.get_node_names()
time.sleep(0.1)
assert found, "robot_state_publisher not found!"
def test_controller_running(self, proc_output):
- cname = "forward_position_controller"
+ cnames = ["forward_position_controller", "joint_state_broadcaster"]
- start = time.time()
- found = False
- while time.time() - start < 10.0 and not found:
- controllers = list_controllers(self.node, "controller_manager", 5.0).controller
- assert controllers, "No controllers found!"
- for c in controllers:
- if c.name == cname and c.state == "active":
- found = True
- break
- assert found, f"{cname} not found!"
+ check_controllers_running(self.node, cnames)
def test_check_if_msgs_published(self):
- wait_for_topics = WaitForTopics([("/joint_states", JointState)], timeout=15.0)
- assert wait_for_topics.wait(), "Topic '/joint_states' not found!"
- msgs = wait_for_topics.received_messages("/joint_states")
- msg = msgs[0]
- assert len(msg.name) == 2, "Wrong number of joints in message"
- assert set(msg.name) == {"joint1", "joint2"}, "Wrong joint names"
- wait_for_topics.shutdown()
+ check_if_js_published("/joint_states", ["joint1", "joint2"])
# TODO(anyone): enable this if shutdown of ros2_control_node does not fail anymore
diff --git a/ros2_control_demo_testing/package.xml b/ros2_control_demo_testing/package.xml
new file mode 100644
index 000000000..663daed04
--- /dev/null
+++ b/ros2_control_demo_testing/package.xml
@@ -0,0 +1,22 @@
+
+
+
+ ros2_control_demo_testing
+ 0.0.0
+ Utilities for launch testing
+ Christoph Froehlich
+ Apache-2.0
+
+ sensor_msgs
+ launch_testing
+ controller_manger
+
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ python3-pytest
+
+
+ ament_python
+
+
diff --git a/ros2_control_demo_testing/resource/ros2_control_demo_testing b/ros2_control_demo_testing/resource/ros2_control_demo_testing
new file mode 100644
index 000000000..e69de29bb
diff --git a/ros2_control_demo_testing/ros2_control_demo_testing/__init__.py b/ros2_control_demo_testing/ros2_control_demo_testing/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/ros2_control_demo_testing/ros2_control_demo_testing/test_utils.py b/ros2_control_demo_testing/ros2_control_demo_testing/test_utils.py
new file mode 100644
index 000000000..7617e8570
--- /dev/null
+++ b/ros2_control_demo_testing/ros2_control_demo_testing/test_utils.py
@@ -0,0 +1,95 @@
+# Copyright (c) 2024 AIT - Austrian Institute of Technology GmbH
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# * Neither the name of the {copyright_holder} nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# Author: Christoph Froehlich
+
+import time
+
+from launch_testing_ros import WaitForTopics
+from sensor_msgs.msg import JointState
+from controller_manager.controller_manager_services import list_controllers
+
+
+def check_controllers_running(node, cnames, namespace=""):
+
+ # wait for controller to be loaded before we call the CM services
+ found = {cname: False for cname in cnames} # Define 'found' as a dictionary
+ start = time.time()
+ # namespace is either "/" (empty) or "/ns" if set
+ if namespace:
+ namespace_api = namespace
+ if not namespace_api.startswith("/"):
+ namespace_api = "/" + namespace_api
+ if namespace.endswith("/"):
+ namespace_api = namespace_api[:-1]
+ else:
+ namespace_api = "/"
+
+ while time.time() - start < 10.0 and not all(found.values()):
+ node_names_namespaces = node.get_node_names_and_namespaces()
+ for cname in cnames:
+ if any(name == cname and ns == namespace_api for name, ns in node_names_namespaces):
+ found[cname] = True
+ time.sleep(0.1)
+ assert all(
+ found.values()
+ ), f"Controller node(s) not found: {', '.join(["ns: " + namespace_api + ", ctrl:" + cname for cname, is_found in found.items() if not is_found])}, but seeing {node.get_node_names_and_namespaces()}"
+
+ found = {cname: False for cname in cnames} # Define 'found' as a dictionary
+ start = time.time()
+ # namespace is either "/" (empty) or "/ns" if set
+ if not namespace:
+ cm = "controller_manager"
+ else:
+ if namespace.endswith("/"):
+ cm = namespace + "controller_manager"
+ else:
+ cm = namespace + "/controller_manager"
+ while time.time() - start < 10.0 and not all(found.values()):
+ controllers = list_controllers(node, cm, 5.0).controller
+ assert controllers, "No controllers found!"
+ for c in controllers:
+ for cname in cnames:
+ if c.name == cname and c.state == "active":
+ found[cname] = True
+ break
+ time.sleep(0.1)
+
+ assert all(
+ found.values()
+ ), f"Controller(s) not found or not active: {', '.join([cname for cname, is_found in found.items() if not is_found])}"
+
+
+def check_if_js_published(topic, joint_names):
+ wait_for_topics = WaitForTopics([(topic, JointState)], timeout=15.0)
+ assert wait_for_topics.wait(), f"Topic '{topic}' not found!"
+ msgs = wait_for_topics.received_messages(topic)
+ msg = msgs[0]
+ assert len(msg.name) == len(joint_names), "Wrong number of joints in message"
+ assert msg.name == joint_names, "Wrong joint names"
+ wait_for_topics.shutdown()
diff --git a/ros2_control_demo_testing/setup.cfg b/ros2_control_demo_testing/setup.cfg
new file mode 100644
index 000000000..5d0f85421
--- /dev/null
+++ b/ros2_control_demo_testing/setup.cfg
@@ -0,0 +1,4 @@
+[develop]
+script_dir=$base/lib/ros2_control_demo_testing
+[install]
+install_scripts=$base/lib/ros2_control_demo_testing
diff --git a/ros2_control_demo_testing/setup.py b/ros2_control_demo_testing/setup.py
new file mode 100644
index 000000000..62f25ff4e
--- /dev/null
+++ b/ros2_control_demo_testing/setup.py
@@ -0,0 +1,53 @@
+# Copyright (c) 2024 AIT - Austrian Institute of Technology GmbH
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# * Neither the name of the {copyright_holder} nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+#
+# Author: Christoph Froehlich
+
+from setuptools import find_packages, setup
+
+package_name = "ros2_control_demo_testing"
+
+setup(
+ name=package_name,
+ version="0.0.0",
+ packages=find_packages(exclude=["test"]),
+ data_files=[
+ ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
+ ("share/" + package_name, ["package.xml"]),
+ ],
+ install_requires=["setuptools"],
+ zip_safe=True,
+ maintainer="Christoph Froehlich",
+ maintainer_email="christoph.froehlich@ait.ac.at",
+ description="Utilities for launch testing",
+ license="Apache-2.0",
+ tests_require=["pytest"],
+ entry_points={
+ "console_scripts": [],
+ },
+)