diff --git a/keep/api/core/db.py b/keep/api/core/db.py index 077305614..321f38500 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -42,6 +42,7 @@ from sqlmodel import Session, SQLModel, col, or_, select, text from keep.api.consts import STATIC_PRESETS +from keep.api.core.config import config from keep.api.core.db_utils import create_db_engine, get_json_extract_field from keep.api.core.dependencies import SINGLE_TENANT_UUID @@ -91,6 +92,7 @@ "affected_services", "assignee", ] +KEEP_AUDIT_EVENTS_ENABLED = config("KEEP_AUDIT_EVENTS_ENABLED", cast=bool, default=True) def dispose_session(): @@ -170,20 +172,21 @@ def create_workflow_execution( # Ensure the object has an id session.flush() execution_id = workflow_execution.id - if fingerprint and event_type == "alert": - workflow_to_alert_execution = WorkflowToAlertExecution( - workflow_execution_id=execution_id, - alert_fingerprint=fingerprint, - event_id=event_id, - ) - session.add(workflow_to_alert_execution) - elif event_type == "incident": - workflow_to_incident_execution = WorkflowToIncidentExecution( - workflow_execution_id=execution_id, - alert_fingerprint=fingerprint, - incident_id=event_id, - ) - session.add(workflow_to_incident_execution) + if KEEP_AUDIT_EVENTS_ENABLED: + if fingerprint and event_type == "alert": + workflow_to_alert_execution = WorkflowToAlertExecution( + workflow_execution_id=execution_id, + alert_fingerprint=fingerprint, + event_id=event_id, + ) + session.add(workflow_to_alert_execution) + elif event_type == "incident": + workflow_to_incident_execution = WorkflowToIncidentExecution( + workflow_execution_id=execution_id, + alert_fingerprint=fingerprint, + incident_id=event_id, + ) + session.add(workflow_to_incident_execution) session.commit() return execution_id @@ -484,7 +487,7 @@ def get_last_workflow_execution_by_workflow_id( session.query(WorkflowExecution) .filter(WorkflowExecution.workflow_id == workflow_id) .filter(WorkflowExecution.tenant_id == tenant_id) - .filter(WorkflowExecution.started >= datetime.now() - timedelta(days=7)) + .filter(WorkflowExecution.started >= datetime.now() - timedelta(days=1)) .filter(WorkflowExecution.status == "success") .order_by(WorkflowExecution.started.desc()) .first() @@ -650,15 +653,17 @@ def get_consumer_providers() -> List[Provider]: def finish_workflow_execution(tenant_id, workflow_id, execution_id, status, error): with Session(engine) as session: workflow_execution = session.exec( - select(WorkflowExecution) - .where(WorkflowExecution.tenant_id == tenant_id) - .where(WorkflowExecution.workflow_id == workflow_id) - .where(WorkflowExecution.id == execution_id) + select(WorkflowExecution).where(WorkflowExecution.id == execution_id) ).first() # some random number to avoid collisions if not workflow_execution: logger.warning( - f"Failed to finish workflow execution {execution_id} for workflow {workflow_id}. Execution not found." + f"Failed to finish workflow execution {execution_id} for workflow {workflow_id}. Execution not found.", + extra={ + "tenant_id": tenant_id, + "workflow_id": workflow_id, + "execution_id": execution_id, + }, ) raise ValueError("Execution not found") workflow_execution.is_running = random.randint(1, 2147483647 - 1) # max int @@ -1616,6 +1621,9 @@ def get_previous_execution_id(tenant_id, workflow_id, workflow_execution_id): .where(WorkflowExecution.tenant_id == tenant_id) .where(WorkflowExecution.workflow_id == workflow_id) .where(WorkflowExecution.id != workflow_execution_id) + .where( + WorkflowExecution.started >= datetime.now() - timedelta(days=1) + ) # no need to check more than 1 day ago .order_by(WorkflowExecution.started.desc()) .limit(1) ).first() @@ -2193,25 +2201,22 @@ def get_linked_providers(tenant_id: str) -> List[Tuple[str, str, datetime]]: LIMIT_BY_ALERTS = 10000 with Session(engine) as session: - alerts_subquery = select(Alert).filter( - Alert.tenant_id == tenant_id, - Alert.provider_type != "group" - ).limit(LIMIT_BY_ALERTS).subquery() + alerts_subquery = ( + select(Alert) + .filter(Alert.tenant_id == tenant_id, Alert.provider_type != "group") + .limit(LIMIT_BY_ALERTS) + .subquery() + ) providers = session.exec( select( alerts_subquery.c.provider_type, alerts_subquery.c.provider_id, - func.max(alerts_subquery.c.timestamp).label("last_alert_timestamp") + func.max(alerts_subquery.c.timestamp).label("last_alert_timestamp"), ) .select_from(alerts_subquery) - .filter( - ~exists().where(Provider.id == alerts_subquery.c.provider_id) - ) - .group_by( - alerts_subquery.c.provider_type, - alerts_subquery.c.provider_id - ) + .filter(~exists().where(Provider.id == alerts_subquery.c.provider_id)) + .group_by(alerts_subquery.c.provider_type, alerts_subquery.c.provider_id) ).all() return providers diff --git a/keep/api/models/db/migrations/versions/2024-12-17-12-48_3d20d954e058.py b/keep/api/models/db/migrations/versions/2024-12-17-12-48_3d20d954e058.py new file mode 100644 index 000000000..14faa149c --- /dev/null +++ b/keep/api/models/db/migrations/versions/2024-12-17-12-48_3d20d954e058.py @@ -0,0 +1,52 @@ +"""Add index to WorkflowExecution + +Revision ID: 3d20d954e058 +Revises: 55cc64020f6d +Create Date: 2024-12-17 12:48:04.713649 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "3d20d954e058" +down_revision = "55cc64020f6d" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("workflowexecution", schema=None) as batch_op: + batch_op.create_index( + "idx_workflowexecution_tenant_workflow_id_timestamp", + ["tenant_id", "workflow_id", sa.desc("started")], + unique=False, + ) + if op.get_bind().dialect.name == "mysql": + batch_op.create_index( + "idx_workflowexecution_workflow_tenant_started_status", + [ + "workflow_id", + "tenant_id", + sa.desc("started"), + sa.text("status(255)"), + ], + unique=False, + ) + else: + batch_op.create_index( + "idx_workflowexecution_workflow_tenant_started_status", + ["workflow_id", "tenant_id", sa.desc("started"), "status"], + unique=False, + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("workflowexecution", schema=None) as batch_op: + batch_op.drop_index("idx_workflowexecution_workflow_tenant_started_status") + batch_op.drop_index("idx_workflowexecution_tenant_workflow_id_timestamp") + # ### end Alembic commands ### diff --git a/keep/api/models/db/workflow.py b/keep/api/models/db/workflow.py index f243b51f0..a6b434e2c 100644 --- a/keep/api/models/db/workflow.py +++ b/keep/api/models/db/workflow.py @@ -1,7 +1,9 @@ +import os from datetime import datetime from typing import List, Optional -from sqlalchemy import TEXT +import sqlalchemy +from sqlalchemy import TEXT, Index from sqlmodel import JSON, Column, Field, Relationship, SQLModel, UniqueConstraint @@ -26,9 +28,37 @@ class Config: orm_mode = True +def get_status_column(): + backend = ( + sqlalchemy.engine.url.make_url( + os.environ.get("DATABASE_CONNECTION_STRING") + ).get_backend_name() + if os.environ.get("DATABASE_CONNECTION_STRING") + else None + ) + return ( + sqlalchemy.text("status(255)") + if backend == "mysql" + else sqlalchemy.text("status") + ) + + class WorkflowExecution(SQLModel, table=True): __table_args__ = ( UniqueConstraint("workflow_id", "execution_number", "is_running", "timeslot"), + Index( + "idx_workflowexecution_tenant_workflow_id_timestamp", + "tenant_id", + "workflow_id", + "started", + ), + Index( + "idx_workflowexecution_workflow_tenant_started_status", + "workflow_id", + "tenant_id", + "started", + get_status_column(), + ), ) id: str = Field(default=None, primary_key=True) diff --git a/keep/contextmanager/contextmanager.py b/keep/contextmanager/contextmanager.py index 2dad2563b..95b51cb8f 100644 --- a/keep/contextmanager/contextmanager.py +++ b/keep/contextmanager/contextmanager.py @@ -2,6 +2,7 @@ import logging import click +import json5 from pympler.asizeof import asizeof from keep.api.core.db import get_last_workflow_execution_by_workflow_id, get_session @@ -10,7 +11,13 @@ class ContextManager: - def __init__(self, tenant_id, workflow_id=None, workflow_execution_id=None): + def __init__( + self, + tenant_id, + workflow_id=None, + workflow_execution_id=None, + workflow: dict | None = None, + ): self.logger = logging.getLogger(__name__) self.logger_adapter = WorkflowLoggerAdapter( self.logger, self, tenant_id, workflow_id, workflow_execution_id @@ -37,16 +44,25 @@ def __init__(self, tenant_id, workflow_id=None, workflow_execution_id=None): # last workflow context self.last_workflow_execution_results = {} self.last_workflow_run_time = None - if self.workflow_id: + if self.workflow_id and workflow: try: - last_workflow_execution = get_last_workflow_execution_by_workflow_id( - tenant_id, workflow_id + # @tb: try to understand if the workflow tries to use last_workflow_results + # if so, we need to get the last workflow execution and load it into the context + workflow_str = json5.dumps(workflow) + last_workflow_results_in_workflow = ( + "last_workflow_results" in workflow_str ) - if last_workflow_execution is not None: - self.last_workflow_execution_results = ( - last_workflow_execution.results + if last_workflow_results_in_workflow: + last_workflow_execution = ( + get_last_workflow_execution_by_workflow_id( + tenant_id, workflow_id + ) ) - self.last_workflow_run_time = last_workflow_execution.started + if last_workflow_execution is not None: + self.last_workflow_execution_results = ( + last_workflow_execution.results + ) + self.last_workflow_run_time = last_workflow_execution.started except Exception: self.logger.exception("Failed to get last workflow execution") pass diff --git a/keep/parser/parser.py b/keep/parser/parser.py index 1fadd594f..d10e4d2ad 100644 --- a/keep/parser/parser.py +++ b/keep/parser/parser.py @@ -137,8 +137,7 @@ def _parse_workflow( self.logger.debug("Parsing workflow") workflow_id = self._get_workflow_id(tenant_id, workflow) context_manager = ContextManager( - tenant_id=tenant_id, - workflow_id=workflow_id, + tenant_id=tenant_id, workflow_id=workflow_id, workflow=workflow ) # Parse the providers (from the workflow yaml or from the providers directory) self._load_providers_config( diff --git a/keep/workflowmanager/workflowscheduler.py b/keep/workflowmanager/workflowscheduler.py index 5f550b05f..2fe05e1ed 100644 --- a/keep/workflowmanager/workflowscheduler.py +++ b/keep/workflowmanager/workflowscheduler.py @@ -17,6 +17,7 @@ from keep.api.core.db import get_workflow as get_workflow_db from keep.api.core.db import get_workflows_that_should_run from keep.api.models.alert import AlertDto, IncidentDto +from keep.api.utils.email_utils import KEEP_EMAILS_ENABLED, EmailTemplates, send_email from keep.providers.providers_factory import ProviderConfigurationException from keep.workflowmanager.workflow import Workflow, WorkflowStrategy from keep.workflowmanager.workflowstore import WorkflowStore @@ -588,42 +589,41 @@ def _finish_workflow_execution( status=status.value, error=error, ) - # get the previous workflow execution id - previous_execution = get_previous_execution_id( - tenant_id, workflow_id, workflow_execution_id - ) - # if error, send an email - if status == WorkflowStatus.ERROR and ( - previous_execution - is None # this means this is the first execution, for example - or previous_execution.status != WorkflowStatus.ERROR.value - ): - workflow = get_workflow_db(tenant_id=tenant_id, workflow_id=workflow_id) - try: - from keep.api.core.config import config - from keep.api.utils.email_utils import EmailTemplates, send_email - keep_platform_url = config( - "KEEP_PLATFORM_URL", default="https://platform.keephq.dev" - ) - error_logs_url = f"{keep_platform_url}/workflows/{workflow_id}/runs/{workflow_execution_id}" - self.logger.debug( - f"Sending email to {workflow.created_by} for failed workflow {workflow_id}" - ) - email_sent = send_email( - to_email=workflow.created_by, - template_id=EmailTemplates.WORKFLOW_RUN_FAILED, - workflow_id=workflow_id, - workflow_name=workflow.name, - workflow_execution_id=workflow_execution_id, - error=error, - url=error_logs_url, - ) - if email_sent: - self.logger.info( - f"Email sent to {workflow.created_by} for failed workflow {workflow_id}" + if KEEP_EMAILS_ENABLED: + # get the previous workflow execution id + previous_execution = get_previous_execution_id( + tenant_id, workflow_id, workflow_execution_id + ) + # if error, send an email + if status == WorkflowStatus.ERROR and ( + previous_execution + is None # this means this is the first execution, for example + or previous_execution.status != WorkflowStatus.ERROR.value + ): + workflow = get_workflow_db(tenant_id=tenant_id, workflow_id=workflow_id) + try: + keep_platform_url = config( + "KEEP_PLATFORM_URL", default="https://platform.keephq.dev" + ) + error_logs_url = f"{keep_platform_url}/workflows/{workflow_id}/runs/{workflow_execution_id}" + self.logger.debug( + f"Sending email to {workflow.created_by} for failed workflow {workflow_id}" + ) + email_sent = send_email( + to_email=workflow.created_by, + template_id=EmailTemplates.WORKFLOW_RUN_FAILED, + workflow_id=workflow_id, + workflow_name=workflow.name, + workflow_execution_id=workflow_execution_id, + error=error, + url=error_logs_url, + ) + if email_sent: + self.logger.info( + f"Email sent to {workflow.created_by} for failed workflow {workflow_id}" + ) + except Exception as e: + self.logger.error( + f"Failed to send email to {workflow.created_by} for failed workflow {workflow_id}: {e}" ) - except Exception as e: - self.logger.error( - f"Failed to send email to {workflow.created_by} for failed workflow {workflow_id}: {e}" - ) diff --git a/pyproject.toml b/pyproject.toml index 97541c9ec..8fc497f7d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "keep" -version = "0.32.0" +version = "0.32.1" description = "Alerting. for developers, by developers." authors = ["Keep Alerting LTD"] packages = [{include = "keep"}] diff --git a/tests/conftest.py b/tests/conftest.py index aeaaf101d..18422f869 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -157,7 +157,7 @@ def mysql_container(docker_ip, docker_services): @pytest.fixture -def db_session(request): +def db_session(request, monkeypatch): # Create a database connection os.environ["DB_ECHO"] = "true" if ( @@ -168,6 +168,24 @@ def db_session(request): ): db_type = request.param.get("db") db_connection_string = request.getfixturevalue(f"{db_type}_container") + monkeypatch.setenv("DATABASE_CONNECTION_STRING", db_connection_string) + t = SQLModel.metadata.tables["workflowexecution"] + curr_index = next( + ( + index + for index in t.indexes + if index.name == "idx_workflowexecution_workflow_tenant_started_status" + ) + ) + t.indexes.remove(curr_index) + status_index = Index( + "idx_workflowexecution_workflow_tenant_started_status", + "workflow_id", + "tenant_id", + "started", + sqlalchemy.text("status(255)"), + ) + t.append_constraint(status_index) mock_engine = create_engine(db_connection_string) # sqlite else: