diff --git a/backend/danswer/background/celery/apps/beat.py b/backend/danswer/background/celery/apps/beat.py index 8ddc17efc52..c896760cb99 100644 --- a/backend/danswer/background/celery/apps/beat.py +++ b/backend/danswer/background/celery/apps/beat.py @@ -21,6 +21,8 @@ @beat_init.connect def on_beat_init(sender: Any, **kwargs: Any) -> None: logger.info("beat_init signal received.") + + # celery beat shouldn't touch the db at all. But just setting a low minimum here. SqlEngine.set_app_name(POSTGRES_CELERY_BEAT_APP_NAME) SqlEngine.init_engine(pool_size=2, max_overflow=0) app_base.wait_for_redis(sender, **kwargs) diff --git a/backend/danswer/background/celery/apps/heavy.py b/backend/danswer/background/celery/apps/heavy.py index ba53776bedb..4bd95162b3a 100644 --- a/backend/danswer/background/celery/apps/heavy.py +++ b/backend/danswer/background/celery/apps/heavy.py @@ -58,7 +58,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME) - SqlEngine.init_engine(pool_size=8, max_overflow=0) + SqlEngine.init_engine(pool_size=4, max_overflow=12) app_base.wait_for_redis(sender, **kwargs) app_base.on_secondary_worker_init(sender, **kwargs) diff --git a/backend/danswer/background/celery/apps/primary.py b/backend/danswer/background/celery/apps/primary.py index 983b76773ed..a8ebb161cec 100644 --- a/backend/danswer/background/celery/apps/primary.py +++ b/backend/danswer/background/celery/apps/primary.py @@ -166,19 +166,6 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: r.delete(key) -# @worker_process_init.connect -# def on_worker_process_init(sender: Any, **kwargs: Any) -> None: -# """This only runs inside child processes when the worker is in pool=prefork mode. -# This may be technically unnecessary since we're finding prefork pools to be -# unstable and currently aren't planning on using them.""" -# logger.info("worker_process_init signal received.") -# SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME) -# SqlEngine.init_engine(pool_size=5, max_overflow=0) - -# # https://stackoverflow.com/questions/43944787/sqlalchemy-celery-with-scoped-session-error -# SqlEngine.get_engine().dispose(close=False) - - @worker_ready.connect def on_worker_ready(sender: Any, **kwargs: Any) -> None: app_base.on_worker_ready(sender, **kwargs) diff --git a/backend/danswer/background/indexing/job_client.py b/backend/danswer/background/indexing/job_client.py index 68d706895fd..6808a52c5ca 100644 --- a/backend/danswer/background/indexing/job_client.py +++ b/backend/danswer/background/indexing/job_client.py @@ -11,7 +11,8 @@ from typing import Literal from typing import Optional -from danswer.db.engine import get_sqlalchemy_engine +from danswer.configs.constants import POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME +from danswer.db.engine import SqlEngine from danswer.utils.logger import setup_logger logger = setup_logger() @@ -37,7 +38,9 @@ def _initializer( if kwargs is None: kwargs = {} - get_sqlalchemy_engine().dispose(close=False) + logger.info("Initializing spawned worker child process.") + SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME) + SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60) return func(*args, **kwargs)