Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/core #21

Merged
merged 4 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions lib/base/abstract_project.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from abc import ABC, abstractmethod
from typing import Any

from lib.core_utils.logging_utils import custom_logger
from lib.module_utils.sjob_manager import SlurmJobManager

logging = custom_logger(__name__.split(".")[-1])


class AbstractProject(ABC):
"""Abstract base class for realm module projects in the Yggdrasil application.
Expand All @@ -18,7 +21,7 @@ class AbstractProject(ABC):
Attributes:
sjob_manager (SlurmJobManager): Manages submission and monitoring of Slurm jobs.
doc (Any): The document representing the project or data to be processed.
yggdrasil_db_manager (Any): The database manager for Yggdrasil-specific database operations.
ydm (Any): The database manager (yggdrasil_db_manager) for Yggdrasil-specific database operations.
"""

def __init__(self, doc: Any, yggdrasil_db_manager: Any) -> None:
Expand All @@ -30,7 +33,66 @@ def __init__(self, doc: Any, yggdrasil_db_manager: Any) -> None:
"""
self.sjob_manager: SlurmJobManager = SlurmJobManager()
self.doc: Any = doc
self.yggdrasil_db_manager: Any = yggdrasil_db_manager
self.ydm: Any = yggdrasil_db_manager
self.project_id: str = self.doc.get("project_id")
self.doc_id: str = self.doc.get("_id")
self.method: str = self.doc.get("details", {}).get(
"library_construction_method", ""
)
self.status: str = "ongoing"
self.project_info: dict = {}
self.samples: list = []
self.proceed: bool = False # Default to False; subclasses can override

# def setup_project(self):
# """Template method defining the steps for project setup."""
# self.proceed = self.check_required_fields()
# if self.proceed:
# self.initialize_project_in_db()
# # self._extract_project_specific_info()
# # self.extract_samples()
# else:
# logging.error("Cannot proceed due to missing required fields.")

def initialize_project_in_db(self):
"""Initialize the project in the Yggdrasil database."""
existing_document = self.ydm.check_project_exists(self.project_id)
if existing_document is None:
# Create the project in YggdrasilDB
self.ydm.create_project(self.project_id, self.doc_id, self.method)
logging.info(f"Project {self.project_id} created in YggdrasilDB.")
else:
logging.info(f"Project {self.project_id} already exists in YggdrasilDB.")
self.status = existing_document.get("status")
if self.status == "completed":
logging.info(
f"Project with ID {self.project_id} is already completed. Skipping processing."
)
self.proceed = False
else:
logging.info(
f"Project with ID {self.project_id} is ongoing and will be processed."
)
self.proceed = True

def add_samples_to_project_in_db(self):
"""Add samples to the project in the Yggdrasil database."""
for sample in self.samples:
self.ydm.add_sample(
project_id=self.project_id,
sample_id=sample.id,
# lib_prep_option=sample.project_info.get("library_prep_option", ""),
status=sample.status,
)

@abstractmethod
def check_required_fields(self) -> bool:
"""Check if the document contains all required fields.

Returns:
bool: True if all required fields are present, False otherwise.
"""
pass

@abstractmethod
async def launch(self) -> None:
Expand Down
67 changes: 21 additions & 46 deletions ygg-mule.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,53 +34,28 @@ def process_document(doc_id):

project_id = document.get("project_id")

# Check if the project exists in the yggdrasil database
existing_document = ydm.check_project_exists(project_id)

if existing_document is None:
projects_reference = document.get("_id")
method = document.get("details", {}).get("library_construction_method")

# Create a new project in the yggdrasil database
ydm.create_project(project_id, projects_reference, method)
process_project = True
else:
# Check the status of the existing project
status = existing_document.get("status")
if status == "completed":
logging.info(
f"Project with ID {project_id} is already completed. Skipping processing."
)
process_project = False
else:
logging.info(
f"Project with ID {project_id} is ongoing and will be processed."
)
process_project = True

if process_project:
# Determine the appropriate module to load
module_loc = get_module_location(document)
if not module_loc:
logging.warning(f"No module found for document with ID {doc_id}.")
return

# Load and execute the module
try:
RealmClass = Ygg.load_realm_class(module_loc)
if RealmClass:
realm = RealmClass(document, ydm)
if realm.proceed:
asyncio.run(realm.launch())
logging.info("Processing complete.")
else:
logging.info(
f"Skipping processing due to missing required information for project: {project_id}"
)
# Determine the appropriate module to load
module_loc = get_module_location(document)
if not module_loc:
logging.warning(f"No module found for document with ID {doc_id}.")
return

# Load and execute the module
try:
RealmClass = Ygg.load_realm_class(module_loc)
if RealmClass:
realm = RealmClass(document, ydm)
if realm.proceed:
asyncio.run(realm.launch())
logging.info("Processing complete.")
else:
logging.warning(f"Failed to load module '{module_loc}'.")
except Exception as e:
logging.error(f"Error while processing document: {e}", exc_info=True)
logging.info(
f"Skipping processing due to missing required information for project: {project_id}"
)
else:
logging.warning(f"Failed to load module '{module_loc}'.")
except Exception as e:
logging.error(f"Error while processing document: {e}", exc_info=True)


# TODO: If the module registry doesn’t change often, consider caching it to avoid reloading it every time
Expand Down
90 changes: 45 additions & 45 deletions ygg_trunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,54 +22,54 @@ async def process_couchdb_changes():
# Fetch data from CouchDB and call the appropriate module
async for data, module_loc in pdm.fetch_changes():
try:
project_id = data.get("project_id")

# Check if the project exists
existing_document = ydm.check_project_exists(project_id)

if existing_document is None:
projects_reference = data.get("_id")
method = data.get("details", {}).get(
"library_construction_method"
)

# Create a new project if it doesn't exist
ydm.create_project(project_id, projects_reference, method)
process_project = True
else:
# If the project exists, check if it is completed
if existing_document.get("status") == "completed":
logging.info(
f"Project with ID {project_id} is already completed. Skipping further processing."
)
process_project = False
# project_id = data.get("project_id")

# # Check if the project exists
# existing_document = ydm.check_project_exists(project_id)

# if existing_document is None:
# projects_reference = data.get("_id")
# method = data.get("details", {}).get(
# "library_construction_method"
# )

# # Create a new project if it doesn't exist
# ydm.create_project(project_id, projects_reference, method)
# process_project = True
# else:
# # If the project exists, check if it is completed
# if existing_document.get("status") == "completed":
# logging.info(
# f"Project with ID {project_id} is already completed. Skipping further processing."
# )
# process_project = False
# else:
# logging.info(
# f"Project with ID {project_id} is ongoing and will be processed."
# )
# process_project = True

# if process_project:
# Dynamically load the module
# module = Ygg.load_module(module_loc)
print(f">>> Module location: {module_loc}")
RealmClass = Ygg.load_realm_class(module_loc)

if RealmClass:
# Call the module's launch function
realm = RealmClass(data, ydm)
if realm.proceed:
task = asyncio.create_task(realm.launch())
tasks.append(task)
# print(f"Tasks ({realm.project_info['project_id']}): {tasks}")
else:
logging.info(
f"Project with ID {project_id} is ongoing and will be processed."
)
process_project = True

if process_project:
# Dynamically load the module
# module = Ygg.load_module(module_loc)
print(f">>> Module location: {module_loc}")
RealmClass = Ygg.load_realm_class(module_loc)

if RealmClass:
# Call the module's launch function
realm = RealmClass(data, ydm)
if realm.proceed:
task = asyncio.create_task(realm.launch())
tasks.append(task)
# print(f"Tasks ({realm.project_info['project_id']}): {tasks}")
else:
logging.info(
f"Skipping task creation due to missing required information. {data.get('project_id')}"
)
else:
logging.warning(
f"Failed to load module '{module_loc}' for '{data['details']['library_construction_method']}'."
f"Skipping task creation due to missing required information. {data.get('project_id')}"
)
else:
logging.warning(
f"Failed to load module '{module_loc}' for '{data['details']['library_construction_method']}'."
)
except Exception as e:
logging.warning(
f"Error while trying to load module: {e}", exc_info=True
Expand Down
Loading