Skip to content

Commit

Permalink
feat: add firing time calc on digestion (#1624)
Browse files Browse the repository at this point in the history
  • Loading branch information
shahargl authored Aug 15, 2024
1 parent e7af854 commit b31ebf2
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 145 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test-pr-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ jobs:
verbose: true

- name: Dump backend logs
if: failure()
if: always()
run: |
docker compose --project-directory . -f tests/e2e_tests/docker-compose-e2e-${{ matrix.db_type }}.yml logs keep-backend > backend_logs-${{ matrix.db_type }}.txt
docker compose --project-directory . -f tests/e2e_tests/docker-compose-e2e-${{ matrix.db_type }}.yml logs keep-frontend > frontend_logs-${{ matrix.db_type }}.txt
continue-on-error: true

- name: Upload test artifacts on failure
if: failure()
if: always()
uses: actions/upload-artifact@v3
with:
name: test-artifacts
Expand Down
6 changes: 4 additions & 2 deletions keep/api/bl/enrichments.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from sqlmodel import Session

from keep.api.core.db import enrich_alert as enrich_alert_db
from keep.api.core.db import get_enrichment, get_mapping_rule_by_id
from keep.api.core.db import get_enrichment_with_session, get_mapping_rule_by_id
from keep.api.core.elastic import ElasticClient
from keep.api.models.alert import AlertDto
from keep.api.models.db.alert import AlertActionType
Expand Down Expand Up @@ -415,7 +415,9 @@ def dispose_enrichments(self, fingerprint: str):
Dispose of enrichments from the alert
"""
self.logger.debug("disposing enrichments", extra={"fingerprint": fingerprint})
enrichments = get_enrichment(self.tenant_id, fingerprint)
enrichments = get_enrichment_with_session(
self.db_session, self.tenant_id, fingerprint
)
if not enrichments or not enrichments.enrichments:
self.logger.debug(
"no enrichments to dispose", extra={"fingerprint": fingerprint}
Expand Down
103 changes: 20 additions & 83 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -815,20 +815,7 @@ def count_alerts(

def get_enrichment(tenant_id, fingerprint, refresh=False):
with Session(engine) as session:
alert_enrichment = session.exec(
select(AlertEnrichment)
.where(AlertEnrichment.tenant_id == tenant_id)
.where(AlertEnrichment.alert_fingerprint == fingerprint)
).first()

if refresh:
try:
session.refresh(alert_enrichment)
except Exception:
logger.exception(
"Failed to refresh enrichment",
extra={"tenant_id": tenant_id, "fingerprint": fingerprint},
)
return get_enrichment_with_session(session, tenant_id, fingerprint, refresh)
return alert_enrichment


Expand All @@ -851,12 +838,20 @@ def get_enrichments(
return result


def get_enrichment_with_session(session, tenant_id, fingerprint):
def get_enrichment_with_session(session, tenant_id, fingerprint, refresh=False):
alert_enrichment = session.exec(
select(AlertEnrichment)
.where(AlertEnrichment.tenant_id == tenant_id)
.where(AlertEnrichment.alert_fingerprint == fingerprint)
).first()
if refresh:
try:
session.refresh(alert_enrichment)
except Exception:
logger.exception(
"Failed to refresh enrichment",
extra={"tenant_id": tenant_id, "fingerprint": fingerprint},
)
return alert_enrichment


Expand Down Expand Up @@ -1360,7 +1355,10 @@ def assign_alert_to_group(
enrich_alert(
tenant_id,
fingerprint,
enrichments={"group_expired": True},
enrichments={
"group_expired": True,
"status": AlertStatus.RESOLVED.value, # Shahar: expired groups should be resolved also in elasticsearch
},
action_type=AlertActionType.GENERIC_ENRICH, # TODO: is this a live code?
action_callee="system",
action_description="Enriched group with group_expired flag",
Expand Down Expand Up @@ -2445,7 +2443,9 @@ def write_pmi_matrix_to_db(tenant_id: str, pmi_matrix_df: pd.DataFrame) -> bool:

# Query for existing entries to differentiate between updates and inserts
existing_entries = session.query(PMIMatrix).filter_by(tenant_id=tenant_id).all()
existing_entries_set = {(entry.fingerprint_i, entry.fingerprint_j) for entry in existing_entries}
existing_entries_set = {
(entry.fingerprint_i, entry.fingerprint_j) for entry in existing_entries
}

for fingerprint_i in pmi_matrix_df.index:
for fingerprint_j in pmi_matrix_df.columns:
Expand All @@ -2455,7 +2455,7 @@ def write_pmi_matrix_to_db(tenant_id: str, pmi_matrix_df: pd.DataFrame) -> bool:
"tenant_id": tenant_id,
"fingerprint_i": fingerprint_i,
"fingerprint_j": fingerprint_j,
"pmi": pmi
"pmi": pmi,
}

if (fingerprint_i, fingerprint_j) in existing_entries_set:
Expand All @@ -2466,11 +2466,11 @@ def write_pmi_matrix_to_db(tenant_id: str, pmi_matrix_df: pd.DataFrame) -> bool:
# Update existing records
if pmi_entries_to_update:
session.bulk_update_mappings(PMIMatrix, pmi_entries_to_update)

# Insert new records
if pmi_entries_to_insert:
session.bulk_insert_mappings(PMIMatrix, pmi_entries_to_insert)

session.commit()

return True
Expand Down Expand Up @@ -2510,69 +2510,6 @@ def get_pmi_values(
return pmi_values


def get_alert_firing_time(tenant_id: str, fingerprint: str) -> timedelta:
with Session(engine) as session:
# Get the latest alert for this fingerprint
latest_alert = (
session.query(Alert)
.filter(Alert.tenant_id == tenant_id)
.filter(Alert.fingerprint == fingerprint)
.order_by(Alert.timestamp.desc())
.first()
)

if not latest_alert:
return timedelta()

# Extract status from the event column
latest_status = latest_alert.event.get("status")

# If the latest status is not 'firing', return 0
if latest_status != "firing":
return timedelta()

# Find the last time it wasn't firing
last_non_firing = (
session.query(Alert)
.filter(Alert.tenant_id == tenant_id)
.filter(Alert.fingerprint == fingerprint)
.filter(func.json_extract(Alert.event, "$.status") != "firing")
.order_by(Alert.timestamp.desc())
.first()
)

if last_non_firing:
# Find the next firing alert after the last non-firing alert
next_firing = (
session.query(Alert)
.filter(Alert.tenant_id == tenant_id)
.filter(Alert.fingerprint == fingerprint)
.filter(Alert.timestamp > last_non_firing.timestamp)
.filter(func.json_extract(Alert.event, "$.status") == "firing")
.order_by(Alert.timestamp.asc())
.first()
)
if next_firing:
return datetime.now(tz=timezone.utc) - next_firing.timestamp.replace(
tzinfo=timezone.utc
)
else:
# If no firing alert after the last non-firing, return 0
return timedelta()
else:
# If all alerts are firing, use the earliest alert time
earliest_alert = (
session.query(Alert)
.filter(Alert.tenant_id == tenant_id)
.filter(Alert.fingerprint == fingerprint)
.order_by(Alert.timestamp.asc())
.first()
)
return datetime.now(tz=timezone.utc) - earliest_alert.timestamp.replace(
tzinfo=timezone.utc
)


def update_incident_summary(incident_id: UUID, summary: str) -> Incident:
with Session(engine) as session:
incident = session.exec(
Expand Down
1 change: 1 addition & 0 deletions keep/api/models/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class AlertDto(BaseModel):
status: AlertStatus
severity: AlertSeverity
lastReceived: str
firingStartTime: str | None = None
environment: str = "undefined"
isDuplicate: bool | None = None
duplicateReason: str | None = None
Expand Down
22 changes: 15 additions & 7 deletions keep/api/routes/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@
get_pusher_client,
)
from keep.api.core.elastic import ElasticClient
from keep.api.models.alert import AlertDto, DeleteRequestBody, EnrichAlertRequestBody, UnEnrichAlertRequestBody
from keep.api.models.alert import (
AlertDto,
DeleteRequestBody,
EnrichAlertRequestBody,
UnEnrichAlertRequestBody,
)
from keep.api.models.db.alert import AlertActionType
from keep.api.models.search_alert import SearchAlertsRequest
from keep.api.tasks.process_event_task import process_event
Expand Down Expand Up @@ -499,12 +504,11 @@ def enrich_alert(
return {"status": "failed"}



@router.post(
"/unenrich",
description="Un-Enrich an alert",
)
def enrich_alert(
def unenrich_alert(
enrich_data: UnEnrichAlertRequestBody,
pusher_client: Pusher = Depends(get_pusher_client),
authenticated_entity: AuthenticatedEntity = Depends(AuthVerifier(["write:alert"])),
Expand Down Expand Up @@ -534,7 +538,9 @@ def enrich_alert(
enrichement_bl = EnrichmentsBl(tenant_id)
if "status" in enrich_data.enrichments:
action_type = AlertActionType.STATUS_UNENRICH
action_description = f"Alert status was un-enriched by {authenticated_entity.email}"
action_description = (
f"Alert status was un-enriched by {authenticated_entity.email}"
)
elif "note" in enrich_data.enrichments:
action_type = AlertActionType.UNCOMMENT
action_description = f"Comment removed by {authenticated_entity.email}"
Expand All @@ -549,8 +555,9 @@ def enrich_alert(
enrichments = enrichments_object.enrichments

new_enrichments = {
key: value for key, value in enrichments.items()
if key not in enrich_data.enrichments
key: value
for key, value in enrichments.items()
if key not in enrich_data.enrichments
}

enrichement_bl.enrich_alert(
Expand All @@ -559,7 +566,7 @@ def enrich_alert(
action_type=action_type,
action_callee=authenticated_entity.email,
action_description=action_description,
force=True
force=True,
)

alert = get_alerts_by_fingerprint(
Expand Down Expand Up @@ -601,6 +608,7 @@ def enrich_alert(
logger.exception("Failed to un-enrich alert", extra={"error": str(e)})
return {"status": "failed"}


@router.post(
"/search",
description="Search alerts",
Expand Down
43 changes: 33 additions & 10 deletions keep/api/tasks/process_event_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,21 @@
# internals
from keep.api.alert_deduplicator.alert_deduplicator import AlertDeduplicator
from keep.api.bl.enrichments import EnrichmentsBl
from keep.api.core.db import get_all_presets, get_enrichment, get_session_sync
from keep.api.core.db import (
get_alerts_by_fingerprint,
get_all_presets,
get_enrichment_with_session,
get_session_sync,
)
from keep.api.core.dependencies import get_pusher_client
from keep.api.core.elastic import ElasticClient
from keep.api.models.alert import AlertDto, AlertStatus
from keep.api.models.db.alert import Alert, AlertActionType, AlertAudit, AlertRaw
from keep.api.models.db.preset import PresetDto
from keep.api.utils.enrichment_helpers import (
calculated_start_firing_time,
convert_db_alerts_to_dto_alerts,
)
from keep.providers.providers_factory import ProvidersFactory
from keep.rulesengine.rulesengine import RulesEngine
from keep.workflowmanager.workflowmanager import WorkflowManager
Expand Down Expand Up @@ -86,6 +95,14 @@ def __save_to_db(
enriched_formatted_events = []
for formatted_event in formatted_events:
formatted_event.pushed = True
# calculate startFiring time
previous_alert = get_alerts_by_fingerprint(
tenant_id=tenant_id, fingerprint=formatted_event.fingerprint, limit=1
)
previous_alert = convert_db_alerts_to_dto_alerts(previous_alert)
formatted_event.firingStartTime = calculated_start_firing_time(
formatted_event, previous_alert
)

enrichments_bl = EnrichmentsBl(tenant_id, session)
# Dispose enrichments that needs to be disposed
Expand Down Expand Up @@ -127,11 +144,9 @@ def __save_to_db(
"alert_hash": formatted_event.alert_hash,
}
if timestamp_forced is not None:
alert_args['timestamp'] = timestamp_forced
alert_args["timestamp"] = timestamp_forced

alert = Alert(
**alert_args
)
alert = Alert(**alert_args)
session.add(alert)
audit = AlertAudit(
tenant_id=tenant_id,
Expand All @@ -156,8 +171,10 @@ def __save_to_db(
except Exception:
logger.exception("Failed to run mapping rules")

alert_enrichment = get_enrichment(
tenant_id=tenant_id, fingerprint=formatted_event.fingerprint
alert_enrichment = get_enrichment_with_session(
session=session,
tenant_id=tenant_id,
fingerprint=formatted_event.fingerprint,
)
if alert_enrichment:
for enrichment in alert_enrichment.enrichments:
Expand Down Expand Up @@ -220,7 +237,6 @@ def __handle_formatted_events(
"tenant_id": tenant_id,
},
)
pusher_client = get_pusher_client()

# first, filter out any deduplicated events
alert_deduplicator = AlertDeduplicator(tenant_id)
Expand Down Expand Up @@ -316,7 +332,10 @@ def __handle_formatted_events(
)

# Tell the client to poll alerts
if pusher_client and notify_client:
if notify_client:
pusher_client = get_pusher_client()
if not pusher_client:
return
try:
pusher_client.trigger(
f"private-{tenant_id}",
Expand Down Expand Up @@ -362,7 +381,7 @@ def __handle_formatted_events(
logger.info("Noisy preset is noisy")
preset_dto.should_do_noise_now = True
# send with pusher
if pusher_client and notify_client:
if notify_client and pusher_client:
try:
pusher_client.trigger(
f"private-{tenant_id}",
Expand Down Expand Up @@ -395,6 +414,7 @@ def process_event(
AlertDto | list[AlertDto] | dict
), # the event to process, either plain (generic) or from a specific provider
notify_client: bool = True,
timestamp_forced: datetime.datetime | None = None,
):
extra_dict = {
"tenant_id": tenant_id,
Expand Down Expand Up @@ -432,10 +452,12 @@ def process_event(
# In case when provider_type is not set
if isinstance(event, dict):
event = [AlertDto(**event)]
raw_event = [raw_event]

# Prepare the event for the digest
if isinstance(event, AlertDto):
event = [event]
raw_event = [raw_event]

__internal_prepartion(event, fingerprint, api_key_name)
__handle_formatted_events(
Expand All @@ -446,6 +468,7 @@ def process_event(
event,
provider_id,
notify_client,
timestamp_forced,
)
except Exception:
logger.exception("Error processing event", extra=extra_dict)
Expand Down
Loading

0 comments on commit b31ebf2

Please sign in to comment.