Skip to content

Commit

Permalink
Merge pull request #3016 from danswer-ai/hotfix/v0.11-indexing-logs
Browse files Browse the repository at this point in the history
Merge hotfix/v0.11-indexing-logs into release/v0.11
  • Loading branch information
rkuo-danswer authored Oct 31, 2024
2 parents 8f063e1 + 1df9170 commit 06c35d9
Showing 1 changed file with 76 additions and 16 deletions.
92 changes: 76 additions & 16 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,12 @@ def connector_indexing_proxy_task(
tenant_id: str | None,
) -> None:
"""celery tasks are forked, but forking is unstable. This proxies work to a spawned task."""

task_logger.info(
f"Indexing proxy - starting: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
client = SimpleJobClient()

job = client.submit(
Expand All @@ -402,29 +407,56 @@ def connector_indexing_proxy_task(
)

if not job:
task_logger.info(
f"Indexing proxy - spawn failed: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
return

task_logger.info(
f"Indexing proxy - spawn succeeded: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)

while True:
sleep(10)
with get_session_with_tenant(tenant_id) as db_session:
index_attempt = get_index_attempt(
db_session=db_session, index_attempt_id=index_attempt_id
)

# do nothing for ongoing jobs that haven't been stopped
if not job.done():
# do nothing for ongoing jobs that haven't been stopped
if not job.done():
with get_session_with_tenant(tenant_id) as db_session:
index_attempt = get_index_attempt(
db_session=db_session, index_attempt_id=index_attempt_id
)

if not index_attempt:
continue

if not index_attempt.is_finished():
continue

if job.status == "error":
logger.error(job.exception())
if job.status == "error":
task_logger.error(
f"Indexing proxy - spawned task exceptioned: "
f"attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"error={job.exception()}"
)

job.release()
break
job.release()
break

task_logger.info(
f"Indexing proxy - finished: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
return


Expand All @@ -446,7 +478,17 @@ def connector_indexing_task(
Returns None if the task did not run (possibly due to a conflict).
Otherwise, returns an int >= 0 representing the number of indexed docs.
NOTE: if an exception is raised out of this task, the primary worker will detect
that the task transitioned to a "READY" state but the generator_complete_key doesn't exist.
This will cause the primary worker to abort the indexing attempt and clean up.
"""
logger.info(
f"Indexing spawned task starting: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)

attempt = None
n_final_progress = 0
Expand Down Expand Up @@ -485,19 +527,19 @@ def connector_indexing_task(
cast(str, fence_json)
)
except ValueError:
task_logger.exception(
logger.exception(
f"connector_indexing_task: fence_data not decodeable: fence={rci.fence_key}"
)
raise

if fence_data.index_attempt_id is None or fence_data.celery_task_id is None:
task_logger.info(
logger.info(
f"connector_indexing_task - Waiting for fence: fence={rci.fence_key}"
)
sleep(1)
continue

task_logger.info(
logger.info(
f"connector_indexing_task - Fence found, continuing...: fence={rci.fence_key}"
)
break
Expand All @@ -509,7 +551,7 @@ def connector_indexing_task(

acquired = lock.acquire(blocking=False)
if not acquired:
task_logger.warning(
logger.warning(
f"Indexing task already running, exiting...: "
f"cc_pair={cc_pair_id} search_settings={search_settings_id}"
)
Expand Down Expand Up @@ -552,6 +594,13 @@ def connector_indexing_task(
rcs.fence_key, rci.generator_progress_key, lock, r
)

logger.info(
f"Indexing spawned task running entrypoint: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)

run_indexing_entrypoint(
index_attempt_id,
tenant_id,
Expand All @@ -570,7 +619,12 @@ def connector_indexing_task(

r.set(rci.generator_complete_key, HTTPStatus.OK.value)
except Exception as e:
task_logger.exception(f"Indexing failed: cc_pair={cc_pair_id}")
logger.exception(
f"Indexing spawned task failed: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
if attempt:
with get_session_with_tenant(tenant_id) as db_session:
mark_attempt_failed(attempt, db_session, failure_reason=str(e))
Expand All @@ -584,4 +638,10 @@ def connector_indexing_task(
if lock.owned():
lock.release()

logger.info(
f"Indexing spawned task finished: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
return n_final_progress

0 comments on commit 06c35d9

Please sign in to comment.