Skip to content

Commit

Permalink
Merge pull request #3334 from danswer-ai/hotfix/v0.16-redis-thread-local
Browse files Browse the repository at this point in the history
Merge hotfix/v0.16-redis-thread-local into release/v0.16
  • Loading branch information
rkuo-danswer authored Dec 4, 2024
2 parents 6c2269e + 55b9514 commit c216406
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 2 deletions.
5 changes: 4 additions & 1 deletion backend/danswer/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from danswer.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT


logger = setup_logger()

celery_app = Celery(__name__)
Expand Down Expand Up @@ -117,9 +116,13 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
# it is planned to use this lock to enforce singleton behavior on the primary
# worker, since the primary worker does redis cleanup on startup, but this isn't
# implemented yet.

# set thread_local=False since we don't control what thread the periodic task might
# reacquire the lock with
lock: RedisLock = r.lock(
DanswerRedisLocks.PRIMARY_WORKER,
timeout=CELERY_PRIMARY_WORKER_LOCK_TIMEOUT,
thread_local=False,
)

logger.info("Primary worker lock: Acquire starting.")
Expand Down
3 changes: 3 additions & 0 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,9 +789,12 @@ def connector_indexing_task(
)
break

# set thread_local=False since we don't control what thread the indexing/pruning
# might run our callback with
lock: RedisLock = r.lock(
redis_connector_index.generator_lock_key,
timeout=CELERY_INDEXING_LOCK_TIMEOUT,
thread_local=False,
)

acquired = lock.acquire(blocking=False)
Expand Down
6 changes: 5 additions & 1 deletion backend/danswer/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from redis import Redis
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session

from danswer.background.celery.apps.app_base import task_logger
Expand Down Expand Up @@ -239,9 +240,12 @@ def connector_pruning_generator_task(

r = get_redis_client(tenant_id=tenant_id)

lock = r.lock(
# set thread_local=False since we don't control what thread the indexing/pruning
# might run our callback with
lock: RedisLock = r.lock(
DanswerRedisLocks.PRUNING_LOCK_PREFIX + f"_{redis_connector.id}",
timeout=CELERY_PRUNING_LOCK_TIMEOUT,
thread_local=False,
)

acquired = lock.acquire(blocking=False)
Expand Down

0 comments on commit c216406

Please sign in to comment.