From bb6452fefe44e6f74f8da5a35f07254d14c16472 Mon Sep 17 00:00:00 2001 From: Matvey Kukuy Date: Wed, 20 Nov 2024 17:03:37 +0200 Subject: [PATCH] Wrapping background tasks --- docker/Dockerfile.api | 5 +- keep/api/background_server_jobs.py | 58 +++++++++++++ keep/api/core/db.py | 20 +++++ .../{demo_mode_runner.py => demo_mode.py} | 85 +++++++------------ keep/api/core/posthog.py | 5 +- keep/api/core/report_uptime.py | 7 +- .../versions/2024-11-20-15-50_192157fd5788.py | 38 +++++++++ keep/api/models/db/system.py | 8 ++ keep/cli/cli.py | 11 --- keep/entrypoint_backend.sh | 6 +- scripts/simulate_alerts.py | 25 ++++-- 11 files changed, 190 insertions(+), 78 deletions(-) create mode 100644 keep/api/background_server_jobs.py rename keep/api/core/{demo_mode_runner.py => demo_mode.py} (88%) create mode 100644 keep/api/models/db/migrations/versions/2024-11-20-15-50_192157fd5788.py create mode 100644 keep/api/models/db/system.py diff --git a/docker/Dockerfile.api b/docker/Dockerfile.api index 291210d25..095b0f8a5 100644 --- a/docker/Dockerfile.api +++ b/docker/Dockerfile.api @@ -35,4 +35,7 @@ RUN chgrp -R 0 /app && chmod -R g=u /app RUN chown -R keep:keep /app RUN chown -R keep:keep /venv USER keep -ENTRYPOINT ["gunicorn", "keep.api.api:get_app", "--bind" , "0.0.0.0:8080" , "--workers", "4" , "-k" , "uvicorn.workers.UvicornWorker", "-c", "/venv/lib/python3.11/site-packages/keep/api/config.py"] + +ENTRYPOINT ["/venv/lib/python3.11/site-packages/keep/entrypoint_backend.sh"] + +CMD ["gunicorn", "keep.api.api:get_app", "--bind" , "0.0.0.0:8080" , "--workers", "4" , "-k" , "uvicorn.workers.UvicornWorker", "-c", "/venv/lib/python3.11/site-packages/keep/api/config.py"] diff --git a/keep/api/background_server_jobs.py b/keep/api/background_server_jobs.py new file mode 100644 index 000000000..bfd25be0e --- /dev/null +++ b/keep/api/background_server_jobs.py @@ -0,0 +1,58 @@ + + +import sys + +# It's a dirty hack to exclude current directory from sys.path +# to avoid importing "logging.py" located in the same directory +# instead of the standard library "logging" module. +# TODO: rename logging.py +for i in range(0, len(sys.path)): + if "keep/api" in sys.path[i]: + sys.path.pop(i) + break + +import os +import time +import logging +import requests + +from keep.api.core.demo_mode import launch_demo_mode +from keep.api.core.report_uptime import launch_uptime_reporting + +logger = logging.getLogger(__name__) + + +def main(): + logger.info("Starting background server jobs.") + + # We intentionally don't use KEEP_API_URL here to avoid going through the internet. + # Demo mode should be launched in the same environment as the server. + keep_api_url = "http://localhost:" + str(os.environ.get("PORT", 8080)) + + while True: + try: + logger.info(f"Checking if server is up at {keep_api_url}...") + response = requests.get(keep_api_url) + response.raise_for_status() + break + except requests.exceptions.RequestException: + logger.info("API is not up yet. Waiting...") + time.sleep(5) + + threads = [] + threads.append(launch_demo_mode(keep_api_url)) + threads.append(launch_uptime_reporting()) + + for thread in threads: + if thread is not None: + thread.join() + + logger.info("Background server jobs started.") + + +if __name__ == "__main__": + """ + This script should be executed alongside to the server. + Running it in the same process as the server may (and most probably will) cause issues. + """ + main() diff --git a/keep/api/core/db.py b/keep/api/core/db.py index 2032f1cbb..d226d227f 100644 --- a/keep/api/core/db.py +++ b/keep/api/core/db.py @@ -58,6 +58,7 @@ from keep.api.models.db.preset import * # pylint: disable=unused-wildcard-import from keep.api.models.db.provider import * # pylint: disable=unused-wildcard-import from keep.api.models.db.rule import * # pylint: disable=unused-wildcard-import +from keep.api.models.db.system import * # pylint: disable=unused-wildcard-import from keep.api.models.db.tenant import * # pylint: disable=unused-wildcard-import from keep.api.models.db.topology import * # pylint: disable=unused-wildcard-import from keep.api.models.db.workflow import * # pylint: disable=unused-wildcard-import @@ -4482,3 +4483,22 @@ def get_resource_ids_by_resource_type( # Execute the query and return results result = session.exec(query) return result.all() + +def get_or_creat_posthog_instance_id( + session: Optional[Session] = None + ): + POSTHOG_INSTANCE_ID_KEY = "posthog_instance_id" + with Session(engine) as session: + system = session.exec(select(System).where(System.name == POSTHOG_INSTANCE_ID_KEY)).first() + if system: + return system.value + + system = System( + id=str(uuid4()), + name=POSTHOG_INSTANCE_ID_KEY, + value=str(uuid4()), + ) + session.add(system) + session.commit() + session.refresh(system) + return system.value diff --git a/keep/api/core/demo_mode_runner.py b/keep/api/core/demo_mode.py similarity index 88% rename from keep/api/core/demo_mode_runner.py rename to keep/api/core/demo_mode.py index b3edac446..9b5626559 100644 --- a/keep/api/core/demo_mode_runner.py +++ b/keep/api/core/demo_mode.py @@ -22,6 +22,8 @@ logger = logging.getLogger(__name__) +KEEP_LIVE_DEMO_MODE = os.environ.get("KEEP_LIVE_DEMO_MODE", "false").lower() == "true" + correlation_rules_to_create = [ { "sqlQuery": {"sql": "((name like :name_1))", "params": {"name_1": "%mq%"}}, @@ -335,16 +337,6 @@ def simulate_alerts( for provider in providers } - while True: - try: - logger.info(f"Demo thread: Checking if server is up at {keep_api_url}...") - response = requests.get(keep_api_url) - response.raise_for_status() - break - except requests.exceptions.RequestException: - logger.info("Demo thread: API is not up yet. Waiting...") - time.sleep(5) - existing_installed_providers = get_existing_installed_providers(keep_api_key, keep_api_url) logger.info(f"Existing installed providers: {existing_installed_providers}") existing_providers_to_their_ids = {} @@ -429,49 +421,36 @@ def simulate_alerts( time.sleep(sleep_interval) -def launch_demo_mode(use_thread: bool = True): - """ - Running async demo in the backgound. - """ - logger.info("Demo mode launched.") - - keep_api_url = os.environ.get( - "KEEP_API_URL", "http://localhost:" + str(os.environ.get("PORT", 8080)) - ) - keep_api_key = os.environ.get("KEEP_READ_ONLY_BYPASS_KEY") - keep_sleep_interval = int(os.environ.get("KEEP_SLEEP_INTERVAL", 5)) - if keep_api_key is None: - with get_session_sync() as session: - keep_api_key = get_or_create_api_key( - session=session, - tenant_id=SINGLE_TENANT_UUID, - created_by="system", - unique_api_key_id="simulate_alerts", - system_description="Simulate Alerts API key", - ) - if use_thread: - thread = threading.Thread( - target=simulate_alerts, - kwargs={ - "keep_api_key": keep_api_key, - "keep_api_url": keep_api_url, - "sleep_interval": keep_sleep_interval, - "demo_correlation_rules": True, - "demo_topology": True, - }, - ) - thread.daemon = True - thread.start() - else: - simulate_alerts( - keep_api_key=keep_api_key, - keep_api_url=keep_api_url, - sleep_interval=keep_sleep_interval, - demo_correlation_rules=True, - demo_topology=True, +def launch_demo_mode(keep_api_url=None) -> threading.Thread | None: + if not KEEP_LIVE_DEMO_MODE: + logger.info("Not launching the demo mode.") + return + + logger.info("Launching demo mode.") + + with get_session_sync() as session: + keep_api_key = get_or_create_api_key( + session=session, + tenant_id=SINGLE_TENANT_UUID, + created_by="system", + unique_api_key_id="simulate_alerts", + system_description="Simulate Alerts API key", ) - logger.info("Demo mode initialized.") + sleep_interval = 5 -if __name__ == "__main__": - launch_demo_mode(use_thread=False) + thread = threading.Thread( + target=simulate_alerts, + kwargs={ + "keep_api_key": keep_api_key, + "keep_api_url": keep_api_url, + "sleep_interval": sleep_interval, + "demo_correlation_rules": True, + "demo_topology": True, + }, + ) + thread.daemon = True + thread.start() + + logger.info("Demo mode launched.") + return thread diff --git a/keep/api/core/posthog.py b/keep/api/core/posthog.py index b21b24096..3a6982e14 100644 --- a/keep/api/core/posthog.py +++ b/keep/api/core/posthog.py @@ -5,6 +5,8 @@ from posthog import Posthog from importlib import metadata +from keep.api.core.db import get_or_creat_posthog_instance_id + try: KEEP_VERSION = metadata.version("keep") except metadata.PackageNotFoundError: @@ -14,7 +16,6 @@ KEEP_VERSION = os.environ.get("KEEP_VERSION", "unknown") POSTHOG_DISABLED = os.getenv("POSTHOG_DISABLED", "false") == "true" -RANDOM_TENANT_ID_PERSISTENT_WITHIN_LAUNCH = uuid.uuid4() if POSTHOG_DISABLED: posthog.disabled = True @@ -37,7 +38,7 @@ def is_posthog_reachable(): feature_flags_request_timeout_seconds=3, sync_mode=True # Explicitly to trigger exception if it's not reachable. ).capture( - RANDOM_TENANT_ID_PERSISTENT_WITHIN_LAUNCH, + get_or_creat_posthog_instance_id(), "connectivity_check", ) return True diff --git a/keep/api/core/report_uptime.py b/keep/api/core/report_uptime.py index 257922c5c..7cb59cc6c 100644 --- a/keep/api/core/report_uptime.py +++ b/keep/api/core/report_uptime.py @@ -1,12 +1,12 @@ import asyncio import logging import threading +from keep.api.core.db import get_or_creat_posthog_instance_id from keep.api.core.posthog import ( posthog_client, is_posthog_reachable, KEEP_VERSION, POSTHOG_DISABLED, - RANDOM_TENANT_ID_PERSISTENT_WITHIN_LAUNCH ) logger = logging.getLogger(__name__) @@ -19,7 +19,7 @@ async def report_uptime_to_posthog(): """ while True: posthog_client.capture( - RANDOM_TENANT_ID_PERSISTENT_WITHIN_LAUNCH, + get_or_creat_posthog_instance_id(), "backend_status", properties={ "status": "up", @@ -30,7 +30,7 @@ async def report_uptime_to_posthog(): # Important to keep it async, otherwise will clog main gunicorn thread and cause timeouts. await asyncio.sleep(UPTIME_REPORTING_CADENCE) -def launch_uptime_reporting(): +def launch_uptime_reporting() -> threading.Thread | None: """ Running async uptime reporting as a sub-thread. """ @@ -39,6 +39,7 @@ def launch_uptime_reporting(): thread = threading.Thread(target=asyncio.run, args=(report_uptime_to_posthog(), )) thread.start() logger.info("Uptime Reporting to Posthog launched.") + return thread else: logger.info("Reporting to Posthog not launched because it's not reachable.") else: diff --git a/keep/api/models/db/migrations/versions/2024-11-20-15-50_192157fd5788.py b/keep/api/models/db/migrations/versions/2024-11-20-15-50_192157fd5788.py new file mode 100644 index 000000000..33e6cbad6 --- /dev/null +++ b/keep/api/models/db/migrations/versions/2024-11-20-15-50_192157fd5788.py @@ -0,0 +1,38 @@ +"""system table + +Revision ID: 192157fd5788 +Revises: 620b6c048091 +Create Date: 2024-11-20 15:50:29.500867 + +""" + +import sqlalchemy as sa +import sqlalchemy_utils +import sqlmodel +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "192157fd5788" +down_revision = "620b6c048091" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "system", + sa.Column("id", sqlmodel.sql.sqltypes.AutoString(), nullable=False), + sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False), + sa.Column("value", sqlmodel.sql.sqltypes.AutoString(), nullable=False), + sa.PrimaryKeyConstraint("id"), + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("system") + # ### end Alembic commands ### diff --git a/keep/api/models/db/system.py b/keep/api/models/db/system.py new file mode 100644 index 000000000..71296e358 --- /dev/null +++ b/keep/api/models/db/system.py @@ -0,0 +1,8 @@ + +from sqlmodel import Field, SQLModel + + +class System(SQLModel, table=True): + id: str = Field(primary_key=True) + name: str + value: str diff --git a/keep/cli/cli.py b/keep/cli/cli.py index 1a989e6f1..4307d6e02 100644 --- a/keep/cli/cli.py +++ b/keep/cli/cli.py @@ -4,7 +4,6 @@ import os import sys import typing -import time import uuid from collections import OrderedDict from importlib import metadata @@ -1627,15 +1626,5 @@ def start(self): # kills the server also, great success os._exit(0) - -@cli.command() -@pass_info -def background_server_jobs(info: Info): - """Run background jobs, expected to be running in parallel with the server.""" - while True: - time.sleep(1) - logger.info("Background job running") - logger.info(os.environ) - if __name__ == "__main__": cli(auto_envvar_prefix="KEEP") diff --git a/keep/entrypoint_backend.sh b/keep/entrypoint_backend.sh index 65f4eef72..1a09cf612 100755 --- a/keep/entrypoint_backend.sh +++ b/keep/entrypoint_backend.sh @@ -6,8 +6,10 @@ set -e # Print commands and their arguments as they are executed set -x -# Execute background taasks -poetry run python keep/cli/cli.py background-server-jobs & +# Get the directory of the current script +SCRIPT_DIR=$(dirname "$0") + +python "$SCRIPT_DIR/api/background_server_jobs.py" & # Execute the CMD provided in the Dockerfile or as arguments exec "$@" \ No newline at end of file diff --git a/scripts/simulate_alerts.py b/scripts/simulate_alerts.py index 56f273efb..3bdca28ff 100644 --- a/scripts/simulate_alerts.py +++ b/scripts/simulate_alerts.py @@ -1,7 +1,8 @@ -import logging import os +import logging +import argparse -from keep.api.core.demo_mode_runner import simulate_alerts +from keep.api.core.demo_mode import simulate_alerts logging.basicConfig( level=logging.DEBUG, @@ -13,18 +14,30 @@ def main(): + parser = argparse.ArgumentParser(description="Simulate alerts for Keep API.") + parser.add_argument( + "--full-demo", + action="store_true", + help="Run the full demo including correlation rules and topology.", + ) + args = parser.parse_args() + + default_sleep_interval = 0.2 + if args.full_demo: + default_sleep_interval = 5 + SLEEP_INTERVAL = float( - os.environ.get("SLEEP_INTERVAL", 0.2) - ) # Configurable sleep interval from env variable + os.environ.get("SLEEP_INTERVAL", default_sleep_interval) + ) keep_api_key = os.environ.get("KEEP_API_KEY") keep_api_url = os.environ.get("KEEP_API_URL") or "http://localhost:8080" simulate_alerts( keep_api_key=keep_api_key, keep_api_url=keep_api_url, sleep_interval=SLEEP_INTERVAL, - demo_correlation_rules=False, + demo_correlation_rules=args.full_demo, + demo_topology=args.full_demo, ) - if __name__ == "__main__": main()