diff --git a/alarm_node.py b/alarm_node.py new file mode 100644 index 000000000..65cbe4f77 --- /dev/null +++ b/alarm_node.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 + +import rospy + +from std_msgs.msg import Float32 + +global alarm_pub + +from typing import Optional, Union +from ros_alarms_msgs.msg import Alarm as AlarmMsg +from .alarms import Alarm, HandlerBase + + +class HighTempAlarmHandler(HandlerBase): + alarm_name = "high_cpu_temp" + severity_required = (0, 5) + + def raised(self, alarm: AlarmMsg): + """ + Called whenever the high CPU temperature alarm is raised. + """ + print(f"ALARM! CPU temperature too high: {alarm.parameters['temperature']}°C") + return True + + def cleared(self, alarm: AlarmMsg): + """ + Called whenever the high CPU temperature alarm is cleared. + """ + print("CPU temperature back to normal.") + return True + + def meta_predicate(self, meta_alarm: Alarm, alarms) -> Union[bool, Alarm]: + """ + Called on an update to one of this alarm's meta alarms, if there are any. + By default, returns True if any meta alarms are raised. + """ + return any(alarm.raised for name, alarm in alarms.items()) + + + +def cpu_temp_callback(temp): + + # Sets of cpu temp alarm + if float(temp.data) > 75: + rospy.logwarn(f"CPU TEMPERATURE EXCEEDS: {temp.data}") + alarm_pub.publish(True) + + else: + alarm_pub.publish(False) + + + +def cpu_temp_alarm(): + global alarm_pub + + # Initializing node + rospy.init_node('cpu_temp_alarm', anonymous=True) + + rospy.Subscriber('cpu_temperature', Float32, cpu_temp_callback) + alarm_pub = rospy.Publisher('cpu_alarm', bool, queue_size=10) + + rospy.spin() + + +# Example usage +# if __name__ == '__main__': + +# +# cpu_temp_alarm() +# diff --git a/cpu_temp_alarm.py b/cpu_temp_alarm.py new file mode 100644 index 000000000..e7d3d7946 --- /dev/null +++ b/cpu_temp_alarm.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python + +import rospy +import psutil +from ros_alarms_msgs.msg import Alarm as AlarmMsg +from ros_alarms_msgs.srv import AlarmSet, AlarmSetRequest + +class CPUMonitor: + def __init__(self, threshold=75.0): + self.threshold = threshold + self.alarm_name = "high_cpu_temp" + self.node_name = rospy.get_name() + self.alarm_set = rospy.ServiceProxy("/alarm/set", AlarmSet) + + def get_cpu_temp(self): + temps = psutil.sensors_temperatures() + if 'coretemp' in temps: + return temps['coretemp'][0].current + return None + + def raise_alarm(self): + try: + req = AlarmSetRequest() + req.alarm.alarm_name = self.alarm_name + req.alarm.node_name = self.node_name + req.alarm.raised = True + req.alarm.problem_description = "CPU temperature is too high!" + req.alarm.parameters = json.dumps({"temperature": self.get_cpu_temp()}) + req.alarm.severity = 5 + self.alarm_set(req) + except rospy.ServiceException as e: + rospy.logerr("Service call failed: %s" % e) + + def clear_alarm(self): + try: + req = AlarmSetRequest() + req.alarm.alarm_name = self.alarm_name + req.alarm.node_name = self.node_name + req.alarm.raised = False + req.alarm.problem_description = "CPU temperature back to normal" + req.alarm.parameters = json.dumps({"temperature": self.get_cpu_temp()}) + req.alarm.severity = 0 + self.alarm_set(req) + except: + rospy.logerr("Service call failed: %s" % e) + + def monitor(self): + rate = rospy.Rate(1) + while not rospy.is_shutdown(): + temp = self.get_cpu_temp() + if temp: + if temp >= self.threshold: + self.raise_alarm() + else: + self.clear_alarm() + rate.sleep() +# Examples usage +# if __name__ == '__main__': +# rospy.init_node('cpu_monitor_node') +# monitor = CPUMonitor(threshold=75.0) +# monitor.monitor()