Skip to content

Commit

Permalink
Merge branch 'main' into feature/2351-split-incidents
Browse files Browse the repository at this point in the history
  • Loading branch information
Kiryous committed Dec 19, 2024
2 parents 5f69bd9 + 1a51b20 commit 3e80bfd
Show file tree
Hide file tree
Showing 12 changed files with 541 additions and 73 deletions.
75 changes: 41 additions & 34 deletions keep/api/bl/incidents_bl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
add_alerts_to_incident_by_incident_id,
create_incident_from_dto,
delete_incident_by_id,
get_incident_alerts_by_incident_id,
get_incident_by_id,
get_incident_unique_fingerprint_count,
remove_alerts_to_incident_by_incident_id,
update_incident_from_dto_by_id,
enrich_alerts_with_incidents,
get_all_alerts_by_fingerprints,
)
from keep.api.core.elastic import ElasticClient
from keep.api.models.alert import IncidentDto, IncidentDtoIn
Expand Down Expand Up @@ -108,43 +109,30 @@ async def add_alerts_to_incident(
"Alerts added to incident",
extra={"incident_id": incident_id, "alert_fingerprints": alert_fingerprints},
)
self.__update_elastic(incident_id, alert_fingerprints)
self.logger.info(
"Alerts pushed to elastic",
extra={"incident_id": incident_id, "alert_fingerprints": alert_fingerprints},
)
self.__update_client_on_incident_change(incident_id)
self.logger.info(
"Client updated on incident change",
extra={"incident_id": incident_id, "alert_fingerprints": alert_fingerprints},
)
incident_dto = IncidentDto.from_db_incident(incident)
self.__run_workflows(incident_dto, "updated")
self.logger.info(
"Workflows run on incident",
extra={"incident_id": incident_id, "alert_fingerprints": alert_fingerprints},
)
self.__postprocess_alerts_change(incident, alert_fingerprints)
await self.__generate_summary(incident_id, incident)
self.logger.info(
"Summary generated",
extra={"incident_id": incident_id, "alert_fingerprints": alert_fingerprints},
)

def __update_elastic(self, incident_id: UUID, alert_fingerprints: List[str]):
def __update_elastic(self, alert_fingerprints: List[str]):
try:
elastic_client = ElasticClient(self.tenant_id)
if elastic_client.enabled:
db_alerts, _ = get_incident_alerts_by_incident_id(
db_alerts = get_all_alerts_by_fingerprints(
tenant_id=self.tenant_id,
incident_id=incident_id,
limit=len(alert_fingerprints),
fingerprints=alert_fingerprints,
session=self.session,
)
db_alerts = enrich_alerts_with_incidents(self.tenant_id, db_alerts, session=self.session)
enriched_alerts_dto = convert_db_alerts_to_dto_alerts(
db_alerts, with_incidents=True
)
elastic_client.index_alerts(alerts=enriched_alerts_dto)
except Exception:
self.logger.exception("Failed to push alert to elasticsearch")
raise

def __update_client_on_incident_change(self, incident_id: Optional[UUID] = None):
if self.pusher_client is not None:
Expand Down Expand Up @@ -217,6 +205,7 @@ def delete_alerts_from_incident(
raise HTTPException(status_code=404, detail="Incident not found")

remove_alerts_to_incident_by_incident_id(self.tenant_id, incident_id, alert_fingerprints)
self.__postprocess_alerts_change(incident, alert_fingerprints)

def delete_incident(self, incident_id: UUID) -> None:
self.logger.info(
Expand Down Expand Up @@ -255,7 +244,7 @@ def update_incident(
incident_id: UUID,
updated_incident_dto: IncidentDtoIn,
generated_by_ai: bool,
) -> None:
) -> IncidentDto:
self.logger.info(
"Fetching incident",
extra={
Expand All @@ -270,16 +259,34 @@ def update_incident(
raise HTTPException(status_code=404, detail="Incident not found")

new_incident_dto = IncidentDto.from_db_incident(incident)
try:
workflow_manager = WorkflowManager.get_instance()
self.logger.info("Adding incident to the workflow manager queue")
workflow_manager.insert_incident(
self.tenant_id, new_incident_dto, "updated"
)
self.logger.info("Added incident to the workflow manager queue")
except Exception:
self.logger.exception(
"Failed to run workflows based on incident",
extra={"incident_id": new_incident_dto.id, "tenant_id": self.tenant_id},
)

self.__update_client_on_incident_change(incident.id)
self.logger.info(
"Client updated on incident change",
extra={"incident_id": incident.id},
)
self.__run_workflows(new_incident_dto, "updated")
self.logger.info(
"Workflows run on incident",
extra={"incident_id": incident.id},
)
return new_incident_dto

def __postprocess_alerts_change(self, incident, alert_fingerprints):

self.__update_elastic(alert_fingerprints)
self.logger.info(
"Alerts pushed to elastic",
extra={"incident_id": incident.id, "alert_fingerprints": alert_fingerprints},
)
self.__update_client_on_incident_change(incident.id)
self.logger.info(
"Client updated on incident change",
extra={"incident_id": incident.id, "alert_fingerprints": alert_fingerprints},
)
incident_dto = IncidentDto.from_db_incident(incident)
self.__run_workflows(incident_dto, "updated")
self.logger.info(
"Workflows run on incident",
extra={"incident_id": incident.id, "alert_fingerprints": alert_fingerprints},
)
18 changes: 15 additions & 3 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,18 @@ def get_alerts_by_fingerprint(

return alerts

def get_all_alerts_by_fingerprints(
tenant_id: str, fingerprints: List[str], session: Optional[Session] = None
) -> List[Alert]:
with existed_or_new_session(session) as session:
query = (
select(Alert)
.filter(Alert.tenant_id == tenant_id)
.filter(Alert.fingerprint.in_(fingerprints))
.order_by(Alert.timestamp.desc())
)
return session.exec(query).all()


def get_alert_by_fingerprint_and_event_id(
tenant_id: str, fingerprint: str, event_id: str
Expand Down Expand Up @@ -3215,11 +3227,11 @@ def enrich_alerts_with_incidents(
).all()

incidents_per_alert = defaultdict(list)
for alert_id, incident in alert_incidents:
incidents_per_alert[alert_id].append(incident)
for fingerprint, incident in alert_incidents:
incidents_per_alert[fingerprint].append(incident)

for alert in alerts:
alert._incidents = incidents_per_alert[incident.id]
alert._incidents = incidents_per_alert[alert.fingerprint]

return alerts

Expand Down
7 changes: 5 additions & 2 deletions keep/api/core/demo_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,11 @@ def perform_demo_ai(keep_api_key, keep_api_url):


def simulate_alerts(*args, **kwargs):
asyncio.create_task(simulate_alerts_worker(0, keep_api_key, 0))
asyncio.run(simulate_alerts_async(*args, **kwargs))
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.create_task(simulate_alerts_worker(0, kwargs.get("keep_api_key"), 0))
loop.create_task(simulate_alerts_async(*args, **kwargs))
loop.run_forever()


async def simulate_alerts_async(
Expand Down
12 changes: 6 additions & 6 deletions keep/api/utils/enrichment_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ def calculated_start_firing_time(


def convert_db_alerts_to_dto_alerts(
alerts: list[Alert | tuple[Alert, LastAlertToIncident]],
with_incidents: bool = False,
session: Optional[Session] = None,
) -> list[AlertDto | AlertWithIncidentLinkMetadataDto]:
alerts: list[Alert | tuple[Alert, LastAlertToIncident]],
with_incidents: bool = False,
session: Optional[Session] = None,
) -> list[AlertDto | AlertWithIncidentLinkMetadataDto]:
"""
Enriches the alerts with the enrichment data.
Expand All @@ -109,8 +109,8 @@ def convert_db_alerts_to_dto_alerts(
if alert.alert_enrichment:
alert.event.update(alert.alert_enrichment.enrichments)
if with_incidents:
if alert.incidents:
alert.event["incident"] = ",".join(str(incident.id) for incident in alert.incidents)
if alert._incidents:
alert.event["incident"] = ",".join(str(incident.id) for incident in alert._incidents)
try:
if alert_to_incident is not None:
alert_dto = AlertWithIncidentLinkMetadataDto.from_db_instance(alert, alert_to_incident)
Expand Down
16 changes: 11 additions & 5 deletions keep/providers/grafana_provider/grafana_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ def simulate_alert(cls, **kwargs) -> dict:
if not alert_type:
alert_type = random.choice(list(ALERTS.keys()))

to_wrap_with_provider_type = kwargs.get("to_wrap_with_provider_type")

if "payload" in ALERTS[alert_type]:
alert_payload = ALERTS[alert_type]["payload"]
else:
Expand Down Expand Up @@ -552,11 +554,15 @@ def simulate_alert(cls, **kwargs) -> dict:
fingerprint = hashlib.md5(fingerprint_src.encode()).hexdigest()
alert_payload["fingerprint"] = fingerprint

return {
"alerts": [alert_payload],
"severity": alert_payload.get("labels", {}).get("severity"),
"title": alert_type,
}

final_payload = {
"alerts": [alert_payload],
"severity": alert_payload.get("labels", {}).get("severity"),
"title": alert_type,
}
if to_wrap_with_provider_type:
return {"keep_source_type": "grafana", "event": final_payload}
return final_payload


if __name__ == "__main__":
Expand Down
5 changes: 5 additions & 0 deletions keep/providers/prometheus_provider/prometheus_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ def simulate_alert(cls, **kwargs) -> dict:
if not alert_type:
alert_type = random.choice(list(ALERTS.keys()))

to_wrap_with_provider_type = kwargs.get("to_wrap_with_provider_type")

alert_payload = ALERTS[alert_type]["payload"]
alert_parameters = ALERTS[alert_type].get("parameters", [])
# now generate some random data
Expand Down Expand Up @@ -267,6 +269,9 @@ def simulate_alert(cls, **kwargs) -> dict:
fingerprint_src = json.dumps(alert_payload["labels"], sort_keys=True)
fingerprint = hashlib.md5(fingerprint_src.encode()).hexdigest()
alert_payload["fingerprint"] = fingerprint
if to_wrap_with_provider_type:
return {"keep_source_type": "prometheus", "event": alert_payload}

return alert_payload


Expand Down
51 changes: 36 additions & 15 deletions keep/providers/vectordev_provider/vectordev_provider.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import dataclasses

import random
import json

import pydantic
import logging

from keep.api.models.alert import AlertDto
from keep.contextmanager.contextmanager import ContextManager
from keep.providers.base.base_provider import BaseProvider
from keep.providers.models.provider_config import ProviderConfig
from keep.api.models.alert import AlertDto
from keep.providers.providers_factory import ProvidersFactory

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@pydantic.dataclasses.dataclass
class VectordevProviderAuthConfig:
Expand All @@ -21,6 +28,11 @@ class VectordevProvider(BaseProvider):
PROVIDER_CATEGORY = ["Monitoring", "Developer Tools"]
PROVIDER_COMING_SOON = True

# Mapping from vector sources to keep providers
SOURCE_TO_PROVIDER_MAP = {
"prometheus": "prometheus",
}

def __init__(
self, context_manager: ContextManager, provider_id: str, config: ProviderConfig
):
Expand All @@ -32,33 +44,42 @@ def validate_config(self):
)

def _format_alert(
event: list[dict], provider_instance: "BaseProvider" = None
event: dict, provider_instance: "BaseProvider" = None
) -> AlertDto | list[AlertDto]:
events = []
# event is a list of events
for e in event:
event_json = None
try:
event_json = json.loads(e.get("message"))
except json.JSONDecodeError:
pass

events.append(
if isinstance(event, list):
events = event
else:
events = [event]
alert_dtos = []
for e in events:
if "keep_source_type" in e and e["keep_source_type"] in VectordevProvider.SOURCE_TO_PROVIDER_MAP:
provider_class = ProvidersFactory.get_provider_class(VectordevProvider.SOURCE_TO_PROVIDER_MAP[e["keep_source_type"]])
alert_dtos.extend(provider_class._format_alert(e["message"],provider_instance))
else:
message_str = json.dumps(e.get("message"))
alert_dtos.append(
AlertDto(
name="",
host=e.get("host"),
message=e.get("message"),
description=e.get("message"),
message=message_str,
description=message_str,
lastReceived=e.get("timestamp"),
source_type=e.get("source_type"),
source=["vectordev"],
original_event=event_json,
original_event=e.get("message"),
)
)
return events
return alert_dtos

def dispose(self):
"""
No need to dispose of anything, so just do nothing.
"""
pass

@classmethod
def simulate_alert(cls, **kwargs) -> dict:
provider = random.choice(list(VectordevProvider.SOURCE_TO_PROVIDER_MAP.values()))
provider_class = ProvidersFactory.get_provider_class(provider)
return provider_class.simulate_alert(to_wrap_with_provider_type=True)

7 changes: 4 additions & 3 deletions keep/secretmanager/kubernetessecretmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import kubernetes.client
import kubernetes.config
from kubernetes.client.rest import ApiException
from kubernetes.client.exceptions import ApiException

from keep.secretmanager.secretmanager import BaseSecretManager

Expand Down Expand Up @@ -32,7 +32,7 @@ def write_secret(self, secret_name: str, secret_value: str) -> None:
ApiException: If an error occurs while writing the secret.
"""
# k8s requirements: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
secret_name = secret_name.replace("_", "-")
secret_name = secret_name.replace("_", "-").lower()
self.logger.info("Writing secret", extra={"secret_name": secret_name})

body = kubernetes.client.V1Secret(
Expand Down Expand Up @@ -70,7 +70,7 @@ def write_secret(self, secret_name: str, secret_value: str) -> None:

def read_secret(self, secret_name: str, is_json: bool = False) -> str | dict:
# k8s requirements: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
secret_name = secret_name.replace("_", "-")
secret_name = secret_name.replace("_", "-").lower()
self.logger.info("Getting secret", extra={"secret_name": secret_name})
try:
response = self.api.read_namespaced_secret(
Expand All @@ -91,6 +91,7 @@ def read_secret(self, secret_name: str, is_json: bool = False) -> str | dict:
raise

def delete_secret(self, secret_name: str) -> None:
secret_name = secret_name.replace("_", "-").lower()
self.logger.info("Deleting secret", extra={"secret_name": secret_name})
try:
self.api.delete_namespaced_secret(
Expand Down
Loading

0 comments on commit 3e80bfd

Please sign in to comment.