diff --git a/controller_manager/CMakeLists.txt b/controller_manager/CMakeLists.txt
index 1b0f308613..15748c5551 100644
--- a/controller_manager/CMakeLists.txt
+++ b/controller_manager/CMakeLists.txt
@@ -237,10 +237,15 @@ if(BUILD_TESTING)
controller_manager_msgs
)
- find_package(ament_cmake_pytest REQUIRED)
+ find_package(ament_cmake_ros REQUIRED)
+ find_package(launch_testing_ament_cmake REQUIRED)
install(FILES test/test_ros2_control_node.yaml
DESTINATION test)
- ament_add_pytest_test(test_ros2_control_node test/test_ros2_control_node_launch.py)
+ function(add_ros_isolated_launch_test path)
+ set(RUNNER "${ament_cmake_ros_DIR}/run_test_isolated.py")
+ add_launch_test("${path}" RUNNER "${RUNNER}" ${ARGN})
+ endfunction()
+ add_ros_isolated_launch_test(test/test_ros2_control_node_launch.py)
endif()
install(
diff --git a/controller_manager/package.xml b/controller_manager/package.xml
index 5447040537..4f808fc026 100644
--- a/controller_manager/package.xml
+++ b/controller_manager/package.xml
@@ -31,11 +31,19 @@
libstatistics_collector
generate_parameter_library
- ament_cmake_gmock
- ament_cmake_pytest
- python3-coverage
+ ament_cmake_ros
hardware_interface_testing
+ launch
+ launch_ros
+ launch_testing
+ launch_testing_ros
+ launch_testing_ament_cmake
+ python3-coverage
+ rclpy
+ robot_state_publisher
+ ros2_control_testing
ros2_control_test_assets
+ sensor_msgs
ament_cmake
diff --git a/controller_manager/test/test_ros2_control_node.yaml b/controller_manager/test/test_ros2_control_node.yaml
index ce0602d6b3..74f6cf8504 100644
--- a/controller_manager/test/test_ros2_control_node.yaml
+++ b/controller_manager/test/test_ros2_control_node.yaml
@@ -5,7 +5,4 @@ controller_manager:
ctrl_with_parameters_and_type:
ros__parameters:
type: "controller_manager/test_controller"
- joint_names: ["joint0"]
-
-joint_state_broadcaster:
- type: joint_state_broadcaster/JointStateBroadcaster
+ joint_names: ["joint1"]
diff --git a/controller_manager/test/test_ros2_control_node_launch.py b/controller_manager/test/test_ros2_control_node_launch.py
index c8c1136849..cd3d33f395 100644
--- a/controller_manager/test/test_ros2_control_node_launch.py
+++ b/controller_manager/test/test_ros2_control_node_launch.py
@@ -28,28 +28,32 @@
#
# Author: Christoph Froehlich
-import pytest
import unittest
-import time
from launch import LaunchDescription
-from launch.substitutions import PathJoinSubstitution
-from launch_ros.substitutions import FindPackageShare
+from launch.substitutions import PathJoinSubstitution, Command, FindExecutable
+from launch_ros.substitutions import FindPackageShare, FindPackagePrefix
from launch_testing.actions import ReadyToTest
-
import launch_testing.markers
-import rclpy
import launch_ros.actions
-from rclpy.node import Node
+from sensor_msgs.msg import JointState
+
+import rclpy
+from controller_manager.test_utils import (
+ check_controllers_running,
+ check_if_js_published,
+ check_node_running,
+)
+from controller_manager.launch_utils import generate_controllers_spawner_launch_description
+import threading
# Executes the given launch file and checks if all nodes can be started
-@pytest.mark.launch_test
def generate_test_description():
robot_controllers = PathJoinSubstitution(
[
- FindPackageShare("controller_manager"),
+ FindPackagePrefix("controller_manager"),
"test",
"test_ros2_control_node.yaml",
]
@@ -61,29 +65,86 @@ def generate_test_description():
parameters=[robot_controllers],
output="both",
)
-
- return LaunchDescription([control_node, ReadyToTest()])
+ # Get URDF
+ robot_description_content = Command(
+ [
+ PathJoinSubstitution([FindExecutable(name="xacro")]),
+ " ",
+ PathJoinSubstitution(
+ [
+ FindPackageShare("ros2_control_test_assets"),
+ "urdf",
+ "test_hardware_components.urdf",
+ ]
+ ),
+ ]
+ )
+ # why not this?
+ # robot_description_content = PathJoinSubstitution(
+ # [
+ # FindPackageShare("ros2_control_test_assets"),
+ # "urdf",
+ # "test_hardware_components.urdf",
+ # ]
+ # )
+ robot_description = {"robot_description": robot_description_content}
+
+ robot_state_pub_node = launch_ros.actions.Node(
+ package="robot_state_publisher",
+ executable="robot_state_publisher",
+ output="both",
+ parameters=[robot_description],
+ )
+ ctrl_spawner = generate_controllers_spawner_launch_description(
+ [
+ "ctrl_with_parameters_and_type",
+ ],
+ controller_params_files=[robot_controllers],
+ )
+ return LaunchDescription([robot_state_pub_node, control_node, ctrl_spawner, ReadyToTest()])
# This is our test fixture. Each method is a test case.
# These run alongside the processes specified in generate_test_description()
class TestFixture(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ rclpy.init()
+
+ @classmethod
+ def tearDownClass(cls):
+ rclpy.shutdown()
def setUp(self):
- rclpy.init()
- self.node = Node("test_node")
+ self.node = rclpy.create_node("test_node")
def tearDown(self):
self.node.destroy_node()
- rclpy.shutdown()
+
+ def timer_callback(self):
+ js_msg = JointState()
+ js_msg.name = ["joint0"]
+ js_msg.position = [0.0]
+ self.pub.publish(js_msg)
+
+ def publish_joint_states(self):
+ self.pub = self.node.create_publisher(JointState, "/joint_states", 10)
+ self.timer = self.node.create_timer(1.0, self.timer_callback)
+ rclpy.spin(self.node)
def test_node_start(self):
- start = time.time()
- found = False
- while time.time() - start < 2.0 and not found:
- found = "controller_manager" in self.node.get_node_names()
- time.sleep(0.1)
- assert found, "controller_manager not found!"
+ check_node_running(self.node, "controller_manager")
+
+ def test_controllers_start(self):
+ cnames = ["ctrl_with_parameters_and_type"]
+ check_controllers_running(self.node, cnames, state="active")
+
+ def test_check_if_msgs_published(self):
+ # we don't have a joint_state_broadcaster in this repo,
+ # publish joint states manually to test check_if_js_published
+ thread = threading.Thread(target=self.publish_joint_states)
+ thread.start()
+ check_if_js_published("/joint_states", ["joint0"])
@launch_testing.post_shutdown_test()