diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index 4d149764..1e07c0ad 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -33,10 +33,6 @@ jobs:
container: ubuntu:${{ matrix.os }}
steps:
- uses: ros-tooling/setup-ros@master
- # - run: |
- # sudo apt install -y python3-pip
- # pip3 install --break-system-packages 'flake8<5' # fix flake8.exceptions.FailedToLoadPlugin: Flake8 failed to load plugin "pycodestyle" due to cannot import name 'missing_whitespace_around_operator' from 'pycodestyle' (/usr/lib/python3/dist-packages/pycodestyle.py).
- # if: ${{ matrix.os == '24.04' }}
- uses: ros-tooling/action-ros-ci@master
with:
target-ros2-distro: ${{ matrix.distro }}
diff --git a/diagnostic_common_diagnostics/CMakeLists.txt b/diagnostic_common_diagnostics/CMakeLists.txt
index 5c0f11f6..e62c86ec 100644
--- a/diagnostic_common_diagnostics/CMakeLists.txt
+++ b/diagnostic_common_diagnostics/CMakeLists.txt
@@ -15,6 +15,7 @@ install(PROGRAMS
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
+ find_package(launch_testing_ament_cmake REQUIRED)
ament_lint_auto_find_test_dependencies()
find_package(ament_cmake_pytest REQUIRED)
@@ -22,10 +23,10 @@ if(BUILD_TESTING)
test_cpu_monitor
test/systemtest/test_cpu_monitor.py
TIMEOUT 10)
- ament_add_pytest_test(
- test_ntp_monitor
- test/systemtest/test_ntp_monitor.py
- TIMEOUT 10)
+ add_launch_test(
+ test/systemtest/test_ntp_monitor_launchtest.py
+ TARGET ntp_monitor_launchtest
+ TIMEOUT 20)
endif()
ament_package()
diff --git a/diagnostic_common_diagnostics/diagnostic_common_diagnostics/ntp_monitor.py b/diagnostic_common_diagnostics/diagnostic_common_diagnostics/ntp_monitor.py
index 4d52e9e8..9462cebb 100755
--- a/diagnostic_common_diagnostics/diagnostic_common_diagnostics/ntp_monitor.py
+++ b/diagnostic_common_diagnostics/diagnostic_common_diagnostics/ntp_monitor.py
@@ -47,13 +47,14 @@
class NTPMonitor(Node):
"""A diagnostic task that monitors the NTP offset of the system clock."""
- def __init__(self, ntp_hostname, offset=500, self_offset=500,
+ def __init__(self, ntp_hostname, ntp_port, offset=500, self_offset=500,
diag_hostname=None, error_offset=5000000,
do_self_test=True):
"""Initialize the NTPMonitor."""
super().__init__(__class__.__name__)
self.ntp_hostname = ntp_hostname
+ self.ntp_port = ntp_port
self.offset = offset
self.self_offset = self_offset
self.diag_hostname = diag_hostname
@@ -67,7 +68,8 @@ def __init__(self, ntp_hostname, offset=500, self_offset=500,
self.stat = DIAG.DiagnosticStatus()
self.stat.level = DIAG.DiagnosticStatus.OK
self.stat.name = 'NTP offset from ' + \
- self.diag_hostname + ' to ' + self.ntp_hostname
+ self.diag_hostname + ' to ' + self.ntp_hostname + \
+ ':' + str(self.ntp_port)
self.stat.message = 'OK'
self.stat.hardware_id = self.hostname
self.stat.values = []
@@ -127,7 +129,10 @@ def add_kv(stat_values, key, value):
ntp_client = ntplib.NTPClient()
response = None
try:
- response = ntp_client.request(self.ntp_hostname, version=3)
+ response = ntp_client.request(
+ self.ntp_hostname,
+ port=self.ntp_port,
+ version=3)
except ntplib.NTPException as e:
self.get_logger().error(f'NTP Error: {e}')
st.level = DIAG.DiagnosticStatus.ERROR
@@ -155,11 +160,17 @@ def add_kv(stat_values, key, value):
def ntp_monitor_main(argv=sys.argv[1:]):
+ # filter out ROS args
+ argv = [a for a in argv if not a.startswith('__') and not a == '--ros-args' and not a == '-r']
+
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--ntp_hostname',
action='store', default='0.pool.ntp.org',
type=str)
+ parser.add_argument('--ntp_port',
+ action='store', default=123,
+ type=int)
parser.add_argument('--offset-tolerance', dest='offset_tol',
action='store', default=500,
help='Offset from NTP host [us]', metavar='OFFSET-TOL',
@@ -188,7 +199,8 @@ def ntp_monitor_main(argv=sys.argv[1:]):
assert offset < error_offset, \
'Offset tolerance must be less than error offset tolerance'
- ntp_monitor = NTPMonitor(args.ntp_hostname, offset, self_offset,
+ ntp_monitor = NTPMonitor(args.ntp_hostname, args.ntp_port,
+ offset, self_offset,
args.diag_hostname, error_offset,
args.do_self_test)
diff --git a/diagnostic_common_diagnostics/package.xml b/diagnostic_common_diagnostics/package.xml
index de8808d7..8496593a 100644
--- a/diagnostic_common_diagnostics/package.xml
+++ b/diagnostic_common_diagnostics/package.xml
@@ -33,6 +33,7 @@
ament_cmake_lint_cmake
ament_cmake_pytest
+ launch_testing_ament_cmake
ament_cmake
diff --git a/diagnostic_common_diagnostics/test/systemtest/test_ntp_monitor.py b/diagnostic_common_diagnostics/test/systemtest/test_ntp_monitor.py
deleted file mode 100644
index 767ed2ac..00000000
--- a/diagnostic_common_diagnostics/test/systemtest/test_ntp_monitor.py
+++ /dev/null
@@ -1,130 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-# Software License Agreement (BSD License)
-#
-# Copyright (c) 2023, Robert Bosch GmbH
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following
-# disclaimer in the documentation and/or other materials provided
-# with the distribution.
-# * Neither the name of the Willow Garage nor the names of its
-# contributors may be used to endorse or promote products derived
-# from this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
-# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
-# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
-# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
-# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-
-import os
-import subprocess
-import unittest
-
-import ament_index_python
-
-from diagnostic_msgs.msg import DiagnosticArray
-
-import rclpy
-
-TIMEOUT_MAX_S = 5.
-
-
-class TestNTPMonitor(unittest.TestCase):
-
- def __init__(self, methodName: str = 'runTest') -> None:
- super().__init__(methodName)
- rclpy.init()
- self.n_msgs_received = 0
-
- def setUp(self):
- self.n_msgs_received = 0
- n = self._count_msgs(1.)
- self.assertEqual(n, 0)
- self.subprocess = subprocess.Popen(
- [
- os.path.join(
- ament_index_python.get_package_prefix(
- 'diagnostic_common_diagnostics'
- ),
- 'lib',
- 'diagnostic_common_diagnostics',
- 'ntp_monitor.py'
- )
- ]
- )
-
- def tearDown(self):
- self.subprocess.kill()
-
- def _diagnostics_callback(self, msg):
- rclpy.logging.get_logger('test_ntp_monitor').info(
- f'Received diagnostics message: {msg}'
- )
- search_strings = [
- 'NTP offset from',
- 'NTP self-offset for'
- ]
- for search_string in search_strings:
- if search_string in ''.join([
- s.name for s in msg.status
- ]):
- self.n_msgs_received += 1
-
- def _count_msgs(self, timeout_s):
- self.n_msgs_received = 0
- node = rclpy.create_node('test_ntp_monitor')
- rclpy.logging.get_logger('test_ntp_monitor').info(
- '_count_msgs'
- )
- node.create_subscription(
- DiagnosticArray,
- 'diagnostics',
- self._diagnostics_callback,
- 1
- )
- TIME_D_S = .05
- waited_s = 0.
- start = node.get_clock().now()
- while waited_s < timeout_s and self.n_msgs_received == 0:
- rclpy.spin_once(node, timeout_sec=TIME_D_S)
- waited_s = (node.get_clock().now() - start).nanoseconds / 1e9
- rclpy.logging.get_logger('test_ntp_monitor').info(
- f'received {self.n_msgs_received} messages after {waited_s}s'
- )
- node.destroy_node()
- return self.n_msgs_received
-
- def test_publishing(self):
- self.assertEqual(
- self.subprocess.poll(),
- None,
- 'NTP monitor subprocess died'
- )
-
- n = self._count_msgs(TIMEOUT_MAX_S)
-
- self.assertGreater(
- n,
- 0,
- f'No messages received within {TIMEOUT_MAX_S}s'
- )
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/diagnostic_common_diagnostics/test/systemtest/test_ntp_monitor_launchtest.py b/diagnostic_common_diagnostics/test/systemtest/test_ntp_monitor_launchtest.py
index d70470ed..495c9b68 100644
--- a/diagnostic_common_diagnostics/test/systemtest/test_ntp_monitor_launchtest.py
+++ b/diagnostic_common_diagnostics/test/systemtest/test_ntp_monitor_launchtest.py
@@ -32,60 +32,84 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
-import os
+import unittest
-import ament_index_python
+from diagnostic_msgs.msg import DiagnosticArray
import launch
-import launch_pytest
-from launch_pytest.tools import process as process_tools
+import launch_ros
import launch_testing
+from launch_testing_ros import WaitForTopics
+
import pytest
+import rclpy
+
-@pytest.fixture
-def ntp_monitor_proc():
- # Launch a process to test
- return launch.actions.ExecuteProcess(
- cmd=[
- os.path.join(
- ament_index_python.get_package_prefix(
- 'diagnostic_common_diagnostics'),
- 'lib',
- 'diagnostic_common_diagnostics',
- 'ntp_monitor.py'
- ),
- ],
- ),
-
-
-@launch_pytest.fixture
-def launch_description(ntp_monitor_proc):
+@pytest.mark.launch_test
+def generate_test_description():
+ """Launch the ntp_monitor node and return a launch description."""
return launch.LaunchDescription([
- ntp_monitor_proc,
+ launch_ros.actions.Node(
+ package='diagnostic_common_diagnostics',
+ executable='ntp_monitor.py',
+ name='ntp_monitor',
+ output='screen',
+ arguments=['--offset-tolerance', '10000',
+ '--error-offset-tolerance', '20000']
+ # 10s, 20s, we are not testing if your clock is correct
+ ),
launch_testing.actions.ReadyToTest()
])
-@pytest.mark.skip(reason='This test is not working yet')
-@pytest.mark.launch(fixture=launch_description)
-def test_read_stdout(ntp_monitor_proc, launch_context):
- """Check if 'ntp_monitor' was found in the stdout."""
- def validate_output(output):
- # this function can use assertions to validate the output or return a boolean.
- # pytest generates easier to understand failures when assertions are used.
- assert output.splitlines() == [
- 'ntp_monitor'], 'process never printed ntp_monitor'
- process_tools.assert_output_sync(
- launch_context, ntp_monitor_proc, validate_output, timeout=5)
-
- def validate_output(output):
- return output == 'this will never happen'
- assert not process_tools.wait_for_output_sync(
- launch_context, ntp_monitor_proc, validate_output, timeout=0.1)
- yield
- # this is executed after launch service shutdown
- assert ntp_monitor_proc.return_code == 0
+class TestNtpMonitor(unittest.TestCase):
+ """Test if the ntp_monitor node is publishing diagnostics."""
+
+ def __init__(self, methodName: str = 'runTest') -> None:
+ super().__init__(methodName)
+ self.received_messages = []
+
+ def _received_message(self, msg):
+ self.received_messages.append(msg)
+
+ def _get_min_level(self):
+ levels = [
+ int.from_bytes(status.level, 'little')
+ for diag in self.received_messages
+ for status in diag.status]
+ if len(levels) == 0:
+ return -1
+ return min(levels)
+
+ def test_topic_published(self):
+ """Test if the ntp_monitor node is publishing diagnostics."""
+ with WaitForTopics(
+ [('/diagnostics', DiagnosticArray)],
+ timeout=5
+ ):
+ print('Topic found')
+
+ rclpy.init()
+ test_node = rclpy.create_node('test_node')
+ test_node.create_subscription(
+ DiagnosticArray,
+ '/diagnostics',
+ self._received_message,
+ 1
+ )
+
+ while len(self.received_messages) < 10:
+ rclpy.spin_once(test_node, timeout_sec=1)
+ if (min_level := self._get_min_level()) == 0:
+ break
+
+ test_node.destroy_node()
+ rclpy.shutdown()
+ print(f'Got {len(self.received_messages)} messages:')
+ for msg in self.received_messages:
+ print(msg)
+ self.assertEqual(min_level, 0)