Skip to content

Commit

Permalink
Added launch testing examples : ROS focused (#263)
Browse files Browse the repository at this point in the history
* added helloworld examples, README

Signed-off-by: Aditya Pande <[email protected]>

* README fix

Signed-off-by: Aditya Pande <[email protected]>

* Copyright header added

Signed-off-by: Aditya Pande <[email protected]>

* Added section : How to run the tests

Signed-off-by: Aditya Pande <[email protected]>

* Changed Parameter style, README link to examples, import order

Signed-off-by: Aditya Pande <[email protected]>

* Moved hello world example to ros2/launch

Signed-off-by: Aditya Pande <[email protected]>

* Updated README

Signed-off-by: Aditya Pande <[email protected]>

* Moved files for CI

Signed-off-by: Aditya Pande <[email protected]>

* CI fix

Signed-off-by: Aditya Pande <[email protected]>

* Added test dependency to package.xml

Signed-off-by: Aditya Pande <[email protected]>

* Removed dependency on demo_nodes_cpp

Signed-off-by: Aditya Pande <[email protected]>

* Merged README

Signed-off-by: Aditya Pande <[email protected]>

* README edit

Signed-off-by: Aditya Pande <[email protected]>
  • Loading branch information
adityapande-1995 authored Sep 16, 2021
1 parent b348a81 commit 798995e
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 0 deletions.
34 changes: 34 additions & 0 deletions launch_testing_ros/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,40 @@ This test launches the talker and listener example nodes from demo\_nodes\_py an
with them via their ROS interfaces. Remapping rules are used so that one of the tests can sit in
between the talker and the listener and change the data on the fly.

### `check_node_launch_test.py`

Usage:

```sh
launch_test test/examples/check_node_launch_test.py
```

There might be situations where nodes, once launched, take some time to actually start and we need to wait for the node to start to perform some action.
We can simulate this using ``launch.actions.TimerAction``. This example shows one way to detect when a node has been launched.
We delay the launch by 5 seconds, and wait for the node to start with a timeout of 8 seconds.

### `check_msgs_launch_test.py`

Usage:

```sh
launch_test test/examples/check_msgs_launch_test.py
```

Consider a problem statement where you need to launch a node and check if messages are published on a particular topic.
This example demonstrates how to do that, using a talker node.
It uses the ``Event`` object to end the test as soon as the first message is received on the chatter topic, with a timeout of 5 seconds.

### `set_param_launch_test.py`

Usage:

```sh
launch_test test/examples/set_param_launch_test.py
```

This example demonstrates how to launch a node, set a parameter in it and check if that was successful.

#### test\_fuzzy\_data
This test gives an example of what a test that fuzzes data might look like. A ROS subscriber
and publisher pair encapsulated in a `DataRepublisher` object changes the string "Hello World" to
Expand Down
81 changes: 81 additions & 0 deletions launch_testing_ros/test/examples/check_msgs_launch_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys

from threading import Event
from threading import Thread
import unittest

import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.markers
import pytest
import rclpy
from rclpy.node import Node
from std_msgs.msg import String


@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
path_to_test = os.path.dirname(__file__)

return launch.LaunchDescription([
launch_ros.actions.Node(
executable=sys.executable,
arguments=[os.path.join(path_to_test, 'talker.py')],
additional_env={'PYTHONUNBUFFERED': '1'},
name='demo_node_1',
),

launch_testing.actions.ReadyToTest()
])


class TestFixture(unittest.TestCase):

def test_check_if_msgs_published(self, proc_output):
rclpy.init()
node = MakeTestNode('test_node')
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=5.0)
assert msgs_received_flag, 'Did not receive msgs !'
rclpy.shutdown()


class MakeTestNode(Node):

def __init__(self, name='test_node'):
super().__init__(name)
self.msg_event_object = Event()

def start_subscriber(self):
# Create a subscriber
self.subscription = self.create_subscription(
String,
'chatter',
self.subscriber_callback,
10
)

# Add a spin thread
self.ros_spin_thread = Thread(target=lambda node: rclpy.spin(node), args=(self,))
self.ros_spin_thread.start()

def subscriber_callback(self, data):
self.msg_event_object.set()
72 changes: 72 additions & 0 deletions launch_testing_ros/test/examples/check_node_launch_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import time
import unittest

import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.markers
import pytest
import rclpy
from rclpy.node import Node


@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
path_to_test = os.path.dirname(__file__)

return launch.LaunchDescription([
launch.actions.TimerAction(
period=5.0,
actions=[
launch_ros.actions.Node(
executable=sys.executable,
arguments=[os.path.join(path_to_test, 'talker.py')],
additional_env={'PYTHONUNBUFFERED': '1'},
name='demo_node_1',
)
]),
launch_testing.actions.ReadyToTest()
])


class TestFixture(unittest.TestCase):

def test_node_start(self, proc_output):
rclpy.init()
node = MakeTestNode('test_node')
assert node.wait_for_node('demo_node_1', 8.0), 'Node not found !'
rclpy.shutdown()


class MakeTestNode(Node):

def __init__(self, name='test_node'):
super().__init__(name)

def wait_for_node(self, node_name, timeout=8.0):
start = time.time()
flag = False
print('Waiting for node...')
while time.time() - start < timeout and not flag:
flag = node_name in self.get_node_names()
time.sleep(0.1)

return flag
41 changes: 41 additions & 0 deletions launch_testing_ros/test/examples/parameter_blackboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import rclpy
from rclpy.node import Node


class TestNode(Node):

def __init__(self):
super().__init__('parameter_blackboard')
self.declare_parameter('demo_parameter_1', False)


def main(args=None):
rclpy.init(args=args)

node = TestNode()

try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()


if __name__ == '__main__':
main()
79 changes: 79 additions & 0 deletions launch_testing_ros/test/examples/set_param_launch_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import unittest

import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.markers
import pytest
from rcl_interfaces.srv import SetParameters
import rclpy
from rclpy.node import Node


@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
path_to_test = os.path.dirname(__file__)

return launch.LaunchDescription([
launch_ros.actions.Node(
executable=sys.executable,
arguments=[os.path.join(path_to_test, 'parameter_blackboard.py')],
additional_env={'PYTHONUNBUFFERED': '1'},
name='demo_node_1',
),
launch_testing.actions.ReadyToTest()
])


class TestFixture(unittest.TestCase):

def test_set_parameter(self, proc_output):
rclpy.init()
node = MakeTestNode('test_node')
response = node.set_parameter(state=True)
assert response.successful, 'Could not set parameter!'
rclpy.shutdown()


class MakeTestNode(Node):

def __init__(self, name='test_node'):
super().__init__(name)

def set_parameter(self, state=True, timeout=5.0):
parameters = [rclpy.Parameter('demo_parameter_1', value=state).to_parameter_msg()]

client = self.create_client(SetParameters, 'demo_node_1/set_parameters')
ready = client.wait_for_service(timeout_sec=5.0)
if not ready:
raise RuntimeError('Wait for service timed out')

request = SetParameters.Request()
request.parameters = parameters
future = client.call_async(request)
rclpy.spin_until_future_complete(self, future)

response = future.result()
if response is None:
e = future.exception()
raise RuntimeError(
f"Exception while calling service of node 'demo_node_1': {e}")
return response.results[0]

0 comments on commit 798995e

Please sign in to comment.