Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(performance): better performance for celpy #2005

Merged
merged 6 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion keep/api/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import os
import time
from importlib import metadata

import jwt
Expand Down Expand Up @@ -341,10 +342,13 @@ async def log_middleware(request: Request, call_next):
f"Request started: {request.method} {request.url.path}",
extra={"tenant_id": identity},
)
start_time = time.time()
request.state.tenant_id = identity
response = await call_next(request)

end_time = time.time()
logger.info(
f"Request finished: {request.method} {request.url.path} {response.status_code}"
f"Request finished: {request.method} {request.url.path} {response.status_code} in {end_time - start_time:.2f}s",
)
return response

Expand Down
2 changes: 1 addition & 1 deletion keep/api/routes/preset.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ def get_preset_alerts(
authenticated_entity: AuthenticatedEntity = Depends(
IdentityManagerFactory.get_auth_verifier(["read:presets"])
),
) -> list[AlertDto]:
) -> list:

# Gathering alerts may take a while and we don't care if it will finish before we return the response.
# In the worst case, gathered alerts will be pulled in the next request.
Expand Down
13 changes: 7 additions & 6 deletions keep/api/tasks/process_event_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,13 +418,19 @@ def __handle_formatted_events(
logger.exception("Failed to push alert to the client")

# Now we need to update the presets
# send with pusher
if notify_client:
pusher_client = get_pusher_client()
if not pusher_client:
return
try:
presets = get_all_presets(tenant_id)
rules_engine = RulesEngine(tenant_id=tenant_id)
presets_do_update = []
for preset in presets:
# filter the alerts based on the search query
preset_dto = PresetDto(**preset.to_dict())
filtered_alerts = RulesEngine.filter_alerts(
filtered_alerts = rules_engine.filter_alerts(
enriched_formatted_events, preset_dto.cel_query
)
# if not related alerts, no need to update
Expand Down Expand Up @@ -452,11 +458,6 @@ def __handle_formatted_events(
):
logger.info("Noisy preset is noisy")
preset_dto.should_do_noise_now = True
# send with pusher
if notify_client:
pusher_client = get_pusher_client()
if not pusher_client:
return
try:
pusher_client.trigger(
f"private-{tenant_id}",
Expand Down
68 changes: 51 additions & 17 deletions keep/rulesengine/rulesengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,41 @@
import logging

import celpy
import celpy.c7nlib
import celpy.celparser
import celpy.celtypes
import celpy.evaluation

from keep.api.consts import STATIC_PRESETS
from keep.api.core.db import assign_alert_to_incident, get_incident_for_grouping_rule
from keep.api.core.db import get_rules as get_rules_db
from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto
from keep.api.utils.cel_utils import preprocess_cel_expression

# Shahar: this is performance enhancment https://github.com/cloud-custodian/cel-python/issues/68


celpy.evaluation.Referent.__repr__ = lambda self: ""
celpy.evaluation.NameContainer.__repr__ = lambda self: ""
celpy.Activation.__repr__ = lambda self: ""
celpy.Activation.__str__ = lambda self: ""
celpy.celtypes.MapType.__repr__ = lambda self: ""
celpy.celtypes.DoubleType.__repr__ = lambda self: ""
celpy.celtypes.BytesType.__repr__ = lambda self: ""
celpy.celtypes.IntType.__repr__ = lambda self: ""
celpy.celtypes.UintType.__repr__ = lambda self: ""
celpy.celtypes.ListType.__repr__ = lambda self: ""
celpy.celtypes.StringType.__repr__ = lambda self: ""
celpy.celtypes.TimestampType.__repr__ = lambda self: ""
celpy.c7nlib.C7NContext.__repr__ = lambda self: ""
celpy.celparser.Tree.__repr__ = lambda self: ""


class RulesEngine:
def __init__(self, tenant_id=None):
self.tenant_id = tenant_id
self.logger = logging.getLogger(__name__)
self.env = celpy.Environment()

def run_rules(self, events: list[AlertDto]) -> list[IncidentDto]:
self.logger.info("Running rules")
Expand Down Expand Up @@ -90,10 +113,9 @@ def _check_if_rule_apply(self, rule, event: AlertDto):
# what we do here is to compile the CEL rule and evaluate it
# https://github.com/cloud-custodian/cel-python
# https://github.com/google/cel-spec
env = celpy.Environment()
for sub_rule in sub_rules:
ast = env.compile(sub_rule)
prgm = env.program(ast)
ast = self.env.compile(sub_rule)
prgm = self.env.program(ast)
activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str)))
try:
r = prgm.evaluate(activation)
Expand Down Expand Up @@ -152,8 +174,23 @@ def _calc_rule_fingerprint(self, event: AlertDto, rule):
return "none"
return ",".join(rule_fingerprint)

@staticmethod
def filter_alerts(alerts: list[AlertDto], cel: str):
def get_alerts_activation(self, alerts: list[AlertDto]):
activations = []
for alert in alerts:
payload = alert.dict()
# TODO: workaround since source is a list
# should be fixed in the future
payload["source"] = ",".join(payload["source"])
# payload severity could be the severity itself or the order of the severity, cast it to the order
if isinstance(payload["severity"], str):
payload["severity"] = AlertSeverity(payload["severity"].lower()).order
activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str)))
activations.append(activation)
return activations

def filter_alerts(
self, alerts: list[AlertDto], cel: str, alerts_activation: list = None
):
"""This function filters alerts according to a CEL

Args:
Expand All @@ -164,7 +201,7 @@ def filter_alerts(alerts: list[AlertDto], cel: str):
list[AlertDto]: list of alerts that are related to the cel
"""
logger = logging.getLogger(__name__)
env = celpy.Environment()

# tb: temp hack because this function is super slow
if cel == STATIC_PRESETS.get("feed", {}).options[0].get("value"):
return [
Expand All @@ -178,19 +215,15 @@ def filter_alerts(alerts: list[AlertDto], cel: str):
return alerts
# preprocess the cel expression
cel = preprocess_cel_expression(cel)
ast = env.compile(cel)
prgm = env.program(ast)
ast = self.env.compile(cel)
prgm = self.env.program(ast)
filtered_alerts = []
for alert in alerts:
payload = alert.dict()
# TODO: workaround since source is a list
# should be fixed in the future
payload["source"] = ",".join(payload["source"])
# payload severity could be the severity itself or the order of the severity, cast it to the order
if isinstance(payload["severity"], str):
payload["severity"] = AlertSeverity(payload["severity"].lower()).order

activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str)))
for i, alert in enumerate(alerts):
if alerts_activation:
activation = alerts_activation[i]
else:
activation = self.get_alerts_activation([alert])[0]
try:
r = prgm.evaluate(activation)
except celpy.evaluation.CELEvalError as e:
Expand Down Expand Up @@ -219,4 +252,5 @@ def filter_alerts(alerts: list[AlertDto], cel: str):
continue
if r:
filtered_alerts.append(alert)

return filtered_alerts
4 changes: 3 additions & 1 deletion keep/searchengine/searchengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,11 @@ def search_preset_alerts(
if self.search_mode == SearchMode.INTERNAL:
# get the alerts
alerts_dto = self._get_last_alerts()
# performance optimization: get the alerts activation once
alerts_activation = self.rule_engine.get_alerts_activation(alerts_dto)
for preset in presets:
filtered_alerts = self.rule_engine.filter_alerts(
alerts_dto, preset.cel_query
alerts_dto, preset.cel_query, alerts_activation
)
preset.alerts_count = len(filtered_alerts)
# update noisy
Expand Down
Loading