Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance document and DB interaction, add new functionality and improv… #20

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 53 additions & 16 deletions lib/couchdb/document.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import datetime
from typing import Any, Dict, List, Optional

from lib.core_utils.logging_utils import custom_logger

logging = custom_logger(__name__.split(".")[-1])


class YggdrasilDocument:
"""Represents a Yggdrasil project document.
Expand Down Expand Up @@ -74,22 +78,27 @@ def to_dict(self) -> Dict[str, Any]:
}

def add_sample(
self, sample_id: str, lib_prep_option: str, status: str = "pending"
self,
sample_id: str,
lib_prep_option: str,
status: str = "pending",
flowcell_ids_processed_for: Optional[List[str]] = None,
) -> None:
"""Adds a new sample to the document.

Args:
sample_id (str): The sample ID.
lib_prep_option (str): The library preparation option.
status (str, optional): The status of the sample. Defaults to "pending".
flowcell_ids_processed_for (List[str], optional): Flowcell IDs the sample has been processed for.
"""
sample = {
"sample_id": sample_id,
"status": status,
"lib_prep_option": lib_prep_option,
"start_time": "",
"end_time": "",
"flowcell_ids_processed_for": [],
"flowcell_ids_processed_for": flowcell_ids_processed_for or [],
}
self.samples.append(sample)

Expand All @@ -100,14 +109,24 @@ def update_sample_status(self, sample_id: str, status: str) -> None:
sample_id (str): The sample ID to update.
status (str): The new status of the sample.
"""
for sample in self.samples:
if sample["sample_id"] == sample_id:
sample["status"] = status
if status == "running":
sample["start_time"] = datetime.datetime.now().isoformat()
elif status in ["completed", "failed"]:
sample["end_time"] = datetime.datetime.now().isoformat()
break
sample = self.get_sample(sample_id)
if sample:
sample["status"] = status
current_time = datetime.datetime.now().isoformat()
if status in ["processing", "running"]:
sample["start_time"] = current_time
elif status in [
"completed",
"processing_failed",
"post_processing_failed",
"aborted",
]:
sample["end_time"] = current_time
else:
logging.error(
f"Sample with ID '{sample_id}' not "
f"found in project '{self.project_id}'."
)

# Check if the project status needs to be updated
self.check_project_completion()
Expand All @@ -126,21 +145,39 @@ def get_sample(self, sample_id: str) -> Optional[Dict[str, Any]]:
return sample
return None

def update_project_status(self, status: str) -> None:
"""Updates the status of the project.

Args:
status (str): The new status of the project.
"""
self.status = status
if status == "completed":
if not self.end_date:
self.end_date = datetime.datetime.now().isoformat()
elif status in ["processing", "failed"]:
self.end_date = ""

def check_project_completion(self) -> None:
"""Checks if all samples are completed and updates the project status.

Samples with status "completed" or "aborted" are considered completed.
Samples with status "completed" or "aborted" are considered finished.

Note:
There will be cases where samples are "aborted". These samples
should be considered as "completed" for the project status.
should be considered "completed" for the project status.
"""
# if all(sample["status"] == "completed" for sample in self.samples):
if all(sample["status"] in ["completed", "aborted"] for sample in self.samples):
# List of statuses indicating a sample is finished
finished_statuses = [
"completed",
"aborted",
] # , "processing_failed", "post_processing_failed"]

if all(sample["status"] in finished_statuses for sample in self.samples):
self.status = "completed"
self.end_date = datetime.datetime.now().isoformat()
else:
# If any sample is not completed (or aborted), set the project status to ongoing
self.status = "ongoing"
# If any sample is not completed (or aborted), set the project status to "processing"
self.status = "processing"
# Clear the end date since the project is not completed
self.end_date = ""
60 changes: 45 additions & 15 deletions lib/couchdb/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ async def get_changes(
"Received `None` for last_processed_seq. Skipping save."
)

yield doc
if doc is not None:
yield doc
else:
logging.warning(f"Document with ID {change['id']} is None.")
except Exception as e:
logging.warning(f"Error processing change: {e}")
logging.debug(f"Data causing the error: {change}")
Expand Down Expand Up @@ -221,7 +224,12 @@ def create_project(

def save_document(self, document: YggdrasilDocument) -> None:
try:
self.db.save(document.to_dict())
existing_doc = self.db.get(document._id)
doc_dict = document.to_dict()
if existing_doc:
# Preserve the _rev field to avoid update conflicts
doc_dict["_rev"] = existing_doc["_rev"]
self.db.save(doc_dict)
logging.info(
f"Document with ID '{document._id}' saved successfully in 'yggdrasil' DB."
)
Expand Down Expand Up @@ -258,20 +266,10 @@ def update_sample_status(
status (str): The new status for the sample.
"""
try:
document = self.get_document_by_project_id(project_id)
if document:
# TODO: Implement .from_dict() method in YggdrasilDocument
ygg_doc = YggdrasilDocument.from_dict(document)
######## Replace the following with the above line ########
# ygg_doc = YggdrasilDocument(
# project_id=document["project_id"],
# projects_reference=document["projects_reference"],
# method=document["method"]
# )
# ygg_doc.samples = document["samples"]
##########################################################
document_dict = self.get_document_by_project_id(project_id)
if document_dict:
ygg_doc = YggdrasilDocument.from_dict(document_dict)
ygg_doc.update_sample_status(sample_id, status)
ygg_doc.check_project_completion()
self.save_document(ygg_doc)
logging.info(
f"Updated status of sample '{sample_id}' in project '{project_id}' to '{status}'."
Expand All @@ -297,3 +295,35 @@ def check_project_exists(self, project_id: str) -> Optional[Dict[str, Any]]:
else:
logging.info(f"Project with ID '{project_id}' does not exist.")
return None

def get_sample_status(self, project_id: str, sample_id: str) -> Optional[str]:
"""Retrieves the status of a specific sample.

Args:
project_id (str): The project ID.
sample_id (str): The sample ID.

Returns:
Optional[str]: The status of the sample if found, else None.
"""
document_dict = self.get_document_by_project_id(project_id)
if document_dict:
ygg_doc = YggdrasilDocument.from_dict(document_dict)
sample = ygg_doc.get_sample(sample_id)
if sample:
return sample["status"]
return None

def get_project_status(self, project_id: str) -> Optional[str]:
"""Retrieves the status of a project.

Args:
project_id (str): The project ID.

Returns:
Optional[str]: The status of the project if found, else None.
"""
document_dict = self.get_document_by_project_id(project_id)
if document_dict:
return document_dict.get("status")
return None
Loading