diff --git a/backend/danswer/background/celery/tasks/indexing/tasks.py b/backend/danswer/background/celery/tasks/indexing/tasks.py index 7c1460cbddf..9f791aec3ba 100644 --- a/backend/danswer/background/celery/tasks/indexing/tasks.py +++ b/backend/danswer/background/celery/tasks/indexing/tasks.py @@ -388,7 +388,12 @@ def connector_indexing_proxy_task( tenant_id: str | None, ) -> None: """celery tasks are forked, but forking is unstable. This proxies work to a spawned task.""" - + task_logger.info( + f"Indexing proxy - starting: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) client = SimpleJobClient() job = client.submit( @@ -402,29 +407,56 @@ def connector_indexing_proxy_task( ) if not job: + task_logger.info( + f"Indexing proxy - spawn failed: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) return + task_logger.info( + f"Indexing proxy - spawn succeeded: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) + while True: sleep(10) - with get_session_with_tenant(tenant_id) as db_session: - index_attempt = get_index_attempt( - db_session=db_session, index_attempt_id=index_attempt_id - ) - # do nothing for ongoing jobs that haven't been stopped - if not job.done(): + # do nothing for ongoing jobs that haven't been stopped + if not job.done(): + with get_session_with_tenant(tenant_id) as db_session: + index_attempt = get_index_attempt( + db_session=db_session, index_attempt_id=index_attempt_id + ) + if not index_attempt: continue if not index_attempt.is_finished(): continue - if job.status == "error": - logger.error(job.exception()) + if job.status == "error": + task_logger.error( + f"Indexing proxy - spawned task exceptioned: " + f"attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id} " + f"error={job.exception()}" + ) - job.release() - break + job.release() + break + task_logger.info( + f"Indexing proxy - finished: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) return @@ -446,7 +478,17 @@ def connector_indexing_task( Returns None if the task did not run (possibly due to a conflict). Otherwise, returns an int >= 0 representing the number of indexed docs. + + NOTE: if an exception is raised out of this task, the primary worker will detect + that the task transitioned to a "READY" state but the generator_complete_key doesn't exist. + This will cause the primary worker to abort the indexing attempt and clean up. """ + logger.info( + f"Indexing spawned task starting: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) attempt = None n_final_progress = 0 @@ -485,19 +527,19 @@ def connector_indexing_task( cast(str, fence_json) ) except ValueError: - task_logger.exception( + logger.exception( f"connector_indexing_task: fence_data not decodeable: fence={rci.fence_key}" ) raise if fence_data.index_attempt_id is None or fence_data.celery_task_id is None: - task_logger.info( + logger.info( f"connector_indexing_task - Waiting for fence: fence={rci.fence_key}" ) sleep(1) continue - task_logger.info( + logger.info( f"connector_indexing_task - Fence found, continuing...: fence={rci.fence_key}" ) break @@ -509,7 +551,7 @@ def connector_indexing_task( acquired = lock.acquire(blocking=False) if not acquired: - task_logger.warning( + logger.warning( f"Indexing task already running, exiting...: " f"cc_pair={cc_pair_id} search_settings={search_settings_id}" ) @@ -552,6 +594,13 @@ def connector_indexing_task( rcs.fence_key, rci.generator_progress_key, lock, r ) + logger.info( + f"Indexing spawned task running entrypoint: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) + run_indexing_entrypoint( index_attempt_id, tenant_id, @@ -570,7 +619,12 @@ def connector_indexing_task( r.set(rci.generator_complete_key, HTTPStatus.OK.value) except Exception as e: - task_logger.exception(f"Indexing failed: cc_pair={cc_pair_id}") + logger.exception( + f"Indexing spawned task failed: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) if attempt: with get_session_with_tenant(tenant_id) as db_session: mark_attempt_failed(attempt, db_session, failure_reason=str(e)) @@ -584,4 +638,10 @@ def connector_indexing_task( if lock.owned(): lock.release() + logger.info( + f"Indexing spawned task finished: attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) return n_final_progress