diff --git a/alembic/versions/1d4933b4b6f7_merge_76e1e55bc5c1_and_d7e6f8c3b9dc.py b/alembic/versions/1d4933b4b6f7_merge_76e1e55bc5c1_and_d7e6f8c3b9dc.py new file mode 100644 index 00000000..91353f2a --- /dev/null +++ b/alembic/versions/1d4933b4b6f7_merge_76e1e55bc5c1_and_d7e6f8c3b9dc.py @@ -0,0 +1,24 @@ +"""merge 76e1e55bc5c1 and d7e6f8c3b9dc + +Revision ID: 1d4933b4b6f7 +Revises: 76e1e55bc5c1, d7e6f8c3b9dc +Create Date: 2024-09-04 16:17:20.875937 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '1d4933b4b6f7' +down_revision = ('76e1e55bc5c1', 'd7e6f8c3b9dc') +branch_labels = None +depends_on = None + + +def upgrade(): + pass + + +def downgrade(): + pass diff --git a/alembic/versions/d7e6f8c3b9dc_scoreset_mapping_columns.py b/alembic/versions/d7e6f8c3b9dc_scoreset_mapping_columns.py new file mode 100644 index 00000000..51f68187 --- /dev/null +++ b/alembic/versions/d7e6f8c3b9dc_scoreset_mapping_columns.py @@ -0,0 +1,48 @@ +"""scoreset_mapping_columns + +Revision ID: d7e6f8c3b9dc +Revises: f36cf612e029 +Create Date: 2024-08-28 09:54:08.249077 + +""" + +from alembic import op +from sqlalchemy.dialects import postgresql +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "d7e6f8c3b9dc" +down_revision = "f36cf612e029" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "scoresets", + sa.Column( + "mapping_state", + sa.Enum( + "incomplete", + "processing", + "failed", + "complete", + "pending_variant_processing", + "not_attempted", + "queued", + name="mappingstate", + native_enum=False, + create_constraint=True, + length=32 + ), + nullable=True, + ), + ) + op.add_column("scoresets", sa.Column("mapping_errors", postgresql.JSONB, nullable=True)) + + +def downgrade(): + op.drop_constraint("mappingstate", table_name="scoresets") + op.drop_column("scoresets", "mapping_state") + op.drop_column("scoresets", "mapping_errors") diff --git a/alembic/versions/f36cf612e029_additional_mapping_columns.py b/alembic/versions/f36cf612e029_additional_mapping_columns.py new file mode 100644 index 00000000..0141bc2a --- /dev/null +++ b/alembic/versions/f36cf612e029_additional_mapping_columns.py @@ -0,0 +1,102 @@ +"""Additional mapping columns + +Revision ID: f36cf612e029 +Revises: ec5d2787bec9 +Create Date: 2024-08-21 16:06:06.793541 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +from datetime import datetime + +# revision identifiers, used by Alembic. +revision = "f36cf612e029" +down_revision = "ec5d2787bec9" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "mapped_variants", + sa.Column("vrs_version", sa.String(), nullable=True, server_default="1.3"), + ) + op.add_column("mapped_variants", sa.Column("error_message", sa.String(), nullable=True)) + op.add_column( + "mapped_variants", + sa.Column( + "modification_date", + sa.Date(), + nullable=False, + server_default = sa.func.current_date(), + ), + ) + op.add_column( + "mapped_variants", + sa.Column( + "mapped_date", + sa.Date(), + nullable=False, + server_default=datetime.fromisocalendar(2024, 1, 1).date().strftime("%Y-%m-%d"), + ), + ) + op.add_column( + "mapped_variants", + sa.Column("mapping_api_version", sa.String(), nullable=False, server_default="0.0.0"), + ) + op.add_column( + "mapped_variants", + sa.Column("current", sa.Boolean(), nullable=False, server_default=sa.false()), + ) + op.alter_column( + "mapped_variants", + "pre_mapped", + existing_type=postgresql.JSONB(astext_type=sa.Text()), + nullable=True, + ) + op.alter_column( + "mapped_variants", + "post_mapped", + existing_type=postgresql.JSONB(astext_type=sa.Text()), + nullable=True, + ) + + op.add_column( + "target_genes", + sa.Column("pre_mapped_metadata", postgresql.JSONB, nullable=True), + ) + op.add_column( + "target_genes", + sa.Column("post_mapped_metadata", postgresql.JSONB, nullable=True), + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column( + "mapped_variants", + "post_mapped", + existing_type=postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + ) + op.alter_column( + "mapped_variants", + "pre_mapped", + existing_type=postgresql.JSONB(astext_type=sa.Text()), + nullable=False, + ) + op.drop_column("mapped_variants", "current") + op.drop_column("mapped_variants", "mapping_api_version") + op.drop_column("mapped_variants", "mapped_date") + op.drop_column("mapped_variants", "modification_date") + op.drop_column("mapped_variants", "error_message") + op.drop_column("mapped_variants", "vrs_version") + + op.drop_column("target_genes", "pre_mapped_metadata") + op.drop_column("target_genes", "post_mapped_metadata") + # ### end Alembic commands ### diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 8f7723a8..294708d0 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -63,6 +63,24 @@ services: volumes: - mavedb-redis-dev:/data + dcd-mapping: + image: dcd-mapping:dev + command: bash -c "uvicorn api.server_main:app --host 0.0.0.0 --port 8000 --reload" + depends_on: + - db + - seqrepo + env_file: + - settings/.env.dev + ports: + - "8004:8000" + volumes: + - mavedb-seqrepo-dev:/usr/local/share/seqrepo + + seqrepo: + image: biocommons/seqrepo:2021-01-29 + volumes: + - mavedb-seqrepo-dev:/usr/local/share/seqrepo + # rabbitmq: # image: rabbitmq:3.8.3 # ports: @@ -71,3 +89,4 @@ services: volumes: mavedb-data-dev: mavedb-redis-dev: + mavedb-seqrepo-dev: diff --git a/pyproject.toml b/pyproject.toml index 8f454525..b1865dff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "mavedb" -version = "2024.3.0" +version = "2024.4.0" description = "API for MaveDB, the database of Multiplexed Assays of Variant Effect." license = "AGPL-3.0-only" readme = "README.md" diff --git a/settings/.env.template b/settings/.env.template index dc6592e5..32d693af 100644 --- a/settings/.env.template +++ b/settings/.env.template @@ -32,3 +32,30 @@ SLACK_WEBHOOK_URL= ORCID_CLIENT_ID=client-id ORCID_CLIENT_SECRET=secret ORCID_JWT_SIGNING_PUBLIC_KEY="-----BEGIN PUBLIC KEY-----\npublic key\n-----END PUBLIC KEY-----" + +#################################################################################################### +# Environment variables for vrs-mapping +#################################################################################################### + +GENE_NORM_DB_URL=postgres://postgres:postgres@db:5432/gene_normalizer +MAVEDB_STORAGE_DIR=/root/.local/share/dcd-mapping + +#################################################################################################### +# Environment variables for UTA connection via CoolSeqTool +#################################################################################################### + +UTA_DB_URL=postgresql://anonymous:anonymous@uta.biocommons.org:5432/uta/uta_20180821 + +#################################################################################################### +# Environment variables for seqrepo +#################################################################################################### + +SEQREPO_ROOT_DIR=/usr/local/share/seqrepo/2021-01-29 + +#################################################################################################### +# Environment variables for mapping MaveDB connection +#################################################################################################### + +MAVEDB_BASE_URL=http://app:8000 +MAVEDB_API_KEY=secret +DCD_MAPPING_URL=http://dcd-mapping:8000 diff --git a/src/mavedb/__init__.py b/src/mavedb/__init__.py index 4bfeff00..7308262e 100644 --- a/src/mavedb/__init__.py +++ b/src/mavedb/__init__.py @@ -6,6 +6,6 @@ logger = module_logging.getLogger(__name__) __project__ = "mavedb-api" -__version__ = "2024.3.0" +__version__ = "2024.4.0" logger.info(f"MaveDB {__version__}") diff --git a/src/mavedb/data_providers/services.py b/src/mavedb/data_providers/services.py index eb13922f..74e7bf45 100644 --- a/src/mavedb/data_providers/services.py +++ b/src/mavedb/data_providers/services.py @@ -1,3 +1,8 @@ +import os +import requests +from datetime import date +from typing import Optional, TypedDict + from cdot.hgvs.dataproviders import ChainedSeqFetcher, FastaSeqFetcher, RESTDataProvider GENOMIC_FASTA_FILES = [ @@ -5,6 +10,8 @@ "/data/GCF_000001405.25_GRCh37.p13_genomic.fna.gz", ] +DCD_MAP_URL = os.environ.get("DCD_MAPPING_URL", "http://dcd-mapping:8000") + def seqfetcher() -> ChainedSeqFetcher: return ChainedSeqFetcher(*[FastaSeqFetcher(file) for file in GENOMIC_FASTA_FILES]) @@ -12,3 +19,31 @@ def seqfetcher() -> ChainedSeqFetcher: def cdot_rest() -> RESTDataProvider: return RESTDataProvider(seqfetcher=seqfetcher()) + + +class VRSMap: + url: str + + class ScoreSetMappingResults(TypedDict): + metadata: Optional[dict[str, str]] + dcd_mapping_version: str + mapped_date_utc: date + computed_genomic_reference_sequence: Optional[dict[str, str]] + mapped_genomic_reference_sequence: Optional[dict[str, str]] + computed_protein_reference_sequence: Optional[dict[str, str]] + mapped_protein_reference_sequence: Optional[dict[str, str]] + mapped_scores: Optional[list[dict]] + error_message: Optional[str] + + def __init__(self, url: str) -> None: + self.url = url + + def map_score_set(self, score_set_urn: str) -> ScoreSetMappingResults: + uri = f"{self.url}/api/v1/map/{score_set_urn}" + response = requests.post(uri) + response.raise_for_status() + return response.json() + + +def vrs_mapper(url: Optional[str] = None) -> VRSMap: + return VRSMap(DCD_MAP_URL) if not url else VRSMap(url) diff --git a/src/mavedb/lib/exceptions.py b/src/mavedb/lib/exceptions.py index d2b10b6c..004cab6f 100644 --- a/src/mavedb/lib/exceptions.py +++ b/src/mavedb/lib/exceptions.py @@ -160,3 +160,19 @@ class NonexistentOrcidUserError(ValueError): """Raised when a user tries to create a contributor with a non-existent ORCID ID""" pass + + +class NonexistentMappingResultsError(ValueError): + """Raised when score set mapping results do not contain mapping results""" + + pass + + +class NonexistentMappingReferenceError(ValueError): + """Raised when score set mapping results do not contain a valid reference sequence""" + + pass + + +class MappingEnqueueError(ValueError): + """Raised when a mapping job fails to be enqueued despite appearing as if it should have been""" diff --git a/src/mavedb/lib/logging/canonical.py b/src/mavedb/lib/logging/canonical.py index 2e2a64a3..6d639a94 100644 --- a/src/mavedb/lib/logging/canonical.py +++ b/src/mavedb/lib/logging/canonical.py @@ -27,7 +27,6 @@ async def log_job(ctx: dict) -> None: if not result: logger.warning(msg=f"Job finished, but could not retrieve a job result for job {job_id}.", extra=log_context) - log_context.pop("message") else: log_context = { **log_context, @@ -55,10 +54,8 @@ async def log_job(ctx: dict) -> None: if result is None: logger.error(msg="Job result could not be found.", extra=log_context) - elif result.result == "success": - logger.info(msg="Job completed successfully.", extra=log_context) - elif result.result != "success": - logger.warning(msg="Job completed with handled exception.", extra=log_context) + elif result.result is not None: + logger.info(msg="Job completed with result.", extra=log_context) else: logger.error(msg="Job completed with unhandled exception.", extra=log_context) diff --git a/src/mavedb/lib/mave/utils.py b/src/mavedb/lib/mave/utils.py index f59a3fca..dd6b7591 100644 --- a/src/mavedb/lib/mave/utils.py +++ b/src/mavedb/lib/mave/utils.py @@ -1,5 +1,7 @@ import re +import pandas as pd + NA_VALUE = "NA" NULL_VALUES = ("", "na", "nan", "nil", "none", "null", "n/a", "undefined", NA_VALUE) @@ -22,6 +24,9 @@ def is_csv_null(value): """Return True if a string from a CSV file represents a NULL value.""" + # Avoid any boolean miscasts from comparisons by handling NA types up front. + if pd.isna(value): + return True # Number 0 is treated as False so that all 0 will be converted to NA value. if value == 0: return value diff --git a/src/mavedb/lib/permissions.py b/src/mavedb/lib/permissions.py index 972b91be..db3b05e1 100644 --- a/src/mavedb/lib/permissions.py +++ b/src/mavedb/lib/permissions.py @@ -90,13 +90,15 @@ def has_permission(user_data: Optional[UserData], item: Base, action: Action) -> if user_may_edit or not private: return PermissionResponse(True) # Roles which may perform this operation. - elif roles_permitted(active_roles, [UserRole.admin]): + elif roles_permitted(active_roles, [UserRole.admin, UserRole.mapper]): return PermissionResponse(True) elif private: # Do not acknowledge the existence of a private entity. return PermissionResponse(False, 404, f"experiment set with URN '{item.urn}' not found") + elif user_data is None or user_data.user is None: + return PermissionResponse(False, 401, f"insufficient permissions for URN '{item.urn}'") else: - return PermissionResponse(False) + return PermissionResponse(False, 403, f"insufficient permissions for URN '{item.urn}'") elif action == Action.UPDATE: if user_may_edit: return PermissionResponse(True) @@ -106,8 +108,10 @@ def has_permission(user_data: Optional[UserData], item: Base, action: Action) -> elif private: # Do not acknowledge the existence of a private entity. return PermissionResponse(False, 404, f"experiment set with URN '{item.urn}' not found") + elif user_data is None or user_data.user is None: + return PermissionResponse(False, 401, f"insufficient permissions for URN '{item.urn}'") else: - return PermissionResponse(False) + return PermissionResponse(False, 403, f"insufficient permissions for URN '{item.urn}'") elif action == Action.DELETE: # Owner may only delete an experiment set if it has not already been published. if user_may_edit: @@ -138,13 +142,15 @@ def has_permission(user_data: Optional[UserData], item: Base, action: Action) -> if user_may_edit or not private: return PermissionResponse(True) # Roles which may perform this operation. - elif roles_permitted(active_roles, [UserRole.admin]): + elif roles_permitted(active_roles, [UserRole.admin, UserRole.mapper]): return PermissionResponse(True) elif private: # Do not acknowledge the existence of a private entity. return PermissionResponse(False, 404, f"experiment with URN '{item.urn}' not found") + elif user_data is None or user_data.user is None: + return PermissionResponse(False, 401, f"insufficient permissions for URN '{item.urn}'") else: - return PermissionResponse(False) + return PermissionResponse(False, 403, f"insufficient permissions for URN '{item.urn}'") elif action == Action.UPDATE: if user_may_edit: return PermissionResponse(True) @@ -154,8 +160,10 @@ def has_permission(user_data: Optional[UserData], item: Base, action: Action) -> elif private: # Do not acknowledge the existence of a private entity. return PermissionResponse(False, 404, f"experiment with URN '{item.urn}' not found") + elif user_data is None or user_data.user is None: + return PermissionResponse(False, 401, f"insufficient permissions for URN '{item.urn}'") else: - return PermissionResponse(False) + return PermissionResponse(False, 403, f"insufficient permissions for URN '{item.urn}'") elif action == Action.DELETE: # Owner may only delete an experiment if it has not already been published. if user_may_edit: @@ -186,13 +194,15 @@ def has_permission(user_data: Optional[UserData], item: Base, action: Action) -> if user_may_edit or not private: return PermissionResponse(True) # Roles which may perform this operation. - elif roles_permitted(active_roles, [UserRole.admin]): + elif roles_permitted(active_roles, [UserRole.admin, UserRole.mapper]): return PermissionResponse(True) elif private: # Do not acknowledge the existence of a private entity. return PermissionResponse(False, 404, f"score set with URN '{item.urn}' not found") + elif user_data is None or user_data.user is None: + return PermissionResponse(False, 401, f"insufficient permissions for URN '{item.urn}'") else: - return PermissionResponse(False) + return PermissionResponse(False, 403, f"insufficient permissions for URN '{item.urn}'") elif action == Action.UPDATE: if user_may_edit: return PermissionResponse(True) @@ -202,8 +212,10 @@ def has_permission(user_data: Optional[UserData], item: Base, action: Action) -> elif private: # Do not acknowledge the existence of a private entity. return PermissionResponse(False, 404, f"score set with URN '{item.urn}' not found") + elif user_data is None or user_data.user is None: + return PermissionResponse(False, 401, f"insufficient permissions for URN '{item.urn}'") else: - return PermissionResponse(False) + return PermissionResponse(False, 403, f"insufficient permissions for URN '{item.urn}'") elif action == Action.DELETE: # Owner may only delete a score set if it has not already been published. if user_may_edit: @@ -247,7 +259,7 @@ def has_permission(user_data: Optional[UserData], item: Base, action: Action) -> elif roles_permitted(active_roles, [UserRole.admin]): return PermissionResponse(True) else: - return PermissionResponse(False) + return PermissionResponse(False, 403, "Insufficient permissions for user update.") elif action == Action.UPDATE: if user_is_self: return PermissionResponse(True) diff --git a/src/mavedb/models/enums/mapping_state.py b/src/mavedb/models/enums/mapping_state.py new file mode 100644 index 00000000..ec788258 --- /dev/null +++ b/src/mavedb/models/enums/mapping_state.py @@ -0,0 +1,11 @@ +import enum + + +class MappingState(enum.Enum): + incomplete = "incomplete" + processing = "processing" + failed = "failed" + complete = "complete" + pending_variant_processing = "pending_variant_processing" + not_attempted = "not_attempted" + queued = "queued" diff --git a/src/mavedb/models/enums/user_role.py b/src/mavedb/models/enums/user_role.py index 72f1597d..17f5fd35 100644 --- a/src/mavedb/models/enums/user_role.py +++ b/src/mavedb/models/enums/user_role.py @@ -3,3 +3,4 @@ class UserRole(enum.Enum): admin = "admin" + mapper = "mapper" diff --git a/src/mavedb/models/mapped_variant.py b/src/mavedb/models/mapped_variant.py index d74da89b..50bba4f4 100644 --- a/src/mavedb/models/mapped_variant.py +++ b/src/mavedb/models/mapped_variant.py @@ -1,4 +1,6 @@ -from sqlalchemy import Column, ForeignKey, Integer +from datetime import date + +from sqlalchemy import Boolean, Column, Date, ForeignKey, Integer, String from sqlalchemy.orm import relationship, backref, Mapped from sqlalchemy.dialects.postgresql import JSONB @@ -11,8 +13,14 @@ class MappedVariant(Base): id = Column(Integer, primary_key=True) - pre_mapped = Column(JSONB, nullable=False) - post_mapped = Column(JSONB, nullable=False) + pre_mapped = Column(JSONB, nullable=True) + post_mapped = Column(JSONB, nullable=True) + vrs_version = Column(String, nullable=True) + error_message = Column(String, nullable=True) + modification_date = Column(Date, nullable=False, default=date.today, onupdate=date.today) + mapped_date = Column(Date, nullable=False) + mapping_api_version = Column(String, nullable=False) + current = Column(Boolean, nullable=False) variant_id = Column(Integer, ForeignKey("variants.id"), index=True, nullable=False) variant: Mapped[Variant] = relationship("Variant", backref=backref("mapped_variants", cascade="all,delete-orphan")) diff --git a/src/mavedb/models/score_set.py b/src/mavedb/models/score_set.py index fd115095..d7bea1dc 100644 --- a/src/mavedb/models/score_set.py +++ b/src/mavedb/models/score_set.py @@ -8,6 +8,7 @@ from typing import List, TYPE_CHECKING, Optional from mavedb.db.base import Base +from mavedb.models.enums.mapping_state import MappingState from mavedb.models.enums.processing_state import ProcessingState import mavedb.models.score_set_publication_identifier @@ -97,6 +98,12 @@ class ScoreSet(Base): num_variants = Column(Integer, nullable=False, default=0) variants: Mapped[list["Variant"]] = relationship(back_populates="score_set", cascade="all, delete-orphan") + mapping_state = Column( + Enum(MappingState, create_constraint=True, length=32, native_enum=False, validate_strings=True), + nullable=True, + ) + mapping_errors = Column(JSONB, nullable=True) + experiment_id = Column(Integer, ForeignKey("experiments.id"), index=True, nullable=False) experiment: Mapped["Experiment"] = relationship(back_populates="score_sets") diff --git a/src/mavedb/models/target_gene.py b/src/mavedb/models/target_gene.py index 99f14e50..540c8a26 100644 --- a/src/mavedb/models/target_gene.py +++ b/src/mavedb/models/target_gene.py @@ -1,5 +1,6 @@ from datetime import date from sqlalchemy import Column, Date, ForeignKey, Integer, String +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import backref, relationship, Mapped from typing import TYPE_CHECKING @@ -40,6 +41,9 @@ class TargetGene(Base): single_parent=True, ) + pre_mapped_metadata: Mapped[JSONB] = Column("pre_mapped_metadata", JSONB, nullable=True) + post_mapped_metadata: Mapped[JSONB] = Column("post_mapped_metadata", JSONB, nullable=True) + creation_date = Column(Date, nullable=False, default=date.today) modification_date = Column(Date, nullable=False, default=date.today, onupdate=date.today) diff --git a/src/mavedb/routers/mapped_variant.py b/src/mavedb/routers/mapped_variant.py index d1ff0d1d..d29883bb 100644 --- a/src/mavedb/routers/mapped_variant.py +++ b/src/mavedb/routers/mapped_variant.py @@ -1,5 +1,6 @@ from typing import Any, List, Optional +from arq import ArqRedis from fastapi import APIRouter, Depends from fastapi.exceptions import HTTPException from sqlalchemy.orm import Session @@ -10,6 +11,8 @@ from mavedb.models.mapped_variant import MappedVariant from mavedb.models.variant import Variant from mavedb.view_models import mapped_variant +from mavedb.worker.jobs import MAPPING_QUEUE_NAME + async def fetch_mapped_variant_by_variant_urn(db, urn: str) -> Optional[MappedVariant]: """ @@ -22,21 +25,41 @@ async def fetch_mapped_variant_by_variant_urn(db, urn: str) -> Optional[MappedVa user. """ try: - item = db.query(MappedVariant).filter(Variant.urn == urn).filter(MappedVariant.variant_id == Variant.id).one_or_none() + item = ( + db.query(MappedVariant) + .filter(Variant.urn == urn) + .filter(MappedVariant.variant_id == Variant.id) + .filter(MappedVariant.current is True) + .one_or_none() + ) except MultipleResultsFound: raise HTTPException(status_code=500, detail=f"Multiple variants with URN {urn} were found.") if not item: raise HTTPException(status_code=404, detail=f"Mapped variant with URN {urn} not found") return item -router = APIRouter(prefix="/api/v1/mapped-variants", tags=["mapped variants"], responses={404: {"description": "Not found"}}) + +router = APIRouter( + prefix="/api/v1/mapped-variants", tags=["mapped variants"], responses={404: {"description": "Not found"}} +) + @router.get("/{urn}", status_code=200, response_model=mapped_variant.MappedVariant, responses={404: {}, 500: {}}) -async def show_mapped_variant( - *, urn: str, db: Session = Depends(deps.get_db) -) -> Any: +async def show_mapped_variant(*, urn: str, db: Session = Depends(deps.get_db)) -> Any: """ Fetch a mapped variant by URN. """ - return await fetch_mapped_variant_by_variant_urn(db, urn) \ No newline at end of file + return await fetch_mapped_variant_by_variant_urn(db, urn) + + +# for testing only +# @router.post("/map/{urn}", status_code=200, responses={404: {}, 500: {}}) +# async def map_score_set(*, urn: str, worker: ArqRedis = Depends(deps.get_worker)) -> Any: +# await worker.lpush(MAPPING_QUEUE_NAME, urn) # type: ignore +# await worker.enqueue_job( +# "variant_mapper_manager", +# None, +# None, +# None +# ) diff --git a/src/mavedb/routers/score_sets.py b/src/mavedb/routers/score_sets.py index 24c602ca..44fd2a30 100644 --- a/src/mavedb/routers/score_sets.py +++ b/src/mavedb/routers/score_sets.py @@ -854,9 +854,13 @@ async def update_score_set( scores_data = pd.DataFrame( variants_to_csv_rows(item.variants, columns=score_columns, dtype="score_data") ).replace("NA", pd.NA) - count_data = pd.DataFrame( - variants_to_csv_rows(item.variants, columns=count_columns, dtype="count_data") - ).replace("NA", pd.NA) + + if item.dataset_columns["count_columns"]: + count_data = pd.DataFrame( + variants_to_csv_rows(item.variants, columns=count_columns, dtype="count_data") + ).replace("NA", pd.NA) + else: + count_data = None # Although this is also updated within the variant creation job, update it here # as well so that we can display the proper UI components (queue invocation delay diff --git a/src/mavedb/routers/statistics.py b/src/mavedb/routers/statistics.py index 53a3e8fc..29445fe2 100644 --- a/src/mavedb/routers/statistics.py +++ b/src/mavedb/routers/statistics.py @@ -332,6 +332,11 @@ def record_object_statistics( Model names and fields should be members of the Enum classes defined above. Providing an invalid model name or model field will yield a 422 Unprocessable Entity error with details about valid enum values. """ + # Validation to ensure 'keywords' is only used with 'experiment'. + if model == RecordNames.scoreSet and field == RecordFields.keywords: + raise HTTPException(status_code=422, + detail="The 'keywords' field can only be used with the 'experiment' model.") + count_data = _record_from_field_and_model(db, model, field) return {field_val: count for field_val, count in count_data if field_val is not None} diff --git a/src/mavedb/view_models/mapped_variant.py b/src/mavedb/view_models/mapped_variant.py index 5c3be930..7bf98300 100644 --- a/src/mavedb/view_models/mapped_variant.py +++ b/src/mavedb/view_models/mapped_variant.py @@ -1,12 +1,19 @@ -from typing import Any +from typing import Any, Optional +from datetime import date from .base.base import BaseModel class MappedVariantBase(BaseModel): - pre_mapped: Any - post_mapped: Any + pre_mapped: Optional[Any] + post_mapped: Optional[Any] variant_id: int + vrs_version: Optional[str] + error_message: Optional[str] + modification_date: date + mapped_date: date + mapping_api_version: str + current: bool class MappedVariantCreate(MappedVariantBase): @@ -16,6 +23,7 @@ class MappedVariantCreate(MappedVariantBase): class MappedVariantUpdate(MappedVariantBase): pass + # Properties shared by models stored in DB class SavedMappedVariant(MappedVariantBase): id: int @@ -26,4 +34,4 @@ class Config: # Properties to return to non-admin clients class MappedVariant(SavedMappedVariant): - pass \ No newline at end of file + pass diff --git a/src/mavedb/view_models/score_set.py b/src/mavedb/view_models/score_set.py index 0c244480..b6cc9064 100644 --- a/src/mavedb/view_models/score_set.py +++ b/src/mavedb/view_models/score_set.py @@ -10,6 +10,7 @@ from mavedb.lib.validation.exceptions import ValidationError from mavedb.lib.validation.utilities import is_null from mavedb.models.enums.processing_state import ProcessingState +from mavedb.models.enums.mapping_state import MappingState from mavedb.models.target_sequence import TargetSequence from mavedb.view_models import PublicationIdentifiersGetter from mavedb.view_models.base.base import BaseModel, validator @@ -269,6 +270,8 @@ class ScoreSet(SavedScoreSet): private: bool processing_state: Optional[ProcessingState] processing_errors: Optional[dict] + mapping_state: Optional[MappingState] + mapping_errors: Optional[dict] class ScoreSetWithVariants(ScoreSet): @@ -299,3 +302,5 @@ class ScoreSetPublicDump(SavedScoreSet): private: bool processing_state: Optional[ProcessingState] processing_errors: Optional[Dict] + mapping_state: Optional[MappingState] + mapping_errors: Optional[Dict] diff --git a/src/mavedb/worker/jobs.py b/src/mavedb/worker/jobs.py index 7ba6e0e1..66a6e82d 100644 --- a/src/mavedb/worker/jobs.py +++ b/src/mavedb/worker/jobs.py @@ -1,10 +1,21 @@ +import asyncio +import functools import logging +from contextlib import asynccontextmanager +from datetime import timedelta, date +from typing import Any, Optional + import pandas as pd +from arq import ArqRedis +from arq.jobs import Job, JobStatus from cdot.hgvs.dataproviders import RESTDataProvider -from sqlalchemy import delete, select, null +from fqfa.util.translate import translate_dna +from sqlalchemy import cast, delete, select, null +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Session +from mavedb.lib.exceptions import NonexistentMappingReferenceError, NonexistentMappingResultsError, MappingEnqueueError from mavedb.lib.score_sets import ( columns_for_dataset, create_variants, @@ -16,13 +27,32 @@ from mavedb.lib.validation.dataframe import ( validate_and_standardize_dataframe_pair, ) +from mavedb.models.enums.mapping_state import MappingState from mavedb.models.enums.processing_state import ProcessingState +from mavedb.models.mapped_variant import MappedVariant from mavedb.models.score_set import ScoreSet +from mavedb.models.target_gene import TargetGene +from mavedb.models.target_sequence import TargetSequence from mavedb.models.user import User from mavedb.models.variant import Variant +from mavedb.data_providers.services import vrs_mapper logger = logging.getLogger(__name__) +MAPPING_QUEUE_NAME = "vrs_mapping_queue" +MAPPING_CURRENT_ID_NAME = "vrs_mapping_current_job_id" +BACKOFF_LIMIT = 5 +BACKOFF_IN_SECONDS = 15 + + +@asynccontextmanager +async def mapping_in_execution(redis: ArqRedis, job_id: str): + await redis.set(MAPPING_CURRENT_ID_NAME, job_id) + try: + yield + finally: + await redis.set(MAPPING_CURRENT_ID_NAME, "") + def setup_job_state(ctx, invoker: int, resource: str, correlation_id: str): ctx["state"][ctx["job_id"]] = { @@ -34,6 +64,31 @@ def setup_job_state(ctx, invoker: int, resource: str, correlation_id: str): return ctx["state"][ctx["job_id"]] +async def enqueue_job_with_backoff( + redis: ArqRedis, job_name: str, attempt: int, *args +) -> tuple[Optional[str], bool, Any]: + new_job_id = None + backoff = None + limit_reached = attempt > BACKOFF_LIMIT + if not limit_reached: + limit_reached = True + backoff = BACKOFF_IN_SECONDS * (2**attempt) + attempt = attempt + 1 + + # NOTE: for jobs supporting backoff, `attempt` should be the final argument. + new_job = await redis.enqueue_job( + job_name, + *args, + attempt, + _defer_by=timedelta(seconds=backoff), + ) + + if new_job: + new_job_id = new_job.job_id + + return (new_job_id, not limit_reached, backoff) + + async def create_variants_for_score_set( ctx, correlation_id: str, score_set_urn: str, updater_id: int, scores: pd.DataFrame, counts: pd.DataFrame ): @@ -42,19 +97,23 @@ async def create_variants_for_score_set( On any raised exception, ensure ProcessingState of score set is set to `failed` prior to exiting. """ + logging_context = {} try: logging_context = setup_job_state(ctx, updater_id, score_set_urn, correlation_id) logger.info(msg="Began processing of score set variants.", extra=logging_context) db: Session = ctx["db"] hdp: RESTDataProvider = ctx["hdp"] + redis: ArqRedis = ctx["redis"] score_set = db.scalars(select(ScoreSet).where(ScoreSet.urn == score_set_urn)).one() updated_by = db.scalars(select(User).where(User.id == updater_id)).one() score_set.modified_by = updated_by score_set.processing_state = ProcessingState.processing + score_set.mapping_state = MappingState.pending_variant_processing logging_context["processing_state"] = score_set.processing_state.name + logging_context["mapping_state"] = score_set.mapping_state.name db.add(score_set) db.commit() @@ -67,16 +126,6 @@ async def create_variants_for_score_set( ) raise ValueError("Can't create variants when score set has no targets.") - if score_set.variants: - db.execute(delete(Variant).where(Variant.score_set_id == score_set.id)) - logging_context["deleted_variants"] = score_set.num_variants - score_set.num_variants = 0 - - logger.info(msg="Deleted existing variants from score set.", extra=logging_context) - - db.commit() - db.refresh(score_set) - validated_scores, validated_counts = validate_and_standardize_dataframe_pair( scores, counts, score_set.target_genes, hdp ) @@ -86,6 +135,19 @@ async def create_variants_for_score_set( "count_columns": columns_for_dataset(validated_counts), } + # Delete variants after validation occurs so we don't overwrite them in the case of a bad update. + if score_set.variants: + existing_variants = db.scalars(select(Variant.id).where(Variant.score_set_id == score_set.id)).all() + db.execute(delete(MappedVariant).where(MappedVariant.variant_id.in_(existing_variants))) + db.execute(delete(Variant).where(Variant.id.in_(existing_variants))) + logging_context["deleted_variants"] = score_set.num_variants + score_set.num_variants = 0 + + logger.info(msg="Deleted existing variants from score set.", extra=logging_context) + + db.commit() + db.refresh(score_set) + variants_data = create_variants_data(validated_scores, validated_counts, None) create_variants(db, score_set, variants_data) @@ -95,37 +157,50 @@ async def create_variants_for_score_set( db.rollback() score_set.processing_state = ProcessingState.failed score_set.processing_errors = {"exception": str(e), "detail": e.triggering_exceptions} + score_set.mapping_state = MappingState.not_attempted logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} logging_context["processing_state"] = score_set.processing_state.name + logging_context["mapping_state"] = score_set.mapping_state.name + logging_context["created_variants"] = 0 logger.warning(msg="Encountered a validation error while processing variants.", extra=logging_context) + return {"success": False} + # NOTE: Since these are likely to be internal errors, it makes less sense to add them to the DB and surface them to the end user. # Catch all non-system exiting exceptions. except Exception as e: db.rollback() score_set.processing_state = ProcessingState.failed score_set.processing_errors = {"exception": str(e), "detail": []} + score_set.mapping_state = MappingState.not_attempted logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} logging_context["processing_state"] = score_set.processing_state.name + logging_context["mapping_state"] = score_set.mapping_state.name + logging_context["created_variants"] = 0 logger.warning(msg="Encountered an internal exception while processing variants.", extra=logging_context) send_slack_message(err=e) + return {"success": False} - # Catch all other exceptions and raise them. The exceptions caught here will be system exiting. + # Catch all other exceptions. The exceptions caught here were intented to be system exiting. except BaseException as e: db.rollback() score_set.processing_state = ProcessingState.failed + score_set.mapping_state = MappingState.not_attempted db.commit() logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} logging_context["processing_state"] = score_set.processing_state.name + logging_context["mapping_state"] = score_set.mapping_state.name + logging_context["created_variants"] = 0 logger.error( msg="Encountered an unhandled exception while creating variants for score set.", extra=logging_context ) - raise e + # Don't raise BaseExceptions so we may emit canonical logs (TODO: Perhaps they are so problematic we want to raise them anyway). + return {"success": False} else: score_set.processing_state = ProcessingState.success @@ -135,6 +210,9 @@ async def create_variants_for_score_set( logging_context["processing_state"] = score_set.processing_state.name logger.info(msg="Finished creating variants in score set.", extra=logging_context) + await redis.lpush(MAPPING_QUEUE_NAME, score_set_urn) # type: ignore + await redis.enqueue_job("variant_mapper_manager", correlation_id, score_set_urn, updater_id) + score_set.mapping_state = MappingState.queued finally: db.add(score_set) db.commit() @@ -142,4 +220,436 @@ async def create_variants_for_score_set( logger.info(msg="Committed new variants to score set.", extra=logging_context) ctx["state"][ctx["job_id"]] = logging_context.copy() - return score_set.processing_state.name + return {"success": True} + + +async def map_variants_for_score_set( + ctx: dict, correlation_id: str, score_set_urn: str, updater_id: int, attempt: int = 1 +) -> dict: + async with mapping_in_execution(redis=ctx["redis"], job_id=ctx["job_id"]): + logging_context = {} + score_set = None + try: + db: Session = ctx["db"] + redis: ArqRedis = ctx["redis"] + + logging_context = setup_job_state(ctx, updater_id, score_set_urn, correlation_id) + logging_context["attempt"] = attempt + logger.info(msg="Started variant mapping", extra=logging_context) + + score_set = db.scalars(select(ScoreSet).where(ScoreSet.urn == score_set_urn)).one() + score_set.mapping_state = MappingState.processing + score_set.mapping_errors = null() + db.add(score_set) + db.commit() + + logging_context["current_mapping_resource"] = score_set.urn + logging_context["mapping_state"] = score_set.mapping_state + logger.debug(msg="Fetched score set metadata for mapping job.", extra=logging_context) + + # Do not block Worker event loop during mapping, see: https://arq-docs.helpmanual.io/#synchronous-jobs. + vrs = vrs_mapper() + blocking = functools.partial(vrs.map_score_set, score_set_urn) + loop = asyncio.get_running_loop() + + except Exception as e: + send_slack_message(e) + logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} + logger.error( + msg="Variant mapper encountered an unexpected error during setup. This job will not be retried.", + extra=logging_context, + ) + + db.rollback() + if score_set: + score_set.mapping_state = MappingState.failed + score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"} + db.add(score_set) + db.commit() + + return {"success": False, "retried": False} + + mapping_results = None + try: + mapping_results = await loop.run_in_executor(ctx["pool"], blocking) + logger.debug(msg="Done mapping variants.", extra=logging_context) + + except Exception as e: + db.rollback() + score_set.mapping_errors = { + "error_message": f"Encountered an internal server error during mapping. Mapping will be automatically retried up to 5 times for this score set (attempt {attempt}/5)." + } + db.add(score_set) + db.commit() + + send_slack_message(e) + logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} + logger.warning( + msg="Variant mapper encountered an unexpected error while mapping variants. This job will be retried.", + extra=logging_context, + ) + + new_job_id = None + max_retries_exceeded = None + try: + await redis.lpush(MAPPING_QUEUE_NAME, score_set_urn) # type: ignore + new_job_id, max_retries_exceeded, backoff_time = await enqueue_job_with_backoff( + redis, "variant_mapper_manager", attempt, correlation_id, score_set_urn, updater_id + ) + # If we fail to enqueue a mapping manager for this score set, evict it from the queue. + if new_job_id is None: + await redis.lpop(MAPPING_QUEUE_NAME, score_set_urn) # type: ignore + + logging_context["backoff_limit_exceeded"] = max_retries_exceeded + logging_context["backoff_deferred_in_seconds"] = backoff_time + logging_context["backoff_job_id"] = new_job_id + + except Exception as backoff_e: + score_set.mapping_state = MappingState.failed + score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"} + db.add(score_set) + db.commit() + send_slack_message(backoff_e) + logging_context = {**logging_context, **format_raised_exception_info_as_dict(backoff_e)} + logger.critical( + msg="While attempting to re-enqueue a mapping job that exited in error, another exception was encountered. This score set will not be mapped.", + extra=logging_context, + ) + else: + if new_job_id and not max_retries_exceeded: + score_set.mapping_state = MappingState.queued + db.add(score_set) + db.commit() + logger.info( + msg="After encountering an error while mapping variants, another mapping job was queued.", + extra=logging_context, + ) + elif new_job_id is None and not max_retries_exceeded: + score_set.mapping_state = MappingState.failed + score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"} + db.add(score_set) + db.commit() + logger.error( + msg="After encountering an error while mapping variants, another mapping job was unable to be queued. This score set will not be mapped.", + extra=logging_context, + ) + else: + score_set.mapping_state = MappingState.failed + score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"} + db.add(score_set) + db.commit() + logger.error( + msg="After encountering an error while mapping variants, the maximum retries for this job were exceeded. This score set will not be mapped.", + extra=logging_context, + ) + finally: + return {"success": False, "retried": (not max_retries_exceeded and new_job_id is not None)} + + try: + if mapping_results: + mapped_scores = mapping_results.get("mapped_scores") + if not mapped_scores: + # if there are no mapped scores, the score set failed to map. + score_set.mapping_state = MappingState.failed + score_set.mapping_errors = {"error_message": mapping_results.get("error_message")} + else: + # TODO(VariantEffect/dcd-mapping2#2) after adding multi target mapping support: + # this assumes single-target mapping, will need to be changed to support multi-target mapping + # just in case there are multiple target genes in the db for a score set (this point shouldn't be reached + # while we only support single-target mapping), match up the target sequence with the one in the computed genomic reference sequence. + # TODO(VariantEffect/dcd-mapping2#3) after adding accession-based score set mapping support: + # this also assumes that the score set is based on a target sequence, not a target accession + + computed_genomic_ref = mapping_results.get("computed_genomic_reference_sequence") + mapped_genomic_ref = mapping_results.get("mapped_genomic_reference_sequence") + computed_protein_ref = mapping_results.get("computed_protein_reference_sequence") + mapped_protein_ref = mapping_results.get("mapped_protein_reference_sequence") + + if computed_genomic_ref: + target_sequence = computed_genomic_ref["sequence"] + elif computed_protein_ref: + target_sequence = computed_protein_ref["sequence"] + else: + raise NonexistentMappingReferenceError() + + target_gene = db.scalars( + select(TargetGene) + .join(ScoreSet) + .join(TargetSequence) + .where( + ScoreSet.urn == str(score_set_urn), + # TargetSequence.sequence == target_sequence, + ) + ).one() + + excluded_pre_mapped_keys = {"sequence"} + if computed_genomic_ref and mapped_genomic_ref: + pre_mapped_metadata = computed_genomic_ref + target_gene.pre_mapped_metadata = cast( + { + "genomic": { + k: pre_mapped_metadata[k] + for k in set(list(pre_mapped_metadata.keys())) - excluded_pre_mapped_keys + } + }, + JSONB, + ) + target_gene.post_mapped_metadata = cast( + {"genomic": mapped_genomic_ref}, JSONB + ) + elif computed_protein_ref and mapped_protein_ref: + pre_mapped_metadata = computed_protein_ref + target_gene.pre_mapped_metadata = cast( + { + "protein": { + k: pre_mapped_metadata[k] + for k in set(list(pre_mapped_metadata.keys())) - excluded_pre_mapped_keys + } + }, + JSONB, + ) + target_gene.post_mapped_metadata = cast( + {"protein": mapped_protein_ref}, JSONB + ) + else: + raise NonexistentMappingReferenceError() + + total_variants = 0 + successful_mapped_variants = 0 + for mapped_score in mapped_scores: + total_variants += 1 + variant_urn = mapped_score.get("mavedb_id") + variant = db.scalars(select(Variant).where(Variant.urn == variant_urn)).one() + + # there should only be one current mapped variant per variant id, so update old mapped variant to current = false + existing_mapped_variant = ( + db.query(MappedVariant) + .filter(MappedVariant.variant_id == variant.id, MappedVariant.current.is_(True)) + .one_or_none() + ) + + if existing_mapped_variant: + existing_mapped_variant.current = False + db.add(existing_mapped_variant) + + if mapped_score.get("pre_mapped") and mapped_score.get("post_mapped"): + successful_mapped_variants += 1 + + mapped_variant = MappedVariant( + pre_mapped=mapped_score.get("pre_mapped", null()), + post_mapped=mapped_score.get("post_mapped", null()), + variant_id=variant.id, + modification_date=date.today(), + mapped_date=mapping_results["mapped_date_utc"], + vrs_version=mapped_score.get("vrs_version", null()), + mapping_api_version=mapping_results["dcd_mapping_version"], + error_message=mapped_score.get("error_message", null()), + current=True, + ) + db.add(mapped_variant) + + if successful_mapped_variants == 0: + score_set.mapping_state = MappingState.failed + score_set.mapping_errors = {"error_message": "All variants failed to map"} + elif successful_mapped_variants < total_variants: + score_set.mapping_state = MappingState.incomplete + else: + score_set.mapping_state = MappingState.complete + + logging_context["mapped_variants_inserted_db"] = len(mapped_scores) + logging_context["variants_successfully_mapped"] = successful_mapped_variants + logging_context["mapping_state"] = score_set.mapping_state.name + logging_context["mapping_errors"] = score_set.mapping_errors + logger.info(msg="Inserted mapped variants into db.", extra=logging_context) + + else: + raise NonexistentMappingResultsError() + + db.add(score_set) + db.commit() + + except Exception as e: + db.rollback() + score_set.mapping_errors = { + "error_message": f"Encountered an unexpected error while parsing mapped variants. Mapping will be automatically retried up to 5 times for this score set (attempt {attempt}/5)." + } + db.add(score_set) + db.commit() + + send_slack_message(e) + logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} + logger.warning( + msg="An unexpected error occurred during variant mapping. This job will be attempted again.", + extra=logging_context, + ) + + new_job_id = None + max_retries_exceeded = None + try: + await redis.lpush(MAPPING_QUEUE_NAME, score_set_urn) # type: ignore + new_job_id, max_retries_exceeded, backoff_time = await enqueue_job_with_backoff( + redis, "variant_mapper_manager", attempt, correlation_id, score_set_urn, updater_id + ) + # If we fail to enqueue a mapping manager for this score set, evict it from the queue. + if new_job_id is None: + await redis.lpop(MAPPING_QUEUE_NAME, score_set_urn) # type: ignore + + logging_context["backoff_limit_exceeded"] = max_retries_exceeded + logging_context["backoff_deferred_in_seconds"] = backoff_time + logging_context["backoff_job_id"] = new_job_id + + except Exception as backoff_e: + score_set.mapping_state = MappingState.failed + score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"} + db.add(score_set) + db.commit() + send_slack_message(backoff_e) + logging_context = {**logging_context, **format_raised_exception_info_as_dict(backoff_e)} + logger.critical( + msg="While attempting to re-enqueue a mapping job that exited in error, another exception was encountered. This score set will not be mapped.", + extra=logging_context, + ) + else: + if new_job_id and not max_retries_exceeded: + score_set.mapping_state = MappingState.queued + db.add(score_set) + db.commit() + logger.info( + msg="After encountering an error while parsing mapped variants, another mapping job was queued.", + extra=logging_context, + ) + elif new_job_id is None and not max_retries_exceeded: + score_set.mapping_state = MappingState.failed + score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"} + db.add(score_set) + db.commit() + logger.error( + msg="After encountering an error while parsing mapped variants, another mapping job was unable to be queued. This score set will not be mapped.", + extra=logging_context, + ) + else: + score_set.mapping_state = MappingState.failed + score_set.mapping_errors = {"error_message": "Encountered an internal server error during mapping"} + db.add(score_set) + db.commit() + logger.error( + msg="After encountering an error while parsing mapped variants, the maximum retries for this job were exceeded. This score set will not be mapped.", + extra=logging_context, + ) + finally: + return {"success": False, "retried": (not max_retries_exceeded and new_job_id is not None)} + + ctx["state"][ctx["job_id"]] = logging_context.copy() + return {"success": True} + + +async def variant_mapper_manager( + ctx: dict, correlation_id: str, score_set_urn: str, updater_id: int, attempt: int = 1 +) -> dict: + logging_context = {} + mapping_job_id = None + mapping_job_status = None + try: + redis: ArqRedis = ctx["redis"] + db: Session = ctx["db"] + + logging_context = setup_job_state(ctx, updater_id, score_set_urn, correlation_id) + logging_context["attempt"] = attempt + logger.debug(msg="Variant mapping manager began execution", extra=logging_context) + + queue_length = await redis.llen(MAPPING_QUEUE_NAME) # type: ignore + queued_urn = await redis.rpop(MAPPING_QUEUE_NAME) # type: ignore + logging_context["variant_mapping_queue_length"] = queue_length + + # Setup the job id cache if it does not already exist. + if not await redis.exists(MAPPING_CURRENT_ID_NAME): + await redis.set(MAPPING_CURRENT_ID_NAME, "") + + if not queued_urn: + logger.debug(msg="No mapping jobs exist in the queue.", extra=logging_context) + return {"success": True, "enqueued_job": None} + else: + queued_urn = queued_urn.decode("utf-8") + logging_context["current_mapping_resource"] = queued_urn + logger.debug(msg="Found mapping job(s) still in queue.", extra=logging_context) + + mapping_job_id = await redis.get(MAPPING_CURRENT_ID_NAME) + if mapping_job_id: + mapping_job_id = mapping_job_id.decode("utf-8") + mapping_job_status = (await Job(job_id=mapping_job_id, redis=redis).status()).value + + logging_context["existing_mapping_job_status"] = mapping_job_status + logging_context["existing_mapping_job_id"] = mapping_job_id + + except Exception as e: + send_slack_message(e) + logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} + logger.error(msg="Variant mapper manager encountered an unexpected error during setup.", extra=logging_context) + return {"success": False, "enqueued_job": None} + + new_job = None + new_job_id = None + score_set = None + try: + if not mapping_job_id or mapping_job_status in (JobStatus.not_found, JobStatus.complete): + logger.debug(msg="No mapping jobs are running, queuing a new one.", extra=logging_context) + + # NOTE: the score_set_urn provided to this function is only used for logging context; + # get the urn from the queue and pass that urn to map_variants_for_score_set + new_job = await redis.enqueue_job( + "map_variants_for_score_set", correlation_id, queued_urn, updater_id, attempt + ) + + if new_job: + new_job_id = new_job.job_id + + logging_context["new_mapping_job_id"] = new_job_id + logger.info(msg="Queued a new mapping job.", extra=logging_context) + + return {"success": True, "enqueued_job": new_job_id} + + logger.info( + msg="A mapping job is already running, or a new job was unable to be enqueued. Deferring mapping by 5 minutes.", + extra=logging_context, + ) + + new_job = await redis.enqueue_job( + "variant_mapper_manager", + correlation_id, + score_set_urn, + updater_id, + attempt, + _defer_by=timedelta(minutes=5), + ) + + if new_job: + # Ensure this score set remains in the front of the queue. + queued_urn = await redis.rpush(MAPPING_QUEUE_NAME, score_set_urn) # type: ignore + new_job_id = new_job.job_id + + logging_context["new_mapping_manager_job_id"] = new_job_id + logger.info(msg="Deferred a new mapping manager job.", extra=logging_context) + + # Our persistent Redis queue and ARQ's execution rules ensure that even if the worker is stopped and not restarted + # before the deferred time, these deferred jobs will still run once able. + return {"success": True, "enqueued_job": new_job_id} + + raise MappingEnqueueError() + + except Exception as e: + send_slack_message(e) + logging_context = {**logging_context, **format_raised_exception_info_as_dict(e)} + logger.error( + msg="Variant mapper manager encountered an unexpected error while enqueing a mapping job. This job will not be retried.", + extra=logging_context, + ) + + db.rollback() + score_set = db.scalars(select(ScoreSet).where(ScoreSet.urn == score_set_urn)).one_or_none() + if score_set: + score_set.mapping_state = MappingState.failed + score_set.mapping_errors = "Unable to queue a new mapping job or defer score set mapping." + db.add(score_set) + db.commit() + + return {"success": False, "enqueued_job": new_job_id} diff --git a/src/mavedb/worker/settings.py b/src/mavedb/worker/settings.py index ac488fe6..1b181123 100644 --- a/src/mavedb/worker/settings.py +++ b/src/mavedb/worker/settings.py @@ -1,17 +1,23 @@ import os +from concurrent import futures from typing import Callable from arq.connections import RedisSettings -from arq import cron +from arq.cron import CronJob +from arq import ArqRedis, cron from mavedb.lib.logging.canonical import log_job -from mavedb.worker.jobs import create_variants_for_score_set +from mavedb.worker.jobs import create_variants_for_score_set, map_variants_for_score_set, variant_mapper_manager from mavedb.db.session import SessionLocal from mavedb.data_providers.services import cdot_rest # ARQ requires at least one task on startup. -BACKGROUND_FUNCTIONS: list[Callable] = [create_variants_for_score_set] -BACKGROUND_CRONJOBS: list[Callable] = [] +BACKGROUND_FUNCTIONS: list[Callable] = [ + create_variants_for_score_set, + variant_mapper_manager, + map_variants_for_score_set, +] +BACKGROUND_CRONJOBS: list[CronJob] = [] REDIS_IP = os.getenv("REDIS_IP") or "localhost" REDIS_PORT = int(os.getenv("REDIS_PORT") or 6379) @@ -22,7 +28,7 @@ async def startup(ctx): - pass + ctx["pool"] = futures.ProcessPoolExecutor() async def shutdown(ctx): diff --git a/tests/conftest.py b/tests/conftest.py index fe03b0b4..954903d9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ import os import sys +from concurrent import futures from inspect import getsourcefile from os.path import abspath from unittest.mock import patch @@ -26,7 +27,7 @@ from mavedb.lib.authentication import get_current_user, UserData from mavedb.models.user import User from mavedb.server_main import app -from mavedb.worker.jobs import create_variants_for_score_set +from mavedb.worker.jobs import create_variants_for_score_set, map_variants_for_score_set, variant_mapper_manager sys.path.append(".") @@ -152,7 +153,7 @@ async def on_job(ctx): ctx["hdp"] = data_provider worker_ = Worker( - functions=[create_variants_for_score_set], + functions=[create_variants_for_score_set, map_variants_for_score_set, variant_mapper_manager], redis_pool=arq_redis, burst=True, poll_delay=0, @@ -168,8 +169,8 @@ async def on_job(ctx): @pytest.fixture -def standalone_worker_context(session, data_provider): - yield {"db": session, "hdp": data_provider, "state": {}, "job_id": "test_job"} +def standalone_worker_context(session, data_provider, arq_redis): + yield {"db": session, "hdp": data_provider, "state": {}, "job_id": "test_job", "redis": arq_redis, "pool": futures.ProcessPoolExecutor()} @pytest.fixture() diff --git a/tests/helpers/constants.py b/tests/helpers/constants.py index f15b6284..f5ba6d28 100644 --- a/tests/helpers/constants.py +++ b/tests/helpers/constants.py @@ -523,3 +523,17 @@ } }, } + + +TEST_VARIANT_MAPPING_SCAFFOLD = { + "metadata": {}, + "computed_reference_sequence": {"sequence_type": "dna", "sequence_id": "ga4gh:SQ.ref_test", "sequence": "ACGTTT"}, + "mapped_reference_sequence": { + "sequence_type": "dna", + "sequence_id": "ga4gh:SQ.map_test", + "sequence_accessions": ["NC_000001.11"], + }, + "mapped_scores": [], + "vrs_version": "2.0", + "api_version": "0.0.0", +} diff --git a/tests/worker/test_jobs.py b/tests/worker/test_jobs.py index 4be3fcf1..d4b6a1e6 100644 --- a/tests/worker/test_jobs.py +++ b/tests/worker/test_jobs.py @@ -1,20 +1,32 @@ +from datetime import date + +from asyncio.unix_events import _UnixSelectorEventLoop from copy import deepcopy +from requests import HTTPError from uuid import uuid4 from unittest.mock import patch +import arq.jobs import cdot.hgvs.dataproviders import jsonschema import pandas as pd import pytest + from mavedb.lib.mave.constants import HGVS_NT_COLUMN from mavedb.lib.score_sets import csv_data_to_df from mavedb.lib.validation.exceptions import ValidationError from mavedb.models.enums.processing_state import ProcessingState from mavedb.models.score_set import ScoreSet as ScoreSetDbModel from mavedb.models.variant import Variant +from mavedb.models.mapped_variant import MappedVariant from mavedb.view_models.experiment import Experiment, ExperimentCreate from mavedb.view_models.score_set import ScoreSet, ScoreSetCreate -from mavedb.worker.jobs import create_variants_for_score_set +from mavedb.worker.jobs import ( + create_variants_for_score_set, + map_variants_for_score_set, + variant_mapper_manager, + MAPPING_QUEUE_NAME, +) from sqlalchemy import select from tests.helpers.constants import ( @@ -22,6 +34,7 @@ TEST_MINIMAL_ACC_SCORESET, TEST_MINIMAL_EXPERIMENT, TEST_MINIMAL_SEQ_SCORESET, + TEST_VARIANT_MAPPING_SCAFFOLD, ) @@ -53,6 +66,43 @@ async def setup_records_and_files(async_client, data_files, input_score_set): return score_set["urn"], scores, counts +async def setup_records_files_and_variants(async_client, data_files, input_score_set, worker_ctx): + urn, scores, counts = await setup_records_and_files(async_client, data_files, input_score_set) + + # Patch CDOT `_get_transcript`, in the event this function is called on an accesssion based scoreset. + with patch.object(cdot.hgvs.dataproviders.RESTDataProvider, "_get_transcript", return_value=TEST_CDOT_TRANSCRIPT): + score_set = await create_variants_for_score_set(worker_ctx, urn, 1, scores, counts) + + assert score_set.processing_state is ProcessingState.success + assert score_set.num_variants == 3 + + return score_set + + +async def setup_mapping_output(async_client, session, score_set, empty=False): + score_set_response = await async_client.get(f"/api/v1/score-set/{score_set.urn}") + + mapping_output = deepcopy(TEST_VARIANT_MAPPING_SCAFFOLD) + mapping_output["metadata"] = score_set_response.json() + + if empty: + return mapping_output + + variants = session.scalars(select(Variant).join(ScoreSetDbModel).where(ScoreSetDbModel.urn == score_set.urn)).all() + for variant in variants: + mapped_score = { + "pre_mapped": {"test": "pre_mapped_output"}, + "pre_mapped_2_0": {"test": "pre_mapped_output (2.0)"}, + "post_mapped": {"test": "post_mapped_output"}, + "post_mapped_2_0": {"test": "post_mapped_output (2.0)"}, + "mavedb_id": variant.urn, + } + + mapping_output["mapped_scores"].append(mapped_score) + + return mapping_output + + @pytest.mark.asyncio @pytest.mark.parametrize( "input_score_set,validation_error", @@ -143,18 +193,17 @@ async def test_create_variants_for_score_set_with_caught_exception( @pytest.mark.asyncio @pytest.mark.parametrize("input_score_set", (TEST_MINIMAL_SEQ_SCORESET, TEST_MINIMAL_ACC_SCORESET)) -async def test_create_variants_for_score_set_with_raised_exception( +async def test_create_variants_for_score_set_with_caught_base_exception( input_score_set, setup_worker_db, async_client, standalone_worker_context, session, data_files ): score_set_urn, scores, counts = await setup_records_and_files(async_client, data_files, input_score_set) # This is somewhat (extra) dumb and wouldn't actually happen like this, but it serves as an effective way to guarantee - # some exception will be raised no matter what in the async job. + # some base exception will be handled no matter what in the async job. with (patch.object(pd.DataFrame, "isnull", side_effect=BaseException),): - with pytest.raises(BaseException): - success = await create_variants_for_score_set( - standalone_worker_context, uuid4().hex, score_set_urn, 1, scores, counts - ) + success = await create_variants_for_score_set( + standalone_worker_context, uuid4().hex, score_set_urn, 1, scores, counts + ) db_variants = session.scalars(select(Variant)).all() score_set = session.query(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set_urn).one() @@ -299,3 +348,195 @@ async def test_create_variants_for_score_set( # Have to commit at the end of async tests for DB threads to be released. Otherwise pytest # thinks we are still using the session fixture and will hang indefinitely. session.commit() + + +# NOTE: These tests operate under the assumption that mapping output is consistent between accession based and sequence based score sets. If +# this assumption changes in the future, tests reflecting this difference in output should be added for accession based score sets. + + +@pytest.mark.skip +@pytest.mark.asyncio +async def test_create_mapped_variants_for_scoreset( + setup_worker_db, + async_client, + standalone_worker_context, + session, + data_files, +): + score_set = await setup_records_files_and_variants( + async_client, data_files, TEST_MINIMAL_SEQ_SCORESET, standalone_worker_context + ) + + # Do not await, we need a co-routine object to be the return value of our `run_in_executor` mock. + mapping_test_output_for_score_set = setup_mapping_output(async_client, session, score_set) + + # We seem unable to mock requests via requests_mock that occur inside another event loop. Workaround + # this limitation by instead patching the _UnixSelectorEventLoop 's executor function, with a coroutine + # object that sets up test mappingn output. + # + # TODO: Does this work on non-unix based machines. + # TODO: Is it even a safe operation to patch this event loop method? + with patch.object(_UnixSelectorEventLoop, "run_in_executor", return_value=mapping_test_output_for_score_set): + await map_variants_for_score_set(standalone_worker_context, uuid4().hex, score_set.urn, 1) + + mapped_variants_for_score_set = session.scalars( + select(MappedVariant).join(Variant).join(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set.urn) + ).all() + assert len(mapped_variants_for_score_set) == score_set.num_variants + + # Have to commit at the end of async tests for DB threads to be released. Otherwise pytest + # thinks we are still using the session fixture and will hang indefinitely. + session.commit() + + +@pytest.mark.skip +@pytest.mark.asyncio +async def test_create_mapped_variants_for_scoreset_with_existing_mapped_variants( + setup_worker_db, async_client, standalone_worker_context, session, data_files +): + score_set = await setup_records_files_and_variants( + async_client, data_files, TEST_MINIMAL_SEQ_SCORESET, standalone_worker_context + ) + + # Do not await, we need a co-routine object to be the return value of our `run_in_executor` mock. + mapping_test_output_for_score_set = setup_mapping_output(async_client, session, score_set) + + # We seem unable to mock requests via requests_mock that occur inside another event loop. Workaround + # this limitation by instead patching the _UnixSelectorEventLoop 's executor function, with a coroutine + # object that sets up test mappingn output. + # + # TODO: Does this work on non-unix based machines. + # TODO: Is it even a safe operation to patch this event loop method? + with patch.object(_UnixSelectorEventLoop, "run_in_executor", return_value=mapping_test_output_for_score_set): + existing_variant = session.scalars(select(Variant)).first() + + if not existing_variant: + raise ValueError + + session.add( + MappedVariant( + pre_mapped={"preexisting": "variant"}, + post_mapped={"preexisting": "variant"}, + variant_id=existing_variant.id, + modification_date=date.today(), + mapped_date=date.today(), + vrs_version="2.0", + mapping_api_version="0.0.0", + ) + ) + session.commit() + + await map_variants_for_score_set(standalone_worker_context, uuid4().hex, score_set.urn, 1) + + mapped_variants_for_score_set = session.scalars( + select(MappedVariant).join(Variant).join(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set.urn) + ).all() + assert len(mapped_variants_for_score_set) == score_set.num_variants + + # Have to commit at the end of async tests for DB threads to be released. Otherwise pytest + # thinks we are still using the session fixture and will hang indefinitely. + session.commit() + + +@pytest.mark.skip +@pytest.mark.asyncio +async def test_create_mapped_variants_for_scoreset_mapping_exception( + setup_worker_db, async_client, standalone_worker_context, session, data_files +): + async def awaitable_http_error(): + raise HTTPError + + score_set = await setup_records_files_and_variants( + async_client, data_files, TEST_MINIMAL_SEQ_SCORESET, standalone_worker_context + ) + + # Do not await, we need a co-routine object which raises an http error once awaited. + mapping_test_output_for_score_set = awaitable_http_error() + + # We seem unable to mock requests via requests_mock that occur inside another event loop. Workaround + # this limitation by instead patching the _UnixSelectorEventLoop 's executor function, with a coroutine + # object that sets up test mappingn output. + # + # TODO: Does this work on non-unix based machines? + # TODO: Is it even a safe operation to patch this event loop method? + with patch.object(_UnixSelectorEventLoop, "run_in_executor", return_value=mapping_test_output_for_score_set): + await map_variants_for_score_set(standalone_worker_context, uuid4().hex, score_set.urn, 1) + + # TODO: How are errors persisted? Test persistence mechanism. + mapped_variants_for_score_set = session.scalars( + select(MappedVariant).join(Variant).join(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set.urn) + ).all() + assert len(mapped_variants_for_score_set) == 0 + + # Have to commit at the end of async tests for DB threads to be released. Otherwise pytest + # thinks we are still using the session fixture and will hang indefinitely. + session.commit() + + +@pytest.mark.skip +@pytest.mark.asyncio +async def test_create_mapped_variants_for_scoreset_no_mapping_output( + setup_worker_db, async_client, standalone_worker_context, session, data_files +): + score_set = await setup_records_files_and_variants( + async_client, data_files, TEST_MINIMAL_SEQ_SCORESET, standalone_worker_context + ) + + # Do not await, we need a co-routine object to be the return value of our `run_in_executor` mock. + mapping_test_output_for_score_set = setup_mapping_output(async_client, session, score_set, empty=True) + + # We seem unable to mock requests via requests_mock that occur inside another event loop. Workaround + # this limitation by instead patching the _UnixSelectorEventLoop 's executor function, with a coroutine + # object that sets up test mappingn output. + # + # TODO: Does this work on non-unix based machines. + # TODO: Is it even a safe operation to patch this event loop method? + with patch.object(_UnixSelectorEventLoop, "run_in_executor", return_value=mapping_test_output_for_score_set): + await map_variants_for_score_set(standalone_worker_context, uuid4().hex, score_set.urn, 1) + + mapped_variants_for_score_set = session.scalars( + select(MappedVariant).join(Variant).join(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set.urn) + ).all() + assert len(mapped_variants_for_score_set) == 0 + + # Have to commit at the end of async tests for DB threads to be released. Otherwise pytest + # thinks we are still using the session fixture and will hang indefinitely. + session.commit() + + +@pytest.mark.skip +@pytest.mark.asyncio +async def test_mapping_manager_empty_queue(setup_worker_db, standalone_worker_context, session): + queued_job = await variant_mapper_manager(standalone_worker_context) + + # No new jobs should have been created if nothing is in the queue. + assert queued_job is None + session.commit() + + +@pytest.mark.skip +@pytest.mark.asyncio +async def test_mapping_manager_occupied_queue_mapping_in_progress(setup_worker_db, standalone_worker_context, session): + await standalone_worker_context["redis"].lpush(MAPPING_QUEUE_NAME, "mavedb:test-urn") + + with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.in_progress): + queued_job = await variant_mapper_manager(standalone_worker_context) + + # Execution should be deferred if a job is in progress. + assert await queued_job.status() is arq.jobs.JobStatus.deferred + session.commit() + + +@pytest.mark.skip +@pytest.mark.asyncio +async def test_mapping_manager_occupied_queue_mapping_not_in_progress( + setup_worker_db, standalone_worker_context, session +): + await standalone_worker_context["redis"].lpush(MAPPING_QUEUE_NAME, "mavedb:test-urn") + + with patch.object(arq.jobs.Job, "status", return_value=arq.jobs.JobStatus.not_found): + queued_job = await variant_mapper_manager(standalone_worker_context) + + # VRS Mapping jobs have the same ID. + assert queued_job.job_id == "vrs_map" + session.commit()