From 232707334ade3c6624c2ad1ba1e247bdbd97232b Mon Sep 17 00:00:00 2001 From: Shahar Glazner Date: Sat, 7 Sep 2024 17:32:40 +0300 Subject: [PATCH] fix: make redis great again (#1856) --- keep/api/api.py | 25 ++++++++++++++-- keep/api/arq_worker.py | 62 ++++++++++++++++++++++++++------------- keep/api/consts.py | 3 ++ keep/api/routes/alerts.py | 5 ++++ 4 files changed, 72 insertions(+), 23 deletions(-) diff --git a/keep/api/api.py b/keep/api/api.py index f12699494..b5ef08752 100644 --- a/keep/api/api.py +++ b/keep/api/api.py @@ -19,7 +19,15 @@ import keep.api.logging import keep.api.observability from keep.api.arq_worker import get_arq_worker -from keep.api.consts import KEEP_ARQ_TASK_POOL, KEEP_ARQ_TASK_POOL_NONE +from keep.api.consts import ( + KEEP_ARQ_QUEUE_AI, + KEEP_ARQ_QUEUE_BASIC, + KEEP_ARQ_TASK_POOL, + KEEP_ARQ_TASK_POOL_AI, + KEEP_ARQ_TASK_POOL_ALL, + KEEP_ARQ_TASK_POOL_BASIC_PROCESSING, + KEEP_ARQ_TASK_POOL_NONE, +) from keep.api.core.config import AuthenticationType from keep.api.core.db import get_api_key from keep.api.core.dependencies import SINGLE_TENANT_UUID @@ -262,8 +270,19 @@ async def on_startup(): logger.info("Consumer started successfully") if KEEP_ARQ_TASK_POOL != KEEP_ARQ_TASK_POOL_NONE: event_loop = asyncio.get_event_loop() - arq_worker = get_arq_worker() - event_loop.create_task(arq_worker.async_run()) + if KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_ALL: + basic_worker = get_arq_worker(KEEP_ARQ_QUEUE_BASIC) + ai_worker = get_arq_worker(KEEP_ARQ_QUEUE_AI) + event_loop.create_task(basic_worker.async_run()) + event_loop.create_task(ai_worker.async_run()) + elif KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_AI: + arq_worker = get_arq_worker(KEEP_ARQ_QUEUE_AI) + event_loop.create_task(arq_worker.async_run()) + elif KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_BASIC_PROCESSING: + arq_worker = get_arq_worker(KEEP_ARQ_QUEUE_BASIC) + event_loop.create_task(arq_worker.async_run()) + else: + raise ValueError(f"Invalid task pool: {KEEP_ARQ_TASK_POOL}") logger.info("Services started successfully") @app.exception_handler(Exception) diff --git a/keep/api/arq_worker.py b/keep/api/arq_worker.py index d1f095203..b364fcade 100644 --- a/keep/api/arq_worker.py +++ b/keep/api/arq_worker.py @@ -8,42 +8,57 @@ from starlette.datastructures import CommaSeparatedStrings import keep.api.logging -from keep.api.core.config import config -from keep.api.tasks.process_background_ai_task import process_background_ai_task -from keep.api.tasks.healthcheck_task import healthcheck_task from keep.api.consts import ( + KEEP_ARQ_QUEUE_AI, + KEEP_ARQ_QUEUE_BASIC, KEEP_ARQ_TASK_POOL, KEEP_ARQ_TASK_POOL_AI, KEEP_ARQ_TASK_POOL_ALL, KEEP_ARQ_TASK_POOL_BASIC_PROCESSING, ) +from keep.api.core.config import config +from keep.api.tasks.healthcheck_task import healthcheck_task +from keep.api.tasks.process_background_ai_task import process_background_ai_task keep.api.logging.setup_logging() logger = logging.getLogger(__name__) -# Current worker will pick up tasks only according to it's execution pool: -all_tasks_for_the_worker = ["keep.api.tasks.healthcheck_task.healthcheck_task"] +# Current worker will pick up tasks only according to its execution pool: +all_tasks_for_the_worker = [("keep.api.tasks.healthcheck_task.healthcheck_task", None)] -if KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_ALL or \ - KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_BASIC_PROCESSING: +if KEEP_ARQ_TASK_POOL in [KEEP_ARQ_TASK_POOL_ALL, KEEP_ARQ_TASK_POOL_BASIC_PROCESSING]: all_tasks_for_the_worker += [ - "keep.api.tasks.process_event_task.async_process_event", - "keep.api.tasks.process_topology_task.async_process_topology", + ("keep.api.tasks.process_event_task.async_process_event", KEEP_ARQ_QUEUE_BASIC), + ( + "keep.api.tasks.process_topology_task.async_process_topology", + KEEP_ARQ_QUEUE_BASIC, + ), ] -if KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_ALL or \ - KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_AI: +if KEEP_ARQ_TASK_POOL in [KEEP_ARQ_TASK_POOL_ALL, KEEP_ARQ_TASK_POOL_AI]: all_tasks_for_the_worker += [ - "keep.api.tasks.process_background_ai_task.process_background_ai_task", - "keep.api.tasks.process_background_ai_task.process_correlation", - "keep.api.tasks.process_background_ai_task.process_summary_generation", - "keep.api.tasks.process_background_ai_task.process_name_generation", + ( + "keep.api.tasks.process_background_ai_task.process_background_ai_task", + KEEP_ARQ_QUEUE_AI, + ), + ( + "keep.api.tasks.process_background_ai_task.process_correlation", + KEEP_ARQ_QUEUE_AI, + ), + ( + "keep.api.tasks.process_background_ai_task.process_summary_generation", + KEEP_ARQ_QUEUE_AI, + ), + ( + "keep.api.tasks.process_background_ai_task.process_name_generation", + KEEP_ARQ_QUEUE_AI, + ), ] ARQ_BACKGROUND_FUNCTIONS: Optional[CommaSeparatedStrings] = config( "ARQ_BACKGROUND_FUNCTIONS", cast=CommaSeparatedStrings, - default=all_tasks_for_the_worker, + default=[task for task, _ in all_tasks_for_the_worker], ) FUNCTIONS: list = ( @@ -64,7 +79,7 @@ async def shutdown(ctx): pass -def get_arq_worker() -> Worker: +def get_arq_worker(queue_name: str) -> Worker: keep_result = config( "ARQ_KEEP_RESULT", cast=int, default=3600 ) # duration to keep job results for @@ -72,7 +87,10 @@ def get_arq_worker() -> Worker: "ARQ_EXPIRES", cast=int, default=3600 ) # the default length of time from when a job is expected to start after which the job expires, making it shorter to avoid clogging return create_worker( - WorkerSettings, keep_result=keep_result, expires_extra_ms=expires + WorkerSettings, + keep_result=keep_result, + expires_extra_ms=expires, + queue_name=queue_name, ) @@ -99,6 +117,11 @@ class WorkerSettings: # Only if it's an AI-dedicated worker, we can set large timeout, otherwise keeping low to avoid clogging timeout = 60 * 15 if KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_AI else 30 functions: list = FUNCTIONS + queue_name: str + + def __init__(self, queue_name: str): + self.queue_name = queue_name + cron_jobs = [ cron( healthcheck_task, @@ -109,8 +132,7 @@ class WorkerSettings: run_at_startup=True, ), ] - if KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_ALL or \ - KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_AI: + if KEEP_ARQ_TASK_POOL in [KEEP_ARQ_TASK_POOL_ALL, KEEP_ARQ_TASK_POOL_AI]: cron_jobs.append( cron( process_background_ai_task, diff --git a/keep/api/consts.py b/keep/api/consts.py index f69187c5c..ecb82d9de 100644 --- a/keep/api/consts.py +++ b/keep/api/consts.py @@ -52,6 +52,9 @@ KEEP_ARQ_TASK_POOL_ALL = "all" # All arq workers enabled for this service KEEP_ARQ_TASK_POOL_BASIC_PROCESSING = "basic_processing" # Everything except AI KEEP_ARQ_TASK_POOL_AI = "ai" # Only AI +# Define queues for different task types +KEEP_ARQ_QUEUE_BASIC = "basic_processing" +KEEP_ARQ_QUEUE_AI = "ai_processing" REDIS = os.environ.get("REDIS", "false") == "true" KEEP_ARQ_TASK_POOL = os.environ.get("KEEP_ARQ_TASK_POOL", None) diff --git a/keep/api/routes/alerts.py b/keep/api/routes/alerts.py index 2c40c717c..f14b63f23 100644 --- a/keep/api/routes/alerts.py +++ b/keep/api/routes/alerts.py @@ -22,6 +22,7 @@ from keep.api.arq_pool import get_pool from keep.api.bl.enrichments_bl import EnrichmentsBl +from keep.api.consts import KEEP_ARQ_QUEUE_BASIC from keep.api.core.config import config from keep.api.core.db import get_alert_audit as get_alert_audit_db from keep.api.core.db import get_alerts_by_fingerprint, get_enrichment, get_last_alerts @@ -286,12 +287,14 @@ async def receive_generic_event( authenticated_entity.api_key_name, request.state.trace_id, event, + _queue_name=KEEP_ARQ_QUEUE_BASIC, ) logger.info( "Enqueued job", extra={ "job_id": job.job_id, "tenant_id": authenticated_entity.tenant_id, + "queue": KEEP_ARQ_QUEUE_BASIC, }, ) else: @@ -365,12 +368,14 @@ async def receive_event( authenticated_entity.api_key_name, trace_id, event, + _queue_name=KEEP_ARQ_QUEUE_BASIC, ) logger.info( "Enqueued job", extra={ "job_id": job.job_id, "tenant_id": authenticated_entity.tenant_id, + "queue": KEEP_ARQ_QUEUE_BASIC, }, ) else: