Skip to content

Commit

Permalink
Merge pull request #3071 from danswer-ai/hotfix/v0.12-stale-tasks
Browse files Browse the repository at this point in the history
Hotfix/v0.12 stale tasks
  • Loading branch information
rkuo-danswer authored Nov 6, 2024
2 parents b2c55eb + 93ec2a6 commit 593b18f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 39 deletions.
65 changes: 37 additions & 28 deletions backend/danswer/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
)
if attempt_id:
task_logger.info(
f"Indexing queued: cc_pair={cc_pair.id} index_attempt={attempt_id}"
f"Indexing queued: index_attempt={attempt_id} "
f"cc_pair={cc_pair.id} "
f"search_settings={search_settings_instance.id} "
)
tasks_created += 1
except SoftTimeLimitExceeded:
Expand Down Expand Up @@ -489,7 +491,7 @@ def connector_indexing_task(
f"search_settings={search_settings_id}"
)

attempt = None
attempt_found = False
n_final_progress: int | None = None

redis_connector = RedisConnector(tenant_id, cc_pair_id)
Expand Down Expand Up @@ -529,6 +531,13 @@ def connector_indexing_task(
sleep(1)
continue

if payload.index_attempt_id != index_attempt_id:
raise ValueError(
f"connector_indexing_task - id mismatch. Task may be left over from previous run.: "
f"task_index_attempt={index_attempt_id} "
f"payload_index_attempt={payload.index_attempt_id}"
)

logger.info(
f"connector_indexing_task - Fence found, continuing...: fence={redis_connector_index.fence_key}"
)
Expand Down Expand Up @@ -557,6 +566,7 @@ def connector_indexing_task(
raise ValueError(
f"Index attempt not found: index_attempt={index_attempt_id}"
)
attempt_found = True

cc_pair = get_connector_credential_pair_from_id(
cc_pair_id=cc_pair_id,
Expand All @@ -576,44 +586,43 @@ def connector_indexing_task(
f"Credential not found: cc_pair={cc_pair_id} credential={cc_pair.credential_id}"
)

# define a callback class
callback = RunIndexingCallback(
redis_connector.stop.fence_key,
redis_connector_index.generator_progress_key,
lock,
r,
)
# define a callback class
callback = RunIndexingCallback(
redis_connector.stop.fence_key,
redis_connector_index.generator_progress_key,
lock,
r,
)

logger.info(
f"Indexing spawned task running entrypoint: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
logger.info(
f"Indexing spawned task running entrypoint: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)

run_indexing_entrypoint(
index_attempt_id,
tenant_id,
cc_pair_id,
is_ee,
callback=callback,
)
run_indexing_entrypoint(
index_attempt_id,
tenant_id,
cc_pair_id,
is_ee,
callback=callback,
)

# get back the total number of indexed docs and return it
n_final_progress = redis_connector_index.get_progress()
redis_connector_index.set_generator_complete(HTTPStatus.OK.value)
# get back the total number of indexed docs and return it
n_final_progress = redis_connector_index.get_progress()
redis_connector_index.set_generator_complete(HTTPStatus.OK.value)
except Exception as e:
logger.exception(
f"Indexing spawned task failed: attempt={index_attempt_id} "
f"tenant={tenant_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id}"
)
if attempt:
if attempt_found:
with get_session_with_tenant(tenant_id) as db_session:
mark_attempt_failed(attempt, db_session, failure_reason=str(e))
mark_attempt_failed(index_attempt_id, db_session, failure_reason=str(e))

redis_connector_index.reset()
raise e
finally:
if lock.owned():
Expand Down
19 changes: 12 additions & 7 deletions backend/danswer/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ def monitor_ccpair_indexing_taskset(
index_attempt = get_index_attempt(db_session, payload.index_attempt_id)
if index_attempt:
mark_attempt_failed(
index_attempt=index_attempt,
index_attempt_id=payload.index_attempt_id,
db_session=db_session,
failure_reason="Connector indexing aborted or exceptioned.",
)
Expand Down Expand Up @@ -690,13 +690,18 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:

for a in attempts:
# if attempts exist in the db but we don't detect them in redis, mark them as failed
failure_reason = f"Unknown index attempt {a.id}. Might be left over from a process restart."
if not r.exists(
RedisConnectorIndex.fence_key_with_ids(
a.connector_credential_pair_id, a.search_settings_id
fence_key = RedisConnectorIndex.fence_key_with_ids(
a.connector_credential_pair_id, a.search_settings_id
)
if not r.exists(fence_key):
failure_reason = (
f"Unknown index attempt. Might be left over from a process restart: "
f"index_attempt={a.id} "
f"cc_pair={a.connector_credential_pair_id} "
f"search_settings={a.search_settings_id}"
)
):
mark_attempt_failed(a, db_session, failure_reason=failure_reason)
task_logger.warning(failure_reason)
mark_attempt_failed(a.id, db_session, failure_reason=failure_reason)

lock_beat.reacquire()
if r.exists(RedisConnectorCredentialPair.get_fence_key()):
Expand Down
4 changes: 2 additions & 2 deletions backend/danswer/background/indexing/run_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ def _run_indexing(
or index_attempt.status != IndexingStatus.IN_PROGRESS
):
mark_attempt_failed(
index_attempt,
index_attempt.id,
db_session,
failure_reason=str(e),
full_exception_trace=traceback.format_exc(),
Expand Down Expand Up @@ -372,7 +372,7 @@ def _run_indexing(
and index_attempt_md.num_exceptions >= batch_num
):
mark_attempt_failed(
index_attempt,
index_attempt.id,
db_session,
failure_reason="All batches exceptioned.",
)
Expand Down
4 changes: 2 additions & 2 deletions backend/danswer/db/index_attempt.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,15 @@ def mark_attempt_partially_succeeded(


def mark_attempt_failed(
index_attempt: IndexAttempt,
index_attempt_id: int,
db_session: Session,
failure_reason: str = "Unknown",
full_exception_trace: str | None = None,
) -> None:
try:
attempt = db_session.execute(
select(IndexAttempt)
.where(IndexAttempt.id == index_attempt.id)
.where(IndexAttempt.id == index_attempt_id)
.with_for_update()
).scalar_one()

Expand Down

0 comments on commit 593b18f

Please sign in to comment.