diff --git a/docs/workflows/overview.mdx b/docs/workflows/overview.mdx index fcf66c5c3..f728d3e6e 100644 --- a/docs/workflows/overview.mdx +++ b/docs/workflows/overview.mdx @@ -10,7 +10,7 @@ In this section we will review the Workflow components. ## Triggers When you run alert with the CLI using `keep run`, the CLI run the alert regardless of the triggers. A trigger is an event that starts the workflow. It could be a manual trigger, an alert, or an interval depending on your use case. -Keep support three types of triggers: +Keep support four types of triggers: ### Manual trigger ``` # run manually @@ -28,6 +28,17 @@ triggers: value: cloudwatch ``` +### Incident trigger +``` +# run when incident get created, update or deleted +# You can use multiple events, but at least one is required +triggers: + - type: incident + events: + - created + - deleted +``` + ### Interval trigger ``` # run every 10 seconds diff --git a/keep-ui/app/workflows/[workflow_id]/workflow-execution-table.tsx b/keep-ui/app/workflows/[workflow_id]/workflow-execution-table.tsx index 6ca7c1748..1a66001ee 100644 --- a/keep-ui/app/workflows/[workflow_id]/workflow-execution-table.tsx +++ b/keep-ui/app/workflows/[workflow_id]/workflow-execution-table.tsx @@ -113,6 +113,7 @@ function getTriggerIcon(triggered_by: string) { case "Manual": return FaHandPointer; case "Scheduler": return PiDiamondsFourFill; case "Alert": return HiBellAlert; + case "Incident": return HiBellAlert; default: return PiDiamondsFourFill; } } @@ -159,6 +160,9 @@ export function ExecutionTable({ case triggered_by.substring(0, 6) === "manual": valueToShow = "Manual"; break; + case triggered_by.substring(0, 8) === "incident": + valueToShow = "Incident"; + break; } } diff --git a/keep-ui/app/workflows/builder/CustomNode.tsx b/keep-ui/app/workflows/builder/CustomNode.tsx index 8d7e43ea9..c98d868fa 100644 --- a/keep-ui/app/workflows/builder/CustomNode.tsx +++ b/keep-ui/app/workflows/builder/CustomNode.tsx @@ -16,6 +16,7 @@ import { toast } from "react-toastify"; function IconUrlProvider(data: FlowNode["data"]) { const { componentType, type } = data || {}; if (type === "alert" || type === "workflow" || type === "trigger" || !type) return "/keep.png"; + if (type === "incident" || type === "workflow" || type === "trigger" || !type) return "/keep.png"; return `/icons/${type ?.replace("step-", "") ?.replace("action-", "") diff --git a/keep-ui/app/workflows/builder/ReactFlowEditor.tsx b/keep-ui/app/workflows/builder/ReactFlowEditor.tsx index 19e192798..3cbea77ea 100644 --- a/keep-ui/app/workflows/builder/ReactFlowEditor.tsx +++ b/keep-ui/app/workflows/builder/ReactFlowEditor.tsx @@ -25,7 +25,7 @@ const ReactFlowEditor = ({ const [isOpen, setIsOpen] = useState(false); const stepEditorRef = useRef(null); const containerRef = useRef(null); - const isTrigger = ['interval', 'manual', 'alert'].includes(selectedNode || '') + const isTrigger = ['interval', 'manual', 'alert', 'incident'].includes(selectedNode || '') const saveRef = useRef(false); useEffect(()=>{ if(saveRef.current && synced){ @@ -34,7 +34,6 @@ const ReactFlowEditor = ({ } }, [saveRef?.current, synced]) - useEffect(() => { setIsOpen(true); if (selectedNode) { @@ -123,11 +122,11 @@ const ReactFlowEditor = ({
- {!selectedNode?.includes('empty') && !isTrigger && } - {!selectedNode?.includes('empty') && !isTrigger && ['interval', 'manual', 'alert'].includes(node?.id)).reduce((obj: any, node: any) => { + const triggerNodeMap = nodes.filter((node: any) => ['interval', 'manual', 'alert', 'incident'].includes(node?.id)).reduce((obj: any, node: any) => { obj[node.id] = true; return obj; }, {} as Record); diff --git a/keep-ui/app/workflows/builder/builder-store.tsx b/keep-ui/app/workflows/builder/builder-store.tsx index 7adae1287..3bc6fa0f3 100644 --- a/keep-ui/app/workflows/builder/builder-store.tsx +++ b/keep-ui/app/workflows/builder/builder-store.tsx @@ -244,6 +244,10 @@ function addNodeBetween(nodeOrEdge: string | null, step: V2Step, type: string, s set({v2Properties: {...get().v2Properties, [newNodeId]: {}}}); break; } + case "incident": { + set({v2Properties: {...get().v2Properties, [newNodeId]: {}}}); + break; + } } } @@ -437,7 +441,7 @@ const useStore = create((set, get) => ({ finalEdges = edges.filter((edge) => !(idArray.includes(edge.source) || idArray.includes(edge.target))); - if (['interval', 'alert', 'manual'].includes(ids) && edges.some((edge) => edge.source === 'trigger_start' && edge.target !== ids)) { + if (['interval', 'alert', 'manual', 'incident'].includes(ids) && edges.some((edge) => edge.source === 'trigger_start' && edge.target !== ids)) { edges = edges.filter((edge) => !(idArray.includes(edge.source))); } const sources = [...new Set(edges.filter((edge) => startNode.id === edge.target))]; @@ -457,7 +461,7 @@ const useStore = create((set, get) => ({ const newNode = createDefaultNodeV2({ ...nodes[endIndex + 1].data, islayouted: false }, nodes[endIndex + 1].id); const newNodes = [...nodes.slice(0, nodeStartIndex), newNode, ...nodes.slice(endIndex + 2)]; - if(['manual', 'alert', 'interval'].includes(ids)) { + if(['manual', 'alert', 'interval', 'incident'].includes(ids)) { const v2Properties = get().v2Properties; delete v2Properties[ids]; set({ v2Properties }); diff --git a/keep-ui/app/workflows/builder/builder-validators.tsx b/keep-ui/app/workflows/builder/builder-validators.tsx index b9742a2ed..fdbcebab9 100644 --- a/keep-ui/app/workflows/builder/builder-validators.tsx +++ b/keep-ui/app/workflows/builder/builder-validators.tsx @@ -20,9 +20,10 @@ export function globalValidatorV2( !!definition?.properties && !definition.properties['manual'] && !definition.properties['interval'] && - !definition.properties['alert'] + !definition.properties['alert'] && + !definition.properties['incident'] ) { - setGlobalValidationError('trigger_start', "Workflow Should alteast have one trigger."); + setGlobalValidationError('trigger_start', "Workflow Should at least have one trigger."); return false; } @@ -38,6 +39,12 @@ export function globalValidatorV2( return false; } + const incidentActions = Object.values(definition.properties.incident||{}).filter(Boolean) + if(definition?.properties && definition.properties['incident'] && incidentActions.length==0){ + setGlobalValidationError('incident', "Workflow incident trigger cannot be empty."); + return false; + } + const anyStepOrAction = definition?.sequence?.length > 0; if (!anyStepOrAction) { setGlobalValidationError(null, diff --git a/keep-ui/app/workflows/builder/editors.tsx b/keep-ui/app/workflows/builder/editors.tsx index 2670b6904..1df59f1f4 100644 --- a/keep-ui/app/workflows/builder/editors.tsx +++ b/keep-ui/app/workflows/builder/editors.tsx @@ -7,6 +7,7 @@ import { Subtitle, Icon, Button, + Switch, Divider, } from "@tremor/react"; import { KeyIcon } from "@heroicons/react/20/solid"; @@ -15,6 +16,7 @@ import { BackspaceIcon, FunnelIcon, } from "@heroicons/react/24/outline"; +import React from "react"; import useStore, { V2Properties } from "./builder-store"; import { useEffect, useRef, useState } from "react"; @@ -263,8 +265,6 @@ function WorkflowEditorV2({ selectedNode: string | null; saveRef: React.MutableRefObject; }) { - const isTrigger = ['interval', 'manual', 'alert'].includes(selectedNode || '') - const updateAlertFilter = (filter: string, value: string) => { const currentFilters = properties.alert || {}; @@ -282,7 +282,6 @@ function WorkflowEditorV2({ } }; - const deleteFilter = (filter: string) => { const currentFilters = { ...properties.alert }; delete currentFilters[filter]; @@ -310,7 +309,7 @@ function WorkflowEditorV2({ <> Workflow Settings {propertyKeys.map((key, index) => { - const isTrigger = ["manual", "alert", 'interval'].includes(key) ; + const isTrigger = ["manual", "alert", 'interval', 'incident'].includes(key); renderDivider = isTrigger && key === selectedNode ? !renderDivider : false; return (
@@ -380,20 +379,41 @@ function WorkflowEditorV2({ ) ); + case "incident": + return selectedNode === 'incident' && <> + Incident events + {Array("created", "updated", "deleted").map((event) => +
+ -1} + onChange={() => { + let events = properties.incident.events || []; + if (events.indexOf(event) > -1) { + events = (events as string[]).filter(e => e !== event) + setProperties({ ...properties, [key]: {events: events } }) + } else { + events.push(event); + setProperties({ ...properties, [key]: {events: events} }) + } + }} + color={"orange"} + /> + +
+ )} + ; case "interval": - return ( - selectedNode === "interval" && ( - - handleChange(key, e.target.value) - } - value={properties[key] || "" as string} - /> - ) - ); - case "disabled": + return selectedNode === "interval" && ( + handleChange(key, e.target.value) + } + value={properties[key] || ""as string} + />); + case "isabled": return (
handleChange(key, e.target.value) } - value={properties[key] || "" as string} - /> - ); - } - })()} -
- ); + value={properties[key] || ""as string} + /> + ); + } + })()} +
+ ); })} ); @@ -489,7 +509,7 @@ export function StepEditorV2({ return ( - {providerType} Editor + {providerType}1 Editor Unique Identifier { - if (['interval', 'manual', 'alert'].includes(key) && properties[key]) { + if (['interval', 'manual', 'alert', 'incident'].includes(key) && properties[key]) { _steps.push({ id: key, type: key, diff --git a/keep/api/core/db.py b/keep/api/core/db.py index fbc21d200..89c550c6d 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -108,6 +108,7 @@ def create_workflow_execution( event_id: str = None, fingerprint: str = None, execution_id: str = None, + event_type: str = "alert", ) -> str: with Session(engine) as session: try: @@ -126,13 +127,21 @@ def create_workflow_execution( # Ensure the object has an id session.flush() execution_id = workflow_execution.id - if fingerprint: + if fingerprint and event_type == "alert": workflow_to_alert_execution = WorkflowToAlertExecution( workflow_execution_id=execution_id, alert_fingerprint=fingerprint, event_id=event_id, ) session.add(workflow_to_alert_execution) + elif event_type == "incident": + workflow_to_incident_execution = WorkflowToIncidentExecution( + workflow_execution_id=execution_id, + alert_fingerprint=fingerprint, + incident_id=event_id, + ) + session.add(workflow_to_incident_execution) + session.commit() return execution_id except IntegrityError: @@ -687,9 +696,8 @@ def get_workflow_executions( ).scalar() avgDuration = avgDuration if avgDuration else 0.0 - query = ( - query.order_by(desc(WorkflowExecution.started)).limit(limit).offset(offset) - ) + query = (query.order_by(desc(WorkflowExecution.started)).limit(limit).offset(offset) +) # Execute the query workflow_executions = query.all() @@ -2366,7 +2374,7 @@ def get_incidents_count( def get_incident_alerts_by_incident_id( - tenant_id: str, incident_id: str, limit: int, offset: int + tenant_id: str, incident_id: str, limit: Optional[int] = None, offset: Optional[int] = None ) -> (List[Alert], int): with Session(engine) as session: query = ( @@ -2384,7 +2392,10 @@ def get_incident_alerts_by_incident_id( total_count = query.count() - return query.limit(limit).offset(offset).all(), total_count + if limit and offset: + query = query.limit(limit).offset(offset) + + return query.all(), total_count def get_alerts_data_for_incident( diff --git a/keep/api/models/alert.py b/keep/api/models/alert.py index 58747fd1c..bb4c97eb8 100644 --- a/keep/api/models/alert.py +++ b/keep/api/models/alert.py @@ -4,11 +4,11 @@ import logging import uuid from enum import Enum -from typing import Any, Dict +from typing import Any, Dict, List from uuid import UUID import pytz -from pydantic import AnyHttpUrl, BaseModel, Extra, root_validator, validator +from pydantic import AnyHttpUrl, BaseModel, Extra, root_validator, validator, PrivateAttr logger = logging.getLogger(__name__) @@ -385,6 +385,8 @@ class IncidentDto(IncidentDtoIn): rule_fingerprint: str | None + _tenant_id: str = PrivateAttr() + def __str__(self) -> str: # Convert the model instance to a dictionary model_dict = self.dict() @@ -393,12 +395,26 @@ def __str__(self) -> str: class Config: extra = Extra.allow schema_extra = IncidentDtoIn.Config.schema_extra + underscore_attrs_are_private = True json_encoders = { # Converts UUID to their values for JSON serialization UUID: lambda v: str(v), } + @property + def name(self): + return self.user_generated_name or self.ai_generated_name + + @property + def alerts(self) -> List["AlertDto"]: + from keep.api.core.db import get_incident_alerts_by_incident_id + from keep.api.utils.enrichment_helpers import convert_db_alerts_to_dto_alerts + if not self._tenant_id: + return [] + alerts, _ = get_incident_alerts_by_incident_id(self._tenant_id, str(self.id)) + return convert_db_alerts_to_dto_alerts(alerts) + @root_validator(pre=True) def set_default_values(cls, values: Dict[str, Any]) -> Dict[str, Any]: # Check and set default status @@ -420,7 +436,7 @@ def from_db_incident(cls, db_incident): if isinstance(db_incident.severity, int) \ else db_incident.severity - return cls( + dto = cls( id=db_incident.id, user_generated_name=db_incident.user_generated_name, ai_generated_name = db_incident.ai_generated_name, @@ -441,6 +457,10 @@ def from_db_incident(cls, db_incident): rule_fingerprint=db_incident.rule_fingerprint, ) + # This field is required for getting alerts when required + dto._tenant_id = db_incident.tenant_id + return dto + class IncidentStatusChangeDto(BaseModel): status: IncidentStatus diff --git a/keep/api/models/db/migrations/versions/2024-09-13-10-48_938b1aa62d5c.py b/keep/api/models/db/migrations/versions/2024-09-13-10-48_938b1aa62d5c.py index 0f4dd1963..72a8082fc 100644 --- a/keep/api/models/db/migrations/versions/2024-09-13-10-48_938b1aa62d5c.py +++ b/keep/api/models/db/migrations/versions/2024-09-13-10-48_938b1aa62d5c.py @@ -12,7 +12,7 @@ # revision identifiers, used by Alembic. revision = "938b1aa62d5c" -down_revision = "c5443d9deb0f" +down_revision = "1aacee84447e" branch_labels = None depends_on = None diff --git a/keep/api/models/db/migrations/versions/2024-09-11-23-30_c5443d9deb0f.py b/keep/api/models/db/migrations/versions/2024-09-17-23-30_c5443d9deb0f.py similarity index 96% rename from keep/api/models/db/migrations/versions/2024-09-11-23-30_c5443d9deb0f.py rename to keep/api/models/db/migrations/versions/2024-09-17-23-30_c5443d9deb0f.py index bc35f2fe1..9ecb5c1cc 100644 --- a/keep/api/models/db/migrations/versions/2024-09-11-23-30_c5443d9deb0f.py +++ b/keep/api/models/db/migrations/versions/2024-09-17-23-30_c5443d9deb0f.py @@ -12,7 +12,7 @@ # revision identifiers, used by Alembic. revision = "c5443d9deb0f" -down_revision = "1aacee84447e" +down_revision = "938b1aa62d5c" branch_labels = None depends_on = None diff --git a/keep/api/models/db/migrations/versions/2024-09-18-02-05_772790c2e50a.py b/keep/api/models/db/migrations/versions/2024-09-18-02-05_772790c2e50a.py new file mode 100644 index 000000000..e04890ab3 --- /dev/null +++ b/keep/api/models/db/migrations/versions/2024-09-18-02-05_772790c2e50a.py @@ -0,0 +1,38 @@ +"""add WorkflowToIncidentExecution + +Revision ID: 772790c2e50a +Revises: 49e7c02579db +Create Date: 2024-09-08 02:05:42.739163 + +""" + +import sqlalchemy as sa +import sqlmodel +from alembic import op + +# revision identifiers, used by Alembic. +revision = "772790c2e50a" +down_revision = "c5443d9deb0f" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "workflowtoincidentexecution", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column( + "workflow_execution_id", sqlmodel.sql.sqltypes.AutoString(), nullable=False + ), + sa.Column("incident_id", sqlmodel.sql.sqltypes.AutoString(), nullable=True), + sa.ForeignKeyConstraint( + ["workflow_execution_id"], + ["workflowexecution.id"], + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("workflow_execution_id", "incident_id"), + ) + + +def downgrade() -> None: + op.drop_table("workflowtoincidentexecution") diff --git a/keep/api/models/db/migrations/versions/2024-09-18-14-08_5d7ae55efc6a.py b/keep/api/models/db/migrations/versions/2024-09-18-14-08_5d7ae55efc6a.py index c0c825b1b..8f0ce6c47 100644 --- a/keep/api/models/db/migrations/versions/2024-09-18-14-08_5d7ae55efc6a.py +++ b/keep/api/models/db/migrations/versions/2024-09-18-14-08_5d7ae55efc6a.py @@ -11,7 +11,7 @@ # revision identifiers, used by Alembic. revision = "5d7ae55efc6a" -down_revision = "938b1aa62d5c" +down_revision = "772790c2e50a" branch_labels = None depends_on = None diff --git a/keep/api/models/db/workflow.py b/keep/api/models/db/workflow.py index 3426a9560..f243b51f0 100644 --- a/keep/api/models/db/workflow.py +++ b/keep/api/models/db/workflow.py @@ -53,6 +53,9 @@ class WorkflowExecution(SQLModel, table=True): workflow_to_alert_execution: "WorkflowToAlertExecution" = Relationship( back_populates="workflow_execution" ) + workflow_to_incident_execution: "WorkflowToIncidentExecution" = Relationship( + back_populates="workflow_execution" + ) class Config: orm_mode = True @@ -71,6 +74,18 @@ class WorkflowToAlertExecution(SQLModel, table=True): ) +class WorkflowToIncidentExecution(SQLModel, table=True): + __table_args__ = (UniqueConstraint("workflow_execution_id", "incident_id"),) + + # https://sqlmodel.tiangolo.com/tutorial/automatic-id-none-refresh/ + id: Optional[int] = Field(primary_key=True, default=None) + workflow_execution_id: str = Field(foreign_key="workflowexecution.id") + incident_id: str | None + workflow_execution: WorkflowExecution = Relationship( + back_populates="workflow_to_incident_execution" + ) + + class WorkflowExecutionLog(SQLModel, table=True): id: int = Field(default=None, primary_key=True) workflow_execution_id: str = Field(foreign_key="workflowexecution.id") diff --git a/keep/api/routes/incidents.py b/keep/api/routes/incidents.py index 6819f4322..ca2bf87aa 100644 --- a/keep/api/routes/incidents.py +++ b/keep/api/routes/incidents.py @@ -38,6 +38,7 @@ ) from keep.identitymanager.authenticatedentity import AuthenticatedEntity from keep.identitymanager.identitymanagerfactory import IdentityManagerFactory +from keep.workflowmanager.workflowmanager import WorkflowManager router = APIRouter() logger = logging.getLogger(__name__) @@ -118,6 +119,21 @@ def create_incident_endpoint( }, ) __update_client_on_incident_change(pusher_client, tenant_id) + + try: + workflow_manager = WorkflowManager.get_instance() + logger.info("Adding incident to the workflow manager queue") + workflow_manager.insert_incident(tenant_id, new_incident_dto, "created") + logger.info("Added incident to the workflow manager queue") + except Exception: + logger.exception( + "Failed to run workflows based on incident", + extra={ + "incident_id": new_incident_dto.id, + "tenant_id": tenant_id + }, + ) + return new_incident_dto @@ -219,7 +235,19 @@ def update_incident( raise HTTPException(status_code=404, detail="Incident not found") new_incident_dto = IncidentDto.from_db_incident(incident) - + try: + workflow_manager = WorkflowManager.get_instance() + logger.info("Adding incident to the workflow manager queue") + workflow_manager.insert_incident(tenant_id, new_incident_dto, "updated") + logger.info("Added incident to the workflow manager queue") + except Exception: + logger.exception( + "Failed to run workflows based on incident", + extra={ + "incident_id": new_incident_dto.id, + "tenant_id": tenant_id + }, + ) return new_incident_dto @@ -242,10 +270,30 @@ def delete_incident( "tenant_id": tenant_id, }, ) + + incident = get_incident_by_id(tenant_id=tenant_id, incident_id=incident_id) + if not incident: + raise HTTPException(status_code=404, detail="Incident not found") + + incident_dto = IncidentDto.from_db_incident(incident) + deleted = delete_incident_by_id(tenant_id=tenant_id, incident_id=incident_id) if not deleted: raise HTTPException(status_code=404, detail="Incident not found") __update_client_on_incident_change(pusher_client, tenant_id) + try: + workflow_manager = WorkflowManager.get_instance() + logger.info("Adding incident to the workflow manager queue") + workflow_manager.insert_incident(tenant_id, incident_dto, "deleted") + logger.info("Added incident to the workflow manager queue") + except Exception: + logger.exception( + "Failed to run workflows based on incident", + extra={ + "incident_id": incident_dto.id, + "tenant_id": tenant_id + }, + ) return Response(status_code=202) @@ -329,6 +377,22 @@ async def add_alerts_to_incident( add_alerts_to_incident_by_incident_id(tenant_id, incident_id, alert_ids) __update_client_on_incident_change(pusher_client, tenant_id, incident_id) + incident_dto = IncidentDto.from_db_incident(incident) + + try: + workflow_manager = WorkflowManager.get_instance() + logger.info("Adding incident to the workflow manager queue") + workflow_manager.insert_incident(tenant_id, incident_dto, "updated") + logger.info("Added incident to the workflow manager queue") + except Exception: + logger.exception( + "Failed to run workflows based on incident", + extra={ + "incident_id": incident_dto.id, + "tenant_id": tenant_id + }, + ) + fingerprints_count = get_incident_unique_fingerprint_count(tenant_id, incident_id) if ( diff --git a/keep/contextmanager/contextmanager.py b/keep/contextmanager/contextmanager.py index 953a97757..2f5611fa7 100644 --- a/keep/contextmanager/contextmanager.py +++ b/keep/contextmanager/contextmanager.py @@ -22,6 +22,7 @@ def __init__(self, tenant_id, workflow_id=None, workflow_execution_id=None): self.providers_context = {} self.actions_context = {} self.event_context = {} + self.incident_context = {} self.foreach_context = { "value": None, } @@ -78,6 +79,9 @@ def get_logger(self): def set_event_context(self, event): self.event_context = event + def set_incident_context(self, incident): + self.incident_context = incident + def get_workflow_id(self): return self.workflow_id @@ -104,6 +108,7 @@ def get_full_context(self, exclude_providers=False, exclude_env=False): "event": self.event_context, "last_workflow_results": self.last_workflow_execution_results, "alert": self.event_context, # this is an alias so workflows will be able to use alert.source + "incident": self.incident_context, # this is an alias so workflows will be able to use alert.source } if not exclude_providers: diff --git a/keep/step/step.py b/keep/step/step.py index be7fae881..4b0caede4 100644 --- a/keep/step/step.py +++ b/keep/step/step.py @@ -91,8 +91,10 @@ def _get_foreach_items(self) -> list | list[list]: index = [i.strip() for i in index] items = self.context_manager.get_full_context() for i in index: - # try to get it as a dict - items = items.get(i, {}) + if isinstance(items, dict): + items = items.get(i, {}) + else: + items = getattr(items, i, {}) foreach_items.append(items) if not foreach_items: return [] diff --git a/keep/workflowmanager/workflowmanager.py b/keep/workflowmanager/workflowmanager.py index d50ab8939..acbdb286b 100644 --- a/keep/workflowmanager/workflowmanager.py +++ b/keep/workflowmanager/workflowmanager.py @@ -4,13 +4,15 @@ import typing import uuid +from pandas.core.common import flatten + from keep.api.core.config import AuthenticationType, config from keep.api.core.db import ( get_enrichment, get_previous_alert_by_fingerprint, save_workflow_results, ) -from keep.api.models.alert import AlertDto, AlertSeverity +from keep.api.models.alert import AlertDto, AlertSeverity, IncidentDto from keep.providers.providers_factory import ProviderConfigurationException from keep.workflowmanager.workflow import Workflow from keep.workflowmanager.workflowscheduler import WorkflowScheduler @@ -68,7 +70,74 @@ def _apply_filter(self, filter_val, value): return value == str(filter_val) return value == filter_val - def insert_events(self, tenant_id, events: typing.List[AlertDto]): + def _get_workflow_from_store(self, tenant_id, workflow_model): + try: + # get the actual workflow that can be triggered + self.logger.info("Getting workflow from store") + workflow = self.workflow_store.get_workflow( + tenant_id, workflow_model.id + ) + self.logger.info("Got workflow from store") + return workflow + except ProviderConfigurationException: + self.logger.exception( + "Workflow have a provider that is not configured", + extra={ + "workflow_id": workflow_model.id, + "tenant_id": tenant_id, + }, + ) + except Exception: + self.logger.exception( + "Error getting workflow", + extra={ + "workflow_id": workflow_model.id, + "tenant_id": tenant_id, + }, + ) + + def insert_incident(self, tenant_id: str, incident: IncidentDto, trigger: str): + all_workflow_models = self.workflow_store.get_all_workflows(tenant_id) + self.logger.info( + "Got all workflows", + extra={ + "num_of_workflows": len(all_workflow_models), + }, + ) + for workflow_model in all_workflow_models: + + if workflow_model.is_disabled: + self.logger.debug( + f"Skipping the workflow: id={workflow_model.id}, name={workflow_model.name}, " + f"tenant_id={workflow_model.tenant_id} - Workflow is disabled." + ) + continue + workflow = self._get_workflow_from_store(tenant_id, workflow_model) + if workflow is None: + continue + + incident_triggers = flatten( + [t.get("events", []) for t in workflow.workflow_triggers if t["type"] == "incident"] + ) + + if trigger not in incident_triggers: + self.logger.debug("workflow does not contain trigger %s, skipping", trigger) + continue + + self.logger.info("Adding workflow to run") + with self.scheduler.lock: + self.scheduler.workflows_to_run.append( + { + "workflow": workflow, + "workflow_id": workflow_model.id, + "tenant_id": tenant_id, + "triggered_by": "incident:{}".format(trigger), + "event": incident, + } + ) + self.logger.info("Workflow added to run") + + def insert_events(self, tenant_id, events: typing.List[AlertDto | IncidentDto]): for event in events: self.logger.info("Getting all workflows") all_workflow_models = self.workflow_store.get_all_workflows(tenant_id) @@ -79,37 +148,17 @@ def insert_events(self, tenant_id, events: typing.List[AlertDto]): }, ) for workflow_model in all_workflow_models: + if workflow_model.is_disabled: self.logger.debug( f"Skipping the workflow: id={workflow_model.id}, name={workflow_model.name}, " f"tenant_id={workflow_model.tenant_id} - Workflow is disabled." ) continue - try: - # get the actual workflow that can be triggered - self.logger.info("Getting workflow from store") - workflow = self.workflow_store.get_workflow( - tenant_id, workflow_model.id - ) - self.logger.info("Got workflow from store") - except ProviderConfigurationException: - self.logger.exception( - "Workflow have a provider that is not configured", - extra={ - "workflow_id": workflow_model.id, - "tenant_id": tenant_id, - }, - ) - continue - except Exception: - self.logger.exception( - "Error getting workflow", - extra={ - "workflow_id": workflow_model.id, - "tenant_id": tenant_id, - }, - ) + workflow = self._get_workflow_from_store(tenant_id, workflow_model) + if workflow is None: continue + for trigger in workflow.workflow_triggers: # TODO: handle it better if not trigger.get("type") == "alert": @@ -371,7 +420,8 @@ def _run_workflow( return [errors, results] - def _get_workflow_results(self, workflow: Workflow): + @staticmethod + def _get_workflow_results(workflow: Workflow): """ Get the results of the workflow from the DB. @@ -381,8 +431,7 @@ def _get_workflow_results(self, workflow: Workflow): Returns: dict: The results of the workflow. """ - print("workflowssssss", workflow.workflow_actions) - print(workflow.workflow_steps) + workflow_results = { action.name: action.provider.results for action in workflow.workflow_actions } diff --git a/keep/workflowmanager/workflowscheduler.py b/keep/workflowmanager/workflowscheduler.py index 693b5000b..c7df0b534 100644 --- a/keep/workflowmanager/workflowscheduler.py +++ b/keep/workflowmanager/workflowscheduler.py @@ -15,7 +15,7 @@ from keep.api.core.db import get_enrichment, get_previous_execution_id from keep.api.core.db import get_workflow as get_workflow_db from keep.api.core.db import get_workflows_that_should_run -from keep.api.models.alert import AlertDto +from keep.api.models.alert import AlertDto, IncidentDto from keep.providers.providers_factory import ProviderConfigurationException from keep.workflowmanager.workflow import Workflow, WorkflowStrategy from keep.workflowmanager.workflowstore import WorkflowStore @@ -57,10 +57,11 @@ def _handle_interval_workflows(self): pass for workflow in workflows: self.logger.debug("Running workflow on background") + + workflow_execution_id = workflow.get("workflow_execution_id") + tenant_id = workflow.get("tenant_id") + workflow_id = workflow.get("workflow_id") try: - workflow_execution_id = workflow.get("workflow_execution_id") - tenant_id = workflow.get("tenant_id") - workflow_id = workflow.get("workflow_id") workflow = self.workflow_store.get_workflow(tenant_id, workflow_id) except ProviderConfigurationException: self.logger.exception( @@ -106,8 +107,13 @@ def _run_workflow( ): self.logger.info(f"Running workflow {workflow.workflow_id}...") try: - # set the event context, e.g. the event that triggered the workflow - workflow.context_manager.set_event_context(event_context) + if isinstance(event_context, AlertDto): + # set the event context, e.g. the event that triggered the workflow + workflow.context_manager.set_event_context(event_context) + else: + # set the incident context, e.g. the incident that triggered the workflow + workflow.context_manager.set_incident_context(event_context) + errors, _ = self.workflow_manager._run_workflow( workflow, workflow_execution_id ) @@ -216,6 +222,7 @@ def handle_manual_event_workflow( execution_number=unique_execution_number, fingerprint=alert.fingerprint, event_id=alert.event_id, + event_type="alert" ) self.logger.info(f"Workflow execution id: {workflow_execution_id}") # This is kinda WTF exception since create_workflow_execution shouldn't fail for manual @@ -313,13 +320,26 @@ def _handle_event_workflows(self): continue event = workflow_to_run.get("event") + triggered_by = workflow_to_run.get("triggered_by") if triggered_by == "manual": triggered_by_user = workflow_to_run.get("triggered_by_user") triggered_by = f"manually by {triggered_by_user}" + elif triggered_by.startswith("incident:"): + triggered_by = f"type:{triggered_by} name:{event.name} id:{event.id}" else: triggered_by = f"type:alert name:{event.name} id:{event.id}" + if isinstance(event, IncidentDto): + event_id = str(event.id) + event_type = "incident" + fingerprint = "incident:{}".format(event_id) + else: + event_id = event.event_id + event_type = "alert" + fingerprint = event.fingerprint + + # In manual, we create the workflow execution id sync so it could be tracked by the caller (UI) # In event (e.g. alarm), we will create it here if not workflow_execution_id: @@ -333,16 +353,17 @@ def _handle_event_workflows(self): # else, we want to enforce that no workflow already run with the same fingerprint else: workflow_execution_number = self._get_unique_execution_number( - event.fingerprint + fingerprint ) workflow_execution_id = create_workflow_execution( workflow_id=workflow_id, tenant_id=tenant_id, triggered_by=triggered_by, execution_number=workflow_execution_number, - fingerprint=event.fingerprint, - event_id=event.event_id, + fingerprint=fingerprint, + event_id=event_id, execution_id=execution_id, + event_type=event_type, ) # If there is already running workflow from the same event except IntegrityError: @@ -404,7 +425,7 @@ def _handle_event_workflows(self): # - the second one will wait for the next iteration # - on the next iteratino, the second alert enriched with the ticket_url # and will trigger a workflow that will update the ticket with "resolved" - if workflow_to_run.get("retry", False): + if workflow_to_run.get("retry", False) and isinstance(event, AlertDto): try: self.logger.info( "Updating enrichments for workflow after retry", diff --git a/tests/test_workflow_execution.py b/tests/test_workflow_execution.py index b772391fd..af04791d4 100644 --- a/tests/test_workflow_execution.py +++ b/tests/test_workflow_execution.py @@ -4,10 +4,11 @@ import pytest import pytz +from asyncio import sleep from keep.api.core.db import get_last_workflow_execution_by_workflow_id from keep.api.core.dependencies import SINGLE_TENANT_UUID -from keep.api.models.alert import AlertDto, AlertStatus +from keep.api.models.alert import AlertDto, AlertStatus, IncidentDtoIn, IncidentDto from keep.api.models.db.workflow import Workflow from keep.workflowmanager.workflowmanager import WorkflowManager @@ -575,3 +576,135 @@ def test_workflow_execution_with_disabled_workflow( assert enabled_workflow_execution.status == "success" assert disabled_workflow_execution is None + + + +workflow_definition_4 = """workflow: +id: incident-triggers-test-created-updated +description: test incident triggers +triggers: +- type: incident + events: + - updated + - created +name: created-updated +owners: [] +services: [] +steps: [] +actions: +- name: mock-action + provider: + type: console + with: + message: | + "incident: {{ incident.name }}" +""" + +workflow_definition_5 = """workflow: +id: incident-incident-triggers-test-deleted +description: test incident triggers +triggers: +- type: incident + events: + - deleted +name: deleted +owners: [] +services: [] +steps: [] +actions: +- name: mock-action + provider: + type: console + with: + message: | + "deleted incident: {{ incident.name }}" +""" + + +def test_workflow_incident_triggers( + db_session, + workflow_manager, +): + workflow_created = Workflow( + id="incident-triggers-test-created-updated", + name="incident-triggers-test-created-updated", + tenant_id=SINGLE_TENANT_UUID, + description="Check that incident triggers works", + created_by="test@keephq.dev", + interval=0, + workflow_raw=workflow_definition_4, + ) + db_session.add(workflow_created) + db_session.commit() + + # Create the current alert + incident = IncidentDto( + id="ba9ddbb9-3a83-40fc-9ace-1e026e08ca2b", + user_generated_name="incident", + alerts_count=0, + alert_sources=[], + services=[], + severity="critical", + is_predicted=False, + is_confirmed=True, + ) + + # Insert the current alert into the workflow manager + + def wait_workflow_execution(workflow_id): + # Wait for the workflow execution to complete + workflow_execution = None + count = 0 + status = None + while workflow_execution is None and count < 30 and status != "success": + workflow_execution = get_last_workflow_execution_by_workflow_id( + SINGLE_TENANT_UUID, workflow_id + ) + if workflow_execution is not None: + status = workflow_execution.status + time.sleep(1) + count += 1 + return workflow_execution + + workflow_manager.insert_incident(SINGLE_TENANT_UUID, incident, "created") + assert len(workflow_manager.scheduler.workflows_to_run) == 1 + + workflow_execution_created = wait_workflow_execution("incident-triggers-test-created-updated") + assert workflow_execution_created is not None + assert workflow_execution_created.status == "success" + assert workflow_execution_created.results['mock-action'] == ['"incident: incident"\n'] + assert len(workflow_manager.scheduler.workflows_to_run) == 0 + + workflow_manager.insert_incident(SINGLE_TENANT_UUID, incident, "updated") + assert len(workflow_manager.scheduler.workflows_to_run) == 1 + workflow_execution_updated = wait_workflow_execution("incident-triggers-test-created-updated") + assert workflow_execution_updated is not None + assert workflow_execution_updated.status == "success" + assert workflow_execution_updated.results['mock-action'] == ['"incident: incident"\n'] + + # incident-triggers-test-created-updated should not be triggered + workflow_manager.insert_incident(SINGLE_TENANT_UUID, incident, "deleted") + assert len(workflow_manager.scheduler.workflows_to_run) == 0 + + workflow_deleted = Workflow( + id="incident-triggers-test-deleted", + name="incident-triggers-test-deleted", + tenant_id=SINGLE_TENANT_UUID, + description="Check that incident triggers works", + created_by="test@keephq.dev", + interval=0, + workflow_raw=workflow_definition_5, + ) + db_session.add(workflow_deleted) + db_session.commit() + + workflow_manager.insert_incident(SINGLE_TENANT_UUID, incident, "deleted") + assert len(workflow_manager.scheduler.workflows_to_run) == 1 + + # incident-triggers-test-deleted should be triggered now + workflow_execution_deleted = wait_workflow_execution("incident-triggers-test-deleted") + assert len(workflow_manager.scheduler.workflows_to_run) == 0 + + assert workflow_execution_deleted is not None + assert workflow_execution_deleted.status == "success" + assert workflow_execution_deleted.results['mock-action'] == ['"deleted incident: incident"\n'] \ No newline at end of file