Skip to content

Commit

Permalink
Wrapping background tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Matvey-Kuk committed Nov 20, 2024
1 parent d90654b commit bb6452f
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 78 deletions.
5 changes: 4 additions & 1 deletion docker/Dockerfile.api
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ RUN chgrp -R 0 /app && chmod -R g=u /app
RUN chown -R keep:keep /app
RUN chown -R keep:keep /venv
USER keep
ENTRYPOINT ["gunicorn", "keep.api.api:get_app", "--bind" , "0.0.0.0:8080" , "--workers", "4" , "-k" , "uvicorn.workers.UvicornWorker", "-c", "/venv/lib/python3.11/site-packages/keep/api/config.py"]

ENTRYPOINT ["/venv/lib/python3.11/site-packages/keep/entrypoint_backend.sh"]

CMD ["gunicorn", "keep.api.api:get_app", "--bind" , "0.0.0.0:8080" , "--workers", "4" , "-k" , "uvicorn.workers.UvicornWorker", "-c", "/venv/lib/python3.11/site-packages/keep/api/config.py"]
58 changes: 58 additions & 0 deletions keep/api/background_server_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@


import sys

# It's a dirty hack to exclude current directory from sys.path
# to avoid importing "logging.py" located in the same directory
# instead of the standard library "logging" module.
# TODO: rename logging.py
for i in range(0, len(sys.path)):
if "keep/api" in sys.path[i]:
sys.path.pop(i)
break

import os
import time
import logging
import requests

from keep.api.core.demo_mode import launch_demo_mode
from keep.api.core.report_uptime import launch_uptime_reporting

logger = logging.getLogger(__name__)


def main():
logger.info("Starting background server jobs.")

# We intentionally don't use KEEP_API_URL here to avoid going through the internet.
# Demo mode should be launched in the same environment as the server.
keep_api_url = "http://localhost:" + str(os.environ.get("PORT", 8080))

while True:
try:
logger.info(f"Checking if server is up at {keep_api_url}...")
response = requests.get(keep_api_url)
response.raise_for_status()
break
except requests.exceptions.RequestException:
logger.info("API is not up yet. Waiting...")
time.sleep(5)

threads = []
threads.append(launch_demo_mode(keep_api_url))
threads.append(launch_uptime_reporting())

for thread in threads:
if thread is not None:
thread.join()

logger.info("Background server jobs started.")


if __name__ == "__main__":
"""
This script should be executed alongside to the server.
Running it in the same process as the server may (and most probably will) cause issues.
"""
main()
20 changes: 20 additions & 0 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from keep.api.models.db.preset import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.provider import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.rule import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.system import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.tenant import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.topology import * # pylint: disable=unused-wildcard-import
from keep.api.models.db.workflow import * # pylint: disable=unused-wildcard-import
Expand Down Expand Up @@ -4482,3 +4483,22 @@ def get_resource_ids_by_resource_type(
# Execute the query and return results
result = session.exec(query)
return result.all()

def get_or_creat_posthog_instance_id(
session: Optional[Session] = None
):
POSTHOG_INSTANCE_ID_KEY = "posthog_instance_id"
with Session(engine) as session:
system = session.exec(select(System).where(System.name == POSTHOG_INSTANCE_ID_KEY)).first()
if system:
return system.value

system = System(
id=str(uuid4()),
name=POSTHOG_INSTANCE_ID_KEY,
value=str(uuid4()),
)
session.add(system)
session.commit()
session.refresh(system)
return system.value
85 changes: 32 additions & 53 deletions keep/api/core/demo_mode_runner.py → keep/api/core/demo_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

logger = logging.getLogger(__name__)

KEEP_LIVE_DEMO_MODE = os.environ.get("KEEP_LIVE_DEMO_MODE", "false").lower() == "true"

correlation_rules_to_create = [
{
"sqlQuery": {"sql": "((name like :name_1))", "params": {"name_1": "%mq%"}},
Expand Down Expand Up @@ -335,16 +337,6 @@ def simulate_alerts(
for provider in providers
}

while True:
try:
logger.info(f"Demo thread: Checking if server is up at {keep_api_url}...")
response = requests.get(keep_api_url)
response.raise_for_status()
break
except requests.exceptions.RequestException:
logger.info("Demo thread: API is not up yet. Waiting...")
time.sleep(5)

existing_installed_providers = get_existing_installed_providers(keep_api_key, keep_api_url)
logger.info(f"Existing installed providers: {existing_installed_providers}")
existing_providers_to_their_ids = {}
Expand Down Expand Up @@ -429,49 +421,36 @@ def simulate_alerts(
time.sleep(sleep_interval)


def launch_demo_mode(use_thread: bool = True):
"""
Running async demo in the backgound.
"""
logger.info("Demo mode launched.")

keep_api_url = os.environ.get(
"KEEP_API_URL", "http://localhost:" + str(os.environ.get("PORT", 8080))
)
keep_api_key = os.environ.get("KEEP_READ_ONLY_BYPASS_KEY")
keep_sleep_interval = int(os.environ.get("KEEP_SLEEP_INTERVAL", 5))
if keep_api_key is None:
with get_session_sync() as session:
keep_api_key = get_or_create_api_key(
session=session,
tenant_id=SINGLE_TENANT_UUID,
created_by="system",
unique_api_key_id="simulate_alerts",
system_description="Simulate Alerts API key",
)
if use_thread:
thread = threading.Thread(
target=simulate_alerts,
kwargs={
"keep_api_key": keep_api_key,
"keep_api_url": keep_api_url,
"sleep_interval": keep_sleep_interval,
"demo_correlation_rules": True,
"demo_topology": True,
},
)
thread.daemon = True
thread.start()
else:
simulate_alerts(
keep_api_key=keep_api_key,
keep_api_url=keep_api_url,
sleep_interval=keep_sleep_interval,
demo_correlation_rules=True,
demo_topology=True,
def launch_demo_mode(keep_api_url=None) -> threading.Thread | None:
if not KEEP_LIVE_DEMO_MODE:
logger.info("Not launching the demo mode.")
return

logger.info("Launching demo mode.")

with get_session_sync() as session:
keep_api_key = get_or_create_api_key(
session=session,
tenant_id=SINGLE_TENANT_UUID,
created_by="system",
unique_api_key_id="simulate_alerts",
system_description="Simulate Alerts API key",
)
logger.info("Demo mode initialized.")

sleep_interval = 5

if __name__ == "__main__":
launch_demo_mode(use_thread=False)
thread = threading.Thread(
target=simulate_alerts,
kwargs={
"keep_api_key": keep_api_key,
"keep_api_url": keep_api_url,
"sleep_interval": sleep_interval,
"demo_correlation_rules": True,
"demo_topology": True,
},
)
thread.daemon = True
thread.start()

logger.info("Demo mode launched.")
return thread
5 changes: 3 additions & 2 deletions keep/api/core/posthog.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from posthog import Posthog
from importlib import metadata

from keep.api.core.db import get_or_creat_posthog_instance_id

try:
KEEP_VERSION = metadata.version("keep")
except metadata.PackageNotFoundError:
Expand All @@ -14,7 +16,6 @@
KEEP_VERSION = os.environ.get("KEEP_VERSION", "unknown")

POSTHOG_DISABLED = os.getenv("POSTHOG_DISABLED", "false") == "true"
RANDOM_TENANT_ID_PERSISTENT_WITHIN_LAUNCH = uuid.uuid4()

if POSTHOG_DISABLED:
posthog.disabled = True
Expand All @@ -37,7 +38,7 @@ def is_posthog_reachable():
feature_flags_request_timeout_seconds=3,
sync_mode=True # Explicitly to trigger exception if it's not reachable.
).capture(
RANDOM_TENANT_ID_PERSISTENT_WITHIN_LAUNCH,
get_or_creat_posthog_instance_id(),
"connectivity_check",
)
return True
Expand Down
7 changes: 4 additions & 3 deletions keep/api/core/report_uptime.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import asyncio
import logging
import threading
from keep.api.core.db import get_or_creat_posthog_instance_id
from keep.api.core.posthog import (
posthog_client,
is_posthog_reachable,
KEEP_VERSION,
POSTHOG_DISABLED,
RANDOM_TENANT_ID_PERSISTENT_WITHIN_LAUNCH
)

logger = logging.getLogger(__name__)
Expand All @@ -19,7 +19,7 @@ async def report_uptime_to_posthog():
"""
while True:
posthog_client.capture(
RANDOM_TENANT_ID_PERSISTENT_WITHIN_LAUNCH,
get_or_creat_posthog_instance_id(),
"backend_status",
properties={
"status": "up",
Expand All @@ -30,7 +30,7 @@ async def report_uptime_to_posthog():
# Important to keep it async, otherwise will clog main gunicorn thread and cause timeouts.
await asyncio.sleep(UPTIME_REPORTING_CADENCE)

def launch_uptime_reporting():
def launch_uptime_reporting() -> threading.Thread | None:
"""
Running async uptime reporting as a sub-thread.
"""
Expand All @@ -39,6 +39,7 @@ def launch_uptime_reporting():
thread = threading.Thread(target=asyncio.run, args=(report_uptime_to_posthog(), ))
thread.start()
logger.info("Uptime Reporting to Posthog launched.")
return thread
else:
logger.info("Reporting to Posthog not launched because it's not reachable.")
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""system table
Revision ID: 192157fd5788
Revises: 620b6c048091
Create Date: 2024-11-20 15:50:29.500867
"""

import sqlalchemy as sa
import sqlalchemy_utils
import sqlmodel
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "192157fd5788"
down_revision = "620b6c048091"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"system",
sa.Column("id", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("value", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("system")
# ### end Alembic commands ###
8 changes: 8 additions & 0 deletions keep/api/models/db/system.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

from sqlmodel import Field, SQLModel


class System(SQLModel, table=True):
id: str = Field(primary_key=True)
name: str
value: str
11 changes: 0 additions & 11 deletions keep/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import os
import sys
import typing
import time
import uuid
from collections import OrderedDict
from importlib import metadata
Expand Down Expand Up @@ -1627,15 +1626,5 @@ def start(self):
# kills the server also, great success
os._exit(0)


@cli.command()
@pass_info
def background_server_jobs(info: Info):
"""Run background jobs, expected to be running in parallel with the server."""
while True:
time.sleep(1)
logger.info("Background job running")
logger.info(os.environ)

if __name__ == "__main__":
cli(auto_envvar_prefix="KEEP")
6 changes: 4 additions & 2 deletions keep/entrypoint_backend.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ set -e
# Print commands and their arguments as they are executed
set -x

# Execute background taasks
poetry run python keep/cli/cli.py background-server-jobs &
# Get the directory of the current script
SCRIPT_DIR=$(dirname "$0")

python "$SCRIPT_DIR/api/background_server_jobs.py" &

# Execute the CMD provided in the Dockerfile or as arguments
exec "$@"
Loading

0 comments on commit bb6452f

Please sign in to comment.