Skip to content

Commit

Permalink
feat: add the ability to run workflow on alert state change (#803)
Browse files Browse the repository at this point in the history
  • Loading branch information
shahargl authored Feb 7, 2024
1 parent 1f3b6c6 commit 8215620
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 31 deletions.
13 changes: 13 additions & 0 deletions examples/workflows/change.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
workflow:
id: on-field-change
description: demonstrates how to trigger a workflow when a field changes
triggers:
- type: alert
only_on_change:
- status
actions:
- name: echo-test
provider:
type: console
with:
alert_message: "Hello world"
50 changes: 41 additions & 9 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ def get_all_workflows(tenant_id: str) -> List[Workflow]:
).all()
return workflows


def get_all_workflows_yamls(tenant_id: str) -> List[str]:
with Session(engine) as session:
workflows = session.exec(
Expand Down Expand Up @@ -782,6 +783,24 @@ def get_alerts_by_fingerprint(tenant_id: str, fingerprint: str, limit=1) -> List
return alerts


def get_previous_alert_by_fingerprint(tenant_id: str, fingerprint: str) -> Alert:
# get the previous alert for a given fingerprint
with Session(engine) as session:
alert = (
session.query(Alert)
.filter(Alert.tenant_id == tenant_id)
.filter(Alert.fingerprint == fingerprint)
.order_by(Alert.timestamp.desc())
.limit(2)
.all()
)
if len(alert) > 1:
return alert[1]
else:
# no previous alert
return None


def get_api_key(api_key: str) -> TenantApiKey:
with Session(engine) as session:
api_key_hashed = hashlib.sha256(api_key.encode()).hexdigest()
Expand Down Expand Up @@ -991,19 +1010,31 @@ def delete_rule(tenant_id, rule_id):
return False


def assign_alert_to_group(tenant_id, alert_id, rule_id, group_fingerprint) -> Group:
def assign_alert_to_group(
tenant_id, alert_id, rule_id, timeframe, group_fingerprint
) -> Group:
# checks if group with the group critiria exists, if not it creates it
# and then assign the alert to the group
with Session(engine, expire_on_commit=False) as session:
with Session(engine) as session:
group = session.exec(
select(Group)
.options(selectinload(Group.alerts))
.options(joinedload(Group.alerts))
.where(Group.tenant_id == tenant_id)
.where(Group.rule_id == rule_id)
.where(Group.group_fingerprint == group_fingerprint)
).first()

if not group:
# if the last alert in the group is older than the timeframe, create a new group
if group:
# group has at least one alert (o/w it wouldn't created in the first place)
is_group_expired = max(
alert.timestamp for alert in group.alerts
) < datetime.utcnow() - timedelta(seconds=timeframe)
else:
is_group_expired = True

# if there is no group with the group_fingerprint, create it
if not group or is_group_expired:
# Create and add a new group if it doesn't exist
group = Group(
tenant_id=tenant_id,
Expand All @@ -1015,7 +1046,7 @@ def assign_alert_to_group(tenant_id, alert_id, rule_id, group_fingerprint) -> Gr
# Re-query the group with selectinload to set up future automatic loading of alerts
group = session.exec(
select(Group)
.options(selectinload(Group.alerts))
.options(joinedload(Group.alerts))
.where(Group.id == group.id)
).first()

Expand All @@ -1027,10 +1058,11 @@ def assign_alert_to_group(tenant_id, alert_id, rule_id, group_fingerprint) -> Gr
)
session.add(alert_group)
session.commit()
# To reflect the newly added alert we expire its state to force a refresh on access
session.expire(group, ["alerts"])
session.refresh(group)
return group
# Requery the group to get the updated alerts
group = session.exec(
select(Group).options(joinedload(Group.alerts)).where(Group.id == group.id)
).first()
return group


def get_groups(tenant_id):
Expand Down
4 changes: 3 additions & 1 deletion keep/providers/prometheus_provider/prometheus_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ def simulate_alert(**kwargs) -> dict:
alert_payload[parameter] = random.choice(parameter_options)
annotations = {"summary": alert_payload["summary"]}
alert_payload["labels"]["alertname"] = alert_type
alert_payload["status"] = AlertStatus.FIRING.value
alert_payload["status"] = random.choice(
[AlertStatus.FIRING.value, AlertStatus.RESOLVED.value]
)
alert_payload["annotations"] = annotations
alert_payload["startsAt"] = datetime.datetime.now(
tz=datetime.timezone.utc
Expand Down
10 changes: 9 additions & 1 deletion keep/rulesengine/rulesengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def run_rules(self, events: list[AlertDto]):
tenant_id=self.tenant_id,
alert_id=event.event_id,
rule_id=str(rule.id),
timeframe=rule.timeframe,
group_fingerprint=group_fingerprint,
)
groups.append(updated_group)
Expand Down Expand Up @@ -177,7 +178,14 @@ def _check_if_rule_apply(self, rule, event: AlertDto):
ast = env.compile(sub_rule)
prgm = env.program(ast)
activation = celpy.json_to_cel(json.loads(json.dumps(payload, default=str)))
r = prgm.evaluate(activation)
try:
r = prgm.evaluate(activation)
except celpy.evaluation.CELEvalError as e:
# this is ok, it means that the subrule is not relevant for this event
if "no such member" in str(e):
return False
# unknown
raise
if r:
return True
# no subrules matched
Expand Down
73 changes: 53 additions & 20 deletions keep/workflowmanager/workflowmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
import uuid

from keep.api.core.config import AuthenticationType
from keep.api.core.db import get_enrichment, save_workflow_results
from keep.api.core.db import (
get_enrichment,
get_previous_alert_by_fingerprint,
save_workflow_results,
)
from keep.api.models.alert import AlertDto
from keep.providers.providers_factory import ProviderConfigurationException
from keep.workflowmanager.workflow import Workflow
Expand Down Expand Up @@ -84,6 +88,7 @@ def insert_events(self, tenant_id, events: typing.List[AlertDto]):
if not trigger.get("type") == "alert":
continue
should_run = True
# apply filters
for filter in trigger.get("filters", []):
# TODO: more sophisticated filtering/attributes/nested, etc
filter_key = filter.get("key")
Expand Down Expand Up @@ -128,26 +133,54 @@ def insert_events(self, tenant_id, events: typing.List[AlertDto]):
should_run = False
break

# if we got here, it means the event should trigger the workflow
if should_run:
self.logger.info("Found a workflow to run")
event.trigger = "alert"
# prepare the alert with the enrichment
self.logger.info("Enriching alert")
alert_enrichment = get_enrichment(tenant_id, event.fingerprint)
if alert_enrichment:
for k, v in alert_enrichment.enrichments.items():
setattr(event, k, v)
self.logger.info("Alert enriched")
self.scheduler.workflows_to_run.append(
{
"workflow": workflow,
"workflow_id": workflow_model.id,
"tenant_id": tenant_id,
"triggered_by": "alert",
"event": event,
}
if not should_run:
continue
# enrich the alert with more data
self.logger.info("Found a workflow to run")
event.trigger = "alert"
# prepare the alert with the enrichment
self.logger.info("Enriching alert")
alert_enrichment = get_enrichment(tenant_id, event.fingerprint)
if alert_enrichment:
for k, v in alert_enrichment.enrichments.items():
setattr(event, k, v)
self.logger.info("Alert enriched")
# apply only_on_change (https://github.com/keephq/keep/issues/801)
fields_that_needs_to_be_change = trigger.get("only_on_change", [])
# if there are fields that needs to be changed, get the previous alert
if fields_that_needs_to_be_change:
previous_alert = get_previous_alert_by_fingerprint(
tenant_id, event.fingerprint
)
# now compare:
# (no previous alert means that the workflow should run)
if previous_alert:
for field in fields_that_needs_to_be_change:
# the field hasn't change
if getattr(event, field) == previous_alert.event.get(field):
self.logger.info(
"Skipping the workflow because the field hasn't change",
extra={
"field": field,
"event": event,
"previous_alert": previous_alert,
},
)
should_run = False
break

if not should_run:
continue
# Lastly, if the workflow should run, add it to the scheduler
self.scheduler.workflows_to_run.append(
{
"workflow": workflow,
"workflow_id": workflow_model.id,
"tenant_id": tenant_id,
"triggered_by": "alert",
"event": event,
}
)

def _get_event_value(self, event, filter_key):
# if the filter key is a nested key, get the value
Expand Down

0 comments on commit 8215620

Please sign in to comment.