Skip to content

Commit

Permalink
Consolidate Exception Handling for Exceptions Handled Similarly
Browse files Browse the repository at this point in the history
  • Loading branch information
bencap committed Sep 13, 2024
1 parent 4f86f27 commit 1a7921c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 67 deletions.
4 changes: 4 additions & 0 deletions src/mavedb/lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,7 @@ class NonexistentMappingReferenceError(ValueError):
"""Raised when score set mapping results do not contain a valid reference sequence"""

pass


class MappingEnqueueError(ValueError):
"""Raised when a mapping job fails to be enqueued despite appearing as if it should have been"""
95 changes: 28 additions & 67 deletions src/mavedb/worker/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Session

from mavedb.lib.exceptions import NonexistentMappingReferenceError, NonexistentMappingResultsError
from mavedb.lib.exceptions import NonexistentMappingReferenceError, NonexistentMappingResultsError, MappingEnqueueError
from mavedb.lib.score_sets import (
columns_for_dataset,
create_variants,
Expand Down Expand Up @@ -199,6 +199,7 @@ async def create_variants_for_score_set(
msg="Encountered an unhandled exception while creating variants for score set.", extra=logging_context
)

# Don't raise BaseExceptions so we may emit canonical logs (TODO: Perhaps they are so problematic we want to raise them anyway).
return {"success": False}

else:
Expand Down Expand Up @@ -226,6 +227,7 @@ async def map_variants_for_score_set(
) -> dict:
async with mapping_in_execution(redis=ctx["redis"], job_id=ctx["job_id"]):
logging_context = {}
score_set = None
try:
db: Session = ctx["db"]
redis: ArqRedis = ctx["redis"]
Expand All @@ -234,19 +236,6 @@ async def map_variants_for_score_set(
logging_context["attempt"] = attempt
logger.info(msg="Started variant mapping", extra=logging_context)

except Exception as e:
# NOTE: can't update mapping state here because setup is necessary to update the db
send_slack_message(e)
logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)}
logger.error(
msg="Variant mapper encountered an unexpected error during setup. This job will not be retried.",
extra=logging_context,
)

return {"success": False, "retried": False}

score_set = None
try:
score_set = db.scalars(select(ScoreSet).where(ScoreSet.urn == score_set_urn)).one()
score_set.mapping_state = MappingState.processing
score_set.mapping_errors = null()
Expand All @@ -257,41 +246,26 @@ async def map_variants_for_score_set(
logging_context["mapping_state"] = score_set.mapping_state
logger.debug(msg="Fetched score set metadata for mapping job.", extra=logging_context)

except Exception as e:
db.rollback()
if score_set:
score_set.mapping_state = MappingState.failed
score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"}
db.add(score_set)
db.commit()
send_slack_message(e)
logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)}
logger.error(
msg="Variant mapper encountered an unexpected error while fetching score set metadata. This job will not be retried.",
extra=logging_context,
)

return {"success": False, "retried": False}

try:
# Do not block Worker event loop during mapping, see: https://arq-docs.helpmanual.io/#synchronous-jobs.
vrs = vrs_mapper()
blocking = functools.partial(vrs.map_score_set, score_set_urn)
loop = asyncio.get_running_loop()

except Exception as e:
db.rollback()
score_set.mapping_state = MappingState.failed
score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"}
db.add(score_set)
db.commit()
send_slack_message(e)
logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)}
logger.error(
msg="Variant mapper encountered an unexpected error while preparing the mapping event loop. This job will not be retried.",
msg="Variant mapper encountered an unexpected error during setup. This job will not be retried.",
extra=logging_context,
)

db.rollback()
if score_set:
score_set.mapping_state = MappingState.failed
score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"}
db.add(score_set)
db.commit()

return {"success": False, "retried": False}

mapping_results = None
Expand All @@ -307,9 +281,10 @@ async def map_variants_for_score_set(
}
db.add(score_set)
db.commit()

send_slack_message(e)
logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)}
logger.warn(
logger.warning(
msg="Variant mapper encountered an unexpected error while mapping variants. This job will be retried.",
extra=logging_context,
)
Expand Down Expand Up @@ -500,7 +475,7 @@ async def map_variants_for_score_set(

send_slack_message(e)
logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)}
logger.warn(
logger.warning(
msg="An unexpected error occurred during variant mapping. This job will be attempted again.",
extra=logging_context,
)
Expand Down Expand Up @@ -566,6 +541,8 @@ async def variant_mapper_manager(
ctx: dict, correlation_id: str, score_set_urn: str, updater_id: int, attempt: int = 0
) -> dict:
logging_context = {}
mapping_job_id = None
mapping_job_status = None
try:
redis: ArqRedis = ctx["redis"]
db: Session = ctx["db"]
Expand All @@ -574,21 +551,12 @@ async def variant_mapper_manager(
logging_context["attempt"] = attempt
logger.debug(msg="Variant mapping manager began execution", extra=logging_context)

except Exception as e:
send_slack_message(e)
logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)}
logger.error(msg="Variant mapper manager encountered an unexpected error during setup.", extra=logging_context)
return {"success": False, "enqueued_job": None}

mapping_job_id = None
mapping_job_status = None
try:
queued_urn = await redis.rpop(MAPPING_QUEUE_NAME) # type: ignore
queue_length = await redis.llen(MAPPING_QUEUE_NAME) # type: ignore
logging_context["variant_mapping_queue_length"] = queue_length

# Setup the job id cache if it does not already exist.
if await redis.exists(MAPPING_CURRENT_ID_NAME):
if not await redis.exists(MAPPING_CURRENT_ID_NAME):
await redis.set(MAPPING_CURRENT_ID_NAME, "")

if not queued_urn:
Expand All @@ -599,7 +567,6 @@ async def variant_mapper_manager(
logging_context["current_mapping_resource"] = queued_urn
logger.debug(msg="Found mapping job(s) still in queue.", extra=logging_context)

mapping_job_status = None
mapping_job_id = await redis.get(MAPPING_CURRENT_ID_NAME)
if mapping_job_id:
mapping_job_id = mapping_job_id.decode("utf-8")
Expand All @@ -611,10 +578,7 @@ async def variant_mapper_manager(
except Exception as e:
send_slack_message(e)
logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)}
logger.error(
msg="Variant mapper manager encountered an unexpected error while fetching the executing mapping job.",
extra=logging_context,
)
logger.error(msg="Variant mapper manager encountered an unexpected error during setup.", extra=logging_context)
return {"success": False, "enqueued_job": None}

new_job = None
Expand Down Expand Up @@ -662,25 +626,22 @@ async def variant_mapper_manager(
# before the deferred time, these deferred jobs will still run once able.
return {"success": True, "enqueued_job": new_job_id}

logger.warn(
msg="Unable to queue a new mapping job or defer mapping. This score set will not be mapped.",
raise MappingEnqueueError()

except Exception as e:
send_slack_message(e)
logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)}
logger.error(
msg="Variant mapper manager encountered an unexpected error while enqueing a mapping job. This job will not be retried.",
extra=logging_context,
)
# TODO: If we end up here, we were unable to enqueue a new mapping job or a new manager job despite expecting we should have
# been able to do so. We should raise some sort of exception.

db.rollback()
score_set = db.scalars(select(ScoreSet).where(ScoreSet.urn == score_set_urn)).one_or_none()
if score_set:
score_set.mapping_state = MappingState.failed
score_set.mapping_errors = "Unable to queue a new mapping job or defer score set mapping."
db.add(score_set)
db.commit()

return {"success": False, "enqueued_job": new_job_id}

except Exception as e:
send_slack_message(e)
logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)}
logger.error(
msg="Variant mapper manager encountered an unexpected error while enqueing a mapping job.",
extra=logging_context,
)
return {"success": False, "enqueued_job": new_job_id}

0 comments on commit 1a7921c

Please sign in to comment.