Skip to content

Commit

Permalink
change CpuTask back to DiagnosticTask
Browse files Browse the repository at this point in the history
  • Loading branch information
Richardvdketterij committed Jan 15, 2024
1 parent 70475f8 commit c13e96a
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 12 deletions.
4 changes: 4 additions & 0 deletions diagnostic_common_diagnostics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ if(BUILD_TESTING)
test_ntp_monitor
test/systemtest/test_ntp_monitor.py
TIMEOUT 10)
ament_add_pytest_test(
test_cpu_monitor
test/systemtest/test_cpu_monitor.py
TIMEOUT 10)
endif()

ament_package()
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,19 @@


class CpuTask(DiagnosticTask):
def __init__(self, node, hostname, warning_percentage=90, window=1):
def __init__(self, warning_percentage=90, window=1):
DiagnosticTask.__init__(self, "CPU Information")

self._nh: Node = node
self._warning_percentage = int(warning_percentage)
self._readings = collections.deque(maxlen=window)

# Create diagnostic updater with default updater rate of 1 hz
self.updater = Updater(self._nh, period=1.0)
self.updater.setHardwareID(hostname)

# Add task to diagnostic updater with the callback producing the diagnostic messages
self.updater.add(hostname, self.produce_diagnostics)

def _get_average_reading(self):
def avg(lst):
return float(sum(lst)) / len(lst) if lst else float('nan')

return [avg(cpu_percentages) for cpu_percentages in zip(*self._readings)]

def produce_diagnostics(self, stat):
def run(self, stat):
self._readings.append(psutil.cpu_percent(percpu=True))
cpu_percentages = self._get_average_reading()
cpu_average = sum(cpu_percentages) / len(cpu_percentages)
Expand Down Expand Up @@ -102,8 +94,10 @@ def main(args=None):
warning_percentage = node.get_parameter('warning_percentage').get_parameter_value().integer_value
window = node.get_parameter('window').get_parameter_value().integer_value

# Create the CpuTask
CpuTask(node=node, hostname=hostname, warning_percentage=warning_percentage, window=window)
# Create diagnostic updater with default updater rate of 1 hz
updater = Updater(node)
updater.setHardwareID(hostname)
updater.add(CpuTask(warning_percentage=warning_percentage, window=window))

rclpy.spin(node)

Expand Down
73 changes: 73 additions & 0 deletions diagnostic_common_diagnostics/test/systemtest/test_cpu_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Software License Agreement (BSD License)
#
# Copyright (c) 2023, Robert Bosch GmbH
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of the Willow Garage nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import unittest
import time
from diagnostic_msgs.msg import DiagnosticStatus
from diagnostic_updater import DiagnosticStatusWrapper
from diagnostic_common_diagnostics.cpu_monitor import CpuTask

class TestCPUMonitor(unittest.TestCase):
def test_ok(self):
warning_percentage = 100
task = CpuTask(warning_percentage)
stat = DiagnosticStatusWrapper()
task.run(stat)
self.assertEqual(task.name, 'CPU Information')
self.assertEqual(stat.level, DiagnosticStatus.OK)
self.assertIn(str('CPU Average'), stat.message)

# Check for at least 1 CPU Load Average and 1 CPU Load
self.assertGreaterEqual(len(stat.values), 2)

def test_warn(self):
# time.sleep(0.1)

warning_percentage2 = 0
task2 = CpuTask(warning_percentage2)
stat2 = DiagnosticStatusWrapper()
task2.run(stat2)
self.assertEqual(task2.name, 'CPU Information')
self.assertEqual(stat2.level, DiagnosticStatus.WARN)
self.assertIn(str('At least one CPU exceeds'), stat2.message)

# Check for at least 1 CPU Load Average and 1 CPU Load
self.assertGreaterEqual(len(stat2.values), 2)
# time.sleep(0.25)

# def test_updater(self):


if __name__ == "__main__":
unittest.main()

0 comments on commit c13e96a

Please sign in to comment.