Skip to content

Commit

Permalink
Add outliers identifier
Browse files Browse the repository at this point in the history
  • Loading branch information
malvads committed Nov 14, 2024
1 parent e49f5eb commit 4babfa7
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 2 deletions.
141 changes: 141 additions & 0 deletions resources/src/ai/outliers_identifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright (C) 2024 Eneo Tecnologia S.L.
#
# Authors:
# Miguel Álvarez Adsuara <[email protected]>
#
# This program is free software: you can redistribute it and/or modify it under the terms of the
# GNU Affero General Public License as published by the Free Software Foundation, either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with this program.
# If not, see <https://www.gnu.org/licenses/>.

import json
import pandas as pd
from resources.src.logger import logger
from sklearn.ensemble import IsolationForest

class OutlierIdentifier:
def __init__(self):
self.df = None
self.model = None

def prepare_data(self, all_ips_data):
"""
Prepare the data by flattening the input data, extracting relevant features,
and computing rolling statistics.
Args:
all_ips_data (dict): Dictionary containing time-series data for each IP.
"""
flattened_data = []
for ip, ip_data in all_ips_data.items():
for entry in ip_data:
flattened_data.append({
"ip": ip,
"timestamp": entry.get("timestamp"),
"bytes": entry.get("result", {}).get("bytes", 0),
})

self.df = pd.DataFrame(flattened_data)
self.df['timestamp'] = pd.to_datetime(self.df['timestamp'])
self.df['hour'] = self.df['timestamp'].dt.hour
self.df['minute'] = self.df['timestamp'].dt.minute
self.df['day'] = self.df['timestamp'].dt.day
self.df['dayofweek'] = self.df['timestamp'].dt.dayofweek
self.df['dayofyear'] = self.df['timestamp'].dt.dayofyear

self.df['rolling_mean'] = self.df['bytes'].rolling(window=5, min_periods=1).mean()
self.df['rolling_std'] = self.df['bytes'].rolling(window=5, min_periods=1).std()

self.df['rolling_mean'] = self.df['rolling_mean'].fillna(0)
self.df['rolling_std'] = self.df['rolling_std'].fillna(0)

self.df['low_traffic'] = self.df['bytes'] == 0

def train_model(self, X_train):
"""
Train the Isolation Forest model on the provided training data.
Args:
X_train (DataFrame): The training set features.
"""
self.model = IsolationForest(contamination=0.05, random_state=42)
self.model.fit(X_train)

def identify_implicated_ips(self, outliers):
"""
Identify IPs that contributed to the outlier events.
Args:
outliers (list): A list of outlier events with timestamps and expected values.
Returns:
dict: A dictionary with implicated IPs for each outlier event.
"""
self.df['outlier'] = self.model.predict(self.df[['hour', 'minute', 'day', 'dayofweek', 'dayofyear', 'rolling_mean', 'rolling_std', 'low_traffic']])
self.df['outlier'] = self.df['outlier'].apply(lambda x: 'anomaly' if x == -1 else 'normal')

implicated_ips = {"ips": []}
for outlier in outliers:
timestamp = outlier["timestamp"]
outlier_data = self.df[self.df['timestamp'] == timestamp]

implicated_ips["ips"].append({
"caused_by": list(outlier_data[outlier_data['outlier'] == 'anomaly']['ip'])
})

return implicated_ips

def execute(self, outliers, all_ips_data):
"""
Execute the full pipeline for detecting outliers and identifying implicated IPs.
Args:
outliers (list): A list of outlier events.
all_ips_data (dict): Dictionary containing time-series data for each IP.
Returns:
json: A JSON string with the implicated IPs and outlier information.
"""
self.prepare_data(all_ips_data)
self.train_model(self.df[['hour', 'minute', 'day', 'dayofweek', 'dayofyear', 'rolling_mean', 'rolling_std', 'low_traffic']])

implicated_ips = self.identify_implicated_ips(outliers)

logger.logger.error(implicated_ips)

return json.dumps(implicated_ips) if implicated_ips else {"ips": []}

def train_and_execute_model(self, outliers, all_ips_data):
"""
Wrapper function to handle errors during model training and execution.
Args:
outliers (list): A list of outliers to process.
all_ips_data (dict): Dictionary of IP data.
Returns:
json: A JSON response with the result or error message.
"""
try:
return self.execute(outliers, all_ips_data)
except Exception as e:
logger.logger.error("Could not execute anomaly detection")
return self.return_error(e)

def return_error(self, error="error"):
"""
Return a JSON formatted error message.
Args:
error (str): The error message to return.
Returns:
dict: A dictionary containing the error status and message.
"""
return { "status": "error", "msg": error }
31 changes: 29 additions & 2 deletions resources/src/server/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
from flask import Flask, jsonify, request

from resources.src.redborder.s3 import S3
from resources.src.ai import outliers, shallow_outliers
from resources.src.ai import outliers, shallow_outliers, outliers_identifier
from resources.src.druid import client, query_builder
from resources.src.logger import logger
from resources.src.config import configmanager


'''
Init local variables
'''
Expand Down Expand Up @@ -63,11 +62,13 @@ def __init__(self):
self.start_s3_sync_thread()
self.app = Flask(__name__)
self.app.add_url_rule('/api/v1/outliers', view_func=self.calculate, methods=['POST'])
self.app.add_url_rule('/api/v1/ip_identifier', view_func=self.identify_ip, methods=['POST'])
self.exit_code = 0
self.shallow = shallow_outliers.ShallowOutliers(
sensitivity = config.get("ShallowOutliers", "sensitivity"),
contamination = config.get("ShallowOutliers", "contamination")
)
self.identifier = outliers_identifier.OutlierIdentifier()
self.ai_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "ai")
self.deep_models={}

Expand Down Expand Up @@ -116,6 +117,32 @@ def calculate(self):
logger.logger.info("Starting outliers execution")
return self.execute_model(data, config.get("Outliers","metric"), model)

def identify_ip(self):
"""
Process the incoming request to identify implicated IPs based on outlier data.
Returns:
Response: A JSON response with implicated IPs or an error message.
"""
try:
payload = json.loads(request.form.get('payload', '{}'))

outliers = payload.get('outliers', [])
all_ips_data = payload.get('all_ips_data', {})

if not isinstance(outliers, list) or not isinstance(all_ips_data, dict):
return jsonify({"error": "Invalid data format"}), 400

result = self.identifier.train_and_execute_model(outliers, all_ips_data)

logger.logger.error(result)

return jsonify(result), 200

except Exception as e:
return jsonify({"error": str(e)}), 500

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.


def decode_b64_json(self, b64_json):
"""
Decode a base64 json into a python dictionary.
Expand Down

0 comments on commit 4babfa7

Please sign in to comment.