Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Workflow refactoring, only async #2878

Open
wants to merge 75 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 65 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
48b60c8
feat: Workflow refactoring, only async
Matvey-Kuk Dec 22, 2024
317c9d3
Fix
Matvey-Kuk Dec 22, 2024
c363c08
Fix imports
Matvey-Kuk Dec 22, 2024
1068568
Fix
Matvey-Kuk Dec 22, 2024
8a251b5
Async connection string
Matvey-Kuk Dec 22, 2024
fb56fdd
aiosqlite
Matvey-Kuk Dec 22, 2024
779b69b
poetry.lock
Matvey-Kuk Dec 22, 2024
a2b8de2
Fix docs
Matvey-Kuk Dec 22, 2024
f19aa04
fix
Matvey-Kuk Dec 22, 2024
6ea1a77
Fix docs
Matvey-Kuk Dec 22, 2024
8e2701e
fix tests
Matvey-Kuk Dec 22, 2024
995d1e9
Tests
Matvey-Kuk Dec 22, 2024
dcd7216
So...
Matvey-Kuk Dec 24, 2024
fafba1a
Merge branch 'main' into Matvey-Kuk/workflow-refactoring-v2
Matvey-Kuk Dec 24, 2024
ce6f807
Fix tests
Matvey-Kuk Dec 24, 2024
be41f41
Merge branch 'main' into Matvey-Kuk/workflow-refactoring-v2
Matvey-Kuk Dec 24, 2024
077fb52
asyncio_default_fixture_loop_scope
Matvey-Kuk Dec 24, 2024
84028ed
Fix
Matvey-Kuk Dec 24, 2024
f210edf
Merge branch 'main' into Matvey-Kuk/workflow-refactoring-v2
Matvey-Kuk Dec 24, 2024
5810e9c
Session...
Matvey-Kuk Dec 24, 2024
e1bac08
Fix
Matvey-Kuk Dec 24, 2024
956d5f8
@pytest.mark.asyncio
Matvey-Kuk Dec 24, 2024
3db5d51
function
Matvey-Kuk Dec 24, 2024
881e5cf
mock_session.db_session = db_session
Matvey-Kuk Dec 24, 2024
87ad9f6
Unbelievable
Matvey-Kuk Dec 24, 2024
da66c68
fix more
Matvey-Kuk Dec 24, 2024
f82b86b
More fixes.
Matvey-Kuk Dec 24, 2024
8be0a68
All???!!
Matvey-Kuk Dec 24, 2024
aeef731
FIx.
Matvey-Kuk Dec 24, 2024
0f9f092
Fix
Matvey-Kuk Dec 24, 2024
0811007
Fix?
Matvey-Kuk Dec 24, 2024
18c297b
Fix?
Matvey-Kuk Dec 24, 2024
182933b
Fix
Matvey-Kuk Dec 25, 2024
a7e7dca
Please?
Matvey-Kuk Dec 25, 2024
294da80
Fix?
Matvey-Kuk Dec 25, 2024
8629ed6
Fix?
Matvey-Kuk Dec 25, 2024
7e9f104
Fix
Matvey-Kuk Dec 25, 2024
be5e818
No "api-ref/root",
Matvey-Kuk Dec 25, 2024
a3adc06
Skip?
Matvey-Kuk Dec 25, 2024
fd74522
More timeout?
Matvey-Kuk Dec 25, 2024
361465e
fix
Matvey-Kuk Dec 25, 2024
2b66a89
Fix
Matvey-Kuk Dec 25, 2024
0f38b77
60 sec?
Matvey-Kuk Dec 25, 2024
9528c9f
Meow
Matvey-Kuk Dec 25, 2024
33b89a5
Fix
Matvey-Kuk Dec 25, 2024
7edae22
Un-skip
Matvey-Kuk Dec 25, 2024
7c0a54a
Fix
Matvey-Kuk Dec 25, 2024
8a63919
Fix?
Matvey-Kuk Dec 25, 2024
aa67e52
Fix?
Matvey-Kuk Dec 25, 2024
06e3fdb
Fix
Matvey-Kuk Dec 25, 2024
177800d
Docs
Matvey-Kuk Dec 25, 2024
3331e14
Lock
Matvey-Kuk Dec 25, 2024
802e22c
Fix
Matvey-Kuk Dec 25, 2024
1db3309
Fix
Matvey-Kuk Dec 25, 2024
ae82b42
Fix
Matvey-Kuk Dec 25, 2024
e3c8dff
fix
Matvey-Kuk Dec 25, 2024
9655032
Fix
Matvey-Kuk Dec 25, 2024
e278dbb
lock
Matvey-Kuk Dec 25, 2024
e4e00cf
Fix
Matvey-Kuk Dec 25, 2024
c1bb9bb
Fix
Matvey-Kuk Dec 25, 2024
57edee7
Sync back
Matvey-Kuk Dec 25, 2024
80fe298
Extra import
Matvey-Kuk Dec 25, 2024
7a819e1
Vladimir's fix
Matvey-Kuk Dec 25, 2024
8522df9
Merge branch 'main' into Matvey-Kuk/workflow-refactoring-v2
Matvey-Kuk Dec 25, 2024
2663341
Fix
Matvey-Kuk Dec 25, 2024
42abec3
poetry lock
Matvey-Kuk Dec 25, 2024
7efd63d
Fix
Matvey-Kuk Dec 26, 2024
78ec00b
Polishing
Matvey-Kuk Dec 26, 2024
d42a1ee
Fix
Matvey-Kuk Dec 26, 2024
1c1da2f
Merge branch 'main' into Matvey-Kuk/workflow-refactoring-v2
Matvey-Kuk Dec 26, 2024
62f1ad9
Merge branch 'main' into Matvey-Kuk/workflow-refactoring-v2
Matvey-Kuk Dec 30, 2024
efbbb0e
Fix
Matvey-Kuk Dec 30, 2024
5e5bd2c
Fix
Matvey-Kuk Dec 30, 2024
0554ab8
Fix
Matvey-Kuk Dec 30, 2024
daca2e9
Fix
Matvey-Kuk Dec 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:
LOG_LEVEL: DEBUG
SQLALCHEMY_WARN_20: 1
run: |
poetry run coverage run --branch -m pytest --timeout 20 -n auto --non-integration --ignore=tests/e2e_tests/
poetry run coverage run --branch -m pytest -n auto --non-integration --ignore=tests/e2e_tests/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving timeout to the pyproject.toml

- name: Run integration tests and report coverage
run: |
Expand Down
1 change: 1 addition & 0 deletions docs/mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
"providers/documentation/checkmk-provider",
"providers/documentation/cilium-provider",
"providers/documentation/clickhouse-provider",
"providers/documentation/clickhouse-http-provider",
"providers/documentation/cloudwatch-provider",
"providers/documentation/console-provider",
"providers/documentation/coralogix-provider",
Expand Down
7 changes: 7 additions & 0 deletions docs/providers/documentation/clickhouse-http-provider.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
title: 'ClickHouse HTTP'
sidebarTitle: 'ClickHouse HTTP Provider'
description: 'ClickHouse HTTP provider allows you to interact with ClickHouse database.'
---

This provider is an async (more performant) analog of [clickhouse-provider](providers/documentation/clickhouse-provider.mdx). It's using HTTP protocol to interact to the Clickhouse.
6 changes: 6 additions & 0 deletions docs/providers/overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ By leveraging Keep Providers, users are able to deeply integrate Keep with the t
}
></Card>

<Card
title="ClickHouse HTTP"
href="/providers/documentation/clickhouse-http-provider"
icon={ <img src="https://img.logo.dev/clickhouse.com?token=pk_dfXfZBoKQMGDTIgqu7LvYg" /> }
></Card>

<Card
title="ClickHouse"
href="/providers/documentation/clickhouse-provider"
Expand Down
4 changes: 3 additions & 1 deletion keep/api/bl/incidents_bl.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
import os
import pathlib
Expand Down Expand Up @@ -153,7 +154,7 @@ def update_client_on_incident_change(self, incident_id: Optional[UUID] = None):
def send_workflow_event(self, incident_dto: IncidentDto, action: str) -> None:
try:
workflow_manager = WorkflowManager.get_instance()
workflow_manager.insert_incident(self.tenant_id, incident_dto, action)
asyncio.run(workflow_manager.insert_incident(self.tenant_id, incident_dto, action))
except Exception:
self.logger.exception(
"Failed to run workflows based on incident",
Expand Down Expand Up @@ -231,6 +232,7 @@ def delete_incident(self, incident_id: UUID) -> None:
self.update_client_on_incident_change()
self.send_workflow_event(incident_dto, "deleted")


def update_incident(
self,
incident_id: UUID,
Expand Down
159 changes: 86 additions & 73 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import joinedload, selectinload, subqueryload
from sqlalchemy.sql import exists, expression
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel import Session, SQLModel, col, or_, select, text

from keep.api.consts import STATIC_PRESETS
Expand Down Expand Up @@ -82,6 +83,8 @@


engine = create_db_engine()
engine_async = create_db_engine(_async=True)

SQLAlchemyInstrumentor().instrument(enable_commenter=True, engine=engine)


Expand Down Expand Up @@ -145,7 +148,7 @@ def __convert_to_uuid(value: str) -> UUID | None:
return None


def create_workflow_execution(
async def create_workflow_execution(
workflow_id: str,
tenant_id: str,
triggered_by: str,
Expand All @@ -155,7 +158,7 @@ def create_workflow_execution(
execution_id: str = None,
event_type: str = "alert",
) -> str:
with Session(engine) as session:
async with AsyncSession(engine_async) as session:
try:
if len(triggered_by) > 255:
triggered_by = triggered_by[:255]
Expand All @@ -170,7 +173,7 @@ def create_workflow_execution(
)
session.add(workflow_execution)
# Ensure the object has an id
session.flush()
await session.flush()
execution_id = workflow_execution.id
if KEEP_AUDIT_EVENTS_ENABLED:
if fingerprint and event_type == "alert":
Expand All @@ -188,10 +191,10 @@ def create_workflow_execution(
)
session.add(workflow_to_incident_execution)

session.commit()
await session.commit()
return execution_id
except IntegrityError:
session.rollback()
await session.rollback()
logger.debug(
f"Failed to create a new execution for workflow {workflow_id}. Constraint is met."
)
Expand Down Expand Up @@ -226,12 +229,12 @@ def get_last_completed_execution(
).first()


def get_workflows_that_should_run():
with Session(engine) as session:
async def get_workflows_that_should_run():
async with AsyncSession(engine_async) as session:
logger.debug("Checking for workflows that should run")
workflows_with_interval = []
try:
result = session.exec(
result = await session.exec(
select(Workflow)
.filter(Workflow.is_deleted == False)
.filter(Workflow.is_disabled == False)
Expand All @@ -252,7 +255,7 @@ def get_workflows_that_should_run():
if not last_execution:
try:
# try to get the lock
workflow_execution_id = create_workflow_execution(
workflow_execution_id = await create_workflow_execution(
workflow.id, workflow.tenant_id, "scheduler"
)
# we succeed to get the lock on this execution number :)
Expand All @@ -274,7 +277,7 @@ def get_workflows_that_should_run():
):
try:
# try to get the lock with execution_number + 1
workflow_execution_id = create_workflow_execution(
workflow_execution_id = await create_workflow_execution(
workflow.id,
workflow.tenant_id,
"scheduler",
Expand All @@ -294,10 +297,10 @@ def get_workflows_that_should_run():
# some other thread/instance has already started to work on it
except IntegrityError:
# we need to verify the locking is still valid and not timeouted
session.rollback()
await session.rollback()
pass
# get the ongoing execution
ongoing_execution = session.exec(
ongoing_execution = await session.exec(
select(WorkflowExecution)
.where(WorkflowExecution.workflow_id == workflow.id)
.where(
Expand All @@ -319,10 +322,10 @@ def get_workflows_that_should_run():
# if the ongoing execution runs more than 60 minutes, than its timeout
elif ongoing_execution.started + timedelta(minutes=60) <= current_time:
ongoing_execution.status = "timeout"
session.commit()
await session.commit()
# re-create the execution and try to get the lock
try:
workflow_execution_id = create_workflow_execution(
workflow_execution_id = await create_workflow_execution(
workflow.id,
workflow.tenant_id,
"scheduler",
Expand Down Expand Up @@ -479,25 +482,29 @@ def get_last_workflow_workflow_to_alert_executions(
return latest_workflow_to_alert_executions


def get_last_workflow_execution_by_workflow_id(
async def get_last_workflow_execution_by_workflow_id(
tenant_id: str, workflow_id: str, status: str = None
) -> Optional[WorkflowExecution]:
with Session(engine) as session:
query = (
session.query(WorkflowExecution)
.filter(WorkflowExecution.workflow_id == workflow_id)
.filter(WorkflowExecution.tenant_id == tenant_id)
.filter(WorkflowExecution.started >= datetime.now() - timedelta(days=1))
.order_by(WorkflowExecution.started.desc())
async with AsyncSession(engine_async) as session:
await session.flush()
q = select(WorkflowExecution).filter(
Matvey-Kuk marked this conversation as resolved.
Show resolved Hide resolved
WorkflowExecution.workflow_id == workflow_id
).filter(WorkflowExecution.tenant_id == tenant_id).filter(
WorkflowExecution.started >= datetime.now() - timedelta(days=1)
).order_by(
WorkflowExecution.started.desc()
)

if status:
query = query.filter(WorkflowExecution.status == status)
if status is not None:
q = q.filter(WorkflowExecution.status == status)

workflow_execution = query.first()
workflow_execution = (
(await session.exec(q)).first()
)
return workflow_execution



def get_workflows_with_last_execution(tenant_id: str) -> List[dict]:
with Session(engine) as session:
latest_execution_cte = (
Expand Down Expand Up @@ -582,30 +589,32 @@ def get_all_workflows_yamls(tenant_id: str) -> List[str]:
return workflows


def get_workflow(tenant_id: str, workflow_id: str) -> Workflow:
with Session(engine) as session:
async def get_workflow(tenant_id: str, workflow_id: str) -> Workflow:
async with AsyncSession(engine_async) as session:
# if the workflow id is uuid:
if validators.uuid(workflow_id):
workflow = session.exec(
workflow = await session.exec(
select(Workflow)
.where(Workflow.tenant_id == tenant_id)
.where(Workflow.id == workflow_id)
.where(Workflow.is_deleted == False)
).first()
)
workflow = workflow.first()
else:
workflow = session.exec(
workflow = await session.exec(
select(Workflow)
.where(Workflow.tenant_id == tenant_id)
.where(Workflow.name == workflow_id)
.where(Workflow.is_deleted == False)
).first()
)
workflow = workflow.first()
if not workflow:
return None
return workflow


def get_raw_workflow(tenant_id: str, workflow_id: str) -> str:
workflow = get_workflow(tenant_id, workflow_id)
async def get_raw_workflow(tenant_id: str, workflow_id: str) -> str:
workflow = await get_workflow(tenant_id, workflow_id)
if not workflow:
return None
return workflow.workflow_raw
Expand Down Expand Up @@ -653,33 +662,36 @@ def get_consumer_providers() -> List[Provider]:
return providers


def finish_workflow_execution(tenant_id, workflow_id, execution_id, status, error):
with Session(engine) as session:
workflow_execution = session.exec(
async def finish_workflow_execution(tenant_id, workflow_id, execution_id, status, error):
async with AsyncSession(engine_async) as session:
random_number = random.randint(1, 2147483647 - 1) # max int

workflow_execution_old = (await session.exec(
select(WorkflowExecution).where(WorkflowExecution.id == execution_id)
).first()
# some random number to avoid collisions
if not workflow_execution:
)).first()

# Perform the update query
result = await session.exec(
update(WorkflowExecution)
.where(WorkflowExecution.id == execution_id)
.values(
is_running=random_number,
status=status,
error=error[:255] if error else None,
execution_time=(datetime.utcnow() - workflow_execution_old.started).total_seconds()
)
)

# Check if the update affected any rows
if result.rowcount == 0:
logger.warning(
f"Failed to finish workflow execution {execution_id} for workflow {workflow_id}. Execution not found.",
extra={
"tenant_id": tenant_id,
"workflow_id": workflow_id,
"execution_id": execution_id,
},
f"Failed to finish workflow execution {execution_id} for workflow {workflow_id}. Execution not found."
)
raise ValueError("Execution not found")
workflow_execution.is_running = random.randint(1, 2147483647 - 1) # max int
workflow_execution.status = status
# TODO: we had a bug with the error field, it was too short so some customers may fail over it.
# we need to fix it in the future, create a migration that increases the size of the error field
# and then we can remove the [:511] from here
workflow_execution.error = error[:511] if error else None
workflow_execution.execution_time = (
datetime.utcnow() - workflow_execution.started
).total_seconds()
# TODO: logs
session.commit()

# Commit the transaction
await session.commit()
await session.flush()


def get_workflow_executions(
Expand Down Expand Up @@ -787,14 +799,14 @@ def delete_workflow_by_provisioned_file(tenant_id, provisioned_file):
session.commit()


def get_workflow_id(tenant_id, workflow_name):
with Session(engine) as session:
workflow = session.exec(
async def get_workflow_id(tenant_id, workflow_name):
async with AsyncSession(engine_async) as session:
workflow = (await session.exec(
select(Workflow)
.where(Workflow.tenant_id == tenant_id)
.where(Workflow.name == workflow_name)
.where(Workflow.is_deleted == False)
).first()
)).first()

if workflow:
return workflow.id
Expand Down Expand Up @@ -1606,16 +1618,16 @@ def update_user_role(tenant_id, username, role):
return user


def save_workflow_results(tenant_id, workflow_execution_id, workflow_results):
with Session(engine) as session:
workflow_execution = session.exec(
select(WorkflowExecution)
async def save_workflow_results(tenant_id, workflow_execution_id, workflow_results):
async with AsyncSession(engine_async) as session:
await session.exec(
update(WorkflowExecution)
.where(WorkflowExecution.tenant_id == tenant_id)
.where(WorkflowExecution.id == workflow_execution_id)
).one()

workflow_execution.results = workflow_results
session.commit()
.values(results=workflow_results)
)
await session.commit()
await session.flush()


def get_workflow_by_name(tenant_id, workflow_name):
Expand All @@ -1629,10 +1641,10 @@ def get_workflow_by_name(tenant_id, workflow_name):

return workflow


def get_previous_execution_id(tenant_id, workflow_id, workflow_execution_id):
with Session(engine) as session:
previous_execution = session.exec(
async def get_previous_execution_id(tenant_id, workflow_id, workflow_execution_id):
async with AsyncSession(engine_async) as session:
previous_execution = (await session.exec(
select(WorkflowExecution)
.where(WorkflowExecution.tenant_id == tenant_id)
.where(WorkflowExecution.workflow_id == workflow_id)
Expand All @@ -1642,13 +1654,14 @@ def get_previous_execution_id(tenant_id, workflow_id, workflow_execution_id):
) # no need to check more than 1 day ago
.order_by(WorkflowExecution.started.desc())
.limit(1)
).first()
)).first()
if previous_execution:
return previous_execution
else:
return None



def create_rule(
tenant_id,
name,
Expand Down
Loading
Loading