diff --git a/keep-ui/app/workflows/[workflow_id]/executions.tsx b/keep-ui/app/workflows/[workflow_id]/executions.tsx index d2b844251..cf0d5bf6a 100644 --- a/keep-ui/app/workflows/[workflow_id]/executions.tsx +++ b/keep-ui/app/workflows/[workflow_id]/executions.tsx @@ -153,7 +153,7 @@ export default function WorkflowDetailPage({ const workflow = { last_executions: data.items } as Partial return ( <> - +
diff --git a/keep-ui/app/workflows/[workflow_id]/side-nav-bar.tsx b/keep-ui/app/workflows/[workflow_id]/side-nav-bar.tsx index d119778c7..97c23c4ef 100644 --- a/keep-ui/app/workflows/[workflow_id]/side-nav-bar.tsx +++ b/keep-ui/app/workflows/[workflow_id]/side-nav-bar.tsx @@ -28,7 +28,7 @@ export default function SideNavBar({ workflow }: { workflow: Workflow }) { ]; return ( -
+

{workflow.name}

{workflow.description && ( diff --git a/keep-ui/app/workflows/builder/utils.tsx b/keep-ui/app/workflows/builder/utils.tsx index 0f6309eba..6702e6d7b 100644 --- a/keep-ui/app/workflows/builder/utils.tsx +++ b/keep-ui/app/workflows/builder/utils.tsx @@ -246,7 +246,8 @@ export function generateWorkflow( export function parseWorkflow( workflowString: string, - providers: Provider[] + providers: Provider[], + isPureRaw: boolean = false ): Definition { /** * Parse the alert file and generate the definition @@ -255,16 +256,20 @@ export function parseWorkflow( schema: JSON_SCHEMA, }) as any; // This is to support both old and new structure of workflow - const workflow = parsedWorkflowFile.alert + let workflow = parsedWorkflowFile.alert ? parsedWorkflowFile.alert : parsedWorkflowFile.workflow; + + if(isPureRaw) { + workflow = parsedWorkflowFile + } const steps = [] as V2Step[]; const workflowSteps = - workflow.steps?.map((s: V2Step) => { + workflow?.steps?.map((s: V2Step) => { s.type = "step"; return s; }) || []; - const workflowActions = workflow.actions || []; + const workflowActions = workflow?.actions || []; const conditions = [] as any; [...workflowSteps, ...workflowActions].forEach((action: any) => { const stepOrAction = action.type === "step" ? "step" : "action"; diff --git a/keep-ui/app/workflows/workflow-tile.tsx b/keep-ui/app/workflows/workflow-tile.tsx index 7e3da37ba..4f904d03c 100644 --- a/keep-ui/app/workflows/workflow-tile.tsx +++ b/keep-ui/app/workflows/workflow-tile.tsx @@ -1,14 +1,13 @@ "use client"; import { useSession } from "next-auth/react"; -import { Workflow, Filter } from "./models"; +import { Workflow, Provider, Trigger } from './models'; import { getApiURL } from "../../utils/apiUrl"; import Image from "next/image"; import React, { useState, useMemo } from "react"; import { useRouter } from "next/navigation"; import WorkflowMenu from "./workflow-menu"; import Loading from "../loading"; -import { Trigger, Provider } from "./models"; import { Button, Text, @@ -21,13 +20,13 @@ import { AccordionBody, AccordionHeader, Badge, + Callout, } from "@tremor/react"; import ProviderForm from "app/providers/provider-form"; import SlidingPanel from "react-sliding-side-panel"; -import { useFetchProviders } from "app/providers/page.client"; import { Provider as FullProvider } from "app/providers/providers"; import "./workflow-tile.css"; -import { CheckCircleIcon, XCircleIcon } from "@heroicons/react/24/outline"; +import { CheckCircleIcon, ExclamationCircleIcon, XCircleIcon } from "@heroicons/react/24/outline"; import AlertTriggerModal from "./workflow-run-with-alert-modal"; import { formatDistanceToNowStrict } from "date-fns"; import TimeAgo, { Formatter, Suffix, Unit } from "react-timeago"; @@ -40,7 +39,9 @@ import { MdOutlineKeyboardArrowLeft, } from "react-icons/md"; import { HiBellAlert } from "react-icons/hi2"; +import { BiSolidError } from "react-icons/bi"; import { useWorkflowRun } from "utils/hooks/useWorkflowRun"; +import useWorkflowValidator from "utils/hooks/useWorkflowValidator"; function WorkflowMenuSection({ onDelete, @@ -263,7 +264,7 @@ export const ProvidersCarousel = ({ ); }; -function WorkflowTile({ workflow }: { workflow: Workflow }) { +function WorkflowTile({ workflow , providers}: { workflow: Workflow , providers: FullProvider[]}) { // Create a set to keep track of unique providers const apiUrl = getApiURL(); const { data: session } = useSession(); @@ -281,7 +282,7 @@ function WorkflowTile({ workflow }: { workflow: Workflow }) { ?.filters?.find((f) => f.key === "source")?.value; const [fallBackIcon, setFallBackIcon] = useState(false); - const { providers } = useFetchProviders(); + const { isRunning, handleRunClick, @@ -290,6 +291,9 @@ function WorkflowTile({ workflow }: { workflow: Workflow }) { message, } = useWorkflowRun(workflow!); + const { loading, isValid, error } = useWorkflowValidator(workflow, providers || null); + + const handleConnectProvider = (provider: FullProvider) => { setSelectedProvider(provider); // prepopulate it with the name @@ -382,7 +386,7 @@ function WorkflowTile({ workflow }: { workflow: Workflow }) { ) .map((type) => { let fullProvider = - providers.find((fp) => fp.type === type) || ({} as FullProvider); + providers?.find((fp) => fp.type === type) || ({} as FullProvider); let workflowProvider = workflowProvidersMap.get(type) || ({} as FullProvider); @@ -544,14 +548,14 @@ function WorkflowTile({ workflow }: { workflow: Workflow }) { }; return ( -
+
{isRunning && (
)} { e.stopPropagation(); e.preventDefault(); @@ -560,23 +564,30 @@ function WorkflowTile({ workflow }: { workflow: Workflow }) { } }} > + {!loading && !isValid &&
+ {error} + +
}
{workflow.provisioned && ( Provisioned )} - {!!handleRunClick && - WorkflowMenuSection({ - onDelete: handleDeleteClick, - onRun: handleRunClick, - onDownload: handleDownloadClick, - onView: handleViewClick, - onBuilder: handleBuilderClick, - runButtonToolTip: message, - isRunButtonDisabled: !!isRunButtonDisabled, - provisioned: workflow.provisioned, - })} + {!!handleRunClick && WorkflowMenuSection({ + onDelete: handleDeleteClick, + onRun: handleRunClick, + onDownload: handleDownloadClick, + onView: handleViewClick, + onBuilder: handleBuilderClick, + runButtonToolTip: message || error || "", + isRunButtonDisabled: !!isRunButtonDisabled || !isValid, + provisioned: workflow.provisioned, + })}
@@ -718,7 +729,7 @@ function WorkflowTile({ workflow }: { workflow: Workflow }) { ); } -export function WorkflowTileOld({ workflow }: { workflow: Workflow }) { +export function WorkflowTileOld({ workflow, providers }: { workflow: Workflow, providers: FullProvider[] }) { // Create a set to keep track of unique providers const apiUrl = getApiURL(); const { data: session } = useSession(); @@ -730,7 +741,7 @@ export function WorkflowTileOld({ workflow }: { workflow: Workflow }) { const [formValues, setFormValues] = useState<{ [key: string]: string }>({}); const [formErrors, setFormErrors] = useState<{ [key: string]: string }>({}); - const { providers } = useFetchProviders(); + const { isRunning, handleRunClick, @@ -739,6 +750,9 @@ export function WorkflowTileOld({ workflow }: { workflow: Workflow }) { getTriggerModalProps, } = useWorkflowRun(workflow!); + const { loading, isValid, error } = useWorkflowValidator(workflow, providers || null); + + const handleConnectProvider = (provider: FullProvider) => { setSelectedProvider(provider); // prepopulate it with the name @@ -831,7 +845,7 @@ export function WorkflowTileOld({ workflow }: { workflow: Workflow }) { ) .map((type) => { let fullProvider = - providers.find((fp) => fp.type === type) || ({} as FullProvider); + providers?.find((fp) => fp.type === type) || ({} as FullProvider); let workflowProvider = workflowProvidersMap.get(type) || ({} as FullProvider); @@ -858,22 +872,28 @@ export function WorkflowTileOld({ workflow }: { workflow: Workflow }) {
)} - -
+ + {!loading && !isValid &&
+ {error} + +
}
{workflow.name} - {!!handleRunClick && - WorkflowMenuSection({ - onDelete: handleDeleteClick, - onRun: handleRunClick, - onDownload: handleDownloadClick, - onView: handleViewClick, - onBuilder: handleBuilderClick, - runButtonToolTip: message, - isRunButtonDisabled: !!isRunButtonDisabled, - provisioned: workflow.provisioned, - })} + {!!handleRunClick && WorkflowMenuSection({ + onDelete: handleDeleteClick, + onRun: handleRunClick, + onDownload: handleDownloadClick, + onView: handleViewClick, + onBuilder: handleBuilderClick, + runButtonToolTip: message || error || "", + isRunButtonDisabled: !!isRunButtonDisabled || !isValid, + provisioned: workflow.provisioned, + })}
diff --git a/keep-ui/app/workflows/workflows.client.tsx b/keep-ui/app/workflows/workflows.client.tsx index 2d90aa818..a7652e097 100644 --- a/keep-ui/app/workflows/workflows.client.tsx +++ b/keep-ui/app/workflows/workflows.client.tsx @@ -21,6 +21,7 @@ import { ArrowRightIcon } from "@radix-ui/react-icons"; import { useRouter } from "next/navigation"; import Modal from "@/components/ui/Modal"; import MockWorkflowCardSection from "./mockworkflows"; +import { useFetchProviders } from "app/providers/page.client"; export default function WorkflowsPage() { const apiUrl = getApiURL(); @@ -51,6 +52,9 @@ export default function WorkflowsPage() { (url: string) => fetcher(url, session?.accessToken!) ); + const { providers } = useFetchProviders(); + + /** Add Mock Workflows (6 Random Workflows on Every Request) To add mock workflows, a new backend API endpoint has been created: /workflows/random-templates. @@ -299,13 +303,13 @@ export default function WorkflowsPage() { ) : !isSwitchOn ? (
{data.map((workflow) => ( - + ))}
) : (
{data.map((workflow) => ( - + ))}
)} diff --git a/keep-ui/utils/hooks/useWorkflowInitialization.ts b/keep-ui/utils/hooks/useWorkflowInitialization.ts index 17d33cd4f..8b95ad38e 100644 --- a/keep-ui/utils/hooks/useWorkflowInitialization.ts +++ b/keep-ui/utils/hooks/useWorkflowInitialization.ts @@ -9,6 +9,7 @@ import { FlowNode } from "../../app/workflows/builder/builder-store"; import { Provider } from "app/providers/providers"; import ELK from 'elkjs/lib/elk.bundled.js'; import { processWorkflowV2, getTriggerStep } from "utils/reactFlow"; +import { validate as isUUID } from 'uuid'; const layoutOptions = { "elk.nodeLabels.placement": "INSIDE V_CENTER H_BOTTOM", @@ -175,8 +176,12 @@ const useWorkflowInitialization = ( const initializeWorkflow = async () => { setIsLoading(true); let parsedWorkflow = definition?.value; - const name = parsedWorkflow?.properties?.name || parsedWorkflow?.properties?.id; - + let name = parsedWorkflow?.properties?.name + if(!name) { + name = !isUUID(parsedWorkflow?.properties?.id) + ? parsedWorkflow?.properties?.id + : ""; + } const sequences = [ { id: "start", @@ -199,6 +204,7 @@ const useWorkflowInitialization = ( ]; const intialPositon = { x: 0, y: 50 }; let { nodes, edges } = processWorkflowV2(sequences, intialPositon, true); + //TO DO update all the flow data at one go instead of one by one setSelectedNode(null); setFirstInitilisationDone(false) setIsLayouted(false); diff --git a/keep-ui/utils/hooks/useWorkflowValidator.ts b/keep-ui/utils/hooks/useWorkflowValidator.ts new file mode 100644 index 000000000..a0f5a7759 --- /dev/null +++ b/keep-ui/utils/hooks/useWorkflowValidator.ts @@ -0,0 +1,83 @@ +import { + V2Step, + ReactFlowDefinition, +} from "app/workflows/builder/builder-store"; +import { + stepValidatorV2, + globalValidatorV2, +} from "app/workflows/builder/builder-validators"; +import { + wrapDefinitionV2, + parseWorkflow, +} from "app/workflows/builder/utils"; +import { Provider } from "app/providers/providers"; +import { Definition } from "app/workflows/builder/builder-store"; +import { Workflow } from "app/workflows/models"; + +export default function useWorkflowValidator( + workflow: Workflow, + providers: Provider[] | null +) { + + let errorMessage = null; + const setStepValidationErrorV2 = (step: V2Step, error: string | null) => { + errorMessage = error; + }; + + const setGlobalValidationErrorV2 = ( + id: string | null, + error: string | null + ) => { + errorMessage = error; + }; + + const ValidatorConfigurationV2: { + step: ( + step: V2Step, + parent?: V2Step, + definition?: ReactFlowDefinition + ) => boolean; + root: (def: Definition) => boolean; + } = { + step: (step, parent, definition) => + stepValidatorV2(step, setStepValidationErrorV2, parent, definition), + root: (def) => globalValidatorV2(def, setGlobalValidationErrorV2), + }; + + if (!workflow || !providers) { + return { + loading: true, + isValid: true, + error: errorMessage + } + } + const defintion = wrapDefinitionV2({ + ...parseWorkflow(workflow?.workflow_raw, providers, true), + isValid: true, + }); + const { sequence, properties } = defintion.value ?? { sequence: [], properties: {} }; + let isValid = true; + for (let step of sequence) { + isValid = ValidatorConfigurationV2?.step(step); + if (!isValid) { + break; + } + } + + if (!isValid) { + return { + loading: false, + isValid: false, + error: errorMessage + }; + } + + isValid = ValidatorConfigurationV2.root({ sequence, properties }); + + return { + loading: false, + isValid, + error: errorMessage + }; +} + diff --git a/keep/api/core/db.py b/keep/api/core/db.py index 1381657a1..1cc495958 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -307,24 +307,45 @@ def add_or_update_workflow( description, created_by, interval, - workflow_raw, + update_raw, is_disabled, + workflow_id, provisioned=False, provisioned_file=None, + re_provision=False, updated_by=None, ) -> Workflow: with Session(engine, expire_on_commit=False) as session: # TODO: we need to better understanad if that's the right behavior we want - existing_workflow = ( - session.query(Workflow) - .filter_by(name=name) - .filter_by(tenant_id=tenant_id) - .first() - ) + #if re_provision = True that means it is a system or cli operation. so we need check by both name and id. if exist update or else create new + #if re_provision = False that means it is normal operation. so we need check by id always. if exist update or else create new + + existing_workflow = None + if re_provision: + existing_workflow = ( + session.query(Workflow) + .filter_by(name=name) + .filter_by(tenant_id=tenant_id) + .first() + ) + if not existing_workflow: + existing_workflow = ( + session.query(Workflow) + .filter_by(id=workflow_id) + .filter_by(tenant_id=tenant_id) + .first() + ) + if existing_workflow and existing_workflow.provisioned and not re_provision: + raise ValueError("Cannot update a provisioned workflow") + + provisioned = existing_workflow.provisioned if existing_workflow else provisioned if existing_workflow: + workflow_raw = update_raw(existing_workflow.id) + # tb: no need to override the id field here because it has foreign key constraints. existing_workflow.tenant_id = tenant_id + existing_workflow.name = name existing_workflow.description = description existing_workflow.updated_by = ( updated_by or existing_workflow.updated_by @@ -335,11 +356,13 @@ def add_or_update_workflow( existing_workflow.last_updated = datetime.now() # Update last_updated existing_workflow.is_deleted = False existing_workflow.is_disabled = is_disabled - existing_workflow.provisioned = provisioned + existing_workflow.provisioned = provisioned or False existing_workflow.provisioned_file = provisioned_file else: # Create a new workflow + # Since the workflow does not exist in the database, we'll include the original workflow_id in the raw data + workflow_raw = update_raw(id) workflow = Workflow( id=id, name=name, @@ -350,7 +373,7 @@ def add_or_update_workflow( interval=interval, is_disabled=is_disabled, workflow_raw=workflow_raw, - provisioned=provisioned, + provisioned= provisioned or False, provisioned_file=provisioned_file, ) session.add(workflow) @@ -719,6 +742,7 @@ def delete_workflow(tenant_id, workflow_id): session.commit() + def delete_workflow_by_provisioned_file(tenant_id, provisioned_file): with Session(engine) as session: workflow = session.exec( @@ -732,12 +756,12 @@ def delete_workflow_by_provisioned_file(tenant_id, provisioned_file): session.commit() -def get_workflow_id(tenant_id, workflow_name): +def get_workflow_id(tenant_id, workflow_id): with Session(engine) as session: workflow = session.exec( select(Workflow) .where(Workflow.tenant_id == tenant_id) - .where(Workflow.name == workflow_name) + .where(Workflow.id == workflow_id) .where(Workflow.is_deleted == False) ).first() diff --git a/keep/api/routes/workflows.py b/keep/api/routes/workflows.py index 7ed8dfc2b..2904ad810 100644 --- a/keep/api/routes/workflows.py +++ b/keep/api/routes/workflows.py @@ -136,6 +136,7 @@ def get_workflows( last_execution_started=last_execution_started, disabled=workflow.is_disabled, provisioned=workflow.provisioned, + provisioned_file=workflow.provisioned_file, ) except Exception as e: logger.error(f"Error creating workflow DTO: {e}") @@ -175,6 +176,7 @@ def run_workflow( created_by = authenticated_entity.email logger.info("Running workflow", extra={"workflow_id": workflow_id}) # if the workflow id is the name of the workflow (e.g. the CLI has only the name) + # ideally this not recommeded even for cli. we should always take id as input. if not validators.uuid(workflow_id): logger.info("Workflow ID is not a UUID, trying to get the ID by name") workflow_id = getattr(get_workflow_by_name(tenant_id, workflow_id), "id", None) @@ -241,6 +243,7 @@ async def run_workflow_from_definition( workflowstore = WorkflowStore() workflowmanager = WorkflowManager.get_instance() try: + #test run does not require to have workflow record in db. We just need to parse it workflow = workflowstore.get_workflow_from_dict( tenant_id=tenant_id, workflow=workflow ) @@ -589,6 +592,9 @@ def get_workflow_by_id( workflow_raw=workflow.workflow_raw, last_updated=workflow.last_updated, disabled=workflow.is_disabled, + provisioned=workflow.provisioned, + provisioned_file=workflow.provisioned_file, + ) return WorkflowExecutionsPaginatedResultsDto( limit=limit, diff --git a/keep/parser/parser.py b/keep/parser/parser.py index b274a36b2..fa6d91c11 100644 --- a/keep/parser/parser.py +++ b/keep/parser/parser.py @@ -3,11 +3,12 @@ import logging import os import typing +import validators import yaml from keep.actions.actions_factory import ActionsCRUD -from keep.api.core.db import get_workflow_id +from keep.api.core.db import get_workflow_id, get_workflow_by_name from keep.contextmanager.contextmanager import ContextManager from keep.providers.base.base_provider import BaseProvider from keep.providers.providers_factory import ProvidersFactory @@ -20,11 +21,12 @@ class Parser: def __init__(self): self.logger = logging.getLogger(__name__) - def _get_workflow_id(self, tenant_id, workflow: dict) -> str: + def _get_workflow_id(self, tenant_id, workflow: dict, workflow_not_found_throw_error=False) -> str: """Support both CLI and API workflows Args: workflow (dict): _description_ + workflow_not_found_throw_error: (bool, optional): _description_. Defaults to False. Raises: ValueError: _description_ @@ -32,20 +34,19 @@ def _get_workflow_id(self, tenant_id, workflow: dict) -> str: Returns: str: _description_ """ - # for backward compatibility reasons, the id on the YAML is actually the name - # and the id is a unique generated id stored in the db - workflow_name = workflow.get("id") - if workflow_name is None: + + if workflow_not_found_throw_error and not workflow.get("id"): raise ValueError("Workflow dict must have an id") - # get the workflow id from the database - workflow_id = get_workflow_id(tenant_id, workflow_name) - # if the workflow id is not found, it means that the workflow is not stored in the db - # for example when running from CLI - # so for backward compatibility, we will use the workflow name as the id - # todo - refactor CLI to use db also - if not workflow_id: - workflow_id = workflow_name + #This will only work if we pass the cli workflows with non uuid value as id. + if not validators.uuid(workflow.get("id")): + workflow_id = get_workflow_by_name(tenant_id, workflow.get("id")) + else: + workflow_id = get_workflow_id(tenant_id, workflow.get("id")) + + if workflow_not_found_throw_error and not workflow_id: + raise ValueError(f"Workflow with id {workflow.get('id')} does not exist in the database for tenant {tenant_id}") + return workflow_id def parse( @@ -54,12 +55,14 @@ def parse( parsed_workflow_yaml: dict, providers_file: str = None, actions_file: str = None, + workflow_not_found_throw_error: bool = False ) -> typing.List[Workflow]: """_summary_ Args: parsed_workflow_yaml (str): could be a url or a file path providers_file (str, optional): _description_. Defaults to None. + workflow_not_found_throw_error: (bool, optional): _description_. Defaults to False. Returns: typing.List[Workflow]: _description_ @@ -79,6 +82,7 @@ def parse( workflow_providers, actions_file, workflow_actions, + workflow_not_found_throw_error, ) for workflow in raw_workflows ] @@ -94,6 +98,7 @@ def parse( workflow_providers, actions_file, workflow_actions, + workflow_not_found_throw_error, ) workflows = [workflow] # else, if it stored in the db, it stored without the "workflow" key @@ -105,6 +110,7 @@ def parse( workflow_providers, actions_file, workflow_actions, + workflow_not_found_throw_error, ) workflows = [workflow] return workflows @@ -134,9 +140,10 @@ def _parse_workflow( workflow_providers: dict = None, actions_file: str = None, workflow_actions: dict = None, + workflow_not_found_throw_error: bool = False ) -> Workflow: self.logger.debug("Parsing workflow") - workflow_id = self._get_workflow_id(tenant_id, workflow) + workflow_id = self._get_workflow_id(tenant_id, workflow, workflow_not_found_throw_error) context_manager = ContextManager( tenant_id=tenant_id, workflow_id=workflow_id, @@ -361,6 +368,18 @@ def parse_disabled(workflow_dict: dict) -> bool: ) else False ) + @staticmethod + def parse_provisioned(workflow_dict:dict) -> bool: + workflow_is_provisioned = workflow_dict.get("provisioned") + return ( + True + if ( + workflow_is_provisioned == "true" + or workflow_is_provisioned is True + ) + else False + ) + @staticmethod def parse_provider_parameters(provider_parameters: dict) -> dict: diff --git a/keep/workflowmanager/workflowstore.py b/keep/workflowmanager/workflowstore.py index af1ae746b..0ed0375ec 100644 --- a/keep/workflowmanager/workflowstore.py +++ b/keep/workflowmanager/workflowstore.py @@ -38,26 +38,53 @@ def get_workflow_execution(self, tenant_id: str, workflow_execution_id: str): workflow_execution = get_workflow_execution(tenant_id, workflow_execution_id) return workflow_execution + def _is_not_uuid(self, s: str) -> bool: + try: + uuid_obj = uuid.UUID(s, version=4) + return str(uuid_obj) != s + except ValueError: + # If a ValueError is raised, the string is not a valid UUID + return True + + def create_workflow(self, tenant_id: str, created_by, workflow: dict): workflow_id = workflow.get("id") - self.logger.info(f"Creating workflow {workflow_id}") interval = self.parser.parse_interval(workflow) - if not workflow.get("name"): # workflow name is None or empty string - workflow_name = workflow_id + workflow_name = workflow.get("name") + if not workflow_name: # workflow name is None or empty string + workflow_name = workflow_id if workflow_id and self._is_not_uuid(workflow_id) else workflow_name workflow["name"] = workflow_name else: workflow_name = workflow.get("name") - workflow = add_or_update_workflow( - id=str(uuid.uuid4()), - name=workflow_name, - tenant_id=tenant_id, - description=workflow.get("description"), - created_by=created_by, - interval=interval, - is_disabled=Parser.parse_disabled(workflow), - workflow_raw=yaml.dump(workflow), - ) + if not workflow_name: + raise HTTPException( + status_code=400, detail="Workflow name is required" + ) + def _update_workflow_raw_data(workflow_id: str): + workflow["id"] = workflow_id + return yaml.dump(workflow) + + + try: + workflow = add_or_update_workflow( + id=str(uuid.uuid4()), + name=workflow_name, + tenant_id=tenant_id, + description=workflow.get("description"), + created_by=created_by, + interval=interval, + is_disabled=Parser.parse_disabled(workflow), + update_raw=_update_workflow_raw_data, # Pass update_raw function + workflow_id=workflow_id, + provisioned=Parser.parse_provisioned(workflow), + provisioned_file=workflow.get("provisioned_file") + ) + except Exception as e: + self.logger.error(f"Error creating workflow {workflow_id}: {str(e)}") + raise HTTPException( + status_code=403, detail=f"{str(e)}" + ) self.logger.info(f"Workflow {workflow_id} created successfully") return workflow @@ -101,6 +128,9 @@ def _parse_workflow_to_dict(self, workflow_path: str) -> dict: def get_raw_workflow(self, tenant_id: str, workflow_id: str) -> str: raw_workflow = get_raw_workflow(tenant_id, workflow_id) workflow_yaml = yaml.safe_load(raw_workflow) + if raw_workflow and workflow_yaml: + #Ensure that old workflows created through uploads are updated with the original UUID. + workflow_yaml["id"] = workflow_id valid_workflow_yaml = {"workflow": workflow_yaml} return yaml.dump(valid_workflow_yaml) @@ -112,6 +142,11 @@ def get_workflow(self, tenant_id: str, workflow_id: str) -> Workflow: detail=f"Workflow {workflow_id} not found", ) workflow_yaml = yaml.safe_load(workflow) + if workflow and workflow_yaml: + #Ensure that old workflows created through uploads are updated with the original UUID. + workflow_yaml["id"] = workflow_id + + # Ensure backward compatibility. Any workflow created without an ID or missing the original workflow.id in the workflow_raw will be updated with the current workflow.id. workflow = self.parser.parse(tenant_id, workflow_yaml) if len(workflow) > 1: raise HTTPException( @@ -126,9 +161,10 @@ def get_workflow(self, tenant_id: str, workflow_id: str) -> Workflow: detail=f"Workflow {workflow_id} not found", ) - def get_workflow_from_dict(self, tenant_id: str, workflow: dict) -> Workflow: + #if we want to check strictly if a workflow exists we can pass workflow_not_found_throw_error=True + def get_workflow_from_dict(self, tenant_id: str, workflow: dict, workflow_not_found_throw_error=False) -> Workflow: logging.info("Parsing workflow from dict", extra={"workflow": workflow}) - workflow = self.parser.parse(tenant_id, workflow) + workflow = self.parser.parse(tenant_id, workflow, workflow_not_found_throw_error=workflow_not_found_throw_error) if workflow: return workflow[0] else: @@ -180,6 +216,7 @@ def get_workflows_from_path( if isinstance(workflow_path, tuple): for workflow_url in workflow_path: workflow_yaml = self._parse_workflow_to_dict(workflow_url) + #In this case while parsing, it's not necessary to strictly verify whether the workflow exists. workflows.extend( self.parser.parse( tenant_id, workflow_yaml, providers_file, actions_file @@ -193,6 +230,7 @@ def get_workflows_from_path( ) else: workflow_yaml = self._parse_workflow_to_dict(workflow_path) + #In this case while parsing, it's not necessary to strictly verify whether the workflow exists. workflows = self.parser.parse( tenant_id, workflow_yaml, providers_file, actions_file ) @@ -221,6 +259,7 @@ def _get_workflows_from_directory( os.path.join(workflows_dir, file) ) try: + #In this case while parsing, it's not necessary to strictly verify whether the workflow exists. workflows.extend( self.parser.parse( tenant_id, @@ -307,7 +346,11 @@ def provision_workflows_from_directory( workflow_id = str(uuid.uuid4()) workflow_description = workflow_yaml.get("description") workflow_interval = parser.parse_interval(workflow_yaml) - workflow_disabled = parser.parse_disabled(workflow_yaml) + workflow_disabled = parser.parse_disabled(workflow_yaml) + + def _update_workflow_raw_data(workflow_id: str): + workflow_yaml["id"] = workflow_id + return yaml.dump(workflow_yaml) add_or_update_workflow( id=workflow_id, @@ -317,9 +360,11 @@ def provision_workflows_from_directory( created_by="system", interval=workflow_interval, is_disabled=workflow_disabled, - workflow_raw=yaml.dump(workflow_yaml), + update_raw=_update_workflow_raw_data, + workflow_id=workflow_id, provisioned=True, provisioned_file=workflow_path, + re_provision=True ) provisioned_workflows.append(workflow_yaml) @@ -403,7 +448,6 @@ def group_last_workflow_executions(self, workflows: list[dict]) -> list[dict]: Group last workflow executions by workflow id """ - self.logger.info(f"workflow_executions: {workflows}") workflow_dict = {} for item in workflows: workflow, started, execution_time, status = item diff --git a/tests/test_create_workflow.py b/tests/test_create_workflow.py new file mode 100644 index 000000000..7d8b58574 --- /dev/null +++ b/tests/test_create_workflow.py @@ -0,0 +1,179 @@ +import uuid +import builtins +import json +import time +from pathlib import Path +import pytest +from keep.api.core.dependencies import SINGLE_TENANT_UUID +from keep.workflowmanager.workflowstore import WorkflowStore +from unittest.mock import MagicMock, patch +from fastapi import HTTPException + + + +@pytest.fixture +def mock_uuid(): + index = 0 + + def generate_uuid(): + nonlocal index + index += 1 + return "my-fixed-uuid" + str(index) + + with patch('uuid.uuid4', generate_uuid): + yield + + + + +#Here we can add deprecated payloads also to test the workflow creation. +def get_sample_mock_workflow(name=None, id=None, provisioned=False): + sample = { + "description": "test workflow", + "interval": 0, + "triggers": [ + { + "type": "mock", + "filters": [ + { + "key": "name", + "value": "test alert" + } + ] + } + ], + "actions": [ + { + "name": "test action", + "provider": { + "type": "mock" + } + } + ] + } + + if name : + sample["name"] = name + if id : + sample["id"] = id + + if provisioned : + sample["provisioned"] = provisioned + + return sample + + +def test_create_workflow_check(db_session, mock_uuid): + workflow_store = WorkflowStore() + # create mock workflow + mock_workflow = get_sample_mock_workflow(name="test_name") + result = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow) + assert result.id == "my-fixed-uuid1" + assert result.name == "test_name" + + + +def test_create_workflow_where_id_exists_in_db(db_session, mock_uuid): + workflow_store = WorkflowStore() + # create mock workflow + mock_workflow1 = get_sample_mock_workflow(name="test_workflow") + result1 = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow1) + assert result1.id == "my-fixed-uuid1" + assert result1.name == "test_workflow" + #It is exact copy of above mock workflow. this should workflow should have unique id + mock_workflow1_1 = get_sample_mock_workflow(name="test_workflow") + result_1_1 = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow1_1) + assert result_1_1.id == "my-fixed-uuid2" + assert result_1_1.name == "test_workflow" + + mock_workflow2 = get_sample_mock_workflow(name="test workflow 2") + result2 = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow2) + assert result2.id == "my-fixed-uuid3" + assert result2.name == "test workflow 2" + + #If a valid workflow UUID is provided and it exists in the database, the workflow data will be updated + mock_workflow3 = get_sample_mock_workflow(name="test workflow 1", id="my-fixed-uuid1") + result3 = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow3) + assert result3.id == "my-fixed-uuid1" + assert result3.name == "test workflow 1" + + +def test_create_workflow_when_there_is_no_name(db_session, mock_uuid): + workflow_store = WorkflowStore() + # create mock workflow + mock_workflow1 = get_sample_mock_workflow("test_workflow") + result1 = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow1) + assert result1.id == "my-fixed-uuid1" + assert result1.name == "test_workflow" + #It is exact copy of above mock workflow. this should workflow should have unique id + mock_workflow1_1 = get_sample_mock_workflow() + with pytest.raises(HTTPException) as workflow_info: + workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow1_1) + + assert workflow_info.value.status_code == 400 + assert workflow_info.value.detail == "Workflow name is required" + + #If a valid workflow UUID is provided and it exists in the database, the workflow data will be updated + mock_workflow3 = get_sample_mock_workflow(name="test workflow 1", id="my-fixed-uuid1") + result3 = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow3) + assert result3.id == "my-fixed-uuid1" + assert result3.name == "test workflow 1" + + +def test_create_workflow_when_there_is_id_and_name(db_session, mock_uuid): + #In the case. If a workflow with the given ID exists, update it. Otherwise, create a new workflow. + workflow_store = WorkflowStore() + # create mock workflow + mock_workflow1 = get_sample_mock_workflow(name="test workflow", id="test-1") # this id does not exist in db. so it will create new workflow with new UUID + result1 = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow1) + assert result1.id == "my-fixed-uuid1" + assert result1.name == "test workflow" + #It is exact copy of above mock workflow. this should workflow should have unique id + mock_workflow1_1 = get_sample_mock_workflow(name="test workflow", id="test-1") #this id does not exist in db. so it will create new workflow with new UUID + result_1_1 = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow1_1) + assert result_1_1.id == "my-fixed-uuid2" + assert result_1_1.name == "test workflow" + + #If a valid workflow UUID is provided and it exists in the database, the workflow data will be updated + mock_workflow3 = get_sample_mock_workflow(name="test workflow 1", id="my-fixed-uuid1") + result3 = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow3) + assert result3.id == "my-fixed-uuid1" + assert result3.name == "test workflow 1" + + +def test_create_workflow_when_there_is_id_and_no_name(db_session, mock_uuid): + workflow_store = WorkflowStore() + # create mock workflow + #this is only possible via cli or upload only + mock_workflow1 = get_sample_mock_workflow(id="test-1") # this id does not exist in db. so it will create new workflow with new UUID + result1 = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow1) + assert result1.id == "my-fixed-uuid1" + assert result1.name == "test-1" + #It is exact copy of above mock workflow. this should workflow should have unique id + mock_workflow1_1 = get_sample_mock_workflow(id="test-1") #this id does not exist in db. so it will create new workflow with new UUID + result_1_1 = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow1_1) + assert result_1_1.id == "my-fixed-uuid2" + assert result_1_1.name == "test-1" + + #If a valid workflow UUID is provided and it exists in the database, the workflow data will be updated + mock_workflow3 = get_sample_mock_workflow(id="my-fixed-uuid1", name="test workflow 1") + result3 = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow3) + assert result3.id == "my-fixed-uuid1" + assert result3.name == "test workflow 1" + + +def test_create_worfklow_already_provisioned_workflow_should_not_update(db_session, mock_uuid): + workflow_store = WorkflowStore() + # create mock workflow + mock_workflow = get_sample_mock_workflow(name="test_name", provisioned=True) + result = workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow) + assert result.id == "my-fixed-uuid1" + assert result.name == "test_name" + assert result.provisioned == True + + mock_workflow_1 = get_sample_mock_workflow(name="test_name1", id="my-fixed-uuid1") + + with pytest.raises(HTTPException) as workflow_info: + workflow_store.create_workflow(tenant_id=SINGLE_TENANT_UUID, created_by="test", workflow=mock_workflow_1) + + assert workflow_info.value.detail == "Cannot update a provisioned workflow" \ No newline at end of file diff --git a/tests/test_workflowmanager.py b/tests/test_workflowmanager.py index 2b4868146..ef8524636 100644 --- a/tests/test_workflowmanager.py +++ b/tests/test_workflowmanager.py @@ -26,9 +26,10 @@ def test_get_workflow_from_dict(): workflow_path = str(path_to_test_resources / "db_disk_space_for_testing.yml") workflow_dict = workflow_store._parse_workflow_to_dict(workflow_path=workflow_path) result = workflow_store.get_workflow_from_dict( - tenant_id=tenant_id, workflow=workflow_dict + tenant_id=tenant_id, workflow=workflow_dict, workflow_not_found_throw_error=False ) - mock_parser.parse.assert_called_once_with(tenant_id, workflow_dict) + mock_parser.parse.assert_called_once_with(tenant_id, workflow_dict, workflow_not_found_throw_error=False) + #The workflow ID is not set in the database because it's a mock file. assert result.id == "workflow1" @@ -45,12 +46,12 @@ def test_get_workflow_from_dict_raises_exception(): with pytest.raises(HTTPException) as exc_info: workflow_store.get_workflow_from_dict( - tenant_id=tenant_id, workflow=workflow_dict + tenant_id=tenant_id, workflow=workflow_dict, workflow_not_found_throw_error=True ) assert exc_info.value.status_code == 500 assert exc_info.value.detail == "Unable to parse workflow from dict" - mock_parser.parse.assert_called_once_with(tenant_id, workflow_dict) + mock_parser.parse.assert_called_once_with(tenant_id, workflow_dict, workflow_not_found_throw_error=True) def test_get_workflow_results():