From 9da9d06020d34a4092fe5555bcd7ee9614412ee2 Mon Sep 17 00:00:00 2001 From: glrs <5999366+glrs@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:27:46 +0100 Subject: [PATCH] Improve status tracking and logging --- lib/realms/tenx/run_sample.py | 48 +++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/lib/realms/tenx/run_sample.py b/lib/realms/tenx/run_sample.py index 1770b23..30d5564 100644 --- a/lib/realms/tenx/run_sample.py +++ b/lib/realms/tenx/run_sample.py @@ -127,7 +127,7 @@ def collect_reference_genomes(self) -> Optional[Dict[str, str]]: ) logging.error( f"Conflicting reference genomes found for reference key '{ref_key}' " - f"in sample '{self.run_sample_id}'" + f"in sample '{self.id}'" ) self.status = "failed" return None @@ -220,6 +220,7 @@ async def pre_process(self): # If all pre-processing steps succeeded self.status = "pre_processed" + logging.info(f"[{self.id}] Pre-processing completed successfully.") async def process(self): """Process the sample.""" @@ -277,12 +278,12 @@ def assemble_cellranger_command(self) -> str: command_parts = [f"{pipeline_exec} {pipeline}"] - logging.debug(f"[{self.run_sample_id}] Pipeline: {pipeline}") - logging.debug(f"[{self.run_sample_id}] Pipeline executable: {pipeline_exec}") + logging.debug(f"[{self.id}] Pipeline: {pipeline}") + logging.debug(f"[{self.id}] Pipeline executable: {pipeline_exec}") # Mapping of argument names to their values arg_values: Dict[str, Any] = { - "--id": self.run_sample_id, + "--id": self.id, "--csv": str(self.file_handler.get_multi_csv_path()), "--transcriptome": self.reference_genomes["gex"], "--fastqs": ",".join( @@ -312,9 +313,7 @@ def assemble_cellranger_command(self) -> str: if value: command_parts.append(f"{arg}={value}") else: - logging.error( - f"[{self.run_sample_id}] Missing value for required argument {arg}" - ) + logging.error(f"[{self.id}] Missing value for required argument {arg}") # Include additional arguments command_parts.extend(additional_args) @@ -333,7 +332,7 @@ def collect_libraries_data(self) -> List[Dict[str, str]]: feature_type = self.feature_to_library_type.get(lab_sample.feature) if not feature_type: logging.error( - f"[{self.run_sample_id}] Feature type not found for feature " + f"[{self.id}] Feature type not found for feature " f"'{lab_sample.feature}' in sample '{lab_sample.sample_id}'" ) continue @@ -351,7 +350,7 @@ def collect_libraries_data(self) -> List[Dict[str, str]]: def generate_libraries_csv(self) -> None: """Generate the libraries CSV file required for processing.""" - logging.info(f"[{self.run_sample_id}] Generating library CSV") + logging.info(f"[{self.id}] Generating library CSV") library_csv_path = self.file_handler.get_libraries_csv_path() # Ensure the directory exists @@ -367,19 +366,17 @@ def generate_libraries_csv(self) -> None: for lib in libraries_data: writer.writerow(lib) - logging.info( - f"[{self.run_sample_id}] Libraries CSV generated at {library_csv_path}" - ) + logging.info(f"[{self.id}] Libraries CSV generated at {library_csv_path}") def generate_feature_reference_csv(self) -> None: """Generate the feature reference CSV file required for processing.""" - logging.info(f"[{self.run_sample_id}] Generating feature reference CSV") + logging.info(f"[{self.id}] Generating feature reference CSV") # feature_ref_csv_path = self.file_handler.get_feature_reference_csv_path() pass def generate_multi_sample_csv(self) -> None: """Generate the multi-sample CSV file required for processing.""" - logging.info(f"[{self.run_sample_id}] Generating multi-sample CSV") + logging.info(f"[{self.id}] Generating multi-sample CSV") multi_csv_path = self.file_handler.get_multi_csv_path() # Ensure the directory exists @@ -417,23 +414,24 @@ def generate_multi_sample_csv(self) -> None: f"{lib['sample']},{lib['fastqs']},{lib['library_type']}\n" ) - logging.info( - f"[{self.run_sample_id}] Multi-sample CSV generated at {multi_csv_path}" - ) + logging.info(f"[{self.id}] Multi-sample CSV generated at {multi_csv_path}") def post_process(self) -> None: """Perform post-processing steps after job completion.""" logging.info("\n") - logging.info(f"[{self.run_sample_id}] Post-processing...") + logging.info(f"[{self.id}] Post-processing...") + self.status = "post_processing" # Check if the run was successful if not self.file_handler.check_run_success(): - self.status = "failed" + logging.error(f"[{self.id}] CellRanger run was not successful.") + self.status = "post_processing_failed" return # Extract the report path if not self.file_handler.extract_report_path(): - self.status = "failed" + logging.error(f"[{self.id}] Failed to extract report path.") + self.status = "post_processing_failed" return # Transfer the report @@ -442,6 +440,12 @@ def post_process(self) -> None: project_id=self.project_info.get("project_id", ""), sample_id=self.id, ): - logging.info(f"Report for sample {self.id} transferred successfully.") + logging.info(f"[{self.id}] Report transferred successfully.") else: - logging.error(f"Failed to transfer report for sample {self.id}.") + logging.error(f"[{self.id}] Failed to transfer report.") + self.status = "post_processing_failed" + return + + # If all post-processing steps succeeded + self.status = "completed" + logging.info(f"[{self.id}] Post-processing completed successfully.")