Skip to content

Commit

Permalink
fix: background execution (#1433)
Browse files Browse the repository at this point in the history
Signed-off-by: Tal <[email protected]>
Signed-off-by: Matvey Kukuy <[email protected]>
Signed-off-by: Vladimir Filonov <[email protected]>
Co-authored-by: GlebBerjoskin <[email protected]>
Co-authored-by: Shahar Glazner <[email protected]>
Co-authored-by: Tal <[email protected]>
Co-authored-by: Vladimir Filonov <[email protected]>
  • Loading branch information
5 people authored Aug 13, 2024
1 parent de82b32 commit e0e15cc
Show file tree
Hide file tree
Showing 37 changed files with 1,262 additions and 152 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test-pr-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ env:
POSTGRES_USER: keepuser
POSTGRES_PASSWORD: keeppassword
POSTGRES_DB: keepdb
# To test if imports are working properly
EE_ENABLED: true

jobs:
tests:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ __pycache__/
# C extensions
*.so

# .csv files
*.csv

# Distribution / packaging
.Python
build/
Expand Down
2 changes: 2 additions & 0 deletions docker/Dockerfile.api
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ RUN python -m venv /venv
COPY pyproject.toml poetry.lock ./
RUN poetry export -f requirements.txt --output requirements.txt --without-hashes && /venv/bin/python -m pip install --upgrade -r requirements.txt
COPY keep keep
COPY ee keep/ee
COPY examples examples
COPY README.md README.md
RUN poetry build && /venv/bin/pip install --use-deprecated=legacy-resolver dist/*.whl

FROM base as final
ENV PATH="/venv/bin:${PATH}"
ENV VIRTUAL_ENV="/venv"
ENV EE_PATH="ee"
COPY --from=builder /venv /venv
COPY --from=builder /app/examples /examples
# as per Openshift guidelines, https://docs.openshift.com/container-platform/4.11/openshift_images/create-images.html#use-uid_create-images
Expand Down
99 changes: 99 additions & 0 deletions ee/experimental/graph_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import numpy as np
import networkx as nx

from typing import List, Tuple

from keep.api.core.db import get_pmi_values


def detect_knee_1d_auto_increasing(y: List[float]) -> Tuple[int, float]:
"""
This function detects the knee point in an increasing 1D curve. Knee point is the point where a curve
starts to flatten out (https://en.wikipedia.org/wiki/Knee_of_a_curve).
Parameters:
y (List[float]): a list of float values
Returns:
tuple: knee_index, knee_y
"""

def detect_knee_1d(y: List[float], curve: str, direction: str = 'increasing') -> Tuple[int, float, List[float]]:
x = np.arange(len(y))

x_norm = (x - np.min(x)) / (np.max(x) - np.min(x))
y_norm = (y - np.min(y)) / (np.max(y) - np.min(y))

diff_curve = y_norm - x_norm

if curve == 'concave':
knee_index = np.argmax(diff_curve)
else:
knee_index = np.argmin(diff_curve)

knee_y = y[knee_index]

return knee_index, knee_y, diff_curve

knee_index_concave, knee_y_concave, diff_curve_concave = detect_knee_1d(y, 'concave')
knee_index_convex, knee_y_convex, diff_curve_convex = detect_knee_1d(y, 'convex')
max_diff_concave = np.max(np.abs(diff_curve_concave))
max_diff_convex = np.max(np.abs(diff_curve_convex))

if max_diff_concave > max_diff_convex:
return knee_index_concave, knee_y_concave
else:
return knee_index_convex, knee_y_convex


def create_graph(tenant_id: str, fingerprints: List[str], pmi_threshold: float = 0., knee_threshold: float = 0.8) -> nx.Graph:
"""
This function creates a graph from a list of fingerprints. The graph is created based on the PMI values between
the fingerprints. The edges are created between the fingerprints that have a PMI value greater than the threshold.
The nodes are removed if the knee point of the PMI values of the edges connected to the node is less than the threshold.
Parameters:
tenant_id (str): tenant id
fingerprints (List[str]): a list of fingerprints
pmi_threshold (float): PMI threshold
knee_threshold (float): knee threshold
Returns:
nx.Graph: a graph
"""

graph = nx.Graph()

if len(fingerprints) == 1:
graph.add_node(fingerprints[0])
return graph

# Load all PMI values at once
pmi_values = get_pmi_values(tenant_id, fingerprints)

for idx_i, fingerprint_i in enumerate(fingerprints):
if not isinstance(pmi_values[(fingerprint_i, fingerprint_i)], float):
continue

for idx_j in range(idx_i + 1, len(fingerprints)):
fingerprint_j = fingerprints[idx_j]
weight = pmi_values[(fingerprint_i, fingerprint_j)]
if not isinstance(weight, float):
continue

if weight > pmi_threshold:
graph.add_edge(fingerprint_i, fingerprint_j, weight=weight)

nodes_to_delete = []

for node in graph.nodes:
weights = sorted([edge['weight'] for edge in graph[node].values()])

knee_index, knee_statistic = detect_knee_1d_auto_increasing(weights)

if knee_statistic < knee_threshold:
nodes_to_delete.append(node)

graph.remove_nodes_from(nodes_to_delete)

return graph
247 changes: 243 additions & 4 deletions ee/experimental/incident_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,197 @@
import os
import logging

import numpy as np
import pandas as pd
import networkx as nx

from typing import List

from keep.api.models.db.alert import Alert
from typing import List, Dict
from openai import OpenAI

from datetime import datetime, timedelta

from fastapi import Depends

from ee.experimental.node_utils import NodeCandidateQueue, NodeCandidate
from ee.experimental.graph_utils import create_graph
from ee.experimental.statistical_utils import get_alert_pmi_matrix

from pusher import Pusher

from keep.api.models.db.alert import Alert, Incident
from keep.api.core.db import (
assign_alert_to_incident,
is_alert_assigned_to_incident,
add_alerts_to_incident_by_incident_id,
get_last_alerts,
get_last_incidents,
get_incident_by_id,
write_pmi_matrix_to_db,
create_incident_from_dict,
update_incident_summary,
)

from keep.api.core.dependencies import (
AuthenticatedEntity,
AuthVerifier,
get_pusher_client,
)

logger = logging.getLogger(__name__)

ALGORITHM_VERBOSE_NAME = "Basic correlation algorithm v0.2"
USE_N_HISTORICAL_ALERTS = 10e10
USE_N_HISTORICAL_INCIDENTS = 10e10


def calculate_pmi_matrix(
ctx: dict | None, # arq context
tenant_id: str,
upper_timestamp: datetime = None,
use_n_historical_alerts: int = USE_N_HISTORICAL_ALERTS,
sliding_window: int = None,
stride: int = None,
) -> dict:
logger.info(
"Calculating PMI coefficients for alerts",
extra={
"tenant_id": tenant_id,
},
)

if not upper_timestamp:
upper_timestamp = os.environ.get('PMI_ALERT_UPPER_TIMESTAMP', datetime.now())

if not sliding_window:
sliding_window = os.environ.get('PMI_SLIDING_WINDOW', 4 * 60 * 60)

if not stride:
stride = os.environ.get('PMI_STRIDE', 60 * 60)

alerts=get_last_alerts(tenant_id, limit=use_n_historical_alerts, upper_timestamp=upper_timestamp)
pmi_matrix = get_alert_pmi_matrix(alerts, 'fingerprint', sliding_window, stride)
write_pmi_matrix_to_db(tenant_id, pmi_matrix)

return {"status": "success"}


async def mine_incidents_and_create_objects(
ctx: dict | None, # arq context
tenant_id: str,
alert_lower_timestamp: datetime = None,
alert_upper_timestamp: datetime = None,
use_n_historical_alerts: int = USE_N_HISTORICAL_ALERTS,
incident_lower_timestamp: datetime = None,
incident_upper_timestamp: datetime = None,
use_n_hist_incidents: int = USE_N_HISTORICAL_INCIDENTS,
pmi_threshold: float = None,
knee_threshold: float = None,
min_incident_size: int = None,
incident_similarity_threshold: float = None,
) -> Dict[str, List[Incident]]:

"""
This function mines incidents from alerts and creates incidents in the database.
Parameters:
tenant_id (str): tenant id
alert_lower_timestamp (datetime): lower timestamp for alerts
alert_upper_timestamp (datetime): upper timestamp for alerts
use_n_historical_alerts (int): number of historical alerts to use
incident_lower_timestamp (datetime): lower timestamp for incidents
incident_upper_timestamp (datetime): upper timestamp for incidents
use_n_hist_incidents (int): number of historical incidents to use
pmi_threshold (float): PMI threshold used for incident graph edges creation
knee_threshold (float): knee threshold used for incident graph nodes creation
min_incident_size (int): minimum incident size
incident_similarity_threshold (float): incident similarity threshold
Returns:
Dict[str, List[Incident]]: a dictionary containing the created incidents
"""

if not incident_upper_timestamp:
incident_upper_timestamp = os.environ.get('MINE_INCIDENT_UPPER_TIMESTAMP', datetime.now())

if not incident_lower_timestamp:
incident_validity = os.environ.get('MINE_INCIDENT_VALIDITY', timedelta(days=1))
incident_lower_timestamp = incident_upper_timestamp - incident_validity

if not alert_upper_timestamp:
alert_upper_timestamp = os.environ.get('MINE_ALERT_UPPER_TIMESTAMP', datetime.now())

if not alert_lower_timestamp:
alert_window = os.environ.get('MINE_ALERT_WINDOW', timedelta(hours=12))
alert_lower_timestamp = alert_upper_timestamp - alert_window

if not pmi_threshold:
pmi_threshold = os.environ.get('PMI_THRESHOLD', 0.0)

if not knee_threshold:
knee_threshold = os.environ.get('KNEE_THRESHOLD', 0.8)

if not min_incident_size:
min_incident_size = os.environ.get('MIN_INCIDENT_SIZE', 5)

if not incident_similarity_threshold:
incident_similarity_threshold = os.environ.get('INCIDENT_SIMILARITY_THRESHOLD', 0.8)

calculate_pmi_matrix(ctx, tenant_id)

alerts = get_last_alerts(tenant_id, limit=use_n_historical_alerts, upper_timestamp=alert_upper_timestamp, lower_timestamp=alert_lower_timestamp)
incidents, _ = get_last_incidents(tenant_id, limit=use_n_hist_incidents, upper_timestamp=incident_upper_timestamp, lower_timestamp=incident_lower_timestamp)
nc_queue = NodeCandidateQueue()

for candidate in [NodeCandidate(alert.fingerprint, alert.timestamp) for alert in alerts]:
nc_queue.push_candidate(candidate)
candidates = nc_queue.get_candidates()

graph = create_graph(tenant_id, [candidate.fingerprint for candidate in candidates], pmi_threshold, knee_threshold)
ids = []

for component in nx.connected_components(graph):
if len(component) > min_incident_size:
alerts_appended = False
for incident in incidents:
incident_fingerprints = set([alert.fingerprint for alert in incident.Incident.alerts])
intersection = incident_fingerprints.intersection(component)

if len(intersection) / len(component) >= incident_similarity_threshold:
alerts_appended = True

add_alerts_to_incident_by_incident_id(tenant_id, incident.Incident.id, [alert.id for alert in alerts if alert.fingerprint in component])

summary = generate_incident_summary(incident.Incident)
update_incident_summary(incident.Incident.id, summary)

if not alerts_appended:
incident_start_time = min([alert.timestamp for alert in alerts if alert.fingerprint in component])
incident_start_time = incident_start_time.replace(microsecond=0)

incident = create_incident_from_dict(tenant_id,
{"name": f"Incident started at {incident_start_time}",
"description": "Summarization is Disabled", "is_predicted": True})
ids.append(incident.id)

add_alerts_to_incident_by_incident_id(tenant_id, incident.id, [alert.id for alert in alerts if alert.fingerprint in component])

summary = generate_incident_summary(incident)
update_incident_summary(incident.id, summary)

pusher_client = get_pusher_client()
if pusher_client:
pusher_client.trigger(
f"private-{tenant_id}",
"ai-logs-change",
{"log": ALGORITHM_VERBOSE_NAME + " successfully executed."},
)
logger.info(
"Client notified on new AI log",
extra={"tenant_id": tenant_id},
)


return {"incidents": [get_incident_by_id(tenant_id, incident_id) for incident_id in ids]}


def mine_incidents(alerts: List[Alert], incident_sliding_window_size: int=6*24*60*60, statistic_sliding_window_size: int=60*60,
Expand Down Expand Up @@ -145,4 +332,56 @@ def shape_incidents(alerts: pd.DataFrame, unique_alert_identifier: str, incident
'alert_fingerprints': local_alerts[unique_alert_identifier].unique().tolist(),
})

return incidents
return incidents


def generate_incident_summary(incident: Incident, use_n_alerts_for_summary: int = -1) -> str:
if "OPENAI_API_KEY" not in os.environ:
logger.error("OpenAI API key is not set. Incident summary generation is not available.")
return "Summarization is Disabled"

try:
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])

prompt_addition = ''
if incident.user_summary:
prompt_addition = f'When generating, you must rely on the summary provided by human: {incident.user_summary}'

description_strings = np.unique([f'{alert.event["name"]}' for alert in incident.alerts]).tolist()

if use_n_alerts_for_summary > 0:
incident_description = "\n".join(description_strings[:use_n_alerts_for_summary])
else:
incident_description = "\n".join(description_strings)

timestamps = [alert.timestamp for alert in incident.alerts]
incident_start = min(timestamps).replace(microsecond=0)
incident_end = max(timestamps).replace(microsecond=0)

model = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")

summary = client.chat.completions.create(model=model, messages=[
{
"role": "system",
"content": """You are a very skilled DevOps specialist who can summarize any incident based on alert descriptions.
When provided with information, summarize it in a 2-3 sentences explaining what happened and when.
ONLY SUMMARIZE WHAT YOU SEE. In the end add information about potential scenario of the incident.
EXAMPLE:
An incident occurred between 2022-11-17 14:11:04.955070 and 2022-11-22 22:19:04.837526, involving a
total of 200 alerts. The alerts indicated critical and warning issues such as high CPU and memory
usage in pods and nodes, as well as stuck Kubernetes Daemonset rollout. Potential incident scenario:
Kubernetes Daemonset rollout stuck due to high CPU and memory usage in pods and nodes. This caused a
long tail of alerts on various topics."""
},
{
"role": "user",
"content": f"""Here are alerts of an incident for summarization:\n{incident_description}\n This incident started on
{incident_start}, ended on {incident_end}, included {len(description_strings)} alerts. {prompt_addition}"""
}
]).choices[0].message.content

return summary
except Exception as e:
logger.error(f"Error in generating incident summary: {e}")
return "Summarization is Disabled"
Loading

0 comments on commit e0e15cc

Please sign in to comment.