Skip to content

Commit

Permalink
hotfix merge
Browse files Browse the repository at this point in the history
  • Loading branch information
rkuo-danswer committed Nov 19, 2024
1 parent 574ef47 commit e860f15
Show file tree
Hide file tree
Showing 16 changed files with 532 additions and 107 deletions.
21 changes: 21 additions & 0 deletions backend/danswer/background/celery/apps/primary.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
import danswer.background.celery.apps.app_base as app_base
from danswer.background.celery.apps.app_base import task_logger
from danswer.background.celery.celery_utils import celery_is_worker_primary
from danswer.background.celery.tasks.vespa.tasks import get_unfenced_index_attempt_ids
from danswer.configs.constants import CELERY_PRIMARY_WORKER_LOCK_TIMEOUT
from danswer.configs.constants import DanswerRedisLocks
from danswer.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME
from danswer.db.engine import get_session_with_default_tenant
from danswer.db.engine import SqlEngine
from danswer.db.index_attempt import get_index_attempt
from danswer.db.index_attempt import mark_attempt_failed
from danswer.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
from danswer.redis.redis_connector_delete import RedisConnectorDelete
from danswer.redis.redis_connector_index import RedisConnectorIndex
Expand Down Expand Up @@ -134,6 +138,23 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:

RedisConnectorStop.reset_all(r)

# mark orphaned index attempts as failed
with get_session_with_default_tenant() as db_session:
unfenced_attempt_ids = get_unfenced_index_attempt_ids(db_session, r)
for attempt_id in unfenced_attempt_ids:
attempt = get_index_attempt(db_session, attempt_id)
if not attempt:
continue

failure_reason = (
f"Orphaned index attempt found on startup: "
f"index_attempt={attempt.id} "
f"cc_pair={attempt.connector_credential_pair_id} "
f"search_settings={attempt.search_settings_id}"
)
logger.warning(failure_reason)
mark_attempt_failed(attempt.id, db_session, failure_reason)


@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from datetime import datetime
from datetime import timezone

import redis
from celery import Celery
from celery import shared_task
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from redis import Redis
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session

from danswer.background.celery.apps.app_base import task_logger
Expand Down Expand Up @@ -87,7 +87,7 @@ def try_generate_document_cc_pair_cleanup_tasks(
cc_pair_id: int,
db_session: Session,
r: Redis,
lock_beat: redis.lock.Lock,
lock_beat: RedisLock,
tenant_id: str | None,
) -> int | None:
"""Returns an int if syncing is needed. The int represents the number of sync tasks generated.
Expand Down
31 changes: 24 additions & 7 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
from http import HTTPStatus
from time import sleep

import redis
import sentry_sdk
from celery import Celery
from celery import shared_task
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from redis import Redis
from redis.exceptions import LockError
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session

from danswer.background.celery.apps.app_base import task_logger
Expand Down Expand Up @@ -44,7 +45,7 @@
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder
from danswer.redis.redis_connector import RedisConnector
from danswer.redis.redis_connector_index import RedisConnectorIndexingFenceData
from danswer.redis.redis_connector_index import RedisConnectorIndexPayload
from danswer.redis.redis_pool import get_redis_client
from danswer.utils.logger import setup_logger
from danswer.utils.variable_functionality import global_version
Expand All @@ -61,22 +62,38 @@ def __init__(
self,
stop_key: str,
generator_progress_key: str,
redis_lock: redis.lock.Lock,
redis_lock: RedisLock,
redis_client: Redis,
):
super().__init__()
self.redis_lock: redis.lock.Lock = redis_lock
self.redis_lock: RedisLock = redis_lock
self.stop_key: str = stop_key
self.generator_progress_key: str = generator_progress_key
self.redis_client = redis_client
self.started: datetime = datetime.now(timezone.utc)
self.redis_lock.reacquire()

self.last_lock_reacquire: datetime = datetime.now(timezone.utc)

def should_stop(self) -> bool:
if self.redis_client.exists(self.stop_key):
return True
return False

def progress(self, amount: int) -> None:
self.redis_lock.reacquire()
try:
self.redis_lock.reacquire()
self.last_lock_reacquire = datetime.now(timezone.utc)
except LockError:
logger.exception(
f"RunIndexingCallback - lock.reacquire exceptioned. "
f"lock_timeout={self.redis_lock.timeout} "
f"start={self.started} "
f"last_reacquired={self.last_lock_reacquire} "
f"now={datetime.now(timezone.utc)}"
)
raise

self.redis_client.incrby(self.generator_progress_key, amount)


Expand Down Expand Up @@ -325,7 +342,7 @@ def try_creating_indexing_task(
redis_connector_index.generator_clear()

# set a basic fence to start
payload = RedisConnectorIndexingFenceData(
payload = RedisConnectorIndexPayload(
index_attempt_id=None,
started=None,
submitted=datetime.now(timezone.utc),
Expand Down Expand Up @@ -368,7 +385,7 @@ def try_creating_indexing_task(
redis_connector_index.set_fence(payload)

except Exception:
redis_connector_index.set_fence(payload)
redis_connector_index.set_fence(None)
task_logger.exception(
f"Unexpected exception: "
f"tenant={tenant_id} "
Expand Down
Loading

0 comments on commit e860f15

Please sign in to comment.