Skip to content

Commit

Permalink
Fixing ntp launchtest (ros#330)
Browse files Browse the repository at this point in the history
- using a launchtest

---------

Signed-off-by: Christian Henkel <[email protected]>
  • Loading branch information
ct2034 authored Mar 28, 2024
1 parent 8ee5640 commit 431926f
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 184 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ jobs:
container: ubuntu:${{ matrix.os }}
steps:
- uses: ros-tooling/setup-ros@master
# - run: |
# sudo apt install -y python3-pip
# pip3 install --break-system-packages 'flake8<5' # fix flake8.exceptions.FailedToLoadPlugin: Flake8 failed to load plugin "pycodestyle" due to cannot import name 'missing_whitespace_around_operator' from 'pycodestyle' (/usr/lib/python3/dist-packages/pycodestyle.py).
# if: ${{ matrix.os == '24.04' }}
- uses: ros-tooling/action-ros-ci@master
with:
target-ros2-distro: ${{ matrix.distro }}
Expand Down
9 changes: 5 additions & 4 deletions diagnostic_common_diagnostics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@ install(PROGRAMS

if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
find_package(launch_testing_ament_cmake REQUIRED)
ament_lint_auto_find_test_dependencies()

find_package(ament_cmake_pytest REQUIRED)
ament_add_pytest_test(
test_cpu_monitor
test/systemtest/test_cpu_monitor.py
TIMEOUT 10)
ament_add_pytest_test(
test_ntp_monitor
test/systemtest/test_ntp_monitor.py
TIMEOUT 10)
add_launch_test(
test/systemtest/test_ntp_monitor_launchtest.py
TARGET ntp_monitor_launchtest
TIMEOUT 20)
endif()

ament_package()
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@
class NTPMonitor(Node):
"""A diagnostic task that monitors the NTP offset of the system clock."""

def __init__(self, ntp_hostname, offset=500, self_offset=500,
def __init__(self, ntp_hostname, ntp_port, offset=500, self_offset=500,
diag_hostname=None, error_offset=5000000,
do_self_test=True):
"""Initialize the NTPMonitor."""
super().__init__(__class__.__name__)

self.ntp_hostname = ntp_hostname
self.ntp_port = ntp_port
self.offset = offset
self.self_offset = self_offset
self.diag_hostname = diag_hostname
Expand All @@ -67,7 +68,8 @@ def __init__(self, ntp_hostname, offset=500, self_offset=500,
self.stat = DIAG.DiagnosticStatus()
self.stat.level = DIAG.DiagnosticStatus.OK
self.stat.name = 'NTP offset from ' + \
self.diag_hostname + ' to ' + self.ntp_hostname
self.diag_hostname + ' to ' + self.ntp_hostname + \
':' + str(self.ntp_port)
self.stat.message = 'OK'
self.stat.hardware_id = self.hostname
self.stat.values = []
Expand Down Expand Up @@ -127,7 +129,10 @@ def add_kv(stat_values, key, value):
ntp_client = ntplib.NTPClient()
response = None
try:
response = ntp_client.request(self.ntp_hostname, version=3)
response = ntp_client.request(
self.ntp_hostname,
port=self.ntp_port,
version=3)
except ntplib.NTPException as e:
self.get_logger().error(f'NTP Error: {e}')
st.level = DIAG.DiagnosticStatus.ERROR
Expand Down Expand Up @@ -155,11 +160,17 @@ def add_kv(stat_values, key, value):


def ntp_monitor_main(argv=sys.argv[1:]):
# filter out ROS args
argv = [a for a in argv if not a.startswith('__') and not a == '--ros-args' and not a == '-r']

import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--ntp_hostname',
action='store', default='0.pool.ntp.org',
type=str)
parser.add_argument('--ntp_port',
action='store', default=123,
type=int)
parser.add_argument('--offset-tolerance', dest='offset_tol',
action='store', default=500,
help='Offset from NTP host [us]', metavar='OFFSET-TOL',
Expand Down Expand Up @@ -188,7 +199,8 @@ def ntp_monitor_main(argv=sys.argv[1:]):
assert offset < error_offset, \
'Offset tolerance must be less than error offset tolerance'

ntp_monitor = NTPMonitor(args.ntp_hostname, offset, self_offset,
ntp_monitor = NTPMonitor(args.ntp_hostname, args.ntp_port,
offset, self_offset,
args.diag_hostname, error_offset,
args.do_self_test)

Expand Down
1 change: 1 addition & 0 deletions diagnostic_common_diagnostics/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
<test_depend>ament_cmake_lint_cmake</test_depend>

<test_depend>ament_cmake_pytest</test_depend>
<test_depend>launch_testing_ament_cmake</test_depend>

<export>
<build_type>ament_cmake</build_type>
Expand Down
130 changes: 0 additions & 130 deletions diagnostic_common_diagnostics/test/systemtest/test_ntp_monitor.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,60 +32,84 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import os
import unittest

import ament_index_python
from diagnostic_msgs.msg import DiagnosticArray

import launch

import launch_pytest
from launch_pytest.tools import process as process_tools
import launch_ros

import launch_testing

from launch_testing_ros import WaitForTopics

import pytest

import rclpy


@pytest.fixture
def ntp_monitor_proc():
# Launch a process to test
return launch.actions.ExecuteProcess(
cmd=[
os.path.join(
ament_index_python.get_package_prefix(
'diagnostic_common_diagnostics'),
'lib',
'diagnostic_common_diagnostics',
'ntp_monitor.py'
),
],
),


@launch_pytest.fixture
def launch_description(ntp_monitor_proc):
@pytest.mark.launch_test
def generate_test_description():
"""Launch the ntp_monitor node and return a launch description."""
return launch.LaunchDescription([
ntp_monitor_proc,
launch_ros.actions.Node(
package='diagnostic_common_diagnostics',
executable='ntp_monitor.py',
name='ntp_monitor',
output='screen',
arguments=['--offset-tolerance', '10000',
'--error-offset-tolerance', '20000']
# 10s, 20s, we are not testing if your clock is correct
),
launch_testing.actions.ReadyToTest()
])


@pytest.mark.skip(reason='This test is not working yet')
@pytest.mark.launch(fixture=launch_description)
def test_read_stdout(ntp_monitor_proc, launch_context):
"""Check if 'ntp_monitor' was found in the stdout."""
def validate_output(output):
# this function can use assertions to validate the output or return a boolean.
# pytest generates easier to understand failures when assertions are used.
assert output.splitlines() == [
'ntp_monitor'], 'process never printed ntp_monitor'
process_tools.assert_output_sync(
launch_context, ntp_monitor_proc, validate_output, timeout=5)

def validate_output(output):
return output == 'this will never happen'
assert not process_tools.wait_for_output_sync(
launch_context, ntp_monitor_proc, validate_output, timeout=0.1)
yield
# this is executed after launch service shutdown
assert ntp_monitor_proc.return_code == 0
class TestNtpMonitor(unittest.TestCase):
"""Test if the ntp_monitor node is publishing diagnostics."""

def __init__(self, methodName: str = 'runTest') -> None:
super().__init__(methodName)
self.received_messages = []

def _received_message(self, msg):
self.received_messages.append(msg)

def _get_min_level(self):
levels = [
int.from_bytes(status.level, 'little')
for diag in self.received_messages
for status in diag.status]
if len(levels) == 0:
return -1
return min(levels)

def test_topic_published(self):
"""Test if the ntp_monitor node is publishing diagnostics."""
with WaitForTopics(
[('/diagnostics', DiagnosticArray)],
timeout=5
):
print('Topic found')

rclpy.init()
test_node = rclpy.create_node('test_node')
test_node.create_subscription(
DiagnosticArray,
'/diagnostics',
self._received_message,
1
)

while len(self.received_messages) < 10:
rclpy.spin_once(test_node, timeout_sec=1)
if (min_level := self._get_min_level()) == 0:
break

test_node.destroy_node()
rclpy.shutdown()
print(f'Got {len(self.received_messages)} messages:')
for msg in self.received_messages:
print(msg)
self.assertEqual(min_level, 0)

0 comments on commit 431926f

Please sign in to comment.