-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
eca3638
commit 8c709e8
Showing
4 changed files
with
147 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
republishers: | ||
- topic: "/input_topic" | ||
msg_type: "std_msgs/String" | ||
mappings: | ||
- field: "data" | ||
mapping_type: "single_field" | ||
out: | ||
topic: "/output_topic" | ||
key: "input_to_output" | ||
static_publishers: | ||
- value_from: | ||
package_version: "rospy" | ||
out: | ||
topic: "/inorbit/custom_data" | ||
key: "rospy_version" | ||
- value_from: | ||
package_version: "inorbit_republisher" | ||
out: | ||
topic: "/inorbit/custom_data" | ||
key: "inorbit_republisher_version" | ||
- value_from: | ||
environment_variable: "PYTHONPATH" | ||
out: | ||
topic: "/inorbit/custom_data" | ||
key: "python_path" | ||
- value: "let's republish in orbit" | ||
out: | ||
topic: "/inorbit/custom_data" | ||
key: "greeting" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
<launch> | ||
<node name="inorbit_republisher" pkg="inorbit_republisher" exec="republisher"> | ||
<param name="config" value="$(dirname)/config/example_unit_test.yaml" /> | ||
</node> | ||
</launch> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
# Unit Test for ROS Republisher Node | ||
|
||
This repository contains a unit test for a ROS node that republishes messages from `/input_topic` to `/output_topic`. The test ensures the republisher node processes and republishes messages correctly. | ||
|
||
## Test Description | ||
|
||
The unit test is implemented in `test_republisher.py` and uses the `unittest` framework along with `rostest`. It includes the following test cases: | ||
|
||
- **`test_message_republishing`**: | ||
- Publishes a test message (`key=value`) to `/input_topic`. | ||
- Verifies that the republisher node republishes the message with the correct prefix (`input_to_output=`) to `/output_topic`. | ||
|
||
- **`test_no_message`**: | ||
- Ensures that no messages are received on `/output_topic` at the start of the test. | ||
|
||
The test subscribes to `/output_topic` and collects received messages for validation. | ||
|
||
## Running the Unit Test | ||
1. **Start ROS 2 docker container (optional)**: | ||
You can run the commands below for building and running the republisher inside a docker container. | ||
```bash | ||
docker run -ti --rm \ | ||
--workdir /root/ros2_ws/ \ | ||
-v .:/root/ros2_ws/src/inorbit_republisher \ | ||
osrf/ros:jazzy-desktop | ||
``` | ||
2. **Build the Workspace**: | ||
Ensure the workspace is built and the environment is sourced: | ||
```bash | ||
cd ~/ros2_ws | ||
rosdep install --from-paths src -y --ignore-src | ||
colcon build --packages-select inorbit_republisher --symlink-install | ||
``` | ||
3. **Source and Run the Workspace**: | ||
```bash | ||
source install/local_setup.bash | ||
colcon test --packages-select inorbit_republisher --event-handlers console_cohesion+ console_direct+ | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
#!/usr/bin/env python3 | ||
import os | ||
import time | ||
import unittest | ||
import pytest | ||
import rclpy | ||
from rclpy.node import Node | ||
from std_msgs.msg import String | ||
from launch import LaunchDescription | ||
from launch.actions import IncludeLaunchDescription | ||
from launch.launch_description_sources import AnyLaunchDescriptionSource | ||
import launch_testing | ||
from ament_index_python.packages import get_package_share_directory | ||
|
||
|
||
@pytest.mark.launch_test | ||
def generate_test_description(): | ||
# Use dynamic path instead of hardcoded absolute path | ||
launch_file_path = os.path.join( | ||
get_package_share_directory("inorbit_republisher"), | ||
"test_republisher.launch.xml" | ||
) | ||
return LaunchDescription([ | ||
IncludeLaunchDescription( | ||
AnyLaunchDescriptionSource(launch_file_path) | ||
), | ||
launch_testing.actions.ReadyToTest(), | ||
]) | ||
|
||
|
||
class TestRepublisher(unittest.TestCase): | ||
|
||
def setUp(self): | ||
rclpy.init() | ||
self.node = rclpy.create_node('test_republisher') | ||
self.test_pub = self.node.create_publisher(String, '/input_topic', 10) | ||
self.received_messages = [] | ||
|
||
self.node.create_subscription(String, '/output_topic', self.callback, 10) | ||
# Wait up to 5 seconds for at least one subscriber on /input_topic | ||
end_time = time.time() + 5 | ||
while time.time() < end_time: | ||
if self.test_pub.get_subscription_count() > 0: | ||
break | ||
rclpy.spin_once(self.node, timeout_sec=0.1) | ||
else: | ||
self.fail("Test setup failed: Publisher connection timeout.") | ||
|
||
def callback(self, msg): | ||
self.received_messages.append(msg.data) | ||
|
||
def test_message_republishing(self): | ||
test_message = "key=value" | ||
expected_message = f"input_to_output={test_message}" | ||
self.node.get_logger().info(f"Publishing: {test_message} to /input_topic") | ||
|
||
self.test_pub.publish(String(data=test_message)) | ||
|
||
# Spin for up to 5 seconds waiting for the republished message | ||
end_time = time.time() + 5 | ||
while time.time() < end_time: | ||
rclpy.spin_once(self.node, timeout_sec=0.1) | ||
if expected_message in self.received_messages: | ||
break | ||
|
||
self.node.get_logger().info(f"Messages received: {self.received_messages}") | ||
self.assertIn(expected_message, self.received_messages) | ||
|
||
def tearDown(self): | ||
self.node.destroy_node() | ||
rclpy.shutdown() | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |