From 8215620d119d018c5b0f629d9c78edf86c3e3ec9 Mon Sep 17 00:00:00 2001 From: Shahar Glazner Date: Wed, 7 Feb 2024 14:35:10 +0200 Subject: [PATCH] feat: add the ability to run workflow on alert state change (#803) --- examples/workflows/change.yml | 13 ++++ keep/api/core/db.py | 50 ++++++++++--- .../prometheus_provider.py | 4 +- keep/rulesengine/rulesengine.py | 10 ++- keep/workflowmanager/workflowmanager.py | 73 ++++++++++++++----- 5 files changed, 119 insertions(+), 31 deletions(-) create mode 100644 examples/workflows/change.yml diff --git a/examples/workflows/change.yml b/examples/workflows/change.yml new file mode 100644 index 000000000..605b010be --- /dev/null +++ b/examples/workflows/change.yml @@ -0,0 +1,13 @@ +workflow: + id: on-field-change + description: demonstrates how to trigger a workflow when a field changes + triggers: + - type: alert + only_on_change: + - status + actions: + - name: echo-test + provider: + type: console + with: + alert_message: "Hello world" diff --git a/keep/api/core/db.py b/keep/api/core/db.py index cb4170e7d..88071b61e 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -415,6 +415,7 @@ def get_all_workflows(tenant_id: str) -> List[Workflow]: ).all() return workflows + def get_all_workflows_yamls(tenant_id: str) -> List[str]: with Session(engine) as session: workflows = session.exec( @@ -782,6 +783,24 @@ def get_alerts_by_fingerprint(tenant_id: str, fingerprint: str, limit=1) -> List return alerts +def get_previous_alert_by_fingerprint(tenant_id: str, fingerprint: str) -> Alert: + # get the previous alert for a given fingerprint + with Session(engine) as session: + alert = ( + session.query(Alert) + .filter(Alert.tenant_id == tenant_id) + .filter(Alert.fingerprint == fingerprint) + .order_by(Alert.timestamp.desc()) + .limit(2) + .all() + ) + if len(alert) > 1: + return alert[1] + else: + # no previous alert + return None + + def get_api_key(api_key: str) -> TenantApiKey: with Session(engine) as session: api_key_hashed = hashlib.sha256(api_key.encode()).hexdigest() @@ -991,19 +1010,31 @@ def delete_rule(tenant_id, rule_id): return False -def assign_alert_to_group(tenant_id, alert_id, rule_id, group_fingerprint) -> Group: +def assign_alert_to_group( + tenant_id, alert_id, rule_id, timeframe, group_fingerprint +) -> Group: # checks if group with the group critiria exists, if not it creates it # and then assign the alert to the group - with Session(engine, expire_on_commit=False) as session: + with Session(engine) as session: group = session.exec( select(Group) - .options(selectinload(Group.alerts)) + .options(joinedload(Group.alerts)) .where(Group.tenant_id == tenant_id) .where(Group.rule_id == rule_id) .where(Group.group_fingerprint == group_fingerprint) ).first() - if not group: + # if the last alert in the group is older than the timeframe, create a new group + if group: + # group has at least one alert (o/w it wouldn't created in the first place) + is_group_expired = max( + alert.timestamp for alert in group.alerts + ) < datetime.utcnow() - timedelta(seconds=timeframe) + else: + is_group_expired = True + + # if there is no group with the group_fingerprint, create it + if not group or is_group_expired: # Create and add a new group if it doesn't exist group = Group( tenant_id=tenant_id, @@ -1015,7 +1046,7 @@ def assign_alert_to_group(tenant_id, alert_id, rule_id, group_fingerprint) -> Gr # Re-query the group with selectinload to set up future automatic loading of alerts group = session.exec( select(Group) - .options(selectinload(Group.alerts)) + .options(joinedload(Group.alerts)) .where(Group.id == group.id) ).first() @@ -1027,10 +1058,11 @@ def assign_alert_to_group(tenant_id, alert_id, rule_id, group_fingerprint) -> Gr ) session.add(alert_group) session.commit() - # To reflect the newly added alert we expire its state to force a refresh on access - session.expire(group, ["alerts"]) - session.refresh(group) - return group + # Requery the group to get the updated alerts + group = session.exec( + select(Group).options(joinedload(Group.alerts)).where(Group.id == group.id) + ).first() + return group def get_groups(tenant_id): diff --git a/keep/providers/prometheus_provider/prometheus_provider.py b/keep/providers/prometheus_provider/prometheus_provider.py index cf78c9d56..2a45f8ede 100644 --- a/keep/providers/prometheus_provider/prometheus_provider.py +++ b/keep/providers/prometheus_provider/prometheus_provider.py @@ -229,7 +229,9 @@ def simulate_alert(**kwargs) -> dict: alert_payload[parameter] = random.choice(parameter_options) annotations = {"summary": alert_payload["summary"]} alert_payload["labels"]["alertname"] = alert_type - alert_payload["status"] = AlertStatus.FIRING.value + alert_payload["status"] = random.choice( + [AlertStatus.FIRING.value, AlertStatus.RESOLVED.value] + ) alert_payload["annotations"] = annotations alert_payload["startsAt"] = datetime.datetime.now( tz=datetime.timezone.utc diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index 517c81a5e..53f9379e4 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -57,6 +57,7 @@ def run_rules(self, events: list[AlertDto]): tenant_id=self.tenant_id, alert_id=event.event_id, rule_id=str(rule.id), + timeframe=rule.timeframe, group_fingerprint=group_fingerprint, ) groups.append(updated_group) @@ -177,7 +178,14 @@ def _check_if_rule_apply(self, rule, event: AlertDto): ast = env.compile(sub_rule) prgm = env.program(ast) activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str))) - r = prgm.evaluate(activation) + try: + r = prgm.evaluate(activation) + except celpy.evaluation.CELEvalError as e: + # this is ok, it means that the subrule is not relevant for this event + if "no such member" in str(e): + return False + # unknown + raise if r: return True # no subrules matched diff --git a/keep/workflowmanager/workflowmanager.py b/keep/workflowmanager/workflowmanager.py index c237c5d74..35dea6414 100644 --- a/keep/workflowmanager/workflowmanager.py +++ b/keep/workflowmanager/workflowmanager.py @@ -5,7 +5,11 @@ import uuid from keep.api.core.config import AuthenticationType -from keep.api.core.db import get_enrichment, save_workflow_results +from keep.api.core.db import ( + get_enrichment, + get_previous_alert_by_fingerprint, + save_workflow_results, +) from keep.api.models.alert import AlertDto from keep.providers.providers_factory import ProviderConfigurationException from keep.workflowmanager.workflow import Workflow @@ -84,6 +88,7 @@ def insert_events(self, tenant_id, events: typing.List[AlertDto]): if not trigger.get("type") == "alert": continue should_run = True + # apply filters for filter in trigger.get("filters", []): # TODO: more sophisticated filtering/attributes/nested, etc filter_key = filter.get("key") @@ -128,26 +133,54 @@ def insert_events(self, tenant_id, events: typing.List[AlertDto]): should_run = False break - # if we got here, it means the event should trigger the workflow - if should_run: - self.logger.info("Found a workflow to run") - event.trigger = "alert" - # prepare the alert with the enrichment - self.logger.info("Enriching alert") - alert_enrichment = get_enrichment(tenant_id, event.fingerprint) - if alert_enrichment: - for k, v in alert_enrichment.enrichments.items(): - setattr(event, k, v) - self.logger.info("Alert enriched") - self.scheduler.workflows_to_run.append( - { - "workflow": workflow, - "workflow_id": workflow_model.id, - "tenant_id": tenant_id, - "triggered_by": "alert", - "event": event, - } + if not should_run: + continue + # enrich the alert with more data + self.logger.info("Found a workflow to run") + event.trigger = "alert" + # prepare the alert with the enrichment + self.logger.info("Enriching alert") + alert_enrichment = get_enrichment(tenant_id, event.fingerprint) + if alert_enrichment: + for k, v in alert_enrichment.enrichments.items(): + setattr(event, k, v) + self.logger.info("Alert enriched") + # apply only_on_change (https://github.com/keephq/keep/issues/801) + fields_that_needs_to_be_change = trigger.get("only_on_change", []) + # if there are fields that needs to be changed, get the previous alert + if fields_that_needs_to_be_change: + previous_alert = get_previous_alert_by_fingerprint( + tenant_id, event.fingerprint ) + # now compare: + # (no previous alert means that the workflow should run) + if previous_alert: + for field in fields_that_needs_to_be_change: + # the field hasn't change + if getattr(event, field) == previous_alert.event.get(field): + self.logger.info( + "Skipping the workflow because the field hasn't change", + extra={ + "field": field, + "event": event, + "previous_alert": previous_alert, + }, + ) + should_run = False + break + + if not should_run: + continue + # Lastly, if the workflow should run, add it to the scheduler + self.scheduler.workflows_to_run.append( + { + "workflow": workflow, + "workflow_id": workflow_model.id, + "tenant_id": tenant_id, + "triggered_by": "alert", + "event": event, + } + ) def _get_event_value(self, event, filter_key): # if the filter key is a nested key, get the value