diff --git a/backend/danswer/background/celery/apps/primary.py b/backend/danswer/background/celery/apps/primary.py index 44080337650..cab251c6fb3 100644 --- a/backend/danswer/background/celery/apps/primary.py +++ b/backend/danswer/background/celery/apps/primary.py @@ -39,7 +39,6 @@ from danswer.utils.logger import setup_logger from shared_configs.configs import MULTI_TENANT - logger = setup_logger() celery_app = Celery(__name__) @@ -117,9 +116,13 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: # it is planned to use this lock to enforce singleton behavior on the primary # worker, since the primary worker does redis cleanup on startup, but this isn't # implemented yet. + + # set thread_local=False since we don't control what thread the periodic task might + # reacquire the lock with lock: RedisLock = r.lock( DanswerRedisLocks.PRIMARY_WORKER, timeout=CELERY_PRIMARY_WORKER_LOCK_TIMEOUT, + thread_local=False, ) logger.info("Primary worker lock: Acquire starting.") diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index 57bca336f09..ed370c5bcdb 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -789,9 +789,12 @@ def connector_indexing_task( ) break + # set thread_local=False since we don't control what thread the indexing/pruning + # might run our callback with lock: RedisLock = r.lock( redis_connector_index.generator_lock_key, timeout=CELERY_INDEXING_LOCK_TIMEOUT, + thread_local=False, ) acquired = lock.acquire(blocking=False) diff --git a/backend/danswer/background/celery/tasks/pruning/tasks.py b/backend/danswer/background/celery/tasks/pruning/tasks.py index be2ee4ba418..5497f1211a3 100644 --- a/backend/danswer/background/celery/tasks/pruning/tasks.py +++ b/backend/danswer/background/celery/tasks/pruning/tasks.py @@ -8,6 +8,7 @@ from celery import Task from celery.exceptions import SoftTimeLimitExceeded from redis import Redis +from redis.lock import Lock as RedisLock from sqlalchemy.orm import Session from danswer.background.celery.apps.app_base import task_logger @@ -239,9 +240,12 @@ def connector_pruning_generator_task( r = get_redis_client(tenant_id=tenant_id) - lock = r.lock( + # set thread_local=False since we don't control what thread the indexing/pruning + # might run our callback with + lock: RedisLock = r.lock( DanswerRedisLocks.PRUNING_LOCK_PREFIX + f"_{redis_connector.id}", timeout=CELERY_PRUNING_LOCK_TIMEOUT, + thread_local=False, ) acquired = lock.acquire(blocking=False)