diff --git a/.gitignore b/.gitignore index c18dd8d..372c13e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ __pycache__/ + diff --git a/inorbit_republisher/config/example_unit_test.yaml b/inorbit_republisher/config/example_unit_test.yaml new file mode 100644 index 0000000..6795e2b --- /dev/null +++ b/inorbit_republisher/config/example_unit_test.yaml @@ -0,0 +1,29 @@ +republishers: +- topic: "/input_topic" + msg_type: "std_msgs/String" + mappings: + - field: "data" + mapping_type: "single_field" + out: + topic: "/output_topic" + key: "input_to_output" +static_publishers: +- value_from: + package_version: "rospy" + out: + topic: "/inorbit/custom_data/0" + key: "rospy_version" +- value_from: + package_version: "inorbit_republisher" + out: + topic: "/inorbit/custom_data/0" + key: "inorbit_republisher_version" +- value_from: + environment_variable: "PYTHONPATH" + out: + topic: "/inorbit/custom_data/0" + key: "python_path" +- value: "let's republish in orbit" + out: + topic: "/inorbit/custom_data/0" + key: "greeting" diff --git a/inorbit_republisher/test/sample_data/README.md b/inorbit_republisher/test/sample_data/README.md index 573f1ec..4c5a717 100644 --- a/inorbit_republisher/test/sample_data/README.md +++ b/inorbit_republisher/test/sample_data/README.md @@ -19,14 +19,30 @@ topics: /my_magnetic_field 10 msgs : sensor_msgs/MagneticField ``` To validate the node works launch the sample by using the ``sample_data.launch`` launch file and look at ``out`` topic: ``rostopic echo /inorbit/custom_data/0``. - -```bash -. ~/catkin_ws/devel/setup.zsh -cd ~/catkin_ws/src/inorbit_republisher/test/sample_data -roslaunch sample_data.launch +1. **Start ROS2 docker container (optional)**: +You can run the commands below for building and running the republisher inside a docker container. + ```bash + docker run -ti --rm \ + --workdir /root/catkin_ws/ \ + -v .:/root/catkin_ws/src/inorbit_republisher \ + osrf/ros:noetic-desktop + # Install catkin + apt update && apt install python3-catkin-tools python3-osrf-pycommon -y + ``` +2. **Build the Workspace**: + Ensure the workspace is built and the environment is sourced: + ```bash + cd ~/catkin_ws + rosdep install --from-paths ~/catkin_ws/src --ignore-src --rosdistro=noetic + catkin clean + catkin build inorbit_republisher --verbose + ``` +3. **Source and Run the Workspace**: + ```bash + . ~/catkin_ws/install/setup.zsh + roslaunch sample_data.launch # On a different terminal windows -source /opt/ros/noetic/setup.bash -$ rostopic echo my_temperature +$ rostopic echo /inorbit/custom_data/0 data: "my_temperature=41.0" --- data: "my_magnetic_field_x=1.4" @@ -37,4 +53,4 @@ data: "my_magnetic_field_z=0.6" --- data: "my_temperature=41.0" ... -``` +``` \ No newline at end of file diff --git a/inorbit_republisher/test/unit_test/README.md b/inorbit_republisher/test/unit_test/README.md new file mode 100644 index 0000000..42ffd9c --- /dev/null +++ b/inorbit_republisher/test/unit_test/README.md @@ -0,0 +1,41 @@ +# Unit Test for ROS Republisher Node + +This repository contains a unit test for a ROS node that republishes messages from `/input_topic` to `/output_topic`. The test ensures the republisher node processes and republishes messages correctly. + +## Test Description + +The unit test is implemented in `test_republisher.py` and uses the `unittest` framework along with `rostest`. It includes the following test cases: + +- **`test_message_republishing`**: + - Publishes a test message (`key=value`) to `/input_topic`. + - Verifies that the republisher node republishes the message with the correct prefix (`input_to_output=`) to `/output_topic`. + +- **`test_no_message`**: + - Ensures that no messages are received on `/output_topic` at the start of the test. + +The test subscribes to `/output_topic` and collects received messages for validation. + +## Running the Unit Test +1. **Start ROS2 docker container (optional)**: +You can run the commands below for building and running the republisher inside a docker container. + ```bash + docker run -ti --rm \ + --workdir /root/catkin_ws/ \ + -v .:/root/catkin_ws/src/inorbit_republisher \ + osrf/ros:noetic-desktop + # Install catkin + apt update && apt install python3-catkin-tools python3-osrf-pycommon -y + ``` +2. **Build the Workspace**: + Ensure the workspace is built and the environment is sourced: + ```bash + cd ~/catkin_ws + rosdep install --from-paths ~/catkin_ws/src --ignore-src --rosdistro=noetic + catkin clean + catkin build inorbit_republisher --verbose + ``` +3. **Source and Run the Workspace**: + ```bash + . ~/catkin_ws/devel/setup.bash + rostest inorbit_republisher test_republisher.test + ``` \ No newline at end of file diff --git a/inorbit_republisher/test/unit_test/test_republisher.py b/inorbit_republisher/test/unit_test/test_republisher.py new file mode 100755 index 0000000..0eebe3f --- /dev/null +++ b/inorbit_republisher/test/unit_test/test_republisher.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +# Unit test for a ROS node that republishes messages from /input_topic to /output_topic. +# It uses the unittest framework and rostest for testing. + +import unittest +import rospy +import rostest +from std_msgs.msg import String + +class TestRepublisher(unittest.TestCase): + def setUp(self): + # Initialize the ROS node for testing + rospy.init_node('test_republisher', anonymous=True) + self.test_pub = rospy.Publisher('/input_topic', String, queue_size=10) + self.received_messages = [] + rospy.Subscriber('/output_topic', String, self.callback) + + # Safe way to Wait for publisher connections to be established + timeout = rospy.get_time() + 5.0 # Timeout limited to 5 seconds + rate = rospy.Rate(5) # ROS Rate at 5Hz + while self.test_pub.get_num_connections() == 0: + if rospy.get_time() > timeout: + self.fail("Test setup failed: Publisher connection timeout.") + rate.sleep() + + def callback(self, msg): + # Method to save messages received on /output_topic. + self.received_messages.append(msg.data) + + def test_message_republishing(self): + # Method to send a message (key=value) to /input_topic and + # check if the republisher correctly adds the prefix (input_to_output=) and sends it to /output_topic. + test_message = "key=value" + expected_message = f"input_to_output={test_message}" + rospy.loginfo(f"Publishing: {test_message} to /input_topic") + self.test_pub.publish(String(data=test_message)) + try: + msg = rospy.wait_for_message('/output_topic', String, timeout=35) + print(f"Received message: {msg.data}") + except rospy.ROSException: + print("Timeout exceeded while waiting for a message.") + rospy.loginfo(f"Messages received: {self.received_messages}") + self.assertIn(expected_message, self.received_messages) + +if __name__ == '__main__': + rostest.rosrun('my_ros_package', 'test_republisher', TestRepublisher) diff --git a/inorbit_republisher/test/unit_test/test_republisher.test b/inorbit_republisher/test/unit_test/test_republisher.test new file mode 100644 index 0000000..00621de --- /dev/null +++ b/inorbit_republisher/test/unit_test/test_republisher.test @@ -0,0 +1,9 @@ + + + + + + + + +