From 7218c5189bc084cb08034bf9d39045b22ea8ae6e Mon Sep 17 00:00:00 2001 From: glrs <5999366+glrs@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:16:50 +0100 Subject: [PATCH] Enhance document and DB interaction, add new functionality and improve existing code --- lib/couchdb/document.py | 69 +++++++++++++++++++++++++++++++---------- lib/couchdb/manager.py | 60 ++++++++++++++++++++++++++--------- 2 files changed, 98 insertions(+), 31 deletions(-) diff --git a/lib/couchdb/document.py b/lib/couchdb/document.py index 38e0130..61efdef 100644 --- a/lib/couchdb/document.py +++ b/lib/couchdb/document.py @@ -1,6 +1,10 @@ import datetime from typing import Any, Dict, List, Optional +from lib.core_utils.logging_utils import custom_logger + +logging = custom_logger(__name__.split(".")[-1]) + class YggdrasilDocument: """Represents a Yggdrasil project document. @@ -74,7 +78,11 @@ def to_dict(self) -> Dict[str, Any]: } def add_sample( - self, sample_id: str, lib_prep_option: str, status: str = "pending" + self, + sample_id: str, + lib_prep_option: str, + status: str = "pending", + flowcell_ids_processed_for: Optional[List[str]] = None, ) -> None: """Adds a new sample to the document. @@ -82,6 +90,7 @@ def add_sample( sample_id (str): The sample ID. lib_prep_option (str): The library preparation option. status (str, optional): The status of the sample. Defaults to "pending". + flowcell_ids_processed_for (List[str], optional): Flowcell IDs the sample has been processed for. """ sample = { "sample_id": sample_id, @@ -89,7 +98,7 @@ def add_sample( "lib_prep_option": lib_prep_option, "start_time": "", "end_time": "", - "flowcell_ids_processed_for": [], + "flowcell_ids_processed_for": flowcell_ids_processed_for or [], } self.samples.append(sample) @@ -100,14 +109,24 @@ def update_sample_status(self, sample_id: str, status: str) -> None: sample_id (str): The sample ID to update. status (str): The new status of the sample. """ - for sample in self.samples: - if sample["sample_id"] == sample_id: - sample["status"] = status - if status == "running": - sample["start_time"] = datetime.datetime.now().isoformat() - elif status in ["completed", "failed"]: - sample["end_time"] = datetime.datetime.now().isoformat() - break + sample = self.get_sample(sample_id) + if sample: + sample["status"] = status + current_time = datetime.datetime.now().isoformat() + if status in ["processing", "running"]: + sample["start_time"] = current_time + elif status in [ + "completed", + "processing_failed", + "post_processing_failed", + "aborted", + ]: + sample["end_time"] = current_time + else: + logging.error( + f"Sample with ID '{sample_id}' not " + f"found in project '{self.project_id}'." + ) # Check if the project status needs to be updated self.check_project_completion() @@ -126,21 +145,39 @@ def get_sample(self, sample_id: str) -> Optional[Dict[str, Any]]: return sample return None + def update_project_status(self, status: str) -> None: + """Updates the status of the project. + + Args: + status (str): The new status of the project. + """ + self.status = status + if status == "completed": + if not self.end_date: + self.end_date = datetime.datetime.now().isoformat() + elif status in ["processing", "failed"]: + self.end_date = "" + def check_project_completion(self) -> None: """Checks if all samples are completed and updates the project status. - Samples with status "completed" or "aborted" are considered completed. + Samples with status "completed" or "aborted" are considered finished. Note: There will be cases where samples are "aborted". These samples - should be considered as "completed" for the project status. + should be considered "completed" for the project status. """ - # if all(sample["status"] == "completed" for sample in self.samples): - if all(sample["status"] in ["completed", "aborted"] for sample in self.samples): + # List of statuses indicating a sample is finished + finished_statuses = [ + "completed", + "aborted", + ] # , "processing_failed", "post_processing_failed"] + + if all(sample["status"] in finished_statuses for sample in self.samples): self.status = "completed" self.end_date = datetime.datetime.now().isoformat() else: - # If any sample is not completed (or aborted), set the project status to ongoing - self.status = "ongoing" + # If any sample is not completed (or aborted), set the project status to "processing" + self.status = "processing" # Clear the end date since the project is not completed self.end_date = "" diff --git a/lib/couchdb/manager.py b/lib/couchdb/manager.py index f06a15c..ed24be0 100644 --- a/lib/couchdb/manager.py +++ b/lib/couchdb/manager.py @@ -168,7 +168,10 @@ async def get_changes( "Received `None` for last_processed_seq. Skipping save." ) - yield doc + if doc is not None: + yield doc + else: + logging.warning(f"Document with ID {change['id']} is None.") except Exception as e: logging.warning(f"Error processing change: {e}") logging.debug(f"Data causing the error: {change}") @@ -221,7 +224,12 @@ def create_project( def save_document(self, document: YggdrasilDocument) -> None: try: - self.db.save(document.to_dict()) + existing_doc = self.db.get(document._id) + doc_dict = document.to_dict() + if existing_doc: + # Preserve the _rev field to avoid update conflicts + doc_dict["_rev"] = existing_doc["_rev"] + self.db.save(doc_dict) logging.info( f"Document with ID '{document._id}' saved successfully in 'yggdrasil' DB." ) @@ -258,20 +266,10 @@ def update_sample_status( status (str): The new status for the sample. """ try: - document = self.get_document_by_project_id(project_id) - if document: - # TODO: Implement .from_dict() method in YggdrasilDocument - ygg_doc = YggdrasilDocument.from_dict(document) - ######## Replace the following with the above line ######## - # ygg_doc = YggdrasilDocument( - # project_id=document["project_id"], - # projects_reference=document["projects_reference"], - # method=document["method"] - # ) - # ygg_doc.samples = document["samples"] - ########################################################## + document_dict = self.get_document_by_project_id(project_id) + if document_dict: + ygg_doc = YggdrasilDocument.from_dict(document_dict) ygg_doc.update_sample_status(sample_id, status) - ygg_doc.check_project_completion() self.save_document(ygg_doc) logging.info( f"Updated status of sample '{sample_id}' in project '{project_id}' to '{status}'." @@ -297,3 +295,35 @@ def check_project_exists(self, project_id: str) -> Optional[Dict[str, Any]]: else: logging.info(f"Project with ID '{project_id}' does not exist.") return None + + def get_sample_status(self, project_id: str, sample_id: str) -> Optional[str]: + """Retrieves the status of a specific sample. + + Args: + project_id (str): The project ID. + sample_id (str): The sample ID. + + Returns: + Optional[str]: The status of the sample if found, else None. + """ + document_dict = self.get_document_by_project_id(project_id) + if document_dict: + ygg_doc = YggdrasilDocument.from_dict(document_dict) + sample = ygg_doc.get_sample(sample_id) + if sample: + return sample["status"] + return None + + def get_project_status(self, project_id: str) -> Optional[str]: + """Retrieves the status of a project. + + Args: + project_id (str): The project ID. + + Returns: + Optional[str]: The status of the project if found, else None. + """ + document_dict = self.get_document_by_project_id(project_id) + if document_dict: + return document_dict.get("status") + return None