Skip to content

Commit

Permalink
Merge pull request #20 from glrs/feature/couchdb
Browse files Browse the repository at this point in the history
Enhance document and DB interaction, add new functionality and improv…
  • Loading branch information
glrs authored Nov 19, 2024
2 parents 240a58a + 7218c51 commit 8f99a58
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 31 deletions.
69 changes: 53 additions & 16 deletions lib/couchdb/document.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import datetime
from typing import Any, Dict, List, Optional

from lib.core_utils.logging_utils import custom_logger

logging = custom_logger(__name__.split(".")[-1])


class YggdrasilDocument:
"""Represents a Yggdrasil project document.
Expand Down Expand Up @@ -74,22 +78,27 @@ def to_dict(self) -> Dict[str, Any]:
}

def add_sample(
self, sample_id: str, lib_prep_option: str, status: str = "pending"
self,
sample_id: str,
lib_prep_option: str,
status: str = "pending",
flowcell_ids_processed_for: Optional[List[str]] = None,
) -> None:
"""Adds a new sample to the document.
Args:
sample_id (str): The sample ID.
lib_prep_option (str): The library preparation option.
status (str, optional): The status of the sample. Defaults to "pending".
flowcell_ids_processed_for (List[str], optional): Flowcell IDs the sample has been processed for.
"""
sample = {
"sample_id": sample_id,
"status": status,
"lib_prep_option": lib_prep_option,
"start_time": "",
"end_time": "",
"flowcell_ids_processed_for": [],
"flowcell_ids_processed_for": flowcell_ids_processed_for or [],
}
self.samples.append(sample)

Expand All @@ -100,14 +109,24 @@ def update_sample_status(self, sample_id: str, status: str) -> None:
sample_id (str): The sample ID to update.
status (str): The new status of the sample.
"""
for sample in self.samples:
if sample["sample_id"] == sample_id:
sample["status"] = status
if status == "running":
sample["start_time"] = datetime.datetime.now().isoformat()
elif status in ["completed", "failed"]:
sample["end_time"] = datetime.datetime.now().isoformat()
break
sample = self.get_sample(sample_id)
if sample:
sample["status"] = status
current_time = datetime.datetime.now().isoformat()
if status in ["processing", "running"]:
sample["start_time"] = current_time
elif status in [
"completed",
"processing_failed",
"post_processing_failed",
"aborted",
]:
sample["end_time"] = current_time
else:
logging.error(
f"Sample with ID '{sample_id}' not "
f"found in project '{self.project_id}'."
)

# Check if the project status needs to be updated
self.check_project_completion()
Expand All @@ -126,21 +145,39 @@ def get_sample(self, sample_id: str) -> Optional[Dict[str, Any]]:
return sample
return None

def update_project_status(self, status: str) -> None:
"""Updates the status of the project.
Args:
status (str): The new status of the project.
"""
self.status = status
if status == "completed":
if not self.end_date:
self.end_date = datetime.datetime.now().isoformat()
elif status in ["processing", "failed"]:
self.end_date = ""

def check_project_completion(self) -> None:
"""Checks if all samples are completed and updates the project status.
Samples with status "completed" or "aborted" are considered completed.
Samples with status "completed" or "aborted" are considered finished.
Note:
There will be cases where samples are "aborted". These samples
should be considered as "completed" for the project status.
should be considered "completed" for the project status.
"""
# if all(sample["status"] == "completed" for sample in self.samples):
if all(sample["status"] in ["completed", "aborted"] for sample in self.samples):
# List of statuses indicating a sample is finished
finished_statuses = [
"completed",
"aborted",
] # , "processing_failed", "post_processing_failed"]

if all(sample["status"] in finished_statuses for sample in self.samples):
self.status = "completed"
self.end_date = datetime.datetime.now().isoformat()
else:
# If any sample is not completed (or aborted), set the project status to ongoing
self.status = "ongoing"
# If any sample is not completed (or aborted), set the project status to "processing"
self.status = "processing"
# Clear the end date since the project is not completed
self.end_date = ""
60 changes: 45 additions & 15 deletions lib/couchdb/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ async def get_changes(
"Received `None` for last_processed_seq. Skipping save."
)

yield doc
if doc is not None:
yield doc
else:
logging.warning(f"Document with ID {change['id']} is None.")
except Exception as e:
logging.warning(f"Error processing change: {e}")
logging.debug(f"Data causing the error: {change}")
Expand Down Expand Up @@ -221,7 +224,12 @@ def create_project(

def save_document(self, document: YggdrasilDocument) -> None:
try:
self.db.save(document.to_dict())
existing_doc = self.db.get(document._id)
doc_dict = document.to_dict()
if existing_doc:
# Preserve the _rev field to avoid update conflicts
doc_dict["_rev"] = existing_doc["_rev"]
self.db.save(doc_dict)
logging.info(
f"Document with ID '{document._id}' saved successfully in 'yggdrasil' DB."
)
Expand Down Expand Up @@ -258,20 +266,10 @@ def update_sample_status(
status (str): The new status for the sample.
"""
try:
document = self.get_document_by_project_id(project_id)
if document:
# TODO: Implement .from_dict() method in YggdrasilDocument
ygg_doc = YggdrasilDocument.from_dict(document)
######## Replace the following with the above line ########
# ygg_doc = YggdrasilDocument(
# project_id=document["project_id"],
# projects_reference=document["projects_reference"],
# method=document["method"]
# )
# ygg_doc.samples = document["samples"]
##########################################################
document_dict = self.get_document_by_project_id(project_id)
if document_dict:
ygg_doc = YggdrasilDocument.from_dict(document_dict)
ygg_doc.update_sample_status(sample_id, status)
ygg_doc.check_project_completion()
self.save_document(ygg_doc)
logging.info(
f"Updated status of sample '{sample_id}' in project '{project_id}' to '{status}'."
Expand All @@ -297,3 +295,35 @@ def check_project_exists(self, project_id: str) -> Optional[Dict[str, Any]]:
else:
logging.info(f"Project with ID '{project_id}' does not exist.")
return None

def get_sample_status(self, project_id: str, sample_id: str) -> Optional[str]:
"""Retrieves the status of a specific sample.
Args:
project_id (str): The project ID.
sample_id (str): The sample ID.
Returns:
Optional[str]: The status of the sample if found, else None.
"""
document_dict = self.get_document_by_project_id(project_id)
if document_dict:
ygg_doc = YggdrasilDocument.from_dict(document_dict)
sample = ygg_doc.get_sample(sample_id)
if sample:
return sample["status"]
return None

def get_project_status(self, project_id: str) -> Optional[str]:
"""Retrieves the status of a project.
Args:
project_id (str): The project ID.
Returns:
Optional[str]: The status of the project if found, else None.
"""
document_dict = self.get_document_by_project_id(project_id)
if document_dict:
return document_dict.get("status")
return None

0 comments on commit 8f99a58

Please sign in to comment.