Skip to content

Commit

Permalink
Merge pull request #209 from mgcam/optimise_qc_state_retrieval
Browse files Browse the repository at this point in the history
Optimise QC state retrieval
  • Loading branch information
nerdstrike authored Mar 5, 2024
2 parents ad384c8 + f5f79b7 commit 392bdbc
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 99 deletions.
54 changes: 20 additions & 34 deletions lang_qc/db/helper/qc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.

from collections import defaultdict
from datetime import datetime

from sqlalchemy import and_, func, select
Expand Down Expand Up @@ -95,10 +96,13 @@ def get_qc_states_by_id_product_list(
`sequencing_outcomes_only`- a boolean flag, False by default.
"""

return _map_to_qc_state_models(
seq_products=_get_seq_product_by_id_list(session, ids),
sequencing_outcomes_only=sequencing_outcomes_only,
)
qc_states = _get_qc_state_by_id_list(session, ids, sequencing_outcomes_only)

response = defaultdict(list)
for state in qc_states:
response[state.seq_product.id_product].append(QcState.from_orm(state))

return dict(response)


def product_has_qc_state(
Expand Down Expand Up @@ -385,49 +389,31 @@ def assign_qc_state_to_product(
return qc_state_db


def _get_seq_product_by_id_list(
session: Session, ids: list[ChecksumSHA256]
) -> list[SeqProduct]:
def _get_qc_state_by_id_list(
session: Session, ids: list[ChecksumSHA256], sequencing_outcomes_only: bool
) -> list[QcStateDb]:
"""
Generates and executes a query for SeqProducts from a list
of product IDs. Prefetch all related QC states, types, etc.
"""
query = (
select(SeqProduct)
.join(QcStateDb)
select(QcStateDb)
.join(QcStateDb.seq_product)
.join(QcType)
.join(QcStateDict)
.join(User)
.where(SeqProduct.id_product.in_(ids))
.options(
selectinload(SeqProduct.qc_state).options(
selectinload(QcStateDb.qc_type),
selectinload(QcStateDb.user),
selectinload(QcStateDb.qc_state_dict),
)
selectinload(QcStateDb.seq_product),
selectinload(QcStateDb.qc_type),
selectinload(QcStateDb.user),
selectinload(QcStateDb.qc_state_dict),
)
)
return session.execute(query).scalars().all()


def _map_to_qc_state_models(
seq_products: list[SeqProduct], sequencing_outcomes_only: bool = False
) -> dict[ChecksumSHA256, list[QcState]]:
"""
Given a list of SeqProducts, convert all related QC states into
QcState response format and hashes them by their product ID.
if sequencing_outcomes_only is True:
query = query.where(QcType.qc_type == SEQUENCING_QC_TYPE)

If only sequencing type QC states are required, an optional
argument, sequencing_outcomes_only, should be set to True.
"""
response = dict()
for product in seq_products:
response[product.id_product] = []
for qc in product.qc_state:
if sequencing_outcomes_only and (qc.qc_type.qc_type != "sequencing"):
continue
response[product.id_product].append(QcState.from_orm(qc))
return response
return session.execute(query).scalars().all()


def _get_qc_type_row(session: Session, qc_type: str) -> QcType:
Expand Down
90 changes: 31 additions & 59 deletions lang_qc/db/helper/wells.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def create_for_run(self, run_name: str) -> PacBioPagedWells:
page_number=self.page_number,
page_size=self.page_size,
total_number_of_items=total_number_of_wells,
wells=self._well_models(wells, True),
wells=self._well_models(wells),
)

def _build_query4status(self, qc_flow_status: QcFlowStatusEnum):
Expand Down Expand Up @@ -350,40 +350,18 @@ def _upcoming_wells(self):
)
)

wells = self.session.execute(query).scalars().all()
ids_with_qc_state = products_have_qc_state(
session=self.qcdb_session, ids=[w.id_pac_bio_product for w in wells]
)
wells = [w for w in wells if w.id_pac_bio_product not in ids_with_qc_state]

recent_wells = self.session.execute(query).scalars().all()
wells = self._wells_without_seq_qc_state(recent_wells)
self.total_number_of_items = len(wells) # Save the number of retrieved wells.

return self._well_models(self.slice_data(wells), False)
return self._well_models(self.slice_data(wells))

def _recent_inbox_wells(self, recent_wells):

inbox_wells_indexes = []
for index, db_well in enumerate(recent_wells):
id_product = db_well.id_pac_bio_product
# TODO: Create a method for retrieving a seq. QC state for a product.
qced_products = get_qc_states_by_id_product_list(
session=self.qcdb_session,
ids=[id_product],
sequencing_outcomes_only=True,
).get(id_product)
if qced_products is None:
inbox_wells_indexes.append(index)

# Save the number of retrieved rows.
self.total_number_of_items = len(inbox_wells_indexes)

inbox_wells = []
# Iterate over indexes of records we want for this page and retrieve data
# for this page.
for index in self.slice_data(inbox_wells_indexes):
inbox_wells.append(recent_wells[index])

return self._well_models(inbox_wells)
wells = self._wells_without_seq_qc_state(recent_wells)
self.total_number_of_items = len(wells)

return self._well_models(self.slice_data(wells))

def _aborted_and_unknown_wells(self, qc_flow_status: QcFlowStatusEnum):

Expand All @@ -401,34 +379,22 @@ def _aborted_and_unknown_wells(self, qc_flow_status: QcFlowStatusEnum):
.all()
)

qc_state_applicable = True
if qc_flow_status == QcFlowStatusEnum.UNKNOWN:
# Remove the wells that the QC team has dealt with.
ids_with_qc_state = products_have_qc_state(
session=self.qcdb_session,
ids=[w.id_pac_bio_product for w in wells],
sequencing_outcomes_only=True,
)
wells = [w for w in wells if w.id_pac_bio_product not in ids_with_qc_state]
qc_state_applicable = False

# Save the number of retrieved rows.
wells = self._wells_without_seq_qc_state(wells)
self.total_number_of_items = len(wells)

return self._well_models(self.slice_data(wells), qc_state_applicable)
return self._well_models(self.slice_data(wells))

def _well_models(
self,
db_wells_list: List[PacBioRunWellMetrics],
qc_state_applicable: bool = False,
):

# Normally QC data is not available for the inbox, aborted, etc.
# wells. If some well with a non-inbox status has QC state assigned,
# the same well will also be retrieved by the 'in progress' or
# 'on hold' or 'qc complete' queries. However, it is useful to display
# the QC state if it is available. The `qc_state_applicable` argument
# is a hint to fetch QC state.
qced_products = get_qc_states_by_id_product_list(
session=self.qcdb_session,
ids=[db_well.id_pac_bio_product for db_well in db_wells_list],
sequencing_outcomes_only=True,
)
pb_wells = []
for db_well in db_wells_list:
id_product = db_well.id_pac_bio_product
Expand All @@ -438,18 +404,24 @@ def _well_models(
"plate_number": db_well.plate_number,
"label": db_well.well_label,
}
if qc_state_applicable:
# TODO: Query by all IDs at once.
qced_products = get_qc_states_by_id_product_list(
session=self.qcdb_session,
ids=[id_product],
sequencing_outcomes_only=True,
).get(id_product)
# A well can have only one or zero current sequencing outcomes.
if qced_products is not None and (len(qced_products) > 0):
attrs["qc_state"] = qced_products[0]
if id_product in qced_products:
attrs["qc_state"] = qced_products[id_product][0]
pb_well = PacBioWell.model_validate(attrs)
pb_well.copy_run_tracking_info(db_well)
pb_wells.append(pb_well)

return pb_wells

def _wells_without_seq_qc_state(
self,
db_wells_list: List[PacBioRunWellMetrics],
):

ids_with_qc_state = products_have_qc_state(
session=self.qcdb_session,
ids=[w.id_pac_bio_product for w in db_wells_list],
sequencing_outcomes_only=True,
)
return [
w for w in db_wells_list if w.id_pac_bio_product not in ids_with_qc_state
]
12 changes: 6 additions & 6 deletions lang_qc/models/pacbio/well.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy.orm import Session

from lang_qc.db.helper.qc import get_qc_states_by_id_product_list
from lang_qc.db.helper.qc import get_qc_state_for_product
from lang_qc.db.mlwh_schema import PacBioRunWellMetrics
from lang_qc.models.pacbio.experiment import PacBioExperiment
from lang_qc.models.pacbio.qc_data import QCDataWell
Expand Down Expand Up @@ -151,10 +151,10 @@ def from_orm(cls, mlwh_db_row: PacBioRunWellMetrics, qc_session: Session):
if len(experiment_info):
obj.experiment_tracking = PacBioExperiment.from_orm(experiment_info)

qced_products = get_qc_states_by_id_product_list(
session=qc_session, ids=[id_product], sequencing_outcomes_only=True
).get(id_product)
if qced_products is not None:
obj.qc_state = qced_products[0]
qc_state_db = get_qc_state_for_product(
session=qc_session, id_product=id_product
)
if qc_state_db is not None:
obj.qc_state = QcState.from_orm(qc_state_db)

return obj

0 comments on commit 392bdbc

Please sign in to comment.