diff --git a/src/mavedb/lib/exceptions.py b/src/mavedb/lib/exceptions.py index 319e5f13..004cab6f 100644 --- a/src/mavedb/lib/exceptions.py +++ b/src/mavedb/lib/exceptions.py @@ -172,3 +172,7 @@ class NonexistentMappingReferenceError(ValueError): """Raised when score set mapping results do not contain a valid reference sequence""" pass + + +class MappingEnqueueError(ValueError): + """Raised when a mapping job fails to be enqueued despite appearing as if it should have been""" diff --git a/src/mavedb/worker/jobs.py b/src/mavedb/worker/jobs.py index 78802eac..eff306b9 100644 --- a/src/mavedb/worker/jobs.py +++ b/src/mavedb/worker/jobs.py @@ -15,7 +15,7 @@ from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Session -from mavedb.lib.exceptions import NonexistentMappingReferenceError, NonexistentMappingResultsError +from mavedb.lib.exceptions import NonexistentMappingReferenceError, NonexistentMappingResultsError, MappingEnqueueError from mavedb.lib.score_sets import ( columns_for_dataset, create_variants, @@ -199,6 +199,7 @@ async def create_variants_for_score_set( msg="Encountered an unhandled exception while creating variants for score set.", extra=logging_context ) + # Don't raise BaseExceptions so we may emit canonical logs (TODO: Perhaps they are so problematic we want to raise them anyway). return {"success": False} else: @@ -226,6 +227,7 @@ async def map_variants_for_score_set( ) -> dict: async with mapping_in_execution(redis=ctx["redis"], job_id=ctx["job_id"]): logging_context = {} + score_set = None try: db: Session = ctx["db"] redis: ArqRedis = ctx["redis"] @@ -234,19 +236,6 @@ async def map_variants_for_score_set( logging_context["attempt"] = attempt logger.info(msg="Started variant mapping", extra=logging_context) - except Exception as e: - # NOTE: can't update mapping state here because setup is necessary to update the db - send_slack_message(e) - logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.error( - msg="Variant mapper encountered an unexpected error during setup. This job will not be retried.", - extra=logging_context, - ) - - return {"success": False, "retried": False} - - score_set = None - try: score_set = db.scalars(select(ScoreSet).where(ScoreSet.urn == score_set_urn)).one() score_set.mapping_state = MappingState.processing score_set.mapping_errors = null() @@ -257,41 +246,26 @@ async def map_variants_for_score_set( logging_context["mapping_state"] = score_set.mapping_state logger.debug(msg="Fetched score set metadata for mapping job.", extra=logging_context) - except Exception as e: - db.rollback() - if score_set: - score_set.mapping_state = MappingState.failed - score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"} - db.add(score_set) - db.commit() - send_slack_message(e) - logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.error( - msg="Variant mapper encountered an unexpected error while fetching score set metadata. This job will not be retried.", - extra=logging_context, - ) - - return {"success": False, "retried": False} - - try: # Do not block Worker event loop during mapping, see: https://arq-docs.helpmanual.io/#synchronous-jobs. vrs = vrs_mapper() blocking = functools.partial(vrs.map_score_set, score_set_urn) loop = asyncio.get_running_loop() except Exception as e: - db.rollback() - score_set.mapping_state = MappingState.failed - score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"} - db.add(score_set) - db.commit() send_slack_message(e) logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} logger.error( - msg="Variant mapper encountered an unexpected error while preparing the mapping event loop. This job will not be retried.", + msg="Variant mapper encountered an unexpected error during setup. This job will not be retried.", extra=logging_context, ) + db.rollback() + if score_set: + score_set.mapping_state = MappingState.failed + score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"} + db.add(score_set) + db.commit() + return {"success": False, "retried": False} mapping_results = None @@ -307,9 +281,10 @@ async def map_variants_for_score_set( } db.add(score_set) db.commit() + send_slack_message(e) logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.warn( + logger.warning( msg="Variant mapper encountered an unexpected error while mapping variants. This job will be retried.", extra=logging_context, ) @@ -500,7 +475,7 @@ async def map_variants_for_score_set( send_slack_message(e) logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.warn( + logger.warning( msg="An unexpected error occurred during variant mapping. This job will be attempted again.", extra=logging_context, ) @@ -566,6 +541,8 @@ async def variant_mapper_manager( ctx: dict, correlation_id: str, score_set_urn: str, updater_id: int, attempt: int = 0 ) -> dict: logging_context = {} + mapping_job_id = None + mapping_job_status = None try: redis: ArqRedis = ctx["redis"] db: Session = ctx["db"] @@ -574,21 +551,12 @@ async def variant_mapper_manager( logging_context["attempt"] = attempt logger.debug(msg="Variant mapping manager began execution", extra=logging_context) - except Exception as e: - send_slack_message(e) - logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.error(msg="Variant mapper manager encountered an unexpected error during setup.", extra=logging_context) - return {"success": False, "enqueued_job": None} - - mapping_job_id = None - mapping_job_status = None - try: queued_urn = await redis.rpop(MAPPING_QUEUE_NAME) # type: ignore queue_length = await redis.llen(MAPPING_QUEUE_NAME) # type: ignore logging_context["variant_mapping_queue_length"] = queue_length # Setup the job id cache if it does not already exist. - if await redis.exists(MAPPING_CURRENT_ID_NAME): + if not await redis.exists(MAPPING_CURRENT_ID_NAME): await redis.set(MAPPING_CURRENT_ID_NAME, "") if not queued_urn: @@ -599,7 +567,6 @@ async def variant_mapper_manager( logging_context["current_mapping_resource"] = queued_urn logger.debug(msg="Found mapping job(s) still in queue.", extra=logging_context) - mapping_job_status = None mapping_job_id = await redis.get(MAPPING_CURRENT_ID_NAME) if mapping_job_id: mapping_job_id = mapping_job_id.decode("utf-8") @@ -611,10 +578,7 @@ async def variant_mapper_manager( except Exception as e: send_slack_message(e) logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.error( - msg="Variant mapper manager encountered an unexpected error while fetching the executing mapping job.", - extra=logging_context, - ) + logger.error(msg="Variant mapper manager encountered an unexpected error during setup.", extra=logging_context) return {"success": False, "enqueued_job": None} new_job = None @@ -662,25 +626,22 @@ async def variant_mapper_manager( # before the deferred time, these deferred jobs will still run once able. return {"success": True, "enqueued_job": new_job_id} - logger.warn( - msg="Unable to queue a new mapping job or defer mapping. This score set will not be mapped.", + raise MappingEnqueueError() + + except Exception as e: + send_slack_message(e) + logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} + logger.error( + msg="Variant mapper manager encountered an unexpected error while enqueing a mapping job. This job will not be retried.", extra=logging_context, ) - # TODO: If we end up here, we were unable to enqueue a new mapping job or a new manager job despite expecting we should have - # been able to do so. We should raise some sort of exception. + + db.rollback() score_set = db.scalars(select(ScoreSet).where(ScoreSet.urn == score_set_urn)).one_or_none() if score_set: score_set.mapping_state = MappingState.failed score_set.mapping_errors = "Unable to queue a new mapping job or defer score set mapping." db.add(score_set) + db.commit() return {"success": False, "enqueued_job": new_job_id} - - except Exception as e: - send_slack_message(e) - logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} - logger.error( - msg="Variant mapper manager encountered an unexpected error while enqueing a mapping job.", - extra=logging_context, - ) - return {"success": False, "enqueued_job": new_job_id}