diff --git a/lib/module_utils/sjob_manager.py b/lib/module_utils/sjob_manager.py index f9917f6..a1f1a25 100644 --- a/lib/module_utils/sjob_manager.py +++ b/lib/module_utils/sjob_manager.py @@ -257,7 +257,8 @@ def check_status(job_id: str, status: str, sample: Any) -> None: status (str): The status of the job. sample (object): The sample object with `id` and `status` attributes. """ - logging.debug(f"Job {job_id} status: {status}") + logging.info("\n") + logging.debug(f"[{sample.id}] Job {job_id} status: {status}") if status == "COMPLETED": logging.info(f"[{sample.id}] Job completed successfully.") sample.status = "processed" diff --git a/lib/realms/smartseq3/ss3_project.py b/lib/realms/smartseq3/ss3_project.py index f4406b8..2e07147 100644 --- a/lib/realms/smartseq3/ss3_project.py +++ b/lib/realms/smartseq3/ss3_project.py @@ -144,10 +144,11 @@ def ensure_project_directory(self): async def launch(self): """Launch the SmartSeq3 Realm to handle its samples.""" - self.status = "processing" logging.info( f"Processing SmartSeq3 project {self.project_info['project_name']}" ) + self.status = "processing" + self.samples = self.extract_samples() if not self.samples: logging.warning("No samples found for processing. Returning...") @@ -159,11 +160,35 @@ async def launch(self): # NOTE: Could control whether to proceed with processing based on config or parameters + # Filter samples that passed pre-processing + pre_processed_samples = [ + sample for sample in self.samples if sample.status == "pre_processed" + ] + + if not pre_processed_samples: + logging.warning("No samples passed pre-processing. Exiting...") + return + + logging.info("\n") + logging.info( + f"Samples that passed pre-processing:" + f"{[sample.id for sample in pre_processed_samples]}" + ) + # Process samples - tasks = [sample.process() for sample in self.samples] + tasks = [sample.process() for sample in pre_processed_samples] logging.debug(f"Sample tasks created. Waiting for completion...: {tasks}") await asyncio.gather(*tasks) - logging.info("All samples processed. Finalizing project...") + + # Log samples that passed processing + processed_samples = [ + sample for sample in pre_processed_samples if sample.status == "completed" + ] + logging.info("\n") + logging.info( + f"Samples that finished successfully: " + f"{[sample.id for sample in processed_samples]}\n" + ) self.finalize_project() def extract_samples(self): diff --git a/lib/realms/smartseq3/ss3_sample.py b/lib/realms/smartseq3/ss3_sample.py index d362a69..cde756a 100644 --- a/lib/realms/smartseq3/ss3_sample.py +++ b/lib/realms/smartseq3/ss3_sample.py @@ -53,8 +53,9 @@ def __init__(self, sample_id, sample_data, project_info, config): self.config = config # self.job_id = None + # TODO: Currently not used much, but should be used if we write to a database - self._status = "pending" # other statuses: "processing", "completed", "failed" + # self._status = "initialized" self.metadata = None if DEBUG: @@ -65,6 +66,8 @@ def __init__(self, sample_id, sample_data, project_info, config): # Initialize SampleFileHandler self.file_handler = SampleFileHandler(self) + self._status = "initialized" + @property def id(self): return self._id @@ -83,37 +86,45 @@ async def pre_process(self): logging.info(f"[{self.id}] Pre-processing...") yaml_metadata = self._collect_yaml_metadata() if not yaml_metadata: - logging.warning(f"Metadata missing for sample {self.id}") - return None - - logging.debug("Metadata collected. Creating YAML file") - - self.create_yaml_file(yaml_metadata) + logging.error(f"[{self.id}] Metadata missing. Pre-processing failed.") + self.status = "pre_processing_failed" + return - # TODO: Check if the YAML file was created successfully - logging.debug("YAML file created.") + logging.info(f"[{self.id}] Metadata collected. Creating YAML file") + if not self.create_yaml_file(yaml_metadata): + logging.error(f"[{self.id}] Failed to create YAML file.") + self.status = "pre_processing_failed" + return + logging.debug(f"[{self.id}] YAML file created.") - logging.debug("Creating Slurm script") + logging.debug(f"[{self.id}] Creating Slurm script") slurm_metadata = self._collect_slurm_metadata() if not slurm_metadata: - logging.warning(f"Slurm metadata missing for sample {self.id}") - return None + logging.error(f"[{self.id}] Slurm metadata missing. Pre-processing failed.") + self.status = "pre_processing_failed" + return # Create Slurm script and submit job - slurm_template_path = self.config["slurm_template"] + # TODO: Move slurm_template_path to SampleFileHandler + slurm_template_path = self.config.get("slurm_template", "") if not generate_slurm_script( slurm_metadata, slurm_template_path, self.file_handler.slurm_script_path ): - logging.error(f"Failed to create Slurm script for sample {self.id}.") - return None + logging.error(f"[{self.id}] Failed to create Slurm script.") + self.status = "pre_processing_failed" + return else: - logging.debug(f"Slurm script created for sample {self.id}") + logging.debug(f"[{self.id}] Slurm script created.") + + # If all pre-processing steps succeeded + self.status = "pre_processed" async def process(self): """Process the sample by submitting its job.""" logging.info("\n") logging.info(f"[{self.id}] Processing...") logging.debug(f"[{self.id}] Submitting job...") + self.status = "processing" self.job_id = await self.sjob_manager.submit_job( self.file_handler.slurm_script_path ) @@ -125,7 +136,7 @@ async def process(self): logging.debug(f"[{self.id}] Job {self.job_id} monitoring complete.") else: logging.error(f"[{self.id}] Failed to submit job.") - return None + self.status = "processing_failed" def get_barcode(self): """ @@ -308,14 +319,17 @@ def _get_ref_paths(self, ref_gen, config): ) return None, None - def create_yaml_file(self, metadata): + def create_yaml_file(self, metadata) -> bool: """ Create a YAML file with the provided metadata. Args: metadata (dict): Metadata to write to the YAML file. + + Returns: + bool: True if the YAML file was created successfully, False otherwise. """ - write_yaml(self.config, metadata) + return write_yaml(self.config, metadata) def post_process(self): """ @@ -323,6 +337,7 @@ def post_process(self): """ logging.info("\n") logging.info(f"[{self.id}] Post-processing...") + self.status = "post_processing" # Check if sample output is valid if not self.file_handler.is_output_valid(): @@ -330,6 +345,7 @@ def post_process(self): logging.error( f"[{self.id}] Pipeline output is invalid. Skipping post-processing." ) + self.status = "post_processing_failed" return self.file_handler.create_directories() @@ -337,6 +353,8 @@ def post_process(self): # Create symlinks for the fastq files if not self.file_handler.symlink_fastq_files(): logging.error(f"[{self.id}] Failed to manage symlinks and auxiliary files.") + self.status = "post_processing_failed" + return else: logging.info( f"[{self.id}] Successfully managed symlinks and auxiliary files." @@ -350,6 +368,7 @@ def post_process(self): logging.error( f"[{self.id}] Error collecting stats. Skipping report generation." ) + self.status = "post_processing_failed" return # Create Plots @@ -357,6 +376,7 @@ def post_process(self): logging.error( f"[{self.id}] Error creating plots. Skipping report generation." ) + self.status = "post_processing_failed" return # Generate Report @@ -367,6 +387,7 @@ def post_process(self): logging.error( f"[{self.id}] Report not found at {self.file_handler.report_fpath}" ) + self.status = "post_processing_failed" return if transfer_report( @@ -377,3 +398,8 @@ def post_process(self): logging.info(f"[{self.id}] Report transferred successfully.") else: logging.error(f"[{self.id}] Failed to transfer report.") + self.status = "post_processing_failed" + return + + # If all post-processing steps succeeded + self.status = "completed" diff --git a/lib/realms/smartseq3/utils/yaml_utils.py b/lib/realms/smartseq3/utils/yaml_utils.py index c51e9a2..fadc3e8 100644 --- a/lib/realms/smartseq3/utils/yaml_utils.py +++ b/lib/realms/smartseq3/utils/yaml_utils.py @@ -24,7 +24,7 @@ def parse_yaml(file_path): return yaml.load(Path(file_path)) -def write_yaml(config, args): +def write_yaml(config, args) -> bool: """ Write data to a YAML file based on a template and provided arguments. @@ -74,5 +74,11 @@ def write_yaml(config, args): logging.debug(f"Path: {args['out_yaml']}") logging.warning("Continuing to overwrite the file...") - with open(args["out_yaml"], "w") as outfile: - yaml.dump(template, outfile) + try: + with open(args["out_yaml"], "w") as outfile: + yaml.dump(template, outfile) + logging.debug(f"YAML file written successfully at {args['out_yaml']}") + return True + except Exception as e: + logging.error(f"Failed to write YAML file {args['out_yaml']}: {e}") + return False diff --git a/tests/mocks/mock_sjob_manager.py b/tests/mocks/mock_sjob_manager.py index 820a9fd..da858be 100644 --- a/tests/mocks/mock_sjob_manager.py +++ b/tests/mocks/mock_sjob_manager.py @@ -52,11 +52,16 @@ def check_status(job_id, status, sample): status (str): The status of the job. sample (object): The sample object (must have a post_process method and id attribute). """ - logging.debug(f"Job {job_id} status: {status}") + logging.info("\n") + logging.debug(f"[{sample.id}] Job {job_id} status: {status}") if status == "COMPLETED": - logging.info(f"Sample {sample.id} processing completed.") + logging.info(f"[{sample.id}] Job completed successfully.") + sample.status = "processed" sample.post_process() - sample.status = "completed" - elif status in ["FAILED", "CANCELLED"]: - sample.status = "failed" - logging.info(f"Sample {sample.id} processing failed.") + # sample.status = "completed" + elif status in ["FAILED", "CANCELLED", "TIMEOUT", "OUT_OF_ME+"]: + sample.status = "processing_failed" + logging.info(f"[{sample.id}] Job failed.") + else: + logging.warning(f"[{sample.id}] Job ended with unexpacted status: {status}") + sample.status = "processing_failed"