diff --git a/algo/curve_anomaly/curve_explorer.py b/algo/curve_anomaly/curve_explorer.py index 5fd0757..83fd868 100644 --- a/algo/curve_anomaly/curve_explorer.py +++ b/algo/curve_anomaly/curve_explorer.py @@ -70,26 +70,25 @@ def check(self, value, timestamp): self.timestamp_last_anomaly, self.timestamp_last_notification, notification_now = cont_device.notification_decision( self.timestamp_last_anomaly, self.timestamp_last_notification, timestamp) if notification_now: - return True, self.create_result(f'In der Zeit seit {str(time_window_start)} wurde eine Anomalie im Lastprofil festgestellt.', time_window_start, "TODO", "continous_device") + return True, self.create_result(f'In der Zeit seit {str(time_window_start)} wurde eine Anomalie im Lastprofil festgestellt.', time_window_start, "continous_device") else: return False, '' elif test_result=='load_device_anomaly_power_curve': - return True, self.create_result(f'Bei der letzten Benutzung wurde eine Anomalie im Lastprofil festgestellt.', "", "", "uncontinious_device_curve") + return True, self.create_result(f'Bei der letzten Benutzung wurde eine Anomalie im Lastprofil festgestellt.', "", "uncontinious_device_curve") elif test_result=='load_device_anomaly_length': - return True, self.create_result(f'Bei der letzten Benutzung wurde eine ungewöhnliche Laufdauer festgestellt.', "", "", "uncontinious_device_length") + return True, self.create_result(f'Bei der letzten Benutzung wurde eine ungewöhnliche Laufdauer festgestellt.', "", "uncontinious_device_length") else: return False, '' def update_with_new_value(self, data): pass - def create_result(self, message, value, unit, sub_type): + def create_result(self, message, value, sub_type): return { "type": "curve_anomaly", "sub_type": sub_type, "message": message, - "value": value, - "unit": unit + "value": value } def save(self): diff --git a/algo/frequency_point_outlier/outlier_detector.py b/algo/frequency_point_outlier/outlier_detector.py index fd0aeba..23d05d3 100644 --- a/algo/frequency_point_outlier/outlier_detector.py +++ b/algo/frequency_point_outlier/outlier_detector.py @@ -47,12 +47,15 @@ def run(self): waiting_time = self.calculate_time_diff(now, self.last_received_ts) util.logger.debug(f"{LOG_PREFIX}: Time since last input {waiting_time}") anomaly_occured = False + threshold = None if self.point_is_anomalous_high(waiting_time): sub_type = "high" anomaly_occured = True + threshold = self.get_upper_threshold() elif self.point_is_anomalous_low(waiting_time): sub_type = "low" anomaly_occured = True + threshold = self.get_lower_threshold() if anomaly_occured: util.logger.info(f"{LOG_PREFIX}: Anomaly occured: Type=time Sub-Type={sub_type} Value={waiting_time} Mean={self.current_mean} Std={self.current_stddev}") @@ -60,7 +63,8 @@ def run(self): "type": "time", "sub_type": sub_type, "value": waiting_time, - "unit": "min", + "threshold": threshold, + "mean": self.current_mean }) time.sleep(5) diff --git a/algo/point_outlier/point_explorer.py b/algo/point_outlier/point_explorer.py index 887d14b..bf8eed9 100644 --- a/algo/point_outlier/point_explorer.py +++ b/algo/point_outlier/point_explorer.py @@ -14,13 +14,16 @@ def check(self, value, timestamp): new_value = float(value) anomaly_occured = False + threshold = None if self.point_is_anomalous_high(new_value): sub_type = "high" anomaly_occured = True + threshold = self.get_upper_threshold() if self.point_is_anomalous_low(new_value): sub_type = "low" anomaly_occured = True + threshold = self.get_lower_threshold() if anomaly_occured: util.logger.info(f'{LOG_PREFIX}: An extreme point outlier just occured!') @@ -28,7 +31,8 @@ def check(self, value, timestamp): "type": "extreme_value", "sub_type": sub_type, "value": new_value, - "unit": "TODO" + "threshold": threshold, + "mean": self.current_mean } return False, {} diff --git a/algo/utils.py b/algo/utils.py index 62c37f5..3dcc4f7 100644 --- a/algo/utils.py +++ b/algo/utils.py @@ -91,6 +91,12 @@ def point_is_anomalous_low(self, point): return False return point < self.current_mean - 3*self.current_stddev + def get_upper_threshold(self): + return self.current_mean + 3*self.current_stddev + + def get_lower_threshold(self): + return self.current_mean + 3*self.current_stddev + def update(self, point): self.current_stddev = self.calculate_std(point, self.current_stddev, self.current_mean, self.num_datepoints) self.current_mean = self.calculate_mean(point, self.current_mean, self.num_datepoints) diff --git a/main.py b/main.py index f628108..9e7243a 100644 --- a/main.py +++ b/main.py @@ -22,13 +22,16 @@ from operator_lib.operator_lib import OperatorLib -def handle_schema_error(error, message, produce_func): +def handle_schema_error(error, message, produce_func, device_id): # catches cases when middle keys are missing like ENERGY, but not when last key like power is missing msg_str = json.dumps(message) produce_func({ "type": "schema", "sub_type": "", "value": msg_str, + "threshold": "", + "mean": 0, + "device_id": device_id }) diff --git a/requirements.txt b/requirements.txt index 8513390..f2e60d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -git+https://github.com/SENERGY-Platform/analytics-operator-lib-python@v1.0.1 +git+https://github.com/SENERGY-Platform/analytics-operator-lib-python@v1.0.2 confluent_kafka<2 pandas<2