From db84ed8a96b20799e8aac9e832501e97f0ffb5bf Mon Sep 17 00:00:00 2001 From: glrs <5999366+glrs@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:06:01 +0100 Subject: [PATCH 1/3] Move things around. Move SS3 sample class to a new file --- lib/realms/smartseq3/smartseq3.py | 376 +---------------------------- lib/realms/smartseq3/ss3_sample.py | 373 ++++++++++++++++++++++++++++ 2 files changed, 376 insertions(+), 373 deletions(-) create mode 100644 lib/realms/smartseq3/ss3_sample.py diff --git a/lib/realms/smartseq3/smartseq3.py b/lib/realms/smartseq3/smartseq3.py index b4ed9cd..20df672 100644 --- a/lib/realms/smartseq3/smartseq3.py +++ b/lib/realms/smartseq3/smartseq3.py @@ -1,25 +1,16 @@ -# import glob import asyncio from pathlib import Path from lib.base.abstract_project import AbstractProject -from lib.base.abstract_sample import AbstractSample from lib.core_utils.config_loader import ConfigLoader from lib.core_utils.logging_utils import custom_logger from lib.module_utils.ngi_report_generator import generate_ngi_report # from datetime import datetime # from lib.couchdb.manager import YggdrasilDBManager -from lib.module_utils.sjob_manager import SlurmJobManager -from lib.module_utils.slurm_utils import generate_slurm_script -from lib.realms.smartseq3.report.report_generator import Smartseq3ReportGenerator -from lib.realms.smartseq3.utils.sample_file_handler import SampleFileHandler -from lib.realms.smartseq3.utils.ss3_utils import SS3Utils -from lib.realms.smartseq3.utils.yaml_utils import write_yaml -from tests.mocks.mock_sjob_manager import MockSlurmJobManager +from lib.realms.smartseq3.ss3_sample import SS3Sample -DEBUG = True -logging = custom_logger("SmartSeq3") +logging = custom_logger("SS3 Project") class SmartSeq3(AbstractProject): @@ -200,7 +191,7 @@ def _generate_ngi_report(self): user_name = "Anastasios Glaros" sample_list = [sample.id for sample in self.samples] project_path = str(self.project_dir) - project_id = self.project_info.get("project_id") + project_id = self.project_info.get("project_id", "Unknown_Project") report_success = generate_ngi_report( project_path, project_id, user_name, sample_list @@ -245,364 +236,3 @@ def post_process(self, result): result: Result to post-process. """ pass - - -class SS3Sample(AbstractSample): - """ - Class representing a sample in a SmartSeq3 project. - - Attributes: - id (str): Sample ID. - sample_data (dict): Data related to the sample. - project_info (dict): Information about the parent project. - barcode (str): Barcode of the sample. - flowcell_id (str): ID of the latest flowcell. - config (dict): Configuration settings. - status (str): Current status of the sample. - metadata (dict): Metadata for the sample. - project_dir (Path): Path to the parent project directory. - sample_dir (Path): Path to the sample directory. - sjob_manager (SlurmJobManager): Manager for submitting and monitoring Slurm jobs. - file_handler (SampleFileHandler): Handler for sample files. - """ - - def __init__(self, sample_id, sample_data, project_info, config): - """ - Initialize a SmartSeq3 sample instance. - - Args: - sample_id (str): ID of the sample. - sample_data (dict): Data related to the sample. - project_info (dict): Information about the parent project. - config (dict): Configuration settings. - """ - # TODO: self.id must be demanded by a template class - self._id = sample_id - self.sample_data = sample_data - self.project_info = project_info - - # Initialize barcode - self.barcode = self.get_barcode() - - # Collect flowcell ID - self.flowcell_id = self._get_latest_flowcell() - - self.config = config - # self.job_id = None - # TODO: Currently not used much, but should be used if we write to a database - self._status = "pending" # other statuses: "processing", "completed", "failed" - self.metadata = None - - # Define the parent project directory - self.project_dir = self.project_info.get("project_dir") - - # Define the sample directory - # NOTE: This directory is not created yet. To be verified in the post_process method. - self.sample_dir = self.project_dir / self.id - - if DEBUG: - self.sjob_manager = MockSlurmJobManager() - else: - self.sjob_manager = SlurmJobManager() - - # Initialize SampleFileHandler - self.file_handler = SampleFileHandler(self) - - @property - def id(self): - return self._id - - @property - def status(self): - return self._status - - @status.setter - def status(self, value): - self._status = value - - async def process(self): - """ - Process the sample by collecting metadata, creating YAML files, generating Slurm scripts, and submitting jobs. - """ - logging.info(f"Processing sample {self.id}") - yaml_metadata = self._collect_yaml_metadata() - if not yaml_metadata: - logging.warning(f"Metadata missing for sample {self.id}") - return None - - logging.debug("Metadata collected. Creating YAML file") - - self.create_yaml_file(yaml_metadata) - - # TODO: Check if the YAML file was created successfully - logging.debug("YAML file created.") - - logging.debug("Creating Slurm script") - slurm_metadata = self._collect_slurm_metadata() - if not slurm_metadata: - logging.warning(f"Slurm metadata missing for sample {self.id}") - return None - - # Create Slurm script and submit job - slurm_template_path = self.config["slurm_template"] - if not generate_slurm_script( - slurm_metadata, slurm_template_path, self.file_handler.slurm_script_path - ): - logging.error(f"Failed to create Slurm script for sample {self.id}.") - return None - - # Submit the job - logging.debug("Slurm script created. Submitting job") - self.job_id = await self.sjob_manager.submit_job( - self.file_handler.slurm_script_path - ) - - # if self.job_id: - # logging.debug(f"[{self.id}] Job submitted with ID: {self.job_id}") - - # asyncio.create_task(self.sjob_manager.monitor_job(self.job_id, self)) - # logging.debug(f"[{self.id}] Job {self.job_id} submitted for monitoring.") - # else: - # logging.error(f"[{self.id}] Failed to submit job.") - # return None - - if self.job_id: - logging.debug(f"[{self.id}] Job submitted with ID: {self.job_id}") - # Wait here for the monitoring to complete before exiting the process method - await self.sjob_manager.monitor_job(self.job_id, self) - logging.debug(f"[{self.id}] Job {self.job_id} monitoring complete.") - else: - logging.error(f"[{self.id}] Failed to submit job.") - return None - - def get_barcode(self): - """ - Retrieve and validate the barcode from sample data. - - Returns: - str: The barcode of the sample. - """ - barcode = self.sample_data["library_prep"]["A"].get("barcode", None) - if barcode: - return barcode.split("-")[-1] - else: - logging.warning(f"No barcode found in StatusDB for sample {self.id}.") - return None # Or handle more appropriately based on your application's requirements - - def _collect_yaml_metadata(self): - """ - Collect metadata necessary for creating a YAML file for the sample. - - Returns: - dict: The collected metadata or None if necessary data is missing. - """ - # NOTE: zUMIs does not support multiple flowcells per sample - # Potential solutions: - # 1. SmartSeq3 sample libraries should not be sequenced across multiple flowcells - # SmartSeq3 libraries should not be re-sequenced in the same project - # 2. Merge fastq files from multiple flowcells - - # Select the latest flowcell for analysis - if self.flowcell_id: - fastqs = self.file_handler.locate_fastq_files() - if fastqs is None: - logging.warning( - f"No FASTQ files found for sample '{self.id}' in flowcell '{self.flowcell_id}'. Ensure files are correctly named and located." - ) - return None - else: - logging.warning(f"No flowcell found for sample {self.id}") - return None - - # if not all(fastqs.values()): - # logging.warning(f"Not all fastq files found at {fastq_path}") - # return None - - seq_setup = self.project_info.get("sequencing_setup", "") - if seq_setup: - read_setup = SS3Utils.transform_seq_setup(seq_setup) - - # ref_gen = self.project_info.get('ref_genome', '') - - # NOTE: Might break if the reference genome naming format is odd. - # TODO: Might need to make more robust or even map the ref genomes to their paths - ref_paths = self.file_handler.locate_ref_paths() - if not ref_paths: - logging.warning( - f"Reference paths not found for sample {self.id}. Skipping..." - ) - return None - - if self.barcode is None: - logging.warning(f"Barcode not available for sample {self.id}") - return None - - if not self.file_handler.ensure_barcode_file(): - logging.error( - f"Failed to create barcode file for sample {self.id}. Skipping..." - ) - return None - - try: - metadata = { - "plate": self.id, # NOTE: Temporarily not used, but might be used when we name everything after ngi - # 'plate': self.sample_data.get('customer_name', ''), - "barcode": self.barcode, - "bc_file": self.file_handler.barcode_fpath, - "fastqs": {k: str(v) for k, v in fastqs.items() if v}, - "read_setup": read_setup, - "ref": ref_paths, - "outdir": str(self.sample_dir), - "out_yaml": self.project_dir / f"{self.id}.yaml", - } - except Exception as e: - logging.error(f"Error constructing metadata for sample {self.id}: {e}") - return None - - self.metadata = metadata - - return metadata - - def _get_latest_flowcell(self): - """ - Selects the latest flowcell for the current sample. - - Returns: - The latest flowcell ID or None if no valid flowcells are found. - """ - try: - latest_fc = None - latest_date = None - if "library_prep" in self.sample_data: - for prep_info in self.sample_data["library_prep"].values(): - for fc_id in prep_info.get("sequenced_fc", []): - fc_date = SS3Utils.parse_fc_date(fc_id) - if fc_date and (not latest_date or fc_date > latest_date): - latest_date = fc_date - latest_fc = fc_id - - if not latest_fc: - logging.warning(f"No valid flowcells found for sample {self.id}.") - return latest_fc - - except Exception as e: - logging.error( - f"Error extracting latest flowcell info for sample '{self.id}': {e}", - exc_info=True, - ) - return None - - def _collect_slurm_metadata(self): - """ - Collect metadata necessary for creating a Slurm job script. - - Returns: - dict: The collected metadata or None if necessary data is missing. - """ - try: - metadata = { - "project_name": self.project_info["project_name"], - "project_dir": self.project_dir, - # 'sample_id': self.id, # Temporarily not used, but might be used when we name everything after ngi - "plate_id": self.id, # self.sample_data.get('customer_name', ''), - "yaml_settings_path": self.project_dir / f"{self.id}.yaml", - "zumis_path": self.config["zumis_path"], - } - except Exception as e: - logging.error(f"Error constructing metadata for sample {self.id}: {e}") - return None - - return metadata - - def _transform_seq_setup(self, seq_setup_str): - """ - Transforms a sequencing setup string into a detailed format for each read type. - - Args: - seq_setup_str (str): Sequencing setup string in the format "R1-I1-I2-R2". - - Returns: - dict: A dictionary with formatted strings for each read type. - """ - r1, i1, i2, r2 = seq_setup_str.split("-") - - return { - "R1": (f"cDNA(23-{r1})", "UMI(12-19)"), - "R2": f"cDNA(1-{r2})", - "I1": f"BC(1-{i1})", - "I2": f"BC(1-{i2})", - } - - def _get_ref_paths(self, ref_gen, config): - """ - Maps a reference genome to its STAR index and GTF file paths. - - Args: - ref_gen (str): Reference genome string, e.g., "Zebrafish (Danio rerio, GRCz10)". - config (dict): Configuration object containing the mapping. - - Returns: - tuple: A tuple containing the STAR index path and GTF file path, or None if not found. - """ - try: - # Extract species name before the first comma - species_key = ref_gen.split(",")[0].split("(")[1].strip().lower() - idx_path = config["gen_refs"][species_key]["idx_path"] - gtf_path = config["gen_refs"][species_key]["gtf_path"] - return idx_path, gtf_path - except KeyError as e: - logging.warning( - f"Reference for {e} species not found in config. Handle {self.id} manually." - ) - return None, None - - def create_yaml_file(self, metadata): - """ - Create a YAML file with the provided metadata. - - Args: - metadata (dict): Metadata to write to the YAML file. - """ - write_yaml(self.config, metadata) - - def post_process(self): - """ - Post-process the sample after job completion. - """ - logging.info(f"Post-processing sample {self.id}...") - - # Check if sample output is valid - if not self.file_handler.is_output_valid(): - # TODO: Send a notification (Slack?) for manual intervention - logging.error( - f"[{self.id}] Pipeline output is invalid. Skipping post-processing." - ) - return - - self.file_handler.create_directories() - - # Create symlinks for the fastq files - if not self.file_handler.symlink_fastq_files(): - logging.error("Failed to manage symlinks and auxiliary files.") - else: - logging.info("Successfully managed symlinks and auxiliary files.") - - # Instantiate report generator - report_generator = Smartseq3ReportGenerator(self) - - # Collect stats - if not report_generator.collect_stats(): - logging.error( - f"[{self.id}] Error collecting stats. Skipping report generation." - ) - return - - # Create Plots - if not report_generator.create_graphs(): - logging.error( - f"[{self.id}] Error creating plots. Skipping report generation." - ) - return - - # Generate Report - report_generator.render(format="PDF") diff --git a/lib/realms/smartseq3/ss3_sample.py b/lib/realms/smartseq3/ss3_sample.py new file mode 100644 index 0000000..05b2855 --- /dev/null +++ b/lib/realms/smartseq3/ss3_sample.py @@ -0,0 +1,373 @@ +from lib.base.abstract_sample import AbstractSample +from lib.core_utils.logging_utils import custom_logger +from lib.module_utils.sjob_manager import SlurmJobManager +from lib.module_utils.slurm_utils import generate_slurm_script +from lib.realms.smartseq3.report.report_generator import Smartseq3ReportGenerator +from lib.realms.smartseq3.utils.sample_file_handler import SampleFileHandler +from lib.realms.smartseq3.utils.ss3_utils import SS3Utils +from lib.realms.smartseq3.utils.yaml_utils import write_yaml +from tests.mocks.mock_sjob_manager import MockSlurmJobManager + +logging = custom_logger("SS3 Sample") +DEBUG = True + + +class SS3Sample(AbstractSample): + """ + Class representing a sample in a SmartSeq3 project. + + Attributes: + id (str): Sample ID. + sample_data (dict): Data related to the sample. + project_info (dict): Information about the parent project. + barcode (str): Barcode of the sample. + flowcell_id (str): ID of the latest flowcell. + config (dict): Configuration settings. + status (str): Current status of the sample. + metadata (dict): Metadata for the sample. + project_dir (Path): Path to the parent project directory. + sample_dir (Path): Path to the sample directory. + sjob_manager (SlurmJobManager): Manager for submitting and monitoring Slurm jobs. + file_handler (SampleFileHandler): Handler for sample files. + """ + + def __init__(self, sample_id, sample_data, project_info, config): + """ + Initialize a SmartSeq3 sample instance. + + Args: + sample_id (str): ID of the sample. + sample_data (dict): Data related to the sample. + project_info (dict): Information about the parent project. + config (dict): Configuration settings. + """ + # TODO: self.id must be demanded by a template class + self._id = sample_id + self.sample_data = sample_data + self.project_info = project_info + + # Initialize barcode + self.barcode = self.get_barcode() + + # Collect flowcell ID + self.flowcell_id = self._get_latest_flowcell() + + self.config = config + # self.job_id = None + # TODO: Currently not used much, but should be used if we write to a database + self._status = "pending" # other statuses: "processing", "completed", "failed" + self.metadata = None + + # Define the parent project directory + self.project_dir = self.project_info.get("project_dir") + + # Define the sample directory + # NOTE: This directory is not created yet. To be verified in the post_process method. + self.sample_dir = self.project_dir / self.id + + if DEBUG: + self.sjob_manager = MockSlurmJobManager() + else: + self.sjob_manager = SlurmJobManager() + + # Initialize SampleFileHandler + self.file_handler = SampleFileHandler(self) + + @property + def id(self): + return self._id + + @property + def status(self): + return self._status + + @status.setter + def status(self, value): + self._status = value + + async def process(self): + """ + Process the sample by collecting metadata, creating YAML files, generating Slurm scripts, and submitting jobs. + """ + logging.info(f"Processing sample {self.id}") + yaml_metadata = self._collect_yaml_metadata() + if not yaml_metadata: + logging.warning(f"Metadata missing for sample {self.id}") + return None + + logging.debug("Metadata collected. Creating YAML file") + + self.create_yaml_file(yaml_metadata) + + # TODO: Check if the YAML file was created successfully + logging.debug("YAML file created.") + + logging.debug("Creating Slurm script") + slurm_metadata = self._collect_slurm_metadata() + if not slurm_metadata: + logging.warning(f"Slurm metadata missing for sample {self.id}") + return None + + # Create Slurm script and submit job + slurm_template_path = self.config["slurm_template"] + if not generate_slurm_script( + slurm_metadata, slurm_template_path, self.file_handler.slurm_script_path + ): + logging.error(f"Failed to create Slurm script for sample {self.id}.") + return None + + # Submit the job + logging.debug("Slurm script created. Submitting job") + self.job_id = await self.sjob_manager.submit_job( + self.file_handler.slurm_script_path + ) + + # if self.job_id: + # logging.debug(f"[{self.id}] Job submitted with ID: {self.job_id}") + + # asyncio.create_task(self.sjob_manager.monitor_job(self.job_id, self)) + # logging.debug(f"[{self.id}] Job {self.job_id} submitted for monitoring.") + # else: + # logging.error(f"[{self.id}] Failed to submit job.") + # return None + + if self.job_id: + logging.debug(f"[{self.id}] Job submitted with ID: {self.job_id}") + # Wait here for the monitoring to complete before exiting the process method + await self.sjob_manager.monitor_job(self.job_id, self) + logging.debug(f"[{self.id}] Job {self.job_id} monitoring complete.") + else: + logging.error(f"[{self.id}] Failed to submit job.") + return None + + def get_barcode(self): + """ + Retrieve and validate the barcode from sample data. + + Returns: + str: The barcode of the sample. + """ + barcode = self.sample_data["library_prep"]["A"].get("barcode", None) + if barcode: + return barcode.split("-")[-1] + else: + logging.warning(f"No barcode found in StatusDB for sample {self.id}.") + return None # Or handle more appropriately based on your application's requirements + + def _collect_yaml_metadata(self): + """ + Collect metadata necessary for creating a YAML file for the sample. + + Returns: + dict: The collected metadata or None if necessary data is missing. + """ + # NOTE: zUMIs does not support multiple flowcells per sample + # Potential solutions: + # 1. SmartSeq3 sample libraries should not be sequenced across multiple flowcells + # SmartSeq3 libraries should not be re-sequenced in the same project + # 2. Merge fastq files from multiple flowcells + + # Select the latest flowcell for analysis + if self.flowcell_id: + fastqs = self.file_handler.locate_fastq_files() + if fastqs is None: + logging.warning( + f"No FASTQ files found for sample '{self.id}' in flowcell '{self.flowcell_id}'. Ensure files are correctly named and located." + ) + return None + else: + logging.warning(f"No flowcell found for sample {self.id}") + return None + + # if not all(fastqs.values()): + # logging.warning(f"Not all fastq files found at {fastq_path}") + # return None + + seq_setup = self.project_info.get("sequencing_setup", "") + if seq_setup: + read_setup = SS3Utils.transform_seq_setup(seq_setup) + + # ref_gen = self.project_info.get('ref_genome', '') + + # NOTE: Might break if the reference genome naming format is odd. + # TODO: Might need to make more robust or even map the ref genomes to their paths + ref_paths = self.file_handler.locate_ref_paths() + if not ref_paths: + logging.warning( + f"Reference paths not found for sample {self.id}. Skipping..." + ) + return None + + if self.barcode is None: + logging.warning(f"Barcode not available for sample {self.id}") + return None + + if not self.file_handler.ensure_barcode_file(): + logging.error( + f"Failed to create barcode file for sample {self.id}. Skipping..." + ) + return None + + try: + metadata = { + "plate": self.id, # NOTE: Temporarily not used, but might be used when we name everything after ngi + # 'plate': self.sample_data.get('customer_name', ''), + "barcode": self.barcode, + "bc_file": self.file_handler.barcode_fpath, + "fastqs": {k: str(v) for k, v in fastqs.items() if v}, + "read_setup": read_setup, + "ref": ref_paths, + "outdir": str(self.sample_dir), + "out_yaml": self.project_dir / f"{self.id}.yaml", + } + except Exception as e: + logging.error(f"Error constructing metadata for sample {self.id}: {e}") + return None + + self.metadata = metadata + + return metadata + + def _get_latest_flowcell(self): + """ + Selects the latest flowcell for the current sample. + + Returns: + The latest flowcell ID or None if no valid flowcells are found. + """ + try: + latest_fc = None + latest_date = None + if "library_prep" in self.sample_data: + for prep_info in self.sample_data["library_prep"].values(): + for fc_id in prep_info.get("sequenced_fc", []): + fc_date = SS3Utils.parse_fc_date(fc_id) + if fc_date and (not latest_date or fc_date > latest_date): + latest_date = fc_date + latest_fc = fc_id + + if not latest_fc: + logging.warning(f"No valid flowcells found for sample {self.id}.") + return latest_fc + + except Exception as e: + logging.error( + f"Error extracting latest flowcell info for sample '{self.id}': {e}", + exc_info=True, + ) + return None + + def _collect_slurm_metadata(self): + """ + Collect metadata necessary for creating a Slurm job script. + + Returns: + dict: The collected metadata or None if necessary data is missing. + """ + try: + metadata = { + "project_name": self.project_info["project_name"], + "project_dir": self.project_dir, + # 'sample_id': self.id, # Temporarily not used, but might be used when we name everything after ngi + "plate_id": self.id, # self.sample_data.get('customer_name', ''), + "yaml_settings_path": self.project_dir / f"{self.id}.yaml", + "zumis_path": self.config["zumis_path"], + } + except Exception as e: + logging.error(f"Error constructing metadata for sample {self.id}: {e}") + return None + + return metadata + + def _transform_seq_setup(self, seq_setup_str): + """ + Transforms a sequencing setup string into a detailed format for each read type. + + Args: + seq_setup_str (str): Sequencing setup string in the format "R1-I1-I2-R2". + + Returns: + dict: A dictionary with formatted strings for each read type. + """ + r1, i1, i2, r2 = seq_setup_str.split("-") + + return { + "R1": (f"cDNA(23-{r1})", "UMI(12-19)"), + "R2": f"cDNA(1-{r2})", + "I1": f"BC(1-{i1})", + "I2": f"BC(1-{i2})", + } + + def _get_ref_paths(self, ref_gen, config): + """ + Maps a reference genome to its STAR index and GTF file paths. + + Args: + ref_gen (str): Reference genome string, e.g., "Zebrafish (Danio rerio, GRCz10)". + config (dict): Configuration object containing the mapping. + + Returns: + tuple: A tuple containing the STAR index path and GTF file path, or None if not found. + """ + try: + # Extract species name before the first comma + species_key = ref_gen.split(",")[0].split("(")[1].strip().lower() + idx_path = config["gen_refs"][species_key]["idx_path"] + gtf_path = config["gen_refs"][species_key]["gtf_path"] + return idx_path, gtf_path + except KeyError as e: + logging.warning( + f"Reference for {e} species not found in config. Handle {self.id} manually." + ) + return None, None + + def create_yaml_file(self, metadata): + """ + Create a YAML file with the provided metadata. + + Args: + metadata (dict): Metadata to write to the YAML file. + """ + write_yaml(self.config, metadata) + + def post_process(self): + """ + Post-process the sample after job completion. + """ + logging.info(f"Post-processing sample {self.id}...") + + # Check if sample output is valid + if not self.file_handler.is_output_valid(): + # TODO: Send a notification (Slack?) for manual intervention + logging.error( + f"[{self.id}] Pipeline output is invalid. Skipping post-processing." + ) + return + + self.file_handler.create_directories() + + # Create symlinks for the fastq files + if not self.file_handler.symlink_fastq_files(): + logging.error("Failed to manage symlinks and auxiliary files.") + else: + logging.info("Successfully managed symlinks and auxiliary files.") + + # Instantiate report generator + report_generator = Smartseq3ReportGenerator(self) + + # Collect stats + if not report_generator.collect_stats(): + logging.error( + f"[{self.id}] Error collecting stats. Skipping report generation." + ) + return + + # Create Plots + if not report_generator.create_graphs(): + logging.error( + f"[{self.id}] Error creating plots. Skipping report generation." + ) + return + + # Generate Report + report_generator.render(format="PDF") From 2f54fe8711cf000fa002efd6f5fe923860ec5eca Mon Sep 17 00:00:00 2001 From: glrs <5999366+glrs@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:07:14 +0100 Subject: [PATCH 2/3] Rename `smartseq3.py` to `ss3_project.py` --- lib/realms/smartseq3/{smartseq3.py => ss3_project.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename lib/realms/smartseq3/{smartseq3.py => ss3_project.py} (100%) diff --git a/lib/realms/smartseq3/smartseq3.py b/lib/realms/smartseq3/ss3_project.py similarity index 100% rename from lib/realms/smartseq3/smartseq3.py rename to lib/realms/smartseq3/ss3_project.py From 1901a5737a4331c2879ec81bbd07830620775843 Mon Sep 17 00:00:00 2001 From: glrs <5999366+glrs@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:18:41 +0100 Subject: [PATCH 3/3] Move pre-processing tasks from `process` to 'pre_process' method and update accordingly --- lib/realms/smartseq3/ss3_project.py | 8 ++++++++ lib/realms/smartseq3/ss3_sample.py | 24 ++++++++---------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/lib/realms/smartseq3/ss3_project.py b/lib/realms/smartseq3/ss3_project.py index 20df672..f4406b8 100644 --- a/lib/realms/smartseq3/ss3_project.py +++ b/lib/realms/smartseq3/ss3_project.py @@ -152,6 +152,14 @@ async def launch(self): if not self.samples: logging.warning("No samples found for processing. Returning...") return + + # Pre-process samples + pre_tasks = [sample.pre_process() for sample in self.samples] + await asyncio.gather(*pre_tasks) + + # NOTE: Could control whether to proceed with processing based on config or parameters + + # Process samples tasks = [sample.process() for sample in self.samples] logging.debug(f"Sample tasks created. Waiting for completion...: {tasks}") await asyncio.gather(*tasks) diff --git a/lib/realms/smartseq3/ss3_sample.py b/lib/realms/smartseq3/ss3_sample.py index 05b2855..2c59afb 100644 --- a/lib/realms/smartseq3/ss3_sample.py +++ b/lib/realms/smartseq3/ss3_sample.py @@ -85,11 +85,9 @@ def status(self): def status(self, value): self._status = value - async def process(self): - """ - Process the sample by collecting metadata, creating YAML files, generating Slurm scripts, and submitting jobs. - """ - logging.info(f"Processing sample {self.id}") + async def pre_process(self): + """Pre-process the sample by collecting metadata and creating YAML files.""" + logging.info(f"Pre-processing sample {self.id}") yaml_metadata = self._collect_yaml_metadata() if not yaml_metadata: logging.warning(f"Metadata missing for sample {self.id}") @@ -115,22 +113,16 @@ async def process(self): ): logging.error(f"Failed to create Slurm script for sample {self.id}.") return None + else: + logging.debug(f"Slurm script created for sample {self.id}") - # Submit the job - logging.debug("Slurm script created. Submitting job") + async def process(self): + """Process the sample by submitting its job.""" + logging.debug("Submitting job for sample {self.id}") self.job_id = await self.sjob_manager.submit_job( self.file_handler.slurm_script_path ) - # if self.job_id: - # logging.debug(f"[{self.id}] Job submitted with ID: {self.job_id}") - - # asyncio.create_task(self.sjob_manager.monitor_job(self.job_id, self)) - # logging.debug(f"[{self.id}] Job {self.job_id} submitted for monitoring.") - # else: - # logging.error(f"[{self.id}] Failed to submit job.") - # return None - if self.job_id: logging.debug(f"[{self.id}] Job submitted with ID: {self.job_id}") # Wait here for the monitoring to complete before exiting the process method