Skip to content

Commit

Permalink
Update ros2_control_node tests to use test_utils
Browse files Browse the repository at this point in the history
  • Loading branch information
christophfroehlich committed Dec 17, 2024
1 parent c2db8f6 commit c5c21ff
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 29 deletions.
9 changes: 7 additions & 2 deletions controller_manager/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,15 @@ if(BUILD_TESTING)
controller_manager_msgs
)

find_package(ament_cmake_pytest REQUIRED)
find_package(ament_cmake_ros REQUIRED)
find_package(launch_testing_ament_cmake REQUIRED)
install(FILES test/test_ros2_control_node.yaml
DESTINATION test)
ament_add_pytest_test(test_ros2_control_node test/test_ros2_control_node_launch.py)
function(add_ros_isolated_launch_test path)
set(RUNNER "${ament_cmake_ros_DIR}/run_test_isolated.py")
add_launch_test("${path}" RUNNER "${RUNNER}" ${ARGN})
endfunction()
add_ros_isolated_launch_test(test/test_ros2_control_node_launch.py)
endif()

install(
Expand Down
14 changes: 11 additions & 3 deletions controller_manager/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,19 @@
<depend>libstatistics_collector</depend>
<depend>generate_parameter_library</depend>

<test_depend>ament_cmake_gmock</test_depend>
<test_depend>ament_cmake_pytest</test_depend>
<test_depend>python3-coverage</test_depend>
<test_depend>ament_cmake_ros</test_depend>
<test_depend>hardware_interface_testing</test_depend>
<test_depend>launch</test_depend>
<test_depend>launch_ros</test_depend>
<test_depend>launch_testing</test_depend>
<test_depend>launch_testing_ros</test_depend>
<test_depend>launch_testing_ament_cmake</test_depend>
<test_depend>python3-coverage</test_depend>
<test_depend>rclpy</test_depend>
<test_depend>robot_state_publisher</test_depend>
<test_depend>ros2_control_testing</test_depend>
<test_depend>ros2_control_test_assets</test_depend>
<test_depend>sensor_msgs</test_depend>

<export>
<build_type>ament_cmake</build_type>
Expand Down
5 changes: 1 addition & 4 deletions controller_manager/test/test_ros2_control_node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,4 @@ controller_manager:
ctrl_with_parameters_and_type:
ros__parameters:
type: "controller_manager/test_controller"
joint_names: ["joint0"]

joint_state_broadcaster:
type: joint_state_broadcaster/JointStateBroadcaster
joint_names: ["joint1"]
101 changes: 81 additions & 20 deletions controller_manager/test/test_ros2_control_node_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,32 @@
#
# Author: Christoph Froehlich

import pytest
import unittest
import time

from launch import LaunchDescription
from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
from launch.substitutions import PathJoinSubstitution, Command, FindExecutable
from launch_ros.substitutions import FindPackageShare, FindPackagePrefix
from launch_testing.actions import ReadyToTest

import launch_testing.markers
import rclpy
import launch_ros.actions
from rclpy.node import Node
from sensor_msgs.msg import JointState

import rclpy
from controller_manager.test_utils import (
check_controllers_running,
check_if_js_published,
check_node_running,
)
from controller_manager.launch_utils import generate_controllers_spawner_launch_description
import threading


# Executes the given launch file and checks if all nodes can be started
@pytest.mark.launch_test
def generate_test_description():

robot_controllers = PathJoinSubstitution(
[
FindPackageShare("controller_manager"),
FindPackagePrefix("controller_manager"),
"test",
"test_ros2_control_node.yaml",
]
Expand All @@ -61,29 +65,86 @@ def generate_test_description():
parameters=[robot_controllers],
output="both",
)

return LaunchDescription([control_node, ReadyToTest()])
# Get URDF
robot_description_content = Command(
[
PathJoinSubstitution([FindExecutable(name="xacro")]),
" ",
PathJoinSubstitution(
[
FindPackageShare("ros2_control_test_assets"),
"urdf",
"test_hardware_components.urdf",
]
),
]
)
# why not this?
# robot_description_content = PathJoinSubstitution(
# [
# FindPackageShare("ros2_control_test_assets"),
# "urdf",
# "test_hardware_components.urdf",
# ]
# )
robot_description = {"robot_description": robot_description_content}

robot_state_pub_node = launch_ros.actions.Node(
package="robot_state_publisher",
executable="robot_state_publisher",
output="both",
parameters=[robot_description],
)
ctrl_spawner = generate_controllers_spawner_launch_description(
[
"ctrl_with_parameters_and_type",
],
controller_params_files=[robot_controllers],
)
return LaunchDescription([robot_state_pub_node, control_node, ctrl_spawner, ReadyToTest()])


# This is our test fixture. Each method is a test case.
# These run alongside the processes specified in generate_test_description()
class TestFixture(unittest.TestCase):
@classmethod
def setUpClass(cls):
rclpy.init()

@classmethod
def tearDownClass(cls):
rclpy.shutdown()

def setUp(self):
rclpy.init()
self.node = Node("test_node")
self.node = rclpy.create_node("test_node")

def tearDown(self):
self.node.destroy_node()
rclpy.shutdown()

def timer_callback(self):
js_msg = JointState()
js_msg.name = ["joint0"]
js_msg.position = [0.0]
self.pub.publish(js_msg)

def publish_joint_states(self):
self.pub = self.node.create_publisher(JointState, "/joint_states", 10)
self.timer = self.node.create_timer(1.0, self.timer_callback)
rclpy.spin(self.node)

def test_node_start(self):
start = time.time()
found = False
while time.time() - start < 2.0 and not found:
found = "controller_manager" in self.node.get_node_names()
time.sleep(0.1)
assert found, "controller_manager not found!"
check_node_running(self.node, "controller_manager")

def test_controllers_start(self):
cnames = ["ctrl_with_parameters_and_type"]
check_controllers_running(self.node, cnames, state="active")

def test_check_if_msgs_published(self):
# we don't have a joint_state_broadcaster in this repo,
# publish joint states manually to test check_if_js_published
thread = threading.Thread(target=self.publish_joint_states)
thread.start()
check_if_js_published("/joint_states", ["joint0"])


@launch_testing.post_shutdown_test()
Expand Down

0 comments on commit c5c21ff

Please sign in to comment.