Skip to content

Commit

Permalink
Tools: ros2: Add geopose test
Browse files Browse the repository at this point in the history
* Add missing deps
* Reduce some duplication

Signed-off-by: Ryan Friedman <[email protected]>
  • Loading branch information
Ryanf55 authored and Ryan Friedman committed Sep 8, 2024
1 parent 1e8e250 commit 0a2dcc7
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 11 deletions.
5 changes: 5 additions & 0 deletions Tools/ros2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ colcon test --packages-select ardupilot_dds_tests
colcon test-result --all --verbose
```

To debug a specific test, you can do the following:
```
colcon --log-level DEBUG test --packages-select ardupilot_dds_tests --event-handlers=console_direct+ --pytest-args -k test_dds_udp_geopose_msg_recv -s
```

## Install macOS

The install procedure on macOS is similar, except that all dependencies
Expand Down
4 changes: 3 additions & 1 deletion Tools/ros2/ardupilot_dds_tests/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@
<test_depend>ament_lint_auto</test_depend>
<test_depend>ardupilot_msgs</test_depend>
<test_depend>ardupilot_sitl</test_depend>
<test_depend>builtin_interfaces</test_depend>
<exec_depend>builtin_interfaces</exec_depend>
<exec_depend>geographic_msgs</exec_depend>
<test_depend>launch</test_depend>
<test_depend>launch_pytest</test_depend>
<test_depend>launch_ros</test_depend>
<exec_depend>micro_ros_msgs</exec_depend>
<test_depend>python3-pytest</test_depend>
<test_depend>rclpy</test_depend>
<test_depend>sensor_msgs</test_depend>
<test_depend>python-scipy</test_depend>


<export>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# Copyright 2023 ArduPilot.org.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Bring up ArduPilot SITL and check the GeoPose message is being published."""

import launch_pytest
import math
import pytest
import rclpy
import rclpy.node
from scipy.spatial.transform import Rotation as R
import threading

from launch import LaunchDescription

from launch_pytest.tools import process as process_tools

from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
from rclpy.qos import QoSHistoryPolicy

from geographic_msgs.msg import GeoPoseStamped

TOPIC = "ap/geopose/filtered"
# Copied from locations.txt
CMAC_LAT = -35.363261
CMAC_LON = 149.165230
CMAC_ABS_ALT = 584
CMAC_HEADING = 353

def ros_quat_to_heading(quat):
# By default, scipy follows scalar-last order – (x, y, z, w)
rot = R.from_quat([quat.x, quat.y, quat.z, quat.w])
r, p, y = rot.as_euler(seq="xyz", degrees=True)
return y


def validate_position_cmac(position):
"""Returns true if the vehicle is at CMAC"""
LAT_LON_TOL = 1E-5
# print("FOO", math.fabs(pose.position.latitude - CMAC_LAT))

return math.isclose(position.latitude, CMAC_LAT, abs_tol=LAT_LON_TOL) and \
math.isclose(position.longitude, CMAC_LON, abs_tol=LAT_LON_TOL) and \
math.isclose(position.altitude, CMAC_ABS_ALT, abs_tol = 1.0)

def validate_heading_cmac(orientation):
"""Returns true if the vehicle is facing the right way for CMAC"""
return math.isclose(ros_quat_to_heading(orientation), CMAC_HEADING)

class GeoPoseListener(rclpy.node.Node):
"""Subscribe to GeoPoseStamped messages"""

def __init__(self):
"""Initialise the node."""
super().__init__("geopose_listener")
self.msg_event_object = threading.Event()
self.position_correct_event_object = threading.Event()

# Declare and acquire `topic` parameter
self.declare_parameter("topic", TOPIC)
self.topic = self.get_parameter("topic").get_parameter_value().string_value

def start_subscriber(self):
"""Start the subscriber."""
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
)

self.subscription = self.create_subscription(GeoPoseStamped, self.topic, self.subscriber_callback, qos_profile)

# Add a spin thread.
self.ros_spin_thread = threading.Thread(target=lambda node: rclpy.spin(node), args=(self,))
self.ros_spin_thread.start()

def subscriber_callback(self, msg):
"""Process a GeoPoseStamped message."""
self.msg_event_object.set()

position = msg.pose.position

self.get_logger().info(
"From AP : Position [lat:{}, lon: {}]".format(position.latitude, position.longitude)
)

if validate_position_cmac(msg.pose.position):
self.position_correct_event_object.set()


@launch_pytest.fixture
def launch_sitl_copter_dds_serial(sitl_copter_dds_serial):
"""Fixture to create the launch description."""
sitl_ld, sitl_actions = sitl_copter_dds_serial

ld = LaunchDescription(
[
sitl_ld,
launch_pytest.actions.ReadyToTest(),
]
)
actions = sitl_actions
yield ld, actions


@launch_pytest.fixture
def launch_sitl_copter_dds_udp(sitl_copter_dds_udp):
"""Fixture to create the launch description."""
sitl_ld, sitl_actions = sitl_copter_dds_udp

ld = LaunchDescription(
[
sitl_ld,
launch_pytest.actions.ReadyToTest(),
]
)
actions = sitl_actions
yield ld, actions


@pytest.mark.launch(fixture=launch_sitl_copter_dds_serial)
def test_dds_serial_geopose_msg_recv(launch_context, launch_sitl_copter_dds_serial):
"""Test position messages are published by AP_DDS."""
_, actions = launch_sitl_copter_dds_serial
virtual_ports = actions["virtual_ports"].action
micro_ros_agent = actions["micro_ros_agent"].action
mavproxy = actions["mavproxy"].action
sitl = actions["sitl"].action

# Wait for process to start.
process_tools.wait_for_start_sync(launch_context, virtual_ports, timeout=2)
process_tools.wait_for_start_sync(launch_context, micro_ros_agent, timeout=2)
process_tools.wait_for_start_sync(launch_context, mavproxy, timeout=2)
process_tools.wait_for_start_sync(launch_context, sitl, timeout=2)

rclpy.init()
try:
node = GeoPoseListener()
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=10.0)
assert msgs_received_flag, f"Did not receive '{TOPIC}' msgs."
pose_correct_flag = node.position_correct_event_object.wait(timeout=10.0)
assert pose_correct_flag, f"Did not receive correct position."
finally:
rclpy.shutdown()
yield


@pytest.mark.launch(fixture=launch_sitl_copter_dds_udp)
def test_dds_udp_geopose_msg_recv(launch_context, launch_sitl_copter_dds_udp):
"""Test position messages are published by AP_DDS."""
_, actions = launch_sitl_copter_dds_udp
micro_ros_agent = actions["micro_ros_agent"].action
mavproxy = actions["mavproxy"].action
sitl = actions["sitl"].action

# Wait for process to start.
process_tools.wait_for_start_sync(launch_context, micro_ros_agent, timeout=2)
process_tools.wait_for_start_sync(launch_context, mavproxy, timeout=2)
process_tools.wait_for_start_sync(launch_context, sitl, timeout=2)

rclpy.init()
try:
node = GeoPoseListener()
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=10.0)
assert msgs_received_flag, f"Did not receive '{TOPIC}' msgs."
pose_correct_flag = node.position_correct_event_object.wait(timeout=10.0)
assert pose_correct_flag, f"Did not receive correct position."
finally:
rclpy.shutdown()
yield
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,19 @@

from sensor_msgs.msg import NavSatFix

TOPIC = "ap/navsat/navsat0"


class NavSatFixListener(rclpy.node.Node):
"""Subscribe to NavSatFix messages on /ap/navsat/navsat0."""
"""Subscribe to NavSatFix messages."""

def __init__(self):
"""Initialise the node."""
super().__init__("navsatfix_listener")
self.msg_event_object = threading.Event()

# Declare and acquire `topic` parameter
self.declare_parameter("topic", "ap/navsat/navsat0")
self.declare_parameter("topic", TOPIC)
self.topic = self.get_parameter("topic").get_parameter_value().string_value

def start_subscriber(self):
Expand Down Expand Up @@ -117,7 +119,7 @@ def test_dds_serial_navsat_msg_recv(launch_context, launch_sitl_copter_dds_seria
node = NavSatFixListener()
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=10.0)
assert msgs_received_flag, "Did not receive 'ap/navsat/navsat0' msgs."
assert msgs_received_flag, f"Did not receive '{TOPIC}' msgs."
finally:
rclpy.shutdown()
yield
Expand All @@ -141,7 +143,7 @@ def test_dds_udp_navsat_msg_recv(launch_context, launch_sitl_copter_dds_udp):
node = NavSatFixListener()
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=10.0)
assert msgs_received_flag, "Did not receive 'ap/navsat/navsat0' msgs."
assert msgs_received_flag, f"Did not receive '{TOPIC}' msgs."
finally:
rclpy.shutdown()
yield
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,19 @@

from builtin_interfaces.msg import Time

TOPIC = "ap/time"


class TimeListener(rclpy.node.Node):
"""Subscribe to Time messages on /ap/time."""
"""Subscribe to Time messages."""

def __init__(self):
"""Initialise the node."""
super().__init__("time_listener")
self.msg_event_object = threading.Event()

# Declare and acquire `topic` parameter.
self.declare_parameter("topic", "ap/time")
self.declare_parameter("topic", TOPIC)
self.topic = self.get_parameter("topic").get_parameter_value().string_value

def start_subscriber(self):
Expand Down Expand Up @@ -89,7 +91,7 @@ def launch_sitl_copter_dds_udp(sitl_copter_dds_udp):

@pytest.mark.launch(fixture=launch_sitl_copter_dds_serial)
def test_dds_serial_time_msg_recv(launch_context, launch_sitl_copter_dds_serial):
"""Test /ap/time is published by AP_DDS."""
"""Test Time is published by AP_DDS."""
_, actions = launch_sitl_copter_dds_serial
virtual_ports = actions["virtual_ports"].action
micro_ros_agent = actions["micro_ros_agent"].action
Expand All @@ -107,15 +109,15 @@ def test_dds_serial_time_msg_recv(launch_context, launch_sitl_copter_dds_serial)
node = TimeListener()
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=10.0)
assert msgs_received_flag, "Did not receive 'ROS_Time' msgs."
assert msgs_received_flag, f"Did not receive '{TOPIC}' msgs."
finally:
rclpy.shutdown()
yield


@pytest.mark.launch(fixture=launch_sitl_copter_dds_udp)
def test_dds_udp_time_msg_recv(launch_context, launch_sitl_copter_dds_udp):
"""Test /ap/time is published by AP_DDS."""
"""Test Time is published by AP_DDS."""
_, actions = launch_sitl_copter_dds_udp
micro_ros_agent = actions["micro_ros_agent"].action
mavproxy = actions["mavproxy"].action
Expand All @@ -131,7 +133,7 @@ def test_dds_udp_time_msg_recv(launch_context, launch_sitl_copter_dds_udp):
node = TimeListener()
node.start_subscriber()
msgs_received_flag = node.msg_event_object.wait(timeout=10.0)
assert msgs_received_flag, "Did not receive 'ROS_Time' msgs."
assert msgs_received_flag, f"Did not receive '{TOPIC}' msgs."
finally:
rclpy.shutdown()
yield

0 comments on commit 0a2dcc7

Please sign in to comment.