diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index fefbae03220..0e8e59bf5a6 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -255,7 +255,19 @@ def try_creating_indexing_task( custom_task_id = f"{rci.generator_task_id_prefix}_{uuid4()}" - # create the index attempt ... just for tracking purposes + # set a basic fence to start + fence_value = RedisConnectorIndexingFenceData( + index_attempt_id=None, + started=None, + submitted=datetime.now(timezone.utc), + celery_task_id=None, + ) + r.set(rci.fence_key, fence_value.model_dump_json()) + + # create the index attempt for tracking purposes + # code elsewhere checks for index attempts without an associated redis key + # and cleans them up + # therefore we must create the attempt and the task after the fence goes up index_attempt_id = create_index_attempt( cc_pair.id, search_settings.id, @@ -276,17 +288,19 @@ def try_creating_indexing_task( priority=DanswerCeleryPriority.MEDIUM, ) if not result: - return None + raise RuntimeError("send_task for connector_indexing_proxy_task failed.") - # set this only after all tasks have been added + # now fill out the fence with the rest of the data fence_value = RedisConnectorIndexingFenceData( index_attempt_id=index_attempt_id, started=None, submitted=datetime.now(timezone.utc), celery_task_id=result.id, ) + r.set(rci.fence_key, fence_value.model_dump_json()) except Exception: + r.delete(rci.fence_key) task_logger.exception("Unexpected exception") return None finally: @@ -371,6 +385,38 @@ def connector_indexing_task( rci = RedisConnectorIndexing(cc_pair_id, search_settings_id) + while True: + # read related data and evaluate/print task progress + fence_value = cast(bytes, r.get(rci.fence_key)) + if fence_value is None: + task_logger.info( + f"connector_indexing_task: fence_value not found: fence={rci.fence_key}" + ) + raise + + try: + fence_json = fence_value.decode("utf-8") + fence_data = RedisConnectorIndexingFenceData.model_validate_json( + cast(str, fence_json) + ) + except ValueError: + task_logger.exception( + f"connector_indexing_task: fence_data not decodeable: fence={rci.fence_key}" + ) + raise + + if fence_data.index_attempt_id is None or fence_data.celery_task_id is None: + task_logger.info( + f"connector_indexing_task - Waiting for fence: fence={rci.fence_key}" + ) + sleep(1) + continue + + task_logger.info( + f"connector_indexing_task - Fence found, continuing...: fence={rci.fence_key}" + ) + break + lock = r.lock( rci.generator_lock_key, timeout=CELERY_INDEXING_LOCK_TIMEOUT, diff --git a/backend/danswer/background/celery/tasks/shared/tasks.py b/backend/danswer/background/celery/tasks/shared/tasks.py index 6fc0b5f1f67..26f9d1aac10 100644 --- a/backend/danswer/background/celery/tasks/shared/tasks.py +++ b/backend/danswer/background/celery/tasks/shared/tasks.py @@ -21,10 +21,10 @@ class RedisConnectorIndexingFenceData(BaseModel): - index_attempt_id: int + index_attempt_id: int | None started: datetime | None submitted: datetime - celery_task_id: str + celery_task_id: str | None @shared_task( diff --git a/backend/danswer/background/celery/tasks/vespa/tasks.py b/backend/danswer/background/celery/tasks/vespa/tasks.py index 9830d71f778..2d79045c44f 100644 --- a/backend/danswer/background/celery/tasks/vespa/tasks.py +++ b/backend/danswer/background/celery/tasks/vespa/tasks.py @@ -574,6 +574,10 @@ def monitor_ccpair_indexing_taskset( "monitor_ccpair_indexing_taskset: generator_progress_value is not an integer." ) + if fence_data.index_attempt_id is None or fence_data.celery_task_id is None: + # the task is still setting up + return + # Read result state BEFORE generator_complete_key to avoid a race condition result: AsyncResult = AsyncResult(fence_data.celery_task_id) result_state = result.state diff --git a/backend/danswer/connectors/web/connector.py b/backend/danswer/connectors/web/connector.py index 9e0671ea248..9e406b71674 100644 --- a/backend/danswer/connectors/web/connector.py +++ b/backend/danswer/connectors/web/connector.py @@ -373,7 +373,7 @@ def load_from_state(self) -> GenerateDocumentsOutput: page.close() except Exception as e: last_error = f"Failed to fetch '{current_url}': {e}" - logger.error(last_error) + logger.exception(last_error) playwright.stop() restart_playwright = True continue diff --git a/backend/danswer/document_index/vespa/indexing_utils.py b/backend/danswer/document_index/vespa/indexing_utils.py index 28ff31c8071..8ecdc22672b 100644 --- a/backend/danswer/document_index/vespa/indexing_utils.py +++ b/backend/danswer/document_index/vespa/indexing_utils.py @@ -118,7 +118,7 @@ def get_existing_documents_from_chunks( return document_ids -@retry(tries=3, delay=1, backoff=2) +@retry(tries=5, delay=1, backoff=2) def _index_vespa_chunk( chunk: DocMetadataAwareIndexChunk, index_name: str, diff --git a/backend/tests/integration/common_utils/managers/cc_pair.py b/backend/tests/integration/common_utils/managers/cc_pair.py index 99d8a82a2b3..94114298507 100644 --- a/backend/tests/integration/common_utils/managers/cc_pair.py +++ b/backend/tests/integration/common_utils/managers/cc_pair.py @@ -246,17 +246,17 @@ def wait_for_indexing( fetched_cc_pair.last_success and fetched_cc_pair.last_success > after ): - print(f"CC pair {cc_pair.id} indexing complete.") + print(f"Indexing complete: cc_pair={cc_pair.id}") return elapsed = time.monotonic() - start if elapsed > timeout: raise TimeoutError( - f"CC pair {cc_pair.id} indexing was not completed within {timeout} seconds" + f"Indexing wait timed out: cc_pair={cc_pair.id} timeout={timeout}s" ) print( - f"CC pair {cc_pair.id} indexing to complete. elapsed={elapsed:.2f} timeout={timeout}" + f"Indexing wait for completion: cc_pair={cc_pair.id} elapsed={elapsed:.2f} timeout={timeout}s" ) time.sleep(5)