diff --git a/keep/api/api.py b/keep/api/api.py index 5bcc0ddcc..1ca20f744 100644 --- a/keep/api/api.py +++ b/keep/api/api.py @@ -1,6 +1,7 @@ import asyncio import logging import os +import time from importlib import metadata import jwt @@ -341,10 +342,13 @@ async def log_middleware(request: Request, call_next): f"Request started: {request.method} {request.url.path}", extra={"tenant_id": identity}, ) + start_time = time.time() request.state.tenant_id = identity response = await call_next(request) + + end_time = time.time() logger.info( - f"Request finished: {request.method} {request.url.path} {response.status_code}" + f"Request finished: {request.method} {request.url.path} {response.status_code} in {end_time - start_time:.2f}s", ) return response diff --git a/keep/api/routes/preset.py b/keep/api/routes/preset.py index 45f46c983..ec84d25f8 100644 --- a/keep/api/routes/preset.py +++ b/keep/api/routes/preset.py @@ -395,7 +395,7 @@ def get_preset_alerts( authenticated_entity: AuthenticatedEntity = Depends( IdentityManagerFactory.get_auth_verifier(["read:presets"]) ), -) -> list[AlertDto]: +) -> list: # Gathering alerts may take a while and we don't care if it will finish before we return the response. # In the worst case, gathered alerts will be pulled in the next request. diff --git a/keep/api/tasks/process_event_task.py b/keep/api/tasks/process_event_task.py index c9cbcf383..6bf1c65ce 100644 --- a/keep/api/tasks/process_event_task.py +++ b/keep/api/tasks/process_event_task.py @@ -418,13 +418,19 @@ def __handle_formatted_events( logger.exception("Failed to push alert to the client") # Now we need to update the presets + # send with pusher + if notify_client: + pusher_client = get_pusher_client() + if not pusher_client: + return try: presets = get_all_presets(tenant_id) + rules_engine = RulesEngine(tenant_id=tenant_id) presets_do_update = [] for preset in presets: # filter the alerts based on the search query preset_dto = PresetDto(**preset.to_dict()) - filtered_alerts = RulesEngine.filter_alerts( + filtered_alerts = rules_engine.filter_alerts( enriched_formatted_events, preset_dto.cel_query ) # if not related alerts, no need to update @@ -452,11 +458,6 @@ def __handle_formatted_events( ): logger.info("Noisy preset is noisy") preset_dto.should_do_noise_now = True - # send with pusher - if notify_client: - pusher_client = get_pusher_client() - if not pusher_client: - return try: pusher_client.trigger( f"private-{tenant_id}", diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index 095077645..bfc07aeaa 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -2,6 +2,10 @@ import logging import celpy +import celpy.c7nlib +import celpy.celparser +import celpy.celtypes +import celpy.evaluation from keep.api.consts import STATIC_PRESETS from keep.api.core.db import assign_alert_to_incident, get_incident_for_grouping_rule @@ -9,11 +13,30 @@ from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto from keep.api.utils.cel_utils import preprocess_cel_expression +# Shahar: this is performance enhancment https://github.com/cloud-custodian/cel-python/issues/68 + + +celpy.evaluation.Referent.__repr__ = lambda self: "" +celpy.evaluation.NameContainer.__repr__ = lambda self: "" +celpy.Activation.__repr__ = lambda self: "" +celpy.Activation.__str__ = lambda self: "" +celpy.celtypes.MapType.__repr__ = lambda self: "" +celpy.celtypes.DoubleType.__repr__ = lambda self: "" +celpy.celtypes.BytesType.__repr__ = lambda self: "" +celpy.celtypes.IntType.__repr__ = lambda self: "" +celpy.celtypes.UintType.__repr__ = lambda self: "" +celpy.celtypes.ListType.__repr__ = lambda self: "" +celpy.celtypes.StringType.__repr__ = lambda self: "" +celpy.celtypes.TimestampType.__repr__ = lambda self: "" +celpy.c7nlib.C7NContext.__repr__ = lambda self: "" +celpy.celparser.Tree.__repr__ = lambda self: "" + class RulesEngine: def __init__(self, tenant_id=None): self.tenant_id = tenant_id self.logger = logging.getLogger(__name__) + self.env = celpy.Environment() def run_rules(self, events: list[AlertDto]) -> list[IncidentDto]: self.logger.info("Running rules") @@ -90,10 +113,9 @@ def _check_if_rule_apply(self, rule, event: AlertDto): # what we do here is to compile the CEL rule and evaluate it # https://github.com/cloud-custodian/cel-python # https://github.com/google/cel-spec - env = celpy.Environment() for sub_rule in sub_rules: - ast = env.compile(sub_rule) - prgm = env.program(ast) + ast = self.env.compile(sub_rule) + prgm = self.env.program(ast) activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str))) try: r = prgm.evaluate(activation) @@ -152,8 +174,23 @@ def _calc_rule_fingerprint(self, event: AlertDto, rule): return "none" return ",".join(rule_fingerprint) - @staticmethod - def filter_alerts(alerts: list[AlertDto], cel: str): + def get_alerts_activation(self, alerts: list[AlertDto]): + activations = [] + for alert in alerts: + payload = alert.dict() + # TODO: workaround since source is a list + # should be fixed in the future + payload["source"] = ",".join(payload["source"]) + # payload severity could be the severity itself or the order of the severity, cast it to the order + if isinstance(payload["severity"], str): + payload["severity"] = AlertSeverity(payload["severity"].lower()).order + activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str))) + activations.append(activation) + return activations + + def filter_alerts( + self, alerts: list[AlertDto], cel: str, alerts_activation: list = None + ): """This function filters alerts according to a CEL Args: @@ -164,7 +201,7 @@ def filter_alerts(alerts: list[AlertDto], cel: str): list[AlertDto]: list of alerts that are related to the cel """ logger = logging.getLogger(__name__) - env = celpy.Environment() + # tb: temp hack because this function is super slow if cel == STATIC_PRESETS.get("feed", {}).options[0].get("value"): return [ @@ -178,19 +215,15 @@ def filter_alerts(alerts: list[AlertDto], cel: str): return alerts # preprocess the cel expression cel = preprocess_cel_expression(cel) - ast = env.compile(cel) - prgm = env.program(ast) + ast = self.env.compile(cel) + prgm = self.env.program(ast) filtered_alerts = [] - for alert in alerts: - payload = alert.dict() - # TODO: workaround since source is a list - # should be fixed in the future - payload["source"] = ",".join(payload["source"]) - # payload severity could be the severity itself or the order of the severity, cast it to the order - if isinstance(payload["severity"], str): - payload["severity"] = AlertSeverity(payload["severity"].lower()).order - activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str))) + for i, alert in enumerate(alerts): + if alerts_activation: + activation = alerts_activation[i] + else: + activation = self.get_alerts_activation([alert])[0] try: r = prgm.evaluate(activation) except celpy.evaluation.CELEvalError as e: @@ -219,4 +252,5 @@ def filter_alerts(alerts: list[AlertDto], cel: str): continue if r: filtered_alerts.append(alert) + return filtered_alerts diff --git a/keep/searchengine/searchengine.py b/keep/searchengine/searchengine.py index 0fcf7fb03..749b48a02 100644 --- a/keep/searchengine/searchengine.py +++ b/keep/searchengine/searchengine.py @@ -169,9 +169,11 @@ def search_preset_alerts( if self.search_mode == SearchMode.INTERNAL: # get the alerts alerts_dto = self._get_last_alerts() + # performance optimization: get the alerts activation once + alerts_activation = self.rule_engine.get_alerts_activation(alerts_dto) for preset in presets: filtered_alerts = self.rule_engine.filter_alerts( - alerts_dto, preset.cel_query + alerts_dto, preset.cel_query, alerts_activation ) preset.alerts_count = len(filtered_alerts) # update noisy