Skip to content

Commit

Permalink
feat: async
Browse files Browse the repository at this point in the history
  • Loading branch information
shahargl committed Feb 8, 2024
1 parent ade3ebc commit 65ad092
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 88 deletions.
131 changes: 46 additions & 85 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,32 @@ def __get_conn_impersonate() -> pymysql.connections.Connection:
"mysql+pymysql://",
creator=__get_conn,
)
async_engine = create_async_engine(
"mysql+aiomysql://",
creator=__get_conn,
)
elif db_connection_string == "impersonate":
engine = create_engine(
"mysql+pymysql://",
creator=__get_conn_impersonate,
)
async_engine = create_async_engine(
"mysql+aiomysql://",
creator=__get_conn_impersonate,
)
elif db_connection_string:
db_connection_string_async = db_connection_string.replace(
"sqlite", "sqlite+aiosqlite"
)
"sqlite", "sqlite+aiosqlite", 1
).replace("pymysql", "aiomysql", 1)
engine = create_engine(db_connection_string)
async_engine = create_async_engine(db_connection_string_async)
else:
engine = create_engine(
"sqlite:///./keep.db", connect_args={"check_same_thread": False}
)
async_engine = create_async_engine(
"sqlite+aiosqlite:///./keep.db", connect_args={"check_same_thread": False}
)

SQLAlchemyInstrumentor().instrument(enable_commenter=True, engine=engine)

Expand Down Expand Up @@ -1041,102 +1052,52 @@ def delete_rule(tenant_id, rule_id):
return False


# def assign_alert_to_group(tenant_id, alert_id, rule_id, group_fingerprint) -> Group:
# # checks if group with the group critiria exists, if not it creates it
# # and then assign the alert to the group
# tracer = trace.get_tracer(__name__)
# with Session(engine, expire_on_commit=False) as session:
# group = session.exec(
# select(Group)
# .options(selectinload(Group.alerts))
# .where(Group.tenant_id == tenant_id)
# .where(Group.rule_id == rule_id)
# .where(Group.group_fingerprint == group_fingerprint)
# ).first()

# if not group:
# # Create and add a new group if it doesn't exist
# group = Group(
# tenant_id=tenant_id,
# rule_id=rule_id,
# group_fingerprint=group_fingerprint,
# )
# session.add(group)
# session.commit()
# # Re-query the group with selectinload to set up future automatic loading of alerts
# group = session.exec(
# select(Group)
# .options(selectinload(Group.alerts))
# .where(Group.id == group.id)
# ).first()


# # Create a new AlertToGroup instance and add it
# with tracer.start_as_current_span("alert_to_group"):
# alert_group = AlertToGroup(
# tenant_id=tenant_id,
# alert_id=str(alert_id),
# group_id=str(group.id),
# )
# with tracer.start_as_current_span("session_add"):
# session.add(alert_group)
# with tracer.start_as_current_span("session_commit"):
# session.commit()
# # To reflect the newly added alert we expire its state to force a refresh on access
# with tracer.start_as_current_span("session_expire"):
# session.expire(group, ["alerts"])
# with tracer.start_as_current_span("session_refresh"):
# session.refresh(group)
# return group
async def assign_alert_to_group(
tenant_id, alert_id, rule_id, group_fingerprint
) -> Group:
tracer = trace.get_tracer(__name__)

# Ensure that `async_engine` is an instance of `create_async_engine`
async with AsyncSession(async_engine, expire_on_commit=False) as session:
async with session.begin():
result = await session.execute(
select(Group)
.options(selectinload(Group.alerts))
.where(Group.tenant_id == tenant_id)
.where(Group.rule_id == rule_id)
.where(Group.group_fingerprint == group_fingerprint)
)
group = result.scalars().first()

if not group:
# Create a new group if it doesn't exist
group = Group(
tenant_id=tenant_id,
rule_id=rule_id,
group_fingerprint=group_fingerprint,
)
session.add(group)
await session.commit()

# Re-query the group with selectinload to set up future automatic loading of alerts
result = await session.execute(
select(Group)
.options(selectinload(Group.alerts))
.where(Group.tenant_id == tenant_id)
.where(Group.rule_id == rule_id)
.where(Group.group_fingerprint == group_fingerprint)
.where(Group.id == group.id)
)
group = result.scalars().first()

if not group:
# Create a new group if it doesn't exist
group = Group(
tenant_id=tenant_id,
rule_id=rule_id,
group_fingerprint=group_fingerprint,
)
session.add(group)
await session.commit()

# Re-query the group with selectinload to set up future automatic loading of alerts
result = await session.execute(
select(Group)
.options(selectinload(Group.alerts))
.where(Group.id == group.id)
)
group = result.scalars().first()

# Create a new AlertToGroup instance and add it
with tracer.start_as_current_span("alert_to_group"):
alert_group = AlertToGroup(
tenant_id=tenant_id,
alert_id=str(alert_id),
group_id=str(group.id),
)
with tracer.start_as_current_span("session_add"):
session.add(alert_group)
with tracer.start_as_current_span("session_commit"):
# Commit inside the session's context manager will automatically commit on exit
await session.commit()

# Since we're using context managers, explicit session.commit() is not required here
# Create a new AlertToGroup instance and add it
with tracer.start_as_current_span("alert_to_group"):
alert_group = AlertToGroup(
tenant_id=tenant_id,
alert_id=str(alert_id),
group_id=str(group.id),
)
with tracer.start_as_current_span("session_add"):
session.add(alert_group)
with tracer.start_as_current_span("session_commit"):
# Commit inside the session's context manager will automatically commit on exit
await session.commit()

# Refresh and expire need to be awaited as well
with tracer.start_as_current_span("session_expire_and_refresh"):
Expand Down
73 changes: 73 additions & 0 deletions keep/api/core/db_debugger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# this file is for debugging purposes
# its code written to profile the SQL queries

import inspect
import logging

# it basically log the SQL queries and the time it took to execute them
# in a file called sqlalchemy_queries.log
import os
import threading
import time
from logging.handlers import RotatingFileHandler

from sqlalchemy import event
from sqlalchemy.engine import Engine

logger = logging.getLogger(__name__)


if os.environ.get("DEBUG_SQL", False):
logger.setLevel(logging.DEBUG)
# Configure logging
logger = logging.getLogger("profiler")
logger.setLevel(logging.INFO)
file_handler = RotatingFileHandler(
"sqlalchemy_queries.log", maxBytes=1024 * 1024 * 5, backupCount=5, mode="a"
)
file_handler.setLevel(logging.INFO)
# Create a formatter and set it to the handler
formatter = logging.Formatter("%(asctime)s - %(message)s")
file_handler.setFormatter(formatter)
# Add the handler to the logger
logger.addHandler(file_handler)
# Prevent the logger from propagating messages to the root logger
logger.propagate = False

def get_callee():
try:
# Inspect the stack and find the callee outside of this module
stack = inspect.stack()
for frame_info in stack:
# Inspecting the stack frame to find the first caller outside of this script
# You might need to adjust the conditions based on your project structure
if (
frame_info.function
not in [
"get_callee",
"after_cursor_execute",
"before_cursor_execute",
]
and __file__ in frame_info.filename
):
return f"{frame_info.function} in {frame_info.filename}:{frame_info.lineno}"
except Exception:
return "Unknown callee"
return "Callee not found"

# Function to track query start time
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(
conn, cursor, statement, parameters, context, executemany
):
context._query_start_time = time.time()

# Function to log the query and execution time
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - context._query_start_time
callee = get_callee()
thread_name = threading.current_thread().name
logger.critical(
f"Thread: {thread_name}, Callee: {callee}, Query: {statement}, Parameters: {parameters}, Time: {total:.5f} seconds"
)
4 changes: 2 additions & 2 deletions keep/api/routes/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ async def receive_event(
bg_tasks: BackgroundTasks,
provider_id: str | None = None,
authenticated_entity: AuthenticatedEntity = Depends(AuthVerifier(["write:alert"])),
session: Session = Depends(get_session),
async_session: AsyncSession = Depends(get_async_session),
pusher_client: Pusher = Depends(get_pusher_client),
) -> dict[str, str]:
tenant_id = authenticated_entity.tenant_id
Expand Down Expand Up @@ -701,7 +701,7 @@ async def receive_event(
handle_formatted_events,
tenant_id,
provider_type,
session,
async_session,
event_copy if isinstance(event_copy, list) else [event_copy],
formatted_events,
pusher_client,
Expand Down
21 changes: 20 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pymongo = "^4.6.1"
google-cloud-trace = "1.11.3"
hvac = "^2.1.0"
aiosqlite = "^0.19.0"
aiomysql = "^0.2.0"


[tool.poetry.group.dev.dependencies]
Expand Down
2 changes: 2 additions & 0 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ services:
keep-database:
image: mysql:latest
container_name: keep-database-tests
command: --local-infile=1
environment:
- MYSQL_ROOT_PASSWORD=keep
- MYSQL_DATABASE=keep
volumes:
- mysql-data:/var/lib/mysql
- ./initdb:/docker-entrypoint-initdb.d
ports:
- "0.0.0.0:3306:3306"

Expand Down
7 changes: 7 additions & 0 deletions tests/initdb/rule.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"{""sql"": ""((source = :source_1 and annotations.summary like :annotations.summary_1) and (source = :source_2 and labels.monitor = :labels.monitor_1))"", ""params"": {""source_1"": ""prometheus"", ""annotations.summary_1"": ""%latency%"", ""source_2"": ""grafana"", ""labels.monitor_1"": ""server1""}}",[],f9e5ffd0cfc14757a8e566f9fe7cbd82,keep,"Rule Name","(source == ""prometheus"" && annotations.summary.contains(""latency"")) && (source == ""grafana"" && labels.monitor == ""server1"")",600,keep,"2024-02-01 14:18:20.002416",,,,
"{}","[""labels.instance""]",3d8e3773ab224604bfa58c930bcc5e5a,keep,"CPU (group by labels.instance)","(labels.alertname.contains(""cpu""))",600,keep,"2024-02-01 14:30:13.830655",,,"CPU usage exceeded on {{ group_attributes.num_of_alerts }} pods of {{ labels.instance }} || {{ group_attributes.start_time }} | {{ group_attributes.last_update_time }}",
"{}",[],f4e9ffacc26c4a079afec0e0e149bbbe,keep,"CPU (no grouping)","(labels.alertname.contains(""cpu""))",600,keep,"2024-02-01 14:30:13.837035",,,,
"{}","[""labels.queue""]",632fae98b12c4f9da996dedb3abd0b92,keep,"MQ (group by labels.queue)","(name == ""mq_third_full"")",600,keep,"2024-02-01 14:30:13.839077",,,"The {{ labels.queue }} is more than third full on {{ group_attributes.num_of_alerts }} queue managers | {{ group_attributes.start_time }} || {{ group_attributes.last_update_time }}",
"{}","[""labels.instance""]",5c043777828c42af97697bbaac135c81,keep,"CPU (group by labels.instance)","(labels.alertname.contains(""cpu""))",600,keep,"2024-02-01 14:32:54.560574",,,"CPU usage exceeded on {{ group_attributes.num_of_alerts }} pods of {{ labels.instance }} || {{ group_attributes.start_time }} | {{ group_attributes.last_update_time }}",
"{}",[],923d63ba385a4f719c5789087eb4a921,keep,"CPU (no grouping)","(labels.alertname.contains(""cpu""))",600,keep,"2024-02-01 14:32:54.567197",,,,
"{}","[""labels.queue""]",dd38939c47764a97ac420a71f1287585,keep,"MQ (group by labels.queue)","(name == ""mq_third_full"")",600,keep,"2024-02-01 14:32:54.569420",,,"The {{ labels.queue }} is more than third full on {{ group_attributes.num_of_alerts }} queue managers | {{ group_attributes.start_time }} || {{ group_attributes.last_update_time }}",

0 comments on commit 65ad092

Please sign in to comment.