Skip to content

Commit

Permalink
added point outlier information to output
Browse files Browse the repository at this point in the history
  • Loading branch information
hahahannes committed Apr 15, 2024
1 parent a44a226 commit f38b3c7
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 10 deletions.
11 changes: 5 additions & 6 deletions algo/curve_anomaly/curve_explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,25 @@ def check(self, value, timestamp):
self.timestamp_last_anomaly, self.timestamp_last_notification, notification_now = cont_device.notification_decision(
self.timestamp_last_anomaly, self.timestamp_last_notification, timestamp)
if notification_now:
return True, self.create_result(f'In der Zeit seit {str(time_window_start)} wurde eine Anomalie im Lastprofil festgestellt.', time_window_start, "TODO", "continous_device")
return True, self.create_result(f'In der Zeit seit {str(time_window_start)} wurde eine Anomalie im Lastprofil festgestellt.', time_window_start, "continous_device")
else:
return False, ''
elif test_result=='load_device_anomaly_power_curve':
return True, self.create_result(f'Bei der letzten Benutzung wurde eine Anomalie im Lastprofil festgestellt.', "", "", "uncontinious_device_curve")
return True, self.create_result(f'Bei der letzten Benutzung wurde eine Anomalie im Lastprofil festgestellt.', "", "uncontinious_device_curve")
elif test_result=='load_device_anomaly_length':
return True, self.create_result(f'Bei der letzten Benutzung wurde eine ungewöhnliche Laufdauer festgestellt.', "", "", "uncontinious_device_length")
return True, self.create_result(f'Bei der letzten Benutzung wurde eine ungewöhnliche Laufdauer festgestellt.', "", "uncontinious_device_length")
else:
return False, ''

def update_with_new_value(self, data):
pass

def create_result(self, message, value, unit, sub_type):
def create_result(self, message, value, sub_type):
return {
"type": "curve_anomaly",
"sub_type": sub_type,
"message": message,
"value": value,
"unit": unit
"value": value
}

def save(self):
Expand Down
6 changes: 5 additions & 1 deletion algo/frequency_point_outlier/outlier_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,24 @@ def run(self):
waiting_time = self.calculate_time_diff(now, self.last_received_ts)
util.logger.debug(f"{LOG_PREFIX}: Time since last input {waiting_time}")
anomaly_occured = False
threshold = None
if self.point_is_anomalous_high(waiting_time):
sub_type = "high"
anomaly_occured = True
threshold = self.get_upper_threshold()
elif self.point_is_anomalous_low(waiting_time):
sub_type = "low"
anomaly_occured = True
threshold = self.get_lower_threshold()

if anomaly_occured:
util.logger.info(f"{LOG_PREFIX}: Anomaly occured: Type=time Sub-Type={sub_type} Value={waiting_time} Mean={self.current_mean} Std={self.current_stddev}")
self.kafka_produce_func.produce({
"type": "time",
"sub_type": sub_type,
"value": waiting_time,
"unit": "min",
"threshold": threshold,
"mean": self.current_mean
})

time.sleep(5)
Expand Down
6 changes: 5 additions & 1 deletion algo/point_outlier/point_explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,25 @@ def check(self, value, timestamp):
new_value = float(value)

anomaly_occured = False
threshold = None
if self.point_is_anomalous_high(new_value):
sub_type = "high"
anomaly_occured = True
threshold = self.get_upper_threshold()

if self.point_is_anomalous_low(new_value):
sub_type = "low"
anomaly_occured = True
threshold = self.get_lower_threshold()

if anomaly_occured:
util.logger.info(f'{LOG_PREFIX}: An extreme point outlier just occured!')
return True, {
"type": "extreme_value",
"sub_type": sub_type,
"value": new_value,
"unit": "TODO"
"threshold": threshold,
"mean": self.current_mean
}

return False, {}
Expand Down
6 changes: 6 additions & 0 deletions algo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ def point_is_anomalous_low(self, point):
return False
return point < self.current_mean - 3*self.current_stddev

def get_upper_threshold(self):
return self.current_mean + 3*self.current_stddev

def get_lower_threshold(self):
return self.current_mean + 3*self.current_stddev

def update(self, point):
self.current_stddev = self.calculate_std(point, self.current_stddev, self.current_mean, self.num_datepoints)
self.current_mean = self.calculate_mean(point, self.current_mean, self.num_datepoints)
Expand Down
5 changes: 4 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@

from operator_lib.operator_lib import OperatorLib

def handle_schema_error(error, message, produce_func):
def handle_schema_error(error, message, produce_func, device_id):
# catches cases when middle keys are missing like ENERGY, but not when last key like power is missing
msg_str = json.dumps(message)
produce_func({
"type": "schema",
"sub_type": "",
"value": msg_str,
"threshold": "",
"mean": 0,
"device_id": device_id
})


Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git+https://github.com/SENERGY-Platform/[email protected].1
git+https://github.com/SENERGY-Platform/[email protected].2

confluent_kafka<2
pandas<2
Expand Down

0 comments on commit f38b3c7

Please sign in to comment.