diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index 7d2e6e0b3dd..cd6126223c4 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -173,7 +173,9 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: ) if attempt_id: task_logger.info( - f"Indexing queued: cc_pair={cc_pair.id} index_attempt={attempt_id}" + f"Indexing queued: index_attempt={attempt_id} " + f"cc_pair={cc_pair.id} " + f"search_settings={search_settings_instance.id} " ) tasks_created += 1 except SoftTimeLimitExceeded: @@ -489,7 +491,7 @@ def connector_indexing_task( f"search_settings={search_settings_id}" ) - attempt = None + attempt_found = False n_final_progress: int | None = None redis_connector = RedisConnector(tenant_id, cc_pair_id) @@ -529,6 +531,13 @@ def connector_indexing_task( sleep(1) continue + if payload.index_attempt_id != index_attempt_id: + raise ValueError( + f"connector_indexing_task - id mismatch. Task may be left over from previous run.: " + f"task_index_attempt={index_attempt_id} " + f"payload_index_attempt={payload.index_attempt_id}" + ) + logger.info( f"connector_indexing_task - Fence found, continuing...: fence={redis_connector_index.fence_key}" ) @@ -557,6 +566,7 @@ def connector_indexing_task( raise ValueError( f"Index attempt not found: index_attempt={index_attempt_id}" ) + attempt_found = True cc_pair = get_connector_credential_pair_from_id( cc_pair_id=cc_pair_id, @@ -576,32 +586,32 @@ def connector_indexing_task( f"Credential not found: cc_pair={cc_pair_id} credential={cc_pair.credential_id}" ) - # define a callback class - callback = RunIndexingCallback( - redis_connector.stop.fence_key, - redis_connector_index.generator_progress_key, - lock, - r, - ) + # define a callback class + callback = RunIndexingCallback( + redis_connector.stop.fence_key, + redis_connector_index.generator_progress_key, + lock, + r, + ) - logger.info( - f"Indexing spawned task running entrypoint: attempt={index_attempt_id} " - f"tenant={tenant_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id}" - ) + logger.info( + f"Indexing spawned task running entrypoint: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) - run_indexing_entrypoint( - index_attempt_id, - tenant_id, - cc_pair_id, - is_ee, - callback=callback, - ) + run_indexing_entrypoint( + index_attempt_id, + tenant_id, + cc_pair_id, + is_ee, + callback=callback, + ) - # get back the total number of indexed docs and return it - n_final_progress = redis_connector_index.get_progress() - redis_connector_index.set_generator_complete(HTTPStatus.OK.value) + # get back the total number of indexed docs and return it + n_final_progress = redis_connector_index.get_progress() + redis_connector_index.set_generator_complete(HTTPStatus.OK.value) except Exception as e: logger.exception( f"Indexing spawned task failed: attempt={index_attempt_id} " @@ -609,11 +619,10 @@ def connector_indexing_task( f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id}" ) - if attempt: + if attempt_found: with get_session_with_tenant(tenant_id) as db_session: - mark_attempt_failed(attempt, db_session, failure_reason=str(e)) + mark_attempt_failed(index_attempt_id, db_session, failure_reason=str(e)) - redis_connector_index.reset() raise e finally: if lock.owned(): diff --git a/backend/danswer/background/celery/tasks/vespa/tasks.py b/backend/danswer/background/celery/tasks/vespa/tasks.py index 944750c7a78..b01a0eac815 100644 --- a/backend/danswer/background/celery/tasks/vespa/tasks.py +++ b/backend/danswer/background/celery/tasks/vespa/tasks.py @@ -610,7 +610,7 @@ def monitor_ccpair_indexing_taskset( index_attempt = get_index_attempt(db_session, payload.index_attempt_id) if index_attempt: mark_attempt_failed( - index_attempt=index_attempt, + index_attempt_id=payload.index_attempt_id, db_session=db_session, failure_reason="Connector indexing aborted or exceptioned.", ) @@ -690,13 +690,18 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool: for a in attempts: # if attempts exist in the db but we don't detect them in redis, mark them as failed - failure_reason = f"Unknown index attempt {a.id}. Might be left over from a process restart." - if not r.exists( - RedisConnectorIndex.fence_key_with_ids( - a.connector_credential_pair_id, a.search_settings_id + fence_key = RedisConnectorIndex.fence_key_with_ids( + a.connector_credential_pair_id, a.search_settings_id + ) + if not r.exists(fence_key): + failure_reason = ( + f"Unknown index attempt. Might be left over from a process restart: " + f"index_attempt={a.id} " + f"cc_pair={a.connector_credential_pair_id} " + f"search_settings={a.search_settings_id}" ) - ): - mark_attempt_failed(a, db_session, failure_reason=failure_reason) + task_logger.warning(failure_reason) + mark_attempt_failed(a.id, db_session, failure_reason=failure_reason) lock_beat.reacquire() if r.exists(RedisConnectorCredentialPair.get_fence_key()): diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index 252309e3faa..35cb080b903 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -337,7 +337,7 @@ def _run_indexing( or index_attempt.status != IndexingStatus.IN_PROGRESS ): mark_attempt_failed( - index_attempt, + index_attempt.id, db_session, failure_reason=str(e), full_exception_trace=traceback.format_exc(), @@ -372,7 +372,7 @@ def _run_indexing( and index_attempt_md.num_exceptions >= batch_num ): mark_attempt_failed( - index_attempt, + index_attempt.id, db_session, failure_reason="All batches exceptioned.", ) diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py index 45a07387949..b9c3d9d4ca2 100644 --- a/backend/danswer/db/index_attempt.py +++ b/backend/danswer/db/index_attempt.py @@ -219,7 +219,7 @@ def mark_attempt_partially_succeeded( def mark_attempt_failed( - index_attempt: IndexAttempt, + index_attempt_id: int, db_session: Session, failure_reason: str = "Unknown", full_exception_trace: str | None = None, @@ -227,7 +227,7 @@ def mark_attempt_failed( try: attempt = db_session.execute( select(IndexAttempt) - .where(IndexAttempt.id == index_attempt.id) + .where(IndexAttempt.id == index_attempt_id) .with_for_update() ).scalar_one()