Skip to content

Commit

Permalink
Add tests for publishers
Browse files Browse the repository at this point in the history
  • Loading branch information
christophfroehlich committed Nov 18, 2024
1 parent 3e62d34 commit ad3be0e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@
from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
from launch_testing.actions import ReadyToTest
from launch_testing_ros import WaitForTopics

import launch_testing.markers
import rclpy
import launch_ros.actions
from rclpy.node import Node

from std_msgs.msg import Float64MultiArray


# Executes the given launch file and checks if all nodes can be started
@pytest.mark.launch_test
Expand Down Expand Up @@ -85,10 +88,19 @@ def test_node_start(self):
time.sleep(0.1)
assert found, "publisher_forward_position_controller not found!"

def test_check_if_topic_published(self):
topic = "/forward_position_controller/commands"
wait_for_topics = WaitForTopics([(topic, Float64MultiArray)], timeout=20.0)
assert wait_for_topics.wait(), f"Topic '{topic}' not found!"
msgs = wait_for_topics.received_messages(topic)
msg = msgs[0]
assert len(msg.data) == 2, "Wrong number of joints in message"
wait_for_topics.shutdown()


@launch_testing.post_shutdown_test()
# These tests are run after the processes in generate_test_description() have shutdown.
class TestDescriptionCraneShutdown(unittest.TestCase):
# These tests are run after the processes in generate_test_description() have shut down.
class TestPublisherShutdown(unittest.TestCase):

def test_exit_codes(self, proc_info):
"""Check if the processes exited normally."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@
from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
from launch_testing.actions import ReadyToTest
from launch_testing_ros import WaitForTopics

import launch_testing.markers
import rclpy
import launch_ros.actions
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory


# Executes the given launch file and checks if all nodes can be started
Expand Down Expand Up @@ -85,10 +87,19 @@ def test_node_start(self):
time.sleep(0.1)
assert found, "publisher_position_trajectory_controller not found!"

def test_check_if_topic_published(self):
topic = "/position_trajectory_controller/joint_trajectory"
wait_for_topics = WaitForTopics([(topic, JointTrajectory)], timeout=20.0)
assert wait_for_topics.wait(), f"Topic '{topic}' not found!"
msgs = wait_for_topics.received_messages(topic)
msg = msgs[0]
assert len(msg.joint_names) == 2, "Wrong number of joints in message"
wait_for_topics.shutdown()


@launch_testing.post_shutdown_test()
# These tests are run after the processes in generate_test_description() have shutdown.
class TestDescriptionCraneShutdown(unittest.TestCase):
# These tests are run after the processes in generate_test_description() have shut down.
class TestPublisherShutdown(unittest.TestCase):

def test_exit_codes(self, proc_info):
"""Check if the processes exited normally."""
Expand Down

0 comments on commit ad3be0e

Please sign in to comment.