Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Launch testing examples : ROS focused #263

Merged
merged 13 commits into from
Sep 16, 2021
1 change: 1 addition & 0 deletions launch_testing_ros/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<test_depend>std_msgs</test_depend>
<test_depend>demo_nodes_cpp</test_depend>

<export>
<build_type>ament_python</build_type>
Expand Down
49 changes: 49 additions & 0 deletions launch_testing_ros/test/examples/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
Launch testing examples
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
=======================

.. contents:: Contents
:depth: 2
:local:

Overview
--------

This directory contains examples to help the user get an idea of how to use ``launch_testing`` to write their own test cases. It contains the following examples :

* ``check_node_launch_test.py`` : There might be situations where nodes, once launched, take some time to actually start and we need to wait for the node to start to perform some action. We can simulate this using ``launch.actions.TimerAction``. This example shows one way to detect when a node has been launched. We delay the launch by 5 seconds, and wait for the node to start with a timeout of 8 seconds.
* ``check_msgs_launch_test.py`` : Consider a problem statement where you need to launch a node and check if messages are published on a particular topic. This example demonstrates how to do that, using the talker node from ``demo_nodes_cpp`` package. It uses the ``Event`` object to end the test as soon as the first message is received on the chatter topic, with a timeout of 5 seconds. It is recommended to keep the tests as short as possible.
* ``set_param_launch_test.py`` : Launch a node, set a parameter in it and check if that was successful.

Running the tests
-----------------
Make sure you have your ROS workspace sourced and then run:

``launch_test check_msgs_launch_test.py``

The output should be similar to:
::
[INFO] [launch]: All log files can be found below /home/aditya/.ros/log/2021-09-14-11-12-10-734588-aditya-desktop-2020689
[INFO] [launch]: Default logging verbosity is set to INFO
test_check_if_msgs_published (check_msgs_launch_test.TestFixture) ... 1631643130.800377 [0] launch_tes: using network interface enp0s25 (udp/172.23.2.248) selected arbitrarily from: enp0s25, virbr0
[INFO] [talker-1]: process started with pid [2020700]
[talker-1] 1631643130.834719 [0] talker: using network interface enp0s25 (udp/172.23.2.248) selected arbitrarily from: enp0s25, virbr0
[talker-1] [INFO] [1631643131.848249515] [demo_node_1]: Publishing: 'Hello World: 1'
ok

----------------------------------------------------------------------
Ran 1 test in 1.062s

OK
[INFO] [talker-1]: sending signal 'SIGINT' to process[talker-1]
[talker-1] [INFO] [1631643131.855748369] [rclcpp]: signal_handler(signal_value=2)
[INFO] [talker-1]: process has finished cleanly [pid 2020700]

----------------------------------------------------------------------
Ran 0 tests in 0.000s

OK

Generic examples
----------------

More ``launch_testing`` examples which are not ROS specific are available `here <https://github.com/ros2/launch/tree/master/launch_testing/test/launch_testing/examples>`__ with their explanation available `here <https://github.com/ros2/launch/tree/master/launch_testing#examples>`__
75 changes: 75 additions & 0 deletions launch_testing_ros/test/examples/check_msgs_launch_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from threading import Event
from threading import Thread
import unittest

import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.markers
import pytest
import rclpy
from rclpy.node import Node
from std_msgs.msg import String


@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
return launch.LaunchDescription([
launch_ros.actions.Node(
package='demo_nodes_cpp',
executable='talker',
name='demo_node_1',
),

launch_testing.actions.ReadyToTest()
])


class TestFixture(unittest.TestCase):

def test_check_if_msgs_published(self, proc_output):
rclpy.init()
node = MakeTestNode('test_node')
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=5.0)
assert msgs_received_flag, 'Did not receive msgs !'
rclpy.shutdown()


class MakeTestNode(Node):

def __init__(self, name='test_node'):
super().__init__(name)
self.msg_event_object = Event()

def start_subscriber(self):
# Create a subscriber
self.subscription = self.create_subscription(
String,
'chatter',
self.subscriber_callback,
10
)

# Add a spin thread
self.ros_spin_thread = Thread(target=lambda node: rclpy.spin(node), args=(self,))
self.ros_spin_thread.start()

def subscriber_callback(self, data):
self.msg_event_object.set()
67 changes: 67 additions & 0 deletions launch_testing_ros/test/examples/check_node_launch_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import unittest

import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.markers
import pytest
import rclpy
from rclpy.node import Node


@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
return launch.LaunchDescription([
launch.actions.TimerAction(
period=5.0,
actions=[
launch_ros.actions.Node(
package='demo_nodes_cpp',
executable='talker',
name='demo_node_1',
)
]),
launch_testing.actions.ReadyToTest()
])


class TestFixture(unittest.TestCase):

def test_node_start(self, proc_output):
rclpy.init()
node = MakeTestNode('test_node')
assert node.wait_for_node('demo_node_1', 8.0), 'Node not found !'
rclpy.shutdown()


class MakeTestNode(Node):

def __init__(self, name='test_node'):
super().__init__(name)

def wait_for_node(self, node_name, timeout=8.0):
start = time.time()
flag = False
print('Waiting for node...')
while time.time() - start < timeout and not flag:
flag = node_name in self.get_node_names()
time.sleep(0.1)

return flag
74 changes: 74 additions & 0 deletions launch_testing_ros/test/examples/set_param_launch_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.markers
import pytest
from rcl_interfaces.srv import SetParameters
import rclpy
from rclpy.node import Node


@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
return launch.LaunchDescription([
launch_ros.actions.Node(
package='demo_nodes_cpp',
executable='parameter_blackboard',
name='demo_node_1',
),
launch_testing.actions.ReadyToTest()
])


class TestFixture(unittest.TestCase):

def test_set_parameter(self, proc_output):
rclpy.init()
node = MakeTestNode('test_node')
response = node.set_parameter(state=True)
assert response.successful, 'Could not set parameter!'
rclpy.shutdown()


class MakeTestNode(Node):

def __init__(self, name='test_node'):
super().__init__(name)

def set_parameter(self, state=True, timeout=5.0):
parameters = [rclpy.Parameter('demo_parameter_1', value=state).to_parameter_msg()]

client = self.create_client(SetParameters, 'demo_node_1/set_parameters')
ready = client.wait_for_service(timeout_sec=5.0)
if not ready:
raise RuntimeError('Wait for service timed out')

request = SetParameters.Request()
request.parameters = parameters
future = client.call_async(request)
rclpy.spin_until_future_complete(self, future)

response = future.result()
if response is None:
e = future.exception()
raise RuntimeError(
f"Exception while calling service of node 'demo_node_1': {e}")
return response.results[0]