Skip to content

Commit

Permalink
Merge branch 'main' into victoriametrics
Browse files Browse the repository at this point in the history
  • Loading branch information
shahargl authored Sep 7, 2024
2 parents 88bfb09 + 2327073 commit 1dc2fb4
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 23 deletions.
25 changes: 22 additions & 3 deletions keep/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@
import keep.api.logging
import keep.api.observability
from keep.api.arq_worker import get_arq_worker
from keep.api.consts import KEEP_ARQ_TASK_POOL, KEEP_ARQ_TASK_POOL_NONE
from keep.api.consts import (
KEEP_ARQ_QUEUE_AI,
KEEP_ARQ_QUEUE_BASIC,
KEEP_ARQ_TASK_POOL,
KEEP_ARQ_TASK_POOL_AI,
KEEP_ARQ_TASK_POOL_ALL,
KEEP_ARQ_TASK_POOL_BASIC_PROCESSING,
KEEP_ARQ_TASK_POOL_NONE,
)
from keep.api.core.config import AuthenticationType
from keep.api.core.db import get_api_key
from keep.api.core.dependencies import SINGLE_TENANT_UUID
Expand Down Expand Up @@ -262,8 +270,19 @@ async def on_startup():
logger.info("Consumer started successfully")
if KEEP_ARQ_TASK_POOL != KEEP_ARQ_TASK_POOL_NONE:
event_loop = asyncio.get_event_loop()
arq_worker = get_arq_worker()
event_loop.create_task(arq_worker.async_run())
if KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_ALL:
basic_worker = get_arq_worker(KEEP_ARQ_QUEUE_BASIC)
ai_worker = get_arq_worker(KEEP_ARQ_QUEUE_AI)
event_loop.create_task(basic_worker.async_run())
event_loop.create_task(ai_worker.async_run())
elif KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_AI:
arq_worker = get_arq_worker(KEEP_ARQ_QUEUE_AI)
event_loop.create_task(arq_worker.async_run())
elif KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_BASIC_PROCESSING:
arq_worker = get_arq_worker(KEEP_ARQ_QUEUE_BASIC)
event_loop.create_task(arq_worker.async_run())
else:
raise ValueError(f"Invalid task pool: {KEEP_ARQ_TASK_POOL}")
logger.info("Services started successfully")

@app.exception_handler(Exception)
Expand Down
62 changes: 42 additions & 20 deletions keep/api/arq_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,57 @@
from starlette.datastructures import CommaSeparatedStrings

import keep.api.logging
from keep.api.core.config import config
from keep.api.tasks.process_background_ai_task import process_background_ai_task
from keep.api.tasks.healthcheck_task import healthcheck_task
from keep.api.consts import (
KEEP_ARQ_QUEUE_AI,
KEEP_ARQ_QUEUE_BASIC,
KEEP_ARQ_TASK_POOL,
KEEP_ARQ_TASK_POOL_AI,
KEEP_ARQ_TASK_POOL_ALL,
KEEP_ARQ_TASK_POOL_BASIC_PROCESSING,
)
from keep.api.core.config import config
from keep.api.tasks.healthcheck_task import healthcheck_task
from keep.api.tasks.process_background_ai_task import process_background_ai_task

keep.api.logging.setup_logging()
logger = logging.getLogger(__name__)

# Current worker will pick up tasks only according to it's execution pool:
all_tasks_for_the_worker = ["keep.api.tasks.healthcheck_task.healthcheck_task"]
# Current worker will pick up tasks only according to its execution pool:
all_tasks_for_the_worker = [("keep.api.tasks.healthcheck_task.healthcheck_task", None)]

if KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_ALL or \
KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_BASIC_PROCESSING:
if KEEP_ARQ_TASK_POOL in [KEEP_ARQ_TASK_POOL_ALL, KEEP_ARQ_TASK_POOL_BASIC_PROCESSING]:
all_tasks_for_the_worker += [
"keep.api.tasks.process_event_task.async_process_event",
"keep.api.tasks.process_topology_task.async_process_topology",
("keep.api.tasks.process_event_task.async_process_event", KEEP_ARQ_QUEUE_BASIC),
(
"keep.api.tasks.process_topology_task.async_process_topology",
KEEP_ARQ_QUEUE_BASIC,
),
]

if KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_ALL or \
KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_AI:
if KEEP_ARQ_TASK_POOL in [KEEP_ARQ_TASK_POOL_ALL, KEEP_ARQ_TASK_POOL_AI]:
all_tasks_for_the_worker += [
"keep.api.tasks.process_background_ai_task.process_background_ai_task",
"keep.api.tasks.process_background_ai_task.process_correlation",
"keep.api.tasks.process_background_ai_task.process_summary_generation",
"keep.api.tasks.process_background_ai_task.process_name_generation",
(
"keep.api.tasks.process_background_ai_task.process_background_ai_task",
KEEP_ARQ_QUEUE_AI,
),
(
"keep.api.tasks.process_background_ai_task.process_correlation",
KEEP_ARQ_QUEUE_AI,
),
(
"keep.api.tasks.process_background_ai_task.process_summary_generation",
KEEP_ARQ_QUEUE_AI,
),
(
"keep.api.tasks.process_background_ai_task.process_name_generation",
KEEP_ARQ_QUEUE_AI,
),
]

ARQ_BACKGROUND_FUNCTIONS: Optional[CommaSeparatedStrings] = config(
"ARQ_BACKGROUND_FUNCTIONS",
cast=CommaSeparatedStrings,
default=all_tasks_for_the_worker,
default=[task for task, _ in all_tasks_for_the_worker],
)

FUNCTIONS: list = (
Expand All @@ -64,15 +79,18 @@ async def shutdown(ctx):
pass


def get_arq_worker() -> Worker:
def get_arq_worker(queue_name: str) -> Worker:
keep_result = config(
"ARQ_KEEP_RESULT", cast=int, default=3600
) # duration to keep job results for
expires = config(
"ARQ_EXPIRES", cast=int, default=3600
) # the default length of time from when a job is expected to start after which the job expires, making it shorter to avoid clogging
return create_worker(
WorkerSettings, keep_result=keep_result, expires_extra_ms=expires
WorkerSettings,
keep_result=keep_result,
expires_extra_ms=expires,
queue_name=queue_name,
)


Expand All @@ -99,6 +117,11 @@ class WorkerSettings:
# Only if it's an AI-dedicated worker, we can set large timeout, otherwise keeping low to avoid clogging
timeout = 60 * 15 if KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_AI else 30
functions: list = FUNCTIONS
queue_name: str

def __init__(self, queue_name: str):
self.queue_name = queue_name

cron_jobs = [
cron(
healthcheck_task,
Expand All @@ -109,8 +132,7 @@ class WorkerSettings:
run_at_startup=True,
),
]
if KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_ALL or \
KEEP_ARQ_TASK_POOL == KEEP_ARQ_TASK_POOL_AI:
if KEEP_ARQ_TASK_POOL in [KEEP_ARQ_TASK_POOL_ALL, KEEP_ARQ_TASK_POOL_AI]:
cron_jobs.append(
cron(
process_background_ai_task,
Expand Down
3 changes: 3 additions & 0 deletions keep/api/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
KEEP_ARQ_TASK_POOL_ALL = "all" # All arq workers enabled for this service
KEEP_ARQ_TASK_POOL_BASIC_PROCESSING = "basic_processing" # Everything except AI
KEEP_ARQ_TASK_POOL_AI = "ai" # Only AI
# Define queues for different task types
KEEP_ARQ_QUEUE_BASIC = "basic_processing"
KEEP_ARQ_QUEUE_AI = "ai_processing"

REDIS = os.environ.get("REDIS", "false") == "true"
KEEP_ARQ_TASK_POOL = os.environ.get("KEEP_ARQ_TASK_POOL", None)
Expand Down
5 changes: 5 additions & 0 deletions keep/api/routes/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from keep.api.arq_pool import get_pool
from keep.api.bl.enrichments_bl import EnrichmentsBl
from keep.api.consts import KEEP_ARQ_QUEUE_BASIC
from keep.api.core.config import config
from keep.api.core.db import get_alert_audit as get_alert_audit_db
from keep.api.core.db import get_alerts_by_fingerprint, get_enrichment, get_last_alerts
Expand Down Expand Up @@ -286,12 +287,14 @@ async def receive_generic_event(
authenticated_entity.api_key_name,
request.state.trace_id,
event,
_queue_name=KEEP_ARQ_QUEUE_BASIC,
)
logger.info(
"Enqueued job",
extra={
"job_id": job.job_id,
"tenant_id": authenticated_entity.tenant_id,
"queue": KEEP_ARQ_QUEUE_BASIC,
},
)
else:
Expand Down Expand Up @@ -365,12 +368,14 @@ async def receive_event(
authenticated_entity.api_key_name,
trace_id,
event,
_queue_name=KEEP_ARQ_QUEUE_BASIC,
)
logger.info(
"Enqueued job",
extra={
"job_id": job.job_id,
"tenant_id": authenticated_entity.tenant_id,
"queue": KEEP_ARQ_QUEUE_BASIC,
},
)
else:
Expand Down

0 comments on commit 1dc2fb4

Please sign in to comment.