diff --git a/examples/workflows/change.yml b/examples/workflows/change.yml new file mode 100644 index 000000000..605b010be --- /dev/null +++ b/examples/workflows/change.yml @@ -0,0 +1,13 @@ +workflow: + id: on-field-change + description: demonstrates how to trigger a workflow when a field changes + triggers: + - type: alert + only_on_change: + - status + actions: + - name: echo-test + provider: + type: console + with: + alert_message: "Hello world" diff --git a/keep/api/core/db.py b/keep/api/core/db.py index 517a56455..c1e689302 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -813,6 +813,24 @@ def get_alerts_by_fingerprint(tenant_id: str, fingerprint: str, limit=1) -> List return alerts +def get_previous_alert_by_fingerprint(tenant_id: str, fingerprint: str) -> Alert: + # get the previous alert for a given fingerprint + with Session(engine) as session: + alert = ( + session.query(Alert) + .filter(Alert.tenant_id == tenant_id) + .filter(Alert.fingerprint == fingerprint) + .order_by(Alert.timestamp.desc()) + .limit(2) + .all() + ) + if len(alert) > 1: + return alert[1] + else: + # no previous alert + return None + + def get_api_key(api_key: str) -> TenantApiKey: with Session(engine) as session: api_key_hashed = hashlib.sha256(api_key.encode()).hexdigest() @@ -1052,24 +1070,33 @@ def delete_rule(tenant_id, rule_id): return False -async def assign_alert_to_group( - tenant_id, alert_id, rule_id, group_fingerprint +def assign_alert_to_group( + tenant_id, alert_id, rule_id, timeframe, group_fingerprint ) -> Group: - tracer = trace.get_tracer(__name__) - - # Ensure that `async_engine` is an instance of `create_async_engine` + # checks if group with the group critiria exists, if not it creates it + # and then assign the alert to the group async with AsyncSession(async_engine, expire_on_commit=False) as session: - result = await session.execute( + group = await session.execute( select(Group) - .options(selectinload(Group.alerts)) + .options(joinedload(Group.alerts)) .where(Group.tenant_id == tenant_id) .where(Group.rule_id == rule_id) .where(Group.group_fingerprint == group_fingerprint) ) group = result.scalars().first() - if not group: - # Create a new group if it doesn't exist + # if the last alert in the group is older than the timeframe, create a new group + if group: + # group has at least one alert (o/w it wouldn't created in the first place) + is_group_expired = max( + alert.timestamp for alert in group.alerts + ) < datetime.utcnow() - timedelta(seconds=timeframe) + else: + is_group_expired = True + + # if there is no group with the group_fingerprint, create it + if not group or is_group_expired: + # Create and add a new group if it doesn't exist group = Group( tenant_id=tenant_id, rule_id=rule_id, @@ -1081,28 +1108,24 @@ async def assign_alert_to_group( # Re-query the group with selectinload to set up future automatic loading of alerts result = await session.execute( select(Group) - .options(selectinload(Group.alerts)) + .options(joinedload(Group.alerts)) .where(Group.id == group.id) ) group = result.scalars().first() # Create a new AlertToGroup instance and add it - with tracer.start_as_current_span("alert_to_group"): - alert_group = AlertToGroup( - tenant_id=tenant_id, - alert_id=str(alert_id), - group_id=str(group.id), - ) - with tracer.start_as_current_span("session_add"): - session.add(alert_group) - with tracer.start_as_current_span("session_commit"): - # Commit inside the session's context manager will automatically commit on exit - await session.commit() + alert_group = AlertToGroup( + tenant_id=tenant_id, + alert_id=str(alert_id), + group_id=str(group.id), + ) + session.add(alert_group) + # Commit inside the session's context manager will automatically commit on exit + await session.commit() # Refresh and expire need to be awaited as well - with tracer.start_as_current_span("session_expire_and_refresh"): - session.expire(group, ["alerts"]) - await session.refresh(group) + session.expire(group, ["alerts"]) + await session.refresh(group) return group diff --git a/keep/providers/prometheus_provider/prometheus_provider.py b/keep/providers/prometheus_provider/prometheus_provider.py index cf78c9d56..2a45f8ede 100644 --- a/keep/providers/prometheus_provider/prometheus_provider.py +++ b/keep/providers/prometheus_provider/prometheus_provider.py @@ -229,7 +229,9 @@ def simulate_alert(**kwargs) -> dict: alert_payload[parameter] = random.choice(parameter_options) annotations = {"summary": alert_payload["summary"]} alert_payload["labels"]["alertname"] = alert_type - alert_payload["status"] = AlertStatus.FIRING.value + alert_payload["status"] = random.choice( + [AlertStatus.FIRING.value, AlertStatus.RESOLVED.value] + ) alert_payload["annotations"] = annotations alert_payload["startsAt"] = datetime.datetime.now( tz=datetime.timezone.utc diff --git a/keep/rulesengine/rulesengine.py b/keep/rulesengine/rulesengine.py index af9f9f682..ef5ac5c69 100644 --- a/keep/rulesengine/rulesengine.py +++ b/keep/rulesengine/rulesengine.py @@ -184,7 +184,14 @@ def _check_if_rule_apply(self, rule, event: AlertDto): ast = env.compile(sub_rule) prgm = env.program(ast) activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str))) - r = prgm.evaluate(activation) + try: + r = prgm.evaluate(activation) + except celpy.evaluation.CELEvalError as e: + # this is ok, it means that the subrule is not relevant for this event + if "no such member" in str(e): + return False + # unknown + raise if r: return True # no subrules matched diff --git a/keep/workflowmanager/workflowmanager.py b/keep/workflowmanager/workflowmanager.py index c237c5d74..35dea6414 100644 --- a/keep/workflowmanager/workflowmanager.py +++ b/keep/workflowmanager/workflowmanager.py @@ -5,7 +5,11 @@ import uuid from keep.api.core.config import AuthenticationType -from keep.api.core.db import get_enrichment, save_workflow_results +from keep.api.core.db import ( + get_enrichment, + get_previous_alert_by_fingerprint, + save_workflow_results, +) from keep.api.models.alert import AlertDto from keep.providers.providers_factory import ProviderConfigurationException from keep.workflowmanager.workflow import Workflow @@ -84,6 +88,7 @@ def insert_events(self, tenant_id, events: typing.List[AlertDto]): if not trigger.get("type") == "alert": continue should_run = True + # apply filters for filter in trigger.get("filters", []): # TODO: more sophisticated filtering/attributes/nested, etc filter_key = filter.get("key") @@ -128,26 +133,54 @@ def insert_events(self, tenant_id, events: typing.List[AlertDto]): should_run = False break - # if we got here, it means the event should trigger the workflow - if should_run: - self.logger.info("Found a workflow to run") - event.trigger = "alert" - # prepare the alert with the enrichment - self.logger.info("Enriching alert") - alert_enrichment = get_enrichment(tenant_id, event.fingerprint) - if alert_enrichment: - for k, v in alert_enrichment.enrichments.items(): - setattr(event, k, v) - self.logger.info("Alert enriched") - self.scheduler.workflows_to_run.append( - { - "workflow": workflow, - "workflow_id": workflow_model.id, - "tenant_id": tenant_id, - "triggered_by": "alert", - "event": event, - } + if not should_run: + continue + # enrich the alert with more data + self.logger.info("Found a workflow to run") + event.trigger = "alert" + # prepare the alert with the enrichment + self.logger.info("Enriching alert") + alert_enrichment = get_enrichment(tenant_id, event.fingerprint) + if alert_enrichment: + for k, v in alert_enrichment.enrichments.items(): + setattr(event, k, v) + self.logger.info("Alert enriched") + # apply only_on_change (https://github.com/keephq/keep/issues/801) + fields_that_needs_to_be_change = trigger.get("only_on_change", []) + # if there are fields that needs to be changed, get the previous alert + if fields_that_needs_to_be_change: + previous_alert = get_previous_alert_by_fingerprint( + tenant_id, event.fingerprint ) + # now compare: + # (no previous alert means that the workflow should run) + if previous_alert: + for field in fields_that_needs_to_be_change: + # the field hasn't change + if getattr(event, field) == previous_alert.event.get(field): + self.logger.info( + "Skipping the workflow because the field hasn't change", + extra={ + "field": field, + "event": event, + "previous_alert": previous_alert, + }, + ) + should_run = False + break + + if not should_run: + continue + # Lastly, if the workflow should run, add it to the scheduler + self.scheduler.workflows_to_run.append( + { + "workflow": workflow, + "workflow_id": workflow_model.id, + "tenant_id": tenant_id, + "triggered_by": "alert", + "event": event, + } + ) def _get_event_value(self, event, filter_key): # if the filter key is a nested key, get the value