From b12bbdaa0975c9b2657c8b01e3847d63745cb75c Mon Sep 17 00:00:00 2001 From: Richardvdketterij Date: Tue, 16 Jan 2024 09:50:55 +0100 Subject: [PATCH] add 3 unittests --- .../test/systemtest/test_cpu_monitor.py | 51 +++++++++++++++++-- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/diagnostic_common_diagnostics/test/systemtest/test_cpu_monitor.py b/diagnostic_common_diagnostics/test/systemtest/test_cpu_monitor.py index 0e482ff9..bc21c325 100644 --- a/diagnostic_common_diagnostics/test/systemtest/test_cpu_monitor.py +++ b/diagnostic_common_diagnostics/test/systemtest/test_cpu_monitor.py @@ -34,12 +34,34 @@ import unittest import time +import rclpy + +from rclpy.node import Node from diagnostic_msgs.msg import DiagnosticStatus from diagnostic_updater import DiagnosticStatusWrapper +from diagnostic_updater import DiagnosticArray, Updater from diagnostic_common_diagnostics.cpu_monitor import CpuTask +import time class TestCPUMonitor(unittest.TestCase): + @classmethod + def setUpClass(cls): + rclpy.init(args=None) + + @classmethod + def tearDownClass(cls): + if rclpy.ok(): + rclpy.shutdown() + + def diagnostics_callback(self, msg): + self.message_recieved = True + self.assertEqual(len(msg.status), 1) + def test_ok(self): + # In this case is recommended for accuracy that psutil.cpu_percent() + # function be called with at least 0.1 seconds between calls. + time.sleep(0.1) + warning_percentage = 100 task = CpuTask(warning_percentage) stat = DiagnosticStatusWrapper() @@ -52,7 +74,9 @@ def test_ok(self): self.assertGreaterEqual(len(stat.values), 2) def test_warn(self): - # time.sleep(0.1) + # In this case is recommended for accuracy that psutil.cpu_percent() + # function be called with at least 0.1 seconds between calls. + time.sleep(0.1) warning_percentage2 = 0 task2 = CpuTask(warning_percentage2) @@ -64,9 +88,30 @@ def test_warn(self): # Check for at least 1 CPU Load Average and 1 CPU Load self.assertGreaterEqual(len(stat2.values), 2) - # time.sleep(0.25) - # def test_updater(self): + def test_updater(self): + # In this case is recommended for accuracy that psutil.cpu_percent() + # function be called with at least 0.1 seconds between calls. + time.sleep(0.1) + + self.message_recieved = False + + node = Node('cpu_monitor_test') + updater = Updater(node) + updater.setHardwareID("test_id") + updater.add(CpuTask()) + + node.create_subscription(DiagnosticArray, '/diagnostics', self.diagnostics_callback, 10) + + start_time = time.time() + timeout = 5.0 # Timeout in seconds + + while not self.message_recieved: + rclpy.spin_once(node) + time.sleep(0.1) + elapsed_time = time.time() - start_time + if elapsed_time >= timeout: + self.fail("No diagnostics received") if __name__ == "__main__":