diff --git a/lib/base/abstract_project.py b/lib/base/abstract_project.py index 852cf7b..9c371ce 100644 --- a/lib/base/abstract_project.py +++ b/lib/base/abstract_project.py @@ -1,8 +1,11 @@ from abc import ABC, abstractmethod from typing import Any +from lib.core_utils.logging_utils import custom_logger from lib.module_utils.sjob_manager import SlurmJobManager +logging = custom_logger(__name__.split(".")[-1]) + class AbstractProject(ABC): """Abstract base class for realm module projects in the Yggdrasil application. @@ -18,7 +21,7 @@ class AbstractProject(ABC): Attributes: sjob_manager (SlurmJobManager): Manages submission and monitoring of Slurm jobs. doc (Any): The document representing the project or data to be processed. - yggdrasil_db_manager (Any): The database manager for Yggdrasil-specific database operations. + ydm (Any): The database manager (yggdrasil_db_manager) for Yggdrasil-specific database operations. """ def __init__(self, doc: Any, yggdrasil_db_manager: Any) -> None: @@ -30,7 +33,66 @@ def __init__(self, doc: Any, yggdrasil_db_manager: Any) -> None: """ self.sjob_manager: SlurmJobManager = SlurmJobManager() self.doc: Any = doc - self.yggdrasil_db_manager: Any = yggdrasil_db_manager + self.ydm: Any = yggdrasil_db_manager + self.project_id: str = self.doc.get("project_id") + self.doc_id: str = self.doc.get("_id") + self.method: str = self.doc.get("details", {}).get( + "library_construction_method", "" + ) + self.status: str = "ongoing" + self.project_info: dict = {} + self.samples: list = [] + self.proceed: bool = False # Default to False; subclasses can override + + # def setup_project(self): + # """Template method defining the steps for project setup.""" + # self.proceed = self.check_required_fields() + # if self.proceed: + # self.initialize_project_in_db() + # # self._extract_project_specific_info() + # # self.extract_samples() + # else: + # logging.error("Cannot proceed due to missing required fields.") + + def initialize_project_in_db(self): + """Initialize the project in the Yggdrasil database.""" + existing_document = self.ydm.check_project_exists(self.project_id) + if existing_document is None: + # Create the project in YggdrasilDB + self.ydm.create_project(self.project_id, self.doc_id, self.method) + logging.info(f"Project {self.project_id} created in YggdrasilDB.") + else: + logging.info(f"Project {self.project_id} already exists in YggdrasilDB.") + self.status = existing_document.get("status") + if self.status == "completed": + logging.info( + f"Project with ID {self.project_id} is already completed. Skipping processing." + ) + self.proceed = False + else: + logging.info( + f"Project with ID {self.project_id} is ongoing and will be processed." + ) + self.proceed = True + + def add_samples_to_project_in_db(self): + """Add samples to the project in the Yggdrasil database.""" + for sample in self.samples: + self.ydm.add_sample( + project_id=self.project_id, + sample_id=sample.id, + # lib_prep_option=sample.project_info.get("library_prep_option", ""), + status=sample.status, + ) + + @abstractmethod + def check_required_fields(self) -> bool: + """Check if the document contains all required fields. + + Returns: + bool: True if all required fields are present, False otherwise. + """ + pass @abstractmethod async def launch(self) -> None: diff --git a/ygg-mule.py b/ygg-mule.py index 7b7179b..de4d63c 100644 --- a/ygg-mule.py +++ b/ygg-mule.py @@ -34,53 +34,28 @@ def process_document(doc_id): project_id = document.get("project_id") - # Check if the project exists in the yggdrasil database - existing_document = ydm.check_project_exists(project_id) - - if existing_document is None: - projects_reference = document.get("_id") - method = document.get("details", {}).get("library_construction_method") - - # Create a new project in the yggdrasil database - ydm.create_project(project_id, projects_reference, method) - process_project = True - else: - # Check the status of the existing project - status = existing_document.get("status") - if status == "completed": - logging.info( - f"Project with ID {project_id} is already completed. Skipping processing." - ) - process_project = False - else: - logging.info( - f"Project with ID {project_id} is ongoing and will be processed." - ) - process_project = True - - if process_project: - # Determine the appropriate module to load - module_loc = get_module_location(document) - if not module_loc: - logging.warning(f"No module found for document with ID {doc_id}.") - return - - # Load and execute the module - try: - RealmClass = Ygg.load_realm_class(module_loc) - if RealmClass: - realm = RealmClass(document, ydm) - if realm.proceed: - asyncio.run(realm.launch()) - logging.info("Processing complete.") - else: - logging.info( - f"Skipping processing due to missing required information for project: {project_id}" - ) + # Determine the appropriate module to load + module_loc = get_module_location(document) + if not module_loc: + logging.warning(f"No module found for document with ID {doc_id}.") + return + + # Load and execute the module + try: + RealmClass = Ygg.load_realm_class(module_loc) + if RealmClass: + realm = RealmClass(document, ydm) + if realm.proceed: + asyncio.run(realm.launch()) + logging.info("Processing complete.") else: - logging.warning(f"Failed to load module '{module_loc}'.") - except Exception as e: - logging.error(f"Error while processing document: {e}", exc_info=True) + logging.info( + f"Skipping processing due to missing required information for project: {project_id}" + ) + else: + logging.warning(f"Failed to load module '{module_loc}'.") + except Exception as e: + logging.error(f"Error while processing document: {e}", exc_info=True) # TODO: If the module registry doesn’t change often, consider caching it to avoid reloading it every time diff --git a/ygg_trunk.py b/ygg_trunk.py index f5b0710..d47de6d 100644 --- a/ygg_trunk.py +++ b/ygg_trunk.py @@ -22,54 +22,54 @@ async def process_couchdb_changes(): # Fetch data from CouchDB and call the appropriate module async for data, module_loc in pdm.fetch_changes(): try: - project_id = data.get("project_id") - - # Check if the project exists - existing_document = ydm.check_project_exists(project_id) - - if existing_document is None: - projects_reference = data.get("_id") - method = data.get("details", {}).get( - "library_construction_method" - ) - - # Create a new project if it doesn't exist - ydm.create_project(project_id, projects_reference, method) - process_project = True - else: - # If the project exists, check if it is completed - if existing_document.get("status") == "completed": - logging.info( - f"Project with ID {project_id} is already completed. Skipping further processing." - ) - process_project = False + # project_id = data.get("project_id") + + # # Check if the project exists + # existing_document = ydm.check_project_exists(project_id) + + # if existing_document is None: + # projects_reference = data.get("_id") + # method = data.get("details", {}).get( + # "library_construction_method" + # ) + + # # Create a new project if it doesn't exist + # ydm.create_project(project_id, projects_reference, method) + # process_project = True + # else: + # # If the project exists, check if it is completed + # if existing_document.get("status") == "completed": + # logging.info( + # f"Project with ID {project_id} is already completed. Skipping further processing." + # ) + # process_project = False + # else: + # logging.info( + # f"Project with ID {project_id} is ongoing and will be processed." + # ) + # process_project = True + + # if process_project: + # Dynamically load the module + # module = Ygg.load_module(module_loc) + print(f">>> Module location: {module_loc}") + RealmClass = Ygg.load_realm_class(module_loc) + + if RealmClass: + # Call the module's launch function + realm = RealmClass(data, ydm) + if realm.proceed: + task = asyncio.create_task(realm.launch()) + tasks.append(task) + # print(f"Tasks ({realm.project_info['project_id']}): {tasks}") else: logging.info( - f"Project with ID {project_id} is ongoing and will be processed." - ) - process_project = True - - if process_project: - # Dynamically load the module - # module = Ygg.load_module(module_loc) - print(f">>> Module location: {module_loc}") - RealmClass = Ygg.load_realm_class(module_loc) - - if RealmClass: - # Call the module's launch function - realm = RealmClass(data, ydm) - if realm.proceed: - task = asyncio.create_task(realm.launch()) - tasks.append(task) - # print(f"Tasks ({realm.project_info['project_id']}): {tasks}") - else: - logging.info( - f"Skipping task creation due to missing required information. {data.get('project_id')}" - ) - else: - logging.warning( - f"Failed to load module '{module_loc}' for '{data['details']['library_construction_method']}'." + f"Skipping task creation due to missing required information. {data.get('project_id')}" ) + else: + logging.warning( + f"Failed to load module '{module_loc}' for '{data['details']['library_construction_method']}'." + ) except Exception as e: logging.warning( f"Error while trying to load module: {e}", exc_info=True