Skip to content

Commit

Permalink
chore: opt-out for workflow email (#2851)
Browse files Browse the repository at this point in the history
  • Loading branch information
talboren authored Dec 17, 2024
1 parent d822d7d commit d41adfe
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 82 deletions.
69 changes: 37 additions & 32 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from sqlmodel import Session, SQLModel, col, or_, select, text

from keep.api.consts import STATIC_PRESETS
from keep.api.core.config import config
from keep.api.core.db_utils import create_db_engine, get_json_extract_field
from keep.api.core.dependencies import SINGLE_TENANT_UUID

Expand Down Expand Up @@ -91,6 +92,7 @@
"affected_services",
"assignee",
]
KEEP_AUDIT_EVENTS_ENABLED = config("KEEP_AUDIT_EVENTS_ENABLED", cast=bool, default=True)


def dispose_session():
Expand Down Expand Up @@ -170,20 +172,21 @@ def create_workflow_execution(
# Ensure the object has an id
session.flush()
execution_id = workflow_execution.id
if fingerprint and event_type == "alert":
workflow_to_alert_execution = WorkflowToAlertExecution(
workflow_execution_id=execution_id,
alert_fingerprint=fingerprint,
event_id=event_id,
)
session.add(workflow_to_alert_execution)
elif event_type == "incident":
workflow_to_incident_execution = WorkflowToIncidentExecution(
workflow_execution_id=execution_id,
alert_fingerprint=fingerprint,
incident_id=event_id,
)
session.add(workflow_to_incident_execution)
if KEEP_AUDIT_EVENTS_ENABLED:
if fingerprint and event_type == "alert":
workflow_to_alert_execution = WorkflowToAlertExecution(
workflow_execution_id=execution_id,
alert_fingerprint=fingerprint,
event_id=event_id,
)
session.add(workflow_to_alert_execution)
elif event_type == "incident":
workflow_to_incident_execution = WorkflowToIncidentExecution(
workflow_execution_id=execution_id,
alert_fingerprint=fingerprint,
incident_id=event_id,
)
session.add(workflow_to_incident_execution)

session.commit()
return execution_id
Expand Down Expand Up @@ -484,7 +487,7 @@ def get_last_workflow_execution_by_workflow_id(
session.query(WorkflowExecution)
.filter(WorkflowExecution.workflow_id == workflow_id)
.filter(WorkflowExecution.tenant_id == tenant_id)
.filter(WorkflowExecution.started >= datetime.now() - timedelta(days=7))
.filter(WorkflowExecution.started >= datetime.now() - timedelta(days=1))
.filter(WorkflowExecution.status == "success")
.order_by(WorkflowExecution.started.desc())
.first()
Expand Down Expand Up @@ -650,15 +653,17 @@ def get_consumer_providers() -> List[Provider]:
def finish_workflow_execution(tenant_id, workflow_id, execution_id, status, error):
with Session(engine) as session:
workflow_execution = session.exec(
select(WorkflowExecution)
.where(WorkflowExecution.tenant_id == tenant_id)
.where(WorkflowExecution.workflow_id == workflow_id)
.where(WorkflowExecution.id == execution_id)
select(WorkflowExecution).where(WorkflowExecution.id == execution_id)
).first()
# some random number to avoid collisions
if not workflow_execution:
logger.warning(
f"Failed to finish workflow execution {execution_id} for workflow {workflow_id}. Execution not found."
f"Failed to finish workflow execution {execution_id} for workflow {workflow_id}. Execution not found.",
extra={
"tenant_id": tenant_id,
"workflow_id": workflow_id,
"execution_id": execution_id,
},
)
raise ValueError("Execution not found")
workflow_execution.is_running = random.randint(1, 2147483647 - 1) # max int
Expand Down Expand Up @@ -1616,6 +1621,9 @@ def get_previous_execution_id(tenant_id, workflow_id, workflow_execution_id):
.where(WorkflowExecution.tenant_id == tenant_id)
.where(WorkflowExecution.workflow_id == workflow_id)
.where(WorkflowExecution.id != workflow_execution_id)
.where(
WorkflowExecution.started >= datetime.now() - timedelta(days=1)
) # no need to check more than 1 day ago
.order_by(WorkflowExecution.started.desc())
.limit(1)
).first()
Expand Down Expand Up @@ -2193,25 +2201,22 @@ def get_linked_providers(tenant_id: str) -> List[Tuple[str, str, datetime]]:
LIMIT_BY_ALERTS = 10000

with Session(engine) as session:
alerts_subquery = select(Alert).filter(
Alert.tenant_id == tenant_id,
Alert.provider_type != "group"
).limit(LIMIT_BY_ALERTS).subquery()
alerts_subquery = (
select(Alert)
.filter(Alert.tenant_id == tenant_id, Alert.provider_type != "group")
.limit(LIMIT_BY_ALERTS)
.subquery()
)

providers = session.exec(
select(
alerts_subquery.c.provider_type,
alerts_subquery.c.provider_id,
func.max(alerts_subquery.c.timestamp).label("last_alert_timestamp")
func.max(alerts_subquery.c.timestamp).label("last_alert_timestamp"),
)
.select_from(alerts_subquery)
.filter(
~exists().where(Provider.id == alerts_subquery.c.provider_id)
)
.group_by(
alerts_subquery.c.provider_type,
alerts_subquery.c.provider_id
)
.filter(~exists().where(Provider.id == alerts_subquery.c.provider_id))
.group_by(alerts_subquery.c.provider_type, alerts_subquery.c.provider_id)
).all()

return providers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Add index to WorkflowExecution
Revision ID: 3d20d954e058
Revises: 55cc64020f6d
Create Date: 2024-12-17 12:48:04.713649
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "3d20d954e058"
down_revision = "55cc64020f6d"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("workflowexecution", schema=None) as batch_op:
batch_op.create_index(
"idx_workflowexecution_tenant_workflow_id_timestamp",
["tenant_id", "workflow_id", sa.desc("started")],
unique=False,
)
if op.get_bind().dialect.name == "mysql":
batch_op.create_index(
"idx_workflowexecution_workflow_tenant_started_status",
[
"workflow_id",
"tenant_id",
sa.desc("started"),
sa.text("status(255)"),
],
unique=False,
)
else:
batch_op.create_index(
"idx_workflowexecution_workflow_tenant_started_status",
["workflow_id", "tenant_id", sa.desc("started"), "status"],
unique=False,
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("workflowexecution", schema=None) as batch_op:
batch_op.drop_index("idx_workflowexecution_workflow_tenant_started_status")
batch_op.drop_index("idx_workflowexecution_tenant_workflow_id_timestamp")
# ### end Alembic commands ###
32 changes: 31 additions & 1 deletion keep/api/models/db/workflow.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os
from datetime import datetime
from typing import List, Optional

from sqlalchemy import TEXT
import sqlalchemy
from sqlalchemy import TEXT, Index
from sqlmodel import JSON, Column, Field, Relationship, SQLModel, UniqueConstraint


Expand All @@ -26,9 +28,37 @@ class Config:
orm_mode = True


def get_status_column():
backend = (
sqlalchemy.engine.url.make_url(
os.environ.get("DATABASE_CONNECTION_STRING")
).get_backend_name()
if os.environ.get("DATABASE_CONNECTION_STRING")
else None
)
return (
sqlalchemy.text("status(255)")
if backend == "mysql"
else sqlalchemy.text("status")
)


class WorkflowExecution(SQLModel, table=True):
__table_args__ = (
UniqueConstraint("workflow_id", "execution_number", "is_running", "timeslot"),
Index(
"idx_workflowexecution_tenant_workflow_id_timestamp",
"tenant_id",
"workflow_id",
"started",
),
Index(
"idx_workflowexecution_workflow_tenant_started_status",
"workflow_id",
"tenant_id",
"started",
get_status_column(),
),
)

id: str = Field(default=None, primary_key=True)
Expand Down
32 changes: 24 additions & 8 deletions keep/contextmanager/contextmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging

import click
import json5
from pympler.asizeof import asizeof

from keep.api.core.db import get_last_workflow_execution_by_workflow_id, get_session
Expand All @@ -10,7 +11,13 @@


class ContextManager:
def __init__(self, tenant_id, workflow_id=None, workflow_execution_id=None):
def __init__(
self,
tenant_id,
workflow_id=None,
workflow_execution_id=None,
workflow: dict | None = None,
):
self.logger = logging.getLogger(__name__)
self.logger_adapter = WorkflowLoggerAdapter(
self.logger, self, tenant_id, workflow_id, workflow_execution_id
Expand All @@ -37,16 +44,25 @@ def __init__(self, tenant_id, workflow_id=None, workflow_execution_id=None):
# last workflow context
self.last_workflow_execution_results = {}
self.last_workflow_run_time = None
if self.workflow_id:
if self.workflow_id and workflow:
try:
last_workflow_execution = get_last_workflow_execution_by_workflow_id(
tenant_id, workflow_id
# @tb: try to understand if the workflow tries to use last_workflow_results
# if so, we need to get the last workflow execution and load it into the context
workflow_str = json5.dumps(workflow)
last_workflow_results_in_workflow = (
"last_workflow_results" in workflow_str
)
if last_workflow_execution is not None:
self.last_workflow_execution_results = (
last_workflow_execution.results
if last_workflow_results_in_workflow:
last_workflow_execution = (
get_last_workflow_execution_by_workflow_id(
tenant_id, workflow_id
)
)
self.last_workflow_run_time = last_workflow_execution.started
if last_workflow_execution is not None:
self.last_workflow_execution_results = (
last_workflow_execution.results
)
self.last_workflow_run_time = last_workflow_execution.started
except Exception:
self.logger.exception("Failed to get last workflow execution")
pass
Expand Down
3 changes: 1 addition & 2 deletions keep/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ def _parse_workflow(
self.logger.debug("Parsing workflow")
workflow_id = self._get_workflow_id(tenant_id, workflow)
context_manager = ContextManager(
tenant_id=tenant_id,
workflow_id=workflow_id,
tenant_id=tenant_id, workflow_id=workflow_id, workflow=workflow
)
# Parse the providers (from the workflow yaml or from the providers directory)
self._load_providers_config(
Expand Down
74 changes: 37 additions & 37 deletions keep/workflowmanager/workflowscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from keep.api.core.db import get_workflow as get_workflow_db
from keep.api.core.db import get_workflows_that_should_run
from keep.api.models.alert import AlertDto, IncidentDto
from keep.api.utils.email_utils import KEEP_EMAILS_ENABLED, EmailTemplates, send_email
from keep.providers.providers_factory import ProviderConfigurationException
from keep.workflowmanager.workflow import Workflow, WorkflowStrategy
from keep.workflowmanager.workflowstore import WorkflowStore
Expand Down Expand Up @@ -588,42 +589,41 @@ def _finish_workflow_execution(
status=status.value,
error=error,
)
# get the previous workflow execution id
previous_execution = get_previous_execution_id(
tenant_id, workflow_id, workflow_execution_id
)
# if error, send an email
if status == WorkflowStatus.ERROR and (
previous_execution
is None # this means this is the first execution, for example
or previous_execution.status != WorkflowStatus.ERROR.value
):
workflow = get_workflow_db(tenant_id=tenant_id, workflow_id=workflow_id)
try:
from keep.api.core.config import config
from keep.api.utils.email_utils import EmailTemplates, send_email

keep_platform_url = config(
"KEEP_PLATFORM_URL", default="https://platform.keephq.dev"
)
error_logs_url = f"{keep_platform_url}/workflows/{workflow_id}/runs/{workflow_execution_id}"
self.logger.debug(
f"Sending email to {workflow.created_by} for failed workflow {workflow_id}"
)
email_sent = send_email(
to_email=workflow.created_by,
template_id=EmailTemplates.WORKFLOW_RUN_FAILED,
workflow_id=workflow_id,
workflow_name=workflow.name,
workflow_execution_id=workflow_execution_id,
error=error,
url=error_logs_url,
)
if email_sent:
self.logger.info(
f"Email sent to {workflow.created_by} for failed workflow {workflow_id}"
if KEEP_EMAILS_ENABLED:
# get the previous workflow execution id
previous_execution = get_previous_execution_id(
tenant_id, workflow_id, workflow_execution_id
)
# if error, send an email
if status == WorkflowStatus.ERROR and (
previous_execution
is None # this means this is the first execution, for example
or previous_execution.status != WorkflowStatus.ERROR.value
):
workflow = get_workflow_db(tenant_id=tenant_id, workflow_id=workflow_id)
try:
keep_platform_url = config(
"KEEP_PLATFORM_URL", default="https://platform.keephq.dev"
)
error_logs_url = f"{keep_platform_url}/workflows/{workflow_id}/runs/{workflow_execution_id}"
self.logger.debug(
f"Sending email to {workflow.created_by} for failed workflow {workflow_id}"
)
email_sent = send_email(
to_email=workflow.created_by,
template_id=EmailTemplates.WORKFLOW_RUN_FAILED,
workflow_id=workflow_id,
workflow_name=workflow.name,
workflow_execution_id=workflow_execution_id,
error=error,
url=error_logs_url,
)
if email_sent:
self.logger.info(
f"Email sent to {workflow.created_by} for failed workflow {workflow_id}"
)
except Exception as e:
self.logger.error(
f"Failed to send email to {workflow.created_by} for failed workflow {workflow_id}: {e}"
)
except Exception as e:
self.logger.error(
f"Failed to send email to {workflow.created_by} for failed workflow {workflow_id}: {e}"
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "keep"
version = "0.32.0"
version = "0.32.1"
description = "Alerting. for developers, by developers."
authors = ["Keep Alerting LTD"]
packages = [{include = "keep"}]
Expand Down
Loading

0 comments on commit d41adfe

Please sign in to comment.