Skip to content

Commit

Permalink
feat(performance): better performance for celpy (keephq#2005)
Browse files Browse the repository at this point in the history
  • Loading branch information
shahargl authored Sep 25, 2024
1 parent 4689b42 commit fb72f73
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 26 deletions.
6 changes: 5 additions & 1 deletion keep/api/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import os
import time
from importlib import metadata

import jwt
Expand Down Expand Up @@ -341,10 +342,13 @@ async def log_middleware(request: Request, call_next):
f"Request started: {request.method} {request.url.path}",
extra={"tenant_id": identity},
)
start_time = time.time()
request.state.tenant_id = identity
response = await call_next(request)

end_time = time.time()
logger.info(
f"Request finished: {request.method} {request.url.path} {response.status_code}"
f"Request finished: {request.method} {request.url.path} {response.status_code} in {end_time - start_time:.2f}s",
)
return response

Expand Down
2 changes: 1 addition & 1 deletion keep/api/routes/preset.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ def get_preset_alerts(
authenticated_entity: AuthenticatedEntity = Depends(
IdentityManagerFactory.get_auth_verifier(["read:presets"])
),
) -> list[AlertDto]:
) -> list:

# Gathering alerts may take a while and we don't care if it will finish before we return the response.
# In the worst case, gathered alerts will be pulled in the next request.
Expand Down
13 changes: 7 additions & 6 deletions keep/api/tasks/process_event_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,13 +418,19 @@ def __handle_formatted_events(
logger.exception("Failed to push alert to the client")

# Now we need to update the presets
# send with pusher
if notify_client:
pusher_client = get_pusher_client()
if not pusher_client:
return
try:
presets = get_all_presets(tenant_id)
rules_engine = RulesEngine(tenant_id=tenant_id)
presets_do_update = []
for preset in presets:
# filter the alerts based on the search query
preset_dto = PresetDto(**preset.to_dict())
filtered_alerts = RulesEngine.filter_alerts(
filtered_alerts = rules_engine.filter_alerts(
enriched_formatted_events, preset_dto.cel_query
)
# if not related alerts, no need to update
Expand Down Expand Up @@ -452,11 +458,6 @@ def __handle_formatted_events(
):
logger.info("Noisy preset is noisy")
preset_dto.should_do_noise_now = True
# send with pusher
if notify_client:
pusher_client = get_pusher_client()
if not pusher_client:
return
try:
pusher_client.trigger(
f"private-{tenant_id}",
Expand Down
68 changes: 51 additions & 17 deletions keep/rulesengine/rulesengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,41 @@
import logging

import celpy
import celpy.c7nlib
import celpy.celparser
import celpy.celtypes
import celpy.evaluation

from keep.api.consts import STATIC_PRESETS
from keep.api.core.db import assign_alert_to_incident, get_incident_for_grouping_rule
from keep.api.core.db import get_rules as get_rules_db
from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto
from keep.api.utils.cel_utils import preprocess_cel_expression

# Shahar: this is performance enhancment https://github.com/cloud-custodian/cel-python/issues/68


celpy.evaluation.Referent.__repr__ = lambda self: ""
celpy.evaluation.NameContainer.__repr__ = lambda self: ""
celpy.Activation.__repr__ = lambda self: ""
celpy.Activation.__str__ = lambda self: ""
celpy.celtypes.MapType.__repr__ = lambda self: ""
celpy.celtypes.DoubleType.__repr__ = lambda self: ""
celpy.celtypes.BytesType.__repr__ = lambda self: ""
celpy.celtypes.IntType.__repr__ = lambda self: ""
celpy.celtypes.UintType.__repr__ = lambda self: ""
celpy.celtypes.ListType.__repr__ = lambda self: ""
celpy.celtypes.StringType.__repr__ = lambda self: ""
celpy.celtypes.TimestampType.__repr__ = lambda self: ""
celpy.c7nlib.C7NContext.__repr__ = lambda self: ""
celpy.celparser.Tree.__repr__ = lambda self: ""


class RulesEngine:
def __init__(self, tenant_id=None):
self.tenant_id = tenant_id
self.logger = logging.getLogger(__name__)
self.env = celpy.Environment()

def run_rules(self, events: list[AlertDto]) -> list[IncidentDto]:
self.logger.info("Running rules")
Expand Down Expand Up @@ -90,10 +113,9 @@ def _check_if_rule_apply(self, rule, event: AlertDto):
# what we do here is to compile the CEL rule and evaluate it
# https://github.com/cloud-custodian/cel-python
# https://github.com/google/cel-spec
env = celpy.Environment()
for sub_rule in sub_rules:
ast = env.compile(sub_rule)
prgm = env.program(ast)
ast = self.env.compile(sub_rule)
prgm = self.env.program(ast)
activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str)))
try:
r = prgm.evaluate(activation)
Expand Down Expand Up @@ -152,8 +174,23 @@ def _calc_rule_fingerprint(self, event: AlertDto, rule):
return "none"
return ",".join(rule_fingerprint)

@staticmethod
def filter_alerts(alerts: list[AlertDto], cel: str):
def get_alerts_activation(self, alerts: list[AlertDto]):
activations = []
for alert in alerts:
payload = alert.dict()
# TODO: workaround since source is a list
# should be fixed in the future
payload["source"] = ",".join(payload["source"])
# payload severity could be the severity itself or the order of the severity, cast it to the order
if isinstance(payload["severity"], str):
payload["severity"] = AlertSeverity(payload["severity"].lower()).order
activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str)))
activations.append(activation)
return activations

def filter_alerts(
self, alerts: list[AlertDto], cel: str, alerts_activation: list = None
):
"""This function filters alerts according to a CEL
Args:
Expand All @@ -164,7 +201,7 @@ def filter_alerts(alerts: list[AlertDto], cel: str):
list[AlertDto]: list of alerts that are related to the cel
"""
logger = logging.getLogger(__name__)
env = celpy.Environment()

# tb: temp hack because this function is super slow
if cel == STATIC_PRESETS.get("feed", {}).options[0].get("value"):
return [
Expand All @@ -178,19 +215,15 @@ def filter_alerts(alerts: list[AlertDto], cel: str):
return alerts
# preprocess the cel expression
cel = preprocess_cel_expression(cel)
ast = env.compile(cel)
prgm = env.program(ast)
ast = self.env.compile(cel)
prgm = self.env.program(ast)
filtered_alerts = []
for alert in alerts:
payload = alert.dict()
# TODO: workaround since source is a list
# should be fixed in the future
payload["source"] = ",".join(payload["source"])
# payload severity could be the severity itself or the order of the severity, cast it to the order
if isinstance(payload["severity"], str):
payload["severity"] = AlertSeverity(payload["severity"].lower()).order

activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str)))
for i, alert in enumerate(alerts):
if alerts_activation:
activation = alerts_activation[i]
else:
activation = self.get_alerts_activation([alert])[0]
try:
r = prgm.evaluate(activation)
except celpy.evaluation.CELEvalError as e:
Expand Down Expand Up @@ -219,4 +252,5 @@ def filter_alerts(alerts: list[AlertDto], cel: str):
continue
if r:
filtered_alerts.append(alert)

return filtered_alerts
4 changes: 3 additions & 1 deletion keep/searchengine/searchengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,11 @@ def search_preset_alerts(
if self.search_mode == SearchMode.INTERNAL:
# get the alerts
alerts_dto = self._get_last_alerts()
# performance optimization: get the alerts activation once
alerts_activation = self.rule_engine.get_alerts_activation(alerts_dto)
for preset in presets:
filtered_alerts = self.rule_engine.filter_alerts(
alerts_dto, preset.cel_query
alerts_dto, preset.cel_query, alerts_activation
)
preset.alerts_count = len(filtered_alerts)
# update noisy
Expand Down

0 comments on commit fb72f73

Please sign in to comment.