Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cpu temperature alarm #1217

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions alarm_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#!/usr/bin/env python3

import rospy

from std_msgs.msg import Float32

global alarm_pub

from typing import Optional, Union
from ros_alarms_msgs.msg import Alarm as AlarmMsg
from .alarms import Alarm, HandlerBase


class HighTempAlarmHandler(HandlerBase):
alarm_name = "high_cpu_temp"
severity_required = (0, 5)

def raised(self, alarm: AlarmMsg):
"""
Called whenever the high CPU temperature alarm is raised.
"""
print(f"ALARM! CPU temperature too high: {alarm.parameters['temperature']}°C")
return True

def cleared(self, alarm: AlarmMsg):
"""
Called whenever the high CPU temperature alarm is cleared.
"""
print("CPU temperature back to normal.")
return True

def meta_predicate(self, meta_alarm: Alarm, alarms) -> Union[bool, Alarm]:
"""
Called on an update to one of this alarm's meta alarms, if there are any.
By default, returns True if any meta alarms are raised.
"""
return any(alarm.raised for name, alarm in alarms.items())



def cpu_temp_callback(temp):

# Sets of cpu temp alarm
if float(temp.data) > 75:
rospy.logwarn(f"CPU TEMPERATURE EXCEEDS: {temp.data}")
alarm_pub.publish(True)

else:
alarm_pub.publish(False)



def cpu_temp_alarm():
global alarm_pub

# Initializing node
rospy.init_node('cpu_temp_alarm', anonymous=True)

rospy.Subscriber('cpu_temperature', Float32, cpu_temp_callback)
alarm_pub = rospy.Publisher('cpu_alarm', bool, queue_size=10)

rospy.spin()


# Example usage
# if __name__ == '__main__':

#
# cpu_temp_alarm()
#
61 changes: 61 additions & 0 deletions cpu_temp_alarm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env python

import rospy
import psutil
from ros_alarms_msgs.msg import Alarm as AlarmMsg
from ros_alarms_msgs.srv import AlarmSet, AlarmSetRequest

class CPUMonitor:
def __init__(self, threshold=75.0):
self.threshold = threshold
self.alarm_name = "high_cpu_temp"
self.node_name = rospy.get_name()
self.alarm_set = rospy.ServiceProxy("/alarm/set", AlarmSet)

def get_cpu_temp(self):
temps = psutil.sensors_temperatures()
if 'coretemp' in temps:
return temps['coretemp'][0].current
return None

def raise_alarm(self):
try:
req = AlarmSetRequest()
req.alarm.alarm_name = self.alarm_name
req.alarm.node_name = self.node_name
req.alarm.raised = True
req.alarm.problem_description = "CPU temperature is too high!"
req.alarm.parameters = json.dumps({"temperature": self.get_cpu_temp()})
req.alarm.severity = 5
self.alarm_set(req)
except rospy.ServiceException as e:
rospy.logerr("Service call failed: %s" % e)

def clear_alarm(self):
try:
req = AlarmSetRequest()
req.alarm.alarm_name = self.alarm_name
req.alarm.node_name = self.node_name
req.alarm.raised = False
req.alarm.problem_description = "CPU temperature back to normal"
req.alarm.parameters = json.dumps({"temperature": self.get_cpu_temp()})
req.alarm.severity = 0
self.alarm_set(req)
except:
rospy.logerr("Service call failed: %s" % e)

def monitor(self):
rate = rospy.Rate(1)
while not rospy.is_shutdown():
temp = self.get_cpu_temp()
if temp:
if temp >= self.threshold:
self.raise_alarm()
else:
self.clear_alarm()
rate.sleep()
# Examples usage
# if __name__ == '__main__':
# rospy.init_node('cpu_monitor_node')
# monitor = CPUMonitor(threshold=75.0)
# monitor.monitor()
Loading