Skip to content

Commit

Permalink
Add queued mapping state
Browse files Browse the repository at this point in the history
  • Loading branch information
sallybg committed Sep 13, 2024
1 parent e718cb4 commit 478671d
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 2 deletions.
1 change: 1 addition & 0 deletions alembic/versions/d7e6f8c3b9dc_scoreset_mapping_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def upgrade():
"complete",
"pending_variant_processing",
"not_attempted",
"queued",
name="mappingstate",
native_enum=False,
create_constraint=True,
Expand Down
1 change: 1 addition & 0 deletions src/mavedb/models/enums/mapping_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ class MappingState(enum.Enum):
complete = "complete"
pending_variant_processing = "pending_variant_processing"
not_attempted = "not_attempted"
queued = "queued"
13 changes: 11 additions & 2 deletions src/mavedb/worker/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ async def create_variants_for_score_set(

await redis.lpush(MAPPING_QUEUE_NAME, score_set_urn) # type: ignore
await redis.enqueue_job("variant_mapper_manager", correlation_id, score_set_urn, updater_id)
score_set.mapping_state = MappingState.queued
finally:
db.add(score_set)
db.commit()
Expand Down Expand Up @@ -275,7 +276,6 @@ async def map_variants_for_score_set(

except Exception as e:
db.rollback()
score_set.mapping_state = MappingState.failed
score_set.mapping_errors = {
"error_message": f"Encountered an internal server error during mapping. Mapping will be automatically retried up to 5 times for this score set (attempt {attempt}/5)."
}
Expand Down Expand Up @@ -305,6 +305,10 @@ async def map_variants_for_score_set(
logging_context["backoff_job_id"] = new_job_id

except Exception as backoff_e:
score_set.mapping_state = MappingState.failed
score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"}
db.add(score_set)
db.commit()
send_slack_message(backoff_e)
logging_context = {**logging_context, **format_raised_exception_info_as_dict(backoff_e)}
logger.critical(
Expand All @@ -313,6 +317,9 @@ async def map_variants_for_score_set(
)
else:
if new_job_id and not max_retries_exceeded:
score_set.mapping_state = MappingState.queued
db.add(score_set)
db.commit()
logger.info(
msg="After encountering an error while mapping variants, another mapping job was queued.",
extra=logging_context,
Expand Down Expand Up @@ -463,7 +470,6 @@ async def map_variants_for_score_set(

except Exception as e:
db.rollback()
score_set.mapping_state = MappingState.failed
score_set.mapping_errors = {
"error_message": f"Encountered an unexpected error while parsing mapped variants. Mapping will be automatically retried up to 5 times for this score set (attempt {attempt}/5)."
}
Expand Down Expand Up @@ -505,6 +511,9 @@ async def map_variants_for_score_set(
)
else:
if new_job_id and not max_retries_exceeded:
score_set.mapping_state = MappingState.queued
db.add(score_set)
db.commit()
logger.info(
msg="After encountering an error while parsing mapped variants, another mapping job was queued.",
extra=logging_context,
Expand Down

0 comments on commit 478671d

Please sign in to comment.