Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/minimal state assignment #25

Merged
10 commits merged into from Jul 25, 2022
119 changes: 119 additions & 0 deletions lang_qc/endpoints/qc_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Copyright (c) 2022 Genome Research Ltd.
#
# Author: Adam Blanchet <[email protected]>
#
# This file is part of npg_langqc.
#
# npg_langqc is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.

from fastapi import APIRouter, Depends, HTTPException
from ml_warehouse.schema import PacBioRunWellMetrics
from sqlalchemy import and_, select
from sqlalchemy.orm import Session

from lang_qc.db.mlwh_connection import get_mlwh_db
from lang_qc.db.qc_connection import get_qc_db
from lang_qc.db.qc_schema import (
QcState,
QcStateHist,
)
from lang_qc.models.inbox_models import QcStatus
from lang_qc.util.qc_state_helpers import (
get_seq_product_for_well,
get_qc_state_for_well,
construct_seq_product_for_well,
qc_status_json,
update_qc_state,
NotFoundInDatabaseException,
)

router = APIRouter()


@router.post(
"/run/{run_name}/well/{well_label}/qc_assign",
tags=["Well level QC operations"],
response_model=QcStatus,
)
def assign_qc_status(
run_name: str,
well_label: str,
request_body: QcStatus,
qcdb_session: Session = Depends(get_qc_db),
mlwhdb_session: Session = Depends(get_mlwh_db),
) -> QcStatus:

qc_state = get_qc_state_for_well(run_name, well_label, qcdb_session)
nerdstrike marked this conversation as resolved.
Show resolved Hide resolved

with qcdb_session.no_autoflush:

if qc_state is None:
# The first time a QC state is being set (e.g claiming a well)

seq_product = get_seq_product_for_well(run_name, well_label, qcdb_session)

if seq_product is None:
# Check that well exists in mlwh
mlwh_well = mlwhdb_session.execute(
select(PacBioRunWellMetrics).where(
and_(
PacBioRunWellMetrics.pac_bio_run_name == run_name,
PacBioRunWellMetrics.well_label == well_label,
)
)
).scalar()
if mlwh_well is None:
raise HTTPException(
status_code=404,
detail=f"Well {well_label} from run {run_name} is"
" not in the MLWH database.",
)

# Create a SeqProduct and related things for the well.
seq_product = construct_seq_product_for_well(
run_name, well_label, qcdb_session
)

qc_state = QcState(
seq_product=seq_product,
)

else:
# time to add a historical entry
qcdb_session.add(
QcStateHist(
id_seq_product=qc_state.id_seq_product,
id_user=qc_state.id_user,
id_qc_state_dict=qc_state.id_qc_state_dict,
id_qc_type=qc_state.id_qc_type,
created_by=qc_state.created_by,
date_created=qc_state.date_created,
date_updated=qc_state.date_updated,
is_preliminary=qc_state.is_preliminary,
)
)
qcdb_session.commit()

try:
update_qc_state(request_body, qc_state, qcdb_session)
except NotFoundInDatabaseException as e:
raise HTTPException(
status_code=400,
detail=f"An error occured: {str(e)}\nRequest body was: {request_body.json()}",
)

qcdb_session.merge(qc_state)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a much more complex operation than you need here. Look into using .with_for_update or similar so that we can row-lock the already existing QC states and prevent two users altering the same well state.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #27 to not forget about this.

qcdb_session.commit()

return qc_status_json(qc_state)
3 changes: 2 additions & 1 deletion lang_qc/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from lang_qc.endpoints.inbox import router as pacbio_run_router
from lang_qc.endpoints.pacbio_run import router as inbox_router

from lang_qc.endpoints.qc_state import router as qc_state_router

# Get origins from environment, must be a comma-separated list of origins
# for example, set CORS_ORIGINS=http://localhost:300,https://example.com:443
Expand All @@ -35,6 +35,7 @@
app = FastAPI(title="LangQC")
app.include_router(pacbio_run_router, prefix="/pacbio")
app.include_router(inbox_router, prefix="/pacbio")
app.include_router(qc_state_router, prefix="/pacbio")
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
Expand Down
239 changes: 239 additions & 0 deletions lang_qc/util/qc_state_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
# Copyright (c) 2022 Genome Research Ltd.
#
# Author: Adam Blanchet <[email protected]>
#
# This file is part of npg_langqc.
#
# npg_langqc is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
import json
from datetime import datetime
from typing import Optional

from product_id.main import PacBioWell
nerdstrike marked this conversation as resolved.
Show resolved Hide resolved
from sqlalchemy.orm import Session
from sqlalchemy import select, and_

from lang_qc.db.qc_schema import (
ProductLayout,
QcState,
QcStateDict,
QcType,
SeqPlatform,
SeqProduct,
SubProduct,
SubProductAttr,
User,
)
from lang_qc.models.inbox_models import QcStatus


class NotFoundInDatabaseException(Exception):
"""Exception thrown when something is not found in the DB."""


def create_id_product(run_name, well_label):
return PacBioWell(run_name=run_name, well_label=well_label).hash_product_id()


def create_well_properties(run_name, well_label):
return json.dumps({"run_name": run_name, "well_label": well_label})


def create_well_properties_digest(run_name, well_label):
return PacBioWell(run_name=run_name, well_label=well_label).hash_product_id()


def get_seq_product_for_well(run_name: str, well_label: str, qcdb_session: Session):
"""Get a SeqProduct for a well from the QC database.

This assumes that there is a 1-1 mapping between SubProduct and SeqProduct.
Args:
run_name: The run name.
well_label: The well label.
qcdb_session: A SQLAlchemy Session connected to the QC database.

Returns:
The SeqProduct corresponding to the well.
"""

return (
qcdb_session.execute(
select(SeqProduct)
.join(ProductLayout)
.join(SubProduct)
.where(
and_(
SubProduct.value_attr_one == run_name,
SubProduct.value_attr_two == well_label,
)
)
)
.scalars()
.one_or_none()
)


def get_qc_state_for_well(
run_name: str, well_label: str, qcdb_session: Session
) -> Optional[QcState]:
"""Get a QcState from a well.

Args:
run_name: The run name.
well_label: The well label.
qcdb_session: A SQLAlchemy Session connected to the QC database.

Returns:
Either a QcState object if one is found, or None if not.
"""

return qcdb_session.execute(
select(QcState)
.join(SeqProduct)
.join(ProductLayout)
.join(SubProduct)
.where(
and_(
SubProduct.value_attr_one == run_name,
SubProduct.value_attr_two == well_label,
)
)
).scalar_one_or_none()


def construct_seq_product_for_well(
run_name: str, well_label: str, qcdb_session: Session
):
"""Construct a SeqProduct for a well and push it to the database.

This assumes a 1-1 mapping between SeqProduct and SubProduct.

Args:
run_name: The run name.
well_label: The well label.
qcdb_session: A SQLAlchemy Session connected to the QC database.

Returns:
The SeqProduct which has been pushed to the QC database.
"""

seq_platform = qcdb_session.execute(
select(SeqPlatform).where(SeqPlatform.name == "PacBio")
).scalar_one_or_none()
if seq_platform is None:
raise Exception("PacBio SeqPlatform is not in the QC database.")

run_name_product_attr = qcdb_session.execute(
select(SubProductAttr).where(SubProductAttr.attr_name == "run_name")
).scalar_one_or_none()
if run_name_product_attr is None:
raise Exception("PacBio run name SubProductAttr is not the QC database.")

well_label_product_attr = qcdb_session.execute(
select(SubProductAttr).where(SubProductAttr.attr_name == "well_label")
).scalar_one_or_none()
if well_label_product_attr is None:
raise Exception("PacBio well label SubProductAttr is not in the QC database.")

seq_product = SeqProduct(
id_product=create_id_product(run_name, well_label),
seq_platform=seq_platform,
product_layout=[
ProductLayout(
sub_product=SubProduct(
sub_product_attr=run_name_product_attr,
sub_product_attr_=well_label_product_attr,
value_attr_one=run_name,
value_attr_two=well_label,
properties=create_well_properties(run_name, well_label),
properties_digest=create_well_properties_digest(
run_name, well_label
),
)
)
],
)

qcdb_session.add(seq_product)
qcdb_session.commit()

return seq_product


def update_qc_state(
qc_status_post: QcStatus, qc_state_db: QcState, qcdb_session: Session
):
"""Update the properties of the QcState, without pushing the changes.

Args:
qc_status_post: The object containing the new properties to update.
qc_state_db: The object on which to apply the updates.
qcdb_session: A SQLAlchemy Session connected to the QC database.

Returns:
None
"""

# Check that values are in the DB.
desired_qc_state_dict = qcdb_session.execute(
select(QcStateDict.id_qc_state_dict).where(
QcStateDict.state == qc_status_post.qc_state
)
).one_or_none()
if desired_qc_state_dict is None:
raise NotFoundInDatabaseException(
"Desired QC state is not in the QC database. It might not be allowed."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, you could put the qc_status_post.qc_state into this error and the user would be certain of what they asked for. It sounds self-evident but it's really useful when the client is confused.

)

user = qcdb_session.execute(
select(User).where(User.username == qc_status_post.user)
).scalar_one_or_none()
if user is None:
raise NotFoundInDatabaseException(
"User has not been found in the QC database. Have they been registered?"
)

qc_type = qcdb_session.execute(
select(QcType.id_qc_type).where(QcType.qc_type == qc_status_post.qc_type)
).one_or_none()
if qc_type is None:
raise NotFoundInDatabaseException("QC type is not in the QC database.")

qc_state_db.user = user
qc_state_db.date_updated = datetime.now()
qc_state_db.id_qc_state_dict = desired_qc_state_dict[0]
qc_state_db.id_qc_type = qc_type[0]
qc_state_db.created_by = "LangQC"
qc_state_db.is_preliminary = qc_status_post.is_preliminary


def qc_status_json(db_qc_state: QcState) -> QcStatus:
"""Convenience function to convert a DB QcState to a Pydantic QcStatus.

Args:
db_qc_state: the DB QcState object

Returns:
A QcStatus object with the properties from the DB QCState record.
"""
return QcStatus(
user=db_qc_state.user.username,
date_created=db_qc_state.date_created,
date_updated=db_qc_state.date_updated,
qc_type=db_qc_state.qc_type.qc_type,
qc_type_description=db_qc_state.qc_type.description,
qc_state=db_qc_state.qc_state_dict.state,
is_preliminary=db_qc_state.is_preliminary,
created_by=db_qc_state.created_by,
)
Loading