diff --git a/.github/workflows/test-pr-e2e.yml b/.github/workflows/test-pr-e2e.yml index 95fff2ebc..2a8dfb696 100644 --- a/.github/workflows/test-pr-e2e.yml +++ b/.github/workflows/test-pr-e2e.yml @@ -60,11 +60,25 @@ jobs: - name: Install dependencies using poetry run: poetry install --no-interaction --no-root --with dev - - name: Install Playwright dependencies - run: npx playwright install --with-deps + - name: Get Playwright version from poetry.lock + id: playwright-version + run: | + PLAYWRIGHT_VERSION=$(grep "playwright" poetry.lock -A 5 | grep "version" | head -n 1 | cut -d'"' -f2) + echo "version=$PLAYWRIGHT_VERSION" >> $GITHUB_OUTPUT + - name: Cache Playwright browsers + id: playwright-cache + uses: actions/cache@v4 + with: + path: ~/.cache/ms-playwright + key: playwright-${{ steps.playwright-version.outputs.version }} - - name: Install playwright - run: poetry run playwright install + - name: Install Playwright and dependencies + run: | + if [ "${{ steps.playwright-cache.outputs.cache-hit }}" != "true" ]; then + poetry run playwright install --with-deps + else + poetry run playwright install-deps + fi - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 diff --git a/docker/Dockerfile.ui b/docker/Dockerfile.ui index 6b0d4210a..b0f3aa1a4 100644 --- a/docker/Dockerfile.ui +++ b/docker/Dockerfile.ui @@ -1,6 +1,4 @@ - - -FROM node:18-alpine AS base +FROM node:20-alpine AS base # Install dependencies only when needed FROM base AS deps diff --git a/keep-ui/.dockerignore b/keep-ui/.dockerignore new file mode 100644 index 000000000..d38b75ee9 --- /dev/null +++ b/keep-ui/.dockerignore @@ -0,0 +1,8 @@ +node_modules +.next +.vercel +.env.* +.venv/ +.vscode/ +.github/ +.pytest_cache diff --git a/keep-ui/app/(keep)/alerts/alert-push-alert-to-server-modal.tsx b/keep-ui/app/(keep)/alerts/alert-push-alert-to-server-modal.tsx index f16ff6d73..457d1a42e 100644 --- a/keep-ui/app/(keep)/alerts/alert-push-alert-to-server-modal.tsx +++ b/keep-ui/app/(keep)/alerts/alert-push-alert-to-server-modal.tsx @@ -80,6 +80,11 @@ const PushAlertToServerModal = ({ const onSubmit: SubmitHandler = async (data) => { try { + // if type is string, parse it to JSON + if (typeof data.alertJson === "string") { + data.alertJson = JSON.parse(data.alertJson); + } + const response = await api.post( `/alerts/event/${data.source.type}`, data.alertJson diff --git a/keep-ui/app/(keep)/workflows/builder/builder-workflow-testrun-modal.tsx b/keep-ui/app/(keep)/workflows/builder/builder-workflow-testrun-modal.tsx index a62b11fed..94e0ae849 100644 --- a/keep-ui/app/(keep)/workflows/builder/builder-workflow-testrun-modal.tsx +++ b/keep-ui/app/(keep)/workflows/builder/builder-workflow-testrun-modal.tsx @@ -3,15 +3,18 @@ import { Button, Card, Subtitle, Title } from "@tremor/react"; import ReactLoading from "react-loading"; import { ExecutionResults } from "./workflow-execution-results"; import { WorkflowExecution, WorkflowExecutionFailure } from "./types"; +import { ApiClient } from "@/shared/api"; interface Props { closeModal: () => void; workflowExecution: WorkflowExecution | WorkflowExecutionFailure | null; + apiClient: ApiClient; } export default function BuilderWorkflowTestRunModalContent({ closeModal, workflowExecution, + apiClient, }: Props) { return ( <> @@ -34,7 +37,10 @@ export default function BuilderWorkflowTestRunModalContent({
{workflowExecution ? ( - + ) : (
{generateModalIsOpen || testRunModalOpen ? null : ( diff --git a/keep-ui/app/(keep)/workflows/builder/types.ts b/keep-ui/app/(keep)/workflows/builder/types.ts index 9c8403285..593da16de 100644 --- a/keep-ui/app/(keep)/workflows/builder/types.ts +++ b/keep-ui/app/(keep)/workflows/builder/types.ts @@ -25,6 +25,8 @@ export interface WorkflowExecution { logs?: LogEntry[] | null; error?: string | null; execution_time?: number; + event_id?: string; + event_type?: string; } export interface PaginatedWorkflowExecutionDto { diff --git a/keep-ui/app/(keep)/workflows/builder/workflow-execution-results.tsx b/keep-ui/app/(keep)/workflows/builder/workflow-execution-results.tsx index bba07f997..eae3acb71 100644 --- a/keep-ui/app/(keep)/workflows/builder/workflow-execution-results.tsx +++ b/keep-ui/app/(keep)/workflows/builder/workflow-execution-results.tsx @@ -7,6 +7,7 @@ import { AccordionHeader, Card, Title, + Button, } from "@tremor/react"; import Loading from "@/app/(keep)/loading"; import { ExclamationCircleIcon } from "@heroicons/react/24/outline"; @@ -25,6 +26,8 @@ import { isWorkflowExecution, } from "./types"; import { useApi } from "@/shared/lib/hooks/useApi"; +import WorkflowDefinitionYAML from "../workflow-definition-yaml"; +import { ApiClient } from "@/shared/api/ApiClient"; interface WorkflowResultsProps { workflow_id: string; @@ -56,23 +59,26 @@ export default function WorkflowExecutionResults({ } ); - // disable refresh interval when execution is complete + // Get workflow definition + const { data: workflowData } = useSWR( + api.isReady() ? `/workflows/${workflow_id}` : null, + (url) => api.get(url) + ); + useEffect(() => { if (!executionData) return; - // if the status is other than in_progress, stop the refresh interval if (executionData?.status !== "in_progress") { console.log("Stopping refresh interval"); setRefreshInterval(0); } - // if there's an error - show it if (executionData.error) { setError(executionData?.error); console.log("Stopping refresh interval"); setRefreshInterval(0); } else if (executionData?.status === "success") { - setError(executionData?.error); // should be null - setRefreshInterval(0); // Disable refresh interval when execution is complete + setError(executionData?.error); + setRefreshInterval(0); } }, [executionData]); @@ -80,7 +86,7 @@ export default function WorkflowExecutionResults({ console.error("Error fetching execution status", executionError); } - if (status === "loading" || !executionData) return ; + if (!executionData || !workflowData) return ; if (executionError) { return ( @@ -98,21 +104,29 @@ export default function WorkflowExecutionResults({ return ( + api={api} + /> ); } export function ExecutionResults({ executionData, + workflowData, checks, + api, }: { executionData: WorkflowExecution | WorkflowExecutionFailure; + workflowData?: any; checks?: number; + api: ApiClient; }) { let status: WorkflowExecution["status"] | undefined; let logs: WorkflowExecution["logs"] | undefined; let results: WorkflowExecution["results"] | undefined; + let eventId: string | undefined; + let eventType: string | undefined; const error = executionData.error; @@ -120,113 +134,161 @@ export function ExecutionResults({ status = executionData.status; logs = executionData.logs; results = executionData.results; + eventId = executionData.event_id; + eventType = executionData.event_type; } + const getCurlCommand = () => { + let token = api.getToken(); + let url = api.getApiBaseUrl(); + // Only include workflow ID if workflowData is available + const workflowIdParam = workflowData ? `/${workflowData.id}` : ""; + return `curl -X POST "${url}/workflows${workflowIdParam}/run?event_type=${eventType}&event_id=${eventId}" \\ + -H "Authorization: Bearer ${token}" \\ + -H "Content-Type: application/json"`; + }; + + const copyToClipboard = () => { + navigator.clipboard.writeText(getCurlCommand()); + }; + return ( -
- {results && Object.keys(results).length > 0 && ( - - Workflow Results - - - - - Action ID - - - Results - - - - - {Object.entries(results).map(([stepId, stepResults], index) => ( - - - {stepId} - - - - Value - -
-                          {JSON.stringify(stepResults, null, 2)}
-                        
-
-
-
-
- ))} -
-
+
+ {/* Error Card */} + {error && ( + + +
+
+ {error.split("\n").map((line, index) => ( +

{line}

+ ))} +
+ {eventId && eventType && ( + + )} +
+
)} -
0 ? "mt-8" : ""}> - {status === "in_progress" ? ( -
-
-

- The workflow is in progress, will check again in one second - (times checked: {checks}) -

-
- -
- ) : ( - <> - {error && ( - - {error - ? error.split("\n").map((line, index) => ( - // Render each line as a separate paragraph or div. - // The key is index, which is sufficient for simple lists like this. -

{line}

- )) - : "An unknown error occurred during execution."} -
- )} - - Workflow Logs + + {/* Workflow Results Card */} + {/* + +
+ {results && Object.keys(results).length > 0 && ( +
+ Workflow Results - - Timestamp + + Action ID - - Message + + Results - {(logs ?? []).map((log, index) => ( - - - {log.timestamp} + {Object.entries(results).map(([stepId, stepResults], index) => ( + + + {stepId} - - {log.message} + + + Value + +
+                              {JSON.stringify(stepResults, null, 2)}
+                            
+
+
))}
- - - )} +
+ )} +
+
+ */} + + {/* Lower Section with Logs and Definition */} +
+ {/* Workflow Logs Card */} + +
+ {status === "in_progress" ? ( +
+
+

+ The workflow is in progress, will check again in one second + (times checked: {checks}) +

+
+ +
+ ) : ( +
+ Workflow Logs + + + + + Timestamp + + + Message + + + + + {(logs ?? []).map((log, index) => ( + + + {log.timestamp} + + + {log.message} + + + ))} + +
+
+ )} +
+
+ + {/* Workflow Definition Card */} + + Workflow Definition +
+ +
+
); diff --git a/keep-ui/app/(keep)/workflows/workflow-definition-yaml.tsx b/keep-ui/app/(keep)/workflows/workflow-definition-yaml.tsx new file mode 100644 index 000000000..459fa971a --- /dev/null +++ b/keep-ui/app/(keep)/workflows/workflow-definition-yaml.tsx @@ -0,0 +1,136 @@ +import React from "react"; +import { load, dump } from "js-yaml"; +import { CheckCircle, XCircle, Clock } from "lucide-react"; + +interface LogEntry { + timestamp: string; + message: string; +} + +interface Props { + workflowRaw: string; + executionLogs?: LogEntry[] | null; + executionStatus?: string; +} + +export default function WorkflowDefinitionYAML({ + workflowRaw, + executionLogs, + executionStatus, +}: Props) { + const reorderWorkflowSections = (yamlString: string) => { + const content = yamlString.startsWith('"') + ? JSON.parse(yamlString) + : yamlString; + + const workflow = load(content) as any; + const workflowData = workflow.workflow; + + const metadataFields = ["id", "name", "description", "disabled"]; + const sectionOrder = [ + "triggers", + "consts", + "owners", + "services", + "steps", + "actions", + ]; + + const orderedWorkflow: any = { + workflow: {}, + }; + + metadataFields.forEach((field) => { + if (workflowData[field] !== undefined) { + orderedWorkflow.workflow[field] = workflowData[field]; + } + }); + + sectionOrder.forEach((section) => { + if (workflowData[section] !== undefined) { + orderedWorkflow.workflow[section] = workflowData[section]; + } + }); + + return dump(orderedWorkflow, { + indent: 2, + lineWidth: -1, + noRefs: true, + sortKeys: false, + quotingType: '"', + }); + }; + + const getStatus = (name: string, isAction: boolean = false) => { + if (!executionLogs || !executionStatus) return "pending"; + if (executionStatus === "in_progress") return "in_progress"; + + const type = isAction ? "Action" : "Step"; + const successPattern = `${type} ${name} ran successfully`; + const failurePattern = `Failed to run ${type.toLowerCase()} ${name}`; + + const hasSuccessLog = executionLogs.some( + (log) => log.message?.includes(successPattern) + ); + const hasFailureLog = executionLogs.some( + (log) => log.message?.includes(failurePattern) + ); + + if (hasSuccessLog) return "success"; + if (hasFailureLog) return "failed"; + return "pending"; + }; + + const getStepIcon = (status: string) => { + switch (status) { + case "success": + return ; + case "failed": + return ; + case "in_progress": + return ; + default: + return null; + } + }; + + const renderYamlWithIcons = () => { + const orderedYaml = reorderWorkflowSections(workflowRaw); + const lines = orderedYaml.split("\n"); + let currentName: string | null = null; + let isInActions = false; + + return lines.map((line, index) => { + const trimmedLine = line.trim(); + + if (trimmedLine === "actions:") { + isInActions = true; + } else if (trimmedLine.startsWith("steps:")) { + isInActions = false; + } + + if (trimmedLine.startsWith("- name:")) { + currentName = trimmedLine.split("name:")[1].trim(); + } + + const status = currentName ? getStatus(currentName, isInActions) : null; + const icon = status ? getStepIcon(status) : null; + + return ( +
+
{icon}
+
{line || "\u00A0"}
+
+ ); + }); + }; + + return ( +
+ {renderYamlWithIcons()} +
+ ); +} diff --git a/keep-ui/shared/api/ApiClient.ts b/keep-ui/shared/api/ApiClient.ts index f1e8b80ef..5f579379e 100644 --- a/keep-ui/shared/api/ApiClient.ts +++ b/keep-ui/shared/api/ApiClient.ts @@ -28,6 +28,21 @@ export class ApiClient { }; } + getToken() { + return this.session?.accessToken; + } + + getApiBaseUrl() { + if (this.isServer) { + return getApiURL(); + } + const baseUrl = getApiUrlFromConfig(this.config); + if (baseUrl.startsWith("/")) { + return `${window.location.origin}${baseUrl}`; + } + return baseUrl; + } + async handleResponse(response: Response, url: string) { // Ensure that the fetch was successful if (!response.ok) { diff --git a/keep-ui/utils/hooks/useWorkflowExecutions.ts b/keep-ui/utils/hooks/useWorkflowExecutions.ts index 7c48966a5..3247efc99 100644 --- a/keep-ui/utils/hooks/useWorkflowExecutions.ts +++ b/keep-ui/utils/hooks/useWorkflowExecutions.ts @@ -44,11 +44,15 @@ export const useWorkflowExecutionsV2 = ( return useSWR( api.isReady() - ? `/workflows/${workflowId}/runs?v2=true&tab=${tab}&limit=${limit}&offset=${offset}${ + ? `/workflows/${workflowId}/runs?v2=true&limit=${limit}&offset=${offset}${ searchParams ? `&${searchParams.toString()}` : "" }` : null, - (url: string) => api.get(url) + (url: string) => api.get(url), + { + revalidateOnFocus: false, + revalidateIfStale: false, + } ); }; diff --git a/keep/api/core/db.py b/keep/api/core/db.py index f23211e55..a67049032 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -665,8 +665,8 @@ def finish_workflow_execution(tenant_id, workflow_id, execution_id, status, erro workflow_execution.status = status # TODO: we had a bug with the error field, it was too short so some customers may fail over it. # we need to fix it in the future, create a migration that increases the size of the error field - # and then we can remove the [:255] from here - workflow_execution.error = error[:255] if error else None + # and then we can remove the [:511] from here + workflow_execution.error = error[:511] if error else None workflow_execution.execution_time = ( datetime.utcnow() - workflow_execution.started ).total_seconds() @@ -837,7 +837,11 @@ def get_workflow_execution(tenant_id: str, workflow_execution_id: str): WorkflowExecution.id == workflow_execution_id, WorkflowExecution.tenant_id == tenant_id, ) - .options(joinedload(WorkflowExecution.logs)) + .options( + joinedload(WorkflowExecution.logs), + joinedload(WorkflowExecution.workflow_to_alert_execution), + joinedload(WorkflowExecution.workflow_to_incident_execution), + ) .one() ) return execution_with_logs @@ -1434,6 +1438,18 @@ def get_alert_by_fingerprint_and_event_id( return alert +def get_alert_by_event_id(tenant_id: str, event_id: str) -> Alert: + with Session(engine) as session: + query = ( + session.query(Alert) + .filter(Alert.tenant_id == tenant_id) + .filter(Alert.id == uuid.UUID(event_id)) + ) + query = query.options(subqueryload(Alert.alert_enrichment)) + alert = query.first() + return alert + + def get_previous_alert_by_fingerprint(tenant_id: str, fingerprint: str) -> Alert: # get the previous alert for a given fingerprint with Session(engine) as session: @@ -2185,23 +2201,30 @@ def update_key_last_used( def get_linked_providers(tenant_id: str) -> List[Tuple[str, str, datetime]]: + # Alert table may be too huge, so cutting the query without mercy + LIMIT_BY_ALERTS = 10000 + with Session(engine) as session: - providers = ( - session.query( - Alert.provider_type, - Alert.provider_id, - func.max(Alert.timestamp).label("last_alert_timestamp"), + alerts_subquery = select(Alert).filter( + Alert.tenant_id == tenant_id, + Alert.provider_type != "group" + ).limit(LIMIT_BY_ALERTS).subquery() + + providers = session.exec( + select( + alerts_subquery.c.provider_type, + alerts_subquery.c.provider_id, + func.max(alerts_subquery.c.timestamp).label("last_alert_timestamp") ) - .outerjoin(Provider, Alert.provider_id == Provider.id) + .select_from(alerts_subquery) .filter( - Alert.tenant_id == tenant_id, - Alert.provider_type != "group", - Provider.id - == None, # Filters for alerts with a provider_id not in Provider table + ~exists().where(Provider.id == alerts_subquery.c.provider_id) ) - .group_by(Alert.provider_type, Alert.provider_id) - .all() - ) + .group_by( + alerts_subquery.c.provider_type, + alerts_subquery.c.provider_id + ) + ).all() return providers @@ -2825,6 +2848,9 @@ def get_tenants_configurations(only_with_config=False) -> List[Tenant]: def update_preset_options(tenant_id: str, preset_id: str, options: dict) -> Preset: + if isinstance(preset_id, str): + preset_id = __convert_to_uuid(preset_id) + with Session(engine) as session: preset = session.exec( select(Preset) @@ -4157,6 +4183,8 @@ def create_tag(tag: Tag): def assign_tag_to_preset(tenant_id: str, tag_id: str, preset_id: str): + if isinstance(preset_id, str): + preset_id = __convert_to_uuid(preset_id) with Session(engine) as session: tag_preset = PresetTagLink( tenant_id=tenant_id, diff --git a/keep/api/models/workflow.py b/keep/api/models/workflow.py index 3d534662e..b4d5c3285 100644 --- a/keep/api/models/workflow.py +++ b/keep/api/models/workflow.py @@ -115,6 +115,8 @@ class WorkflowExecutionDTO(BaseModel): error: Optional[str] execution_time: Optional[float] results: Optional[dict] + event_id: Optional[str] + event_type: Optional[str] class WorkflowCreateOrUpdateDTO(BaseModel): diff --git a/keep/api/routes/preset.py b/keep/api/routes/preset.py index bd331806c..bbe3af59c 100644 --- a/keep/api/routes/preset.py +++ b/keep/api/routes/preset.py @@ -318,39 +318,41 @@ def create_preset( @router.delete( - "/{uuid}", + "/{preset_id}", description="Delete a preset for tenant", ) def delete_preset( - uuid: str, + preset_id: uuid.UUID, authenticated_entity: AuthenticatedEntity = Depends( IdentityManagerFactory.get_auth_verifier(["delete:presets"]) ), session: Session = Depends(get_session), ): tenant_id = authenticated_entity.tenant_id - logger.info("Deleting preset", extra={"uuid": uuid}) + logger.info("Deleting preset", extra={"uuid": preset_id}) # Delete links - session.query(PresetTagLink).filter(PresetTagLink.preset_id == uuid).delete() + session.query(PresetTagLink).filter(PresetTagLink.preset_id == preset_id).delete() statement = ( - select(Preset).where(Preset.tenant_id == tenant_id).where(Preset.id == uuid) + select(Preset) + .where(Preset.tenant_id == tenant_id) + .where(Preset.id == preset_id) ) preset = session.exec(statement).first() if not preset: raise HTTPException(404, "Preset not found") session.delete(preset) session.commit() - logger.info("Deleted preset", extra={"uuid": uuid}) + logger.info("Deleted preset", extra={"uuid": preset_id}) return {} @router.put( - "/{uuid}", + "/{preset_id}", description="Update a preset for tenant", ) def update_preset( - uuid: str, + preset_id: uuid.UUID, body: CreateOrUpdatePresetDto, authenticated_entity: AuthenticatedEntity = Depends( IdentityManagerFactory.get_auth_verifier(["write:presets"]) @@ -358,9 +360,11 @@ def update_preset( session: Session = Depends(get_session), ) -> PresetDto: tenant_id = authenticated_entity.tenant_id - logger.info("Updating preset", extra={"uuid": uuid}) + logger.info("Updating preset", extra={"uuid": preset_id}) statement = ( - select(Preset).where(Preset.tenant_id == tenant_id).where(Preset.id == uuid) + select(Preset) + .where(Preset.tenant_id == tenant_id) + .where(Preset.id == preset_id) ) preset = session.exec(statement).first() if not preset: @@ -412,7 +416,7 @@ def update_preset( session.commit() session.refresh(preset) - logger.info("Updated preset", extra={"uuid": uuid}) + logger.info("Updated preset", extra={"uuid": preset_id}) return PresetDto(**preset.to_dict()) diff --git a/keep/api/routes/workflows.py b/keep/api/routes/workflows.py index b313650fe..7a5912996 100644 --- a/keep/api/routes/workflows.py +++ b/keep/api/routes/workflows.py @@ -20,6 +20,7 @@ from sqlmodel import Session from keep.api.core.db import ( + get_alert_by_event_id, get_installed_providers, get_last_workflow_workflow_to_alert_executions, get_session, @@ -35,6 +36,7 @@ WorkflowExecutionLogsDTO, WorkflowToAlertExecutionDTO, ) +from keep.api.utils.enrichment_helpers import convert_db_alerts_to_dto_alerts from keep.api.utils.pagination import WorkflowExecutionsPaginatedResultsDto from keep.identitymanager.authenticatedentity import AuthenticatedEntity from keep.identitymanager.identitymanagerfactory import IdentityManagerFactory @@ -168,6 +170,8 @@ def export_workflows( ) def run_workflow( workflow_id: str, + event_type: Optional[str] = Query(None), + event_id: Optional[str] = Query(None), body: Optional[Dict[Any, Any]] = Body(None), authenticated_entity: AuthenticatedEntity = Depends( IdentityManagerFactory.get_auth_verifier(["write:workflows"]) @@ -176,40 +180,57 @@ def run_workflow( tenant_id = authenticated_entity.tenant_id created_by = authenticated_entity.email logger.info("Running workflow", extra={"workflow_id": workflow_id}) + # if the workflow id is the name of the workflow (e.g. the CLI has only the name) if not validators.uuid(workflow_id): logger.info("Workflow ID is not a UUID, trying to get the ID by name") workflow_id = getattr(get_workflow_by_name(tenant_id, workflow_id), "id", None) + workflowmanager = WorkflowManager.get_instance() - # Finally, run it try: - - if body.get("type", "alert") == "alert": - event_class = AlertDto + # Handle replay from query parameters + if event_type and event_id: + if event_type == "alert": + # Fetch alert from your alert store + alert_db = get_alert_by_event_id(tenant_id, event_id) + event = convert_db_alerts_to_dto_alerts([alert_db])[0] + elif event_type == "incident": + # SHAHAR: TODO + raise NotImplementedError("Incident replay is not supported yet") + else: + raise HTTPException( + status_code=400, + detail=f"Invalid event type: {event_type}", + ) else: - event_class = IncidentDto - - event_body = body.get("body", {}) or body - - # if its event that was triggered by the UI with the Modal - fingerprint = event_body.get("fingerprint", "") - if (fingerprint and "test-workflow" in fingerprint) or not body: - # some random - event_body["id"] = event_body.get("fingerprint", "manual-run") - event_body["name"] = event_body.get("fingerprint", "manual-run") - event_body["lastReceived"] = datetime.datetime.now( - tz=datetime.timezone.utc - ).isoformat() - if "source" in event_body and not isinstance(event_body["source"], list): - event_body["source"] = [event_body["source"]] - try: - event = event_class(**event_body) - except TypeError: - raise HTTPException( - status_code=400, - detail="Invalid event format", + # Handle regular run from body + event_body = body.get("body", {}) or body + event_class = ( + AlertDto if body.get("type", "alert") == "alert" else IncidentDto ) + + # Handle UI triggered events + fingerprint = event_body.get("fingerprint", "") + if (fingerprint and "test-workflow" in fingerprint) or not body: + event_body["id"] = event_body.get("fingerprint", "manual-run") + event_body["name"] = event_body.get("fingerprint", "manual-run") + event_body["lastReceived"] = datetime.datetime.now( + tz=datetime.timezone.utc + ).isoformat() + if "source" in event_body and not isinstance( + event_body["source"], list + ): + event_body["source"] = [event_body["source"]] + + try: + event = event_class(**event_body) + except TypeError: + raise HTTPException( + status_code=400, + detail="Invalid event format", + ) + workflow_execution_id = workflowmanager.scheduler.handle_manual_event_workflow( workflow_id, tenant_id, created_by, event ) @@ -222,6 +243,7 @@ def run_workflow( status_code=500, detail=f"Failed to run workflow {workflow_id}: {e}", ) + logger.info( "Workflow ran successfully", extra={ @@ -693,6 +715,17 @@ def get_workflow_execution_status( detail=f"Workflow execution {workflow_execution_id} not found", ) + event_id = None + event_type = None + + if workflow_execution.workflow_to_alert_execution: + event_id = workflow_execution.workflow_to_alert_execution.event_id + event_type = "alert" + # TODO: sub triggers? on create? on update? + elif workflow_execution.workflow_to_incident_execution: + event_id = workflow_execution.workflow_to_incident_execution.incident_id + event_type = "incident" + workflow_execution_dto = WorkflowExecutionDTO( id=workflow_execution.id, workflow_id=workflow_execution.workflow_id, @@ -711,5 +744,7 @@ def get_workflow_execution_status( for log in workflow_execution.logs ], results=workflow_execution.results, + event_id=event_id, + event_type=event_type, ) return workflow_execution_dto diff --git a/keep/providers/mysql_provider/mysql_provider.py b/keep/providers/mysql_provider/mysql_provider.py index e0d3f9d1c..10cbbbb8b 100644 --- a/keep/providers/mysql_provider/mysql_provider.py +++ b/keep/providers/mysql_provider/mysql_provider.py @@ -121,7 +121,11 @@ def _query( cursor.close() if single_row: - return results[0] + if results: + return results[0] + else: + self.logger.warning("No results found for query: %s", query) + raise ValueError(f"Query {query} returned no rows") return results diff --git a/keep/providers/openai_provider/openai_provider.py b/keep/providers/openai_provider/openai_provider.py index 261e618e5..32711ffd0 100644 --- a/keep/providers/openai_provider/openai_provider.py +++ b/keep/providers/openai_provider/openai_provider.py @@ -49,6 +49,14 @@ def validate_scopes(self) -> dict[str, bool | str]: return scopes def _query(self, prompt, model="gpt-3.5-turbo"): + # gpt3.5 turbo has a limit of 16k characters + if len(prompt) > 16000: + # let's try another model + self.logger.info( + "Prompt is too long for gpt-3.5-turbo, trying gpt-4o-2024-08-06" + ) + model = "gpt-4o-2024-08-06" + client = OpenAI( api_key=self.authentication_config.api_key, organization=self.authentication_config.organization_id, diff --git a/keep/providers/slack_provider/slack_provider.py b/keep/providers/slack_provider/slack_provider.py index 4e0040061..fe699db29 100644 --- a/keep/providers/slack_provider/slack_provider.py +++ b/keep/providers/slack_provider/slack_provider.py @@ -194,7 +194,7 @@ def _notify( method = "chat.postMessage" payload["thread_ts"] = thread_timestamp - if payload["attachments"]: + if payload.get("attachments", None): payload["attachments"] = attachments response = requests.post( f"{SlackProvider.SLACK_API}/{method}", diff --git a/keep/workflowmanager/workflowscheduler.py b/keep/workflowmanager/workflowscheduler.py index ee6754dd8..5f550b05f 100644 --- a/keep/workflowmanager/workflowscheduler.py +++ b/keep/workflowmanager/workflowscheduler.py @@ -42,6 +42,9 @@ def __init__(self, workflow_manager): self.workflows_to_run = [] self._stop = False self.lock = Lock() + self.interval_enabled = ( + config("WORKFLOWS_INTERVAL_ENABLED", default="true") == "true" + ) async def start(self): self.logger.info("Starting workflows scheduler") @@ -54,6 +57,11 @@ async def start(self): def _handle_interval_workflows(self): workflows = [] + + if not self.interval_enabled: + self.logger.debug("Interval workflows are disabled") + return + try: # get all workflows that should run due to interval workflows = get_workflows_that_should_run()