From 1901a5737a4331c2879ec81bbd07830620775843 Mon Sep 17 00:00:00 2001 From: glrs <5999366+glrs@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:18:41 +0100 Subject: [PATCH] Move pre-processing tasks from `process` to 'pre_process' method and update accordingly --- lib/realms/smartseq3/ss3_project.py | 8 ++++++++ lib/realms/smartseq3/ss3_sample.py | 24 ++++++++---------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/lib/realms/smartseq3/ss3_project.py b/lib/realms/smartseq3/ss3_project.py index 20df672..f4406b8 100644 --- a/lib/realms/smartseq3/ss3_project.py +++ b/lib/realms/smartseq3/ss3_project.py @@ -152,6 +152,14 @@ async def launch(self): if not self.samples: logging.warning("No samples found for processing. Returning...") return + + # Pre-process samples + pre_tasks = [sample.pre_process() for sample in self.samples] + await asyncio.gather(*pre_tasks) + + # NOTE: Could control whether to proceed with processing based on config or parameters + + # Process samples tasks = [sample.process() for sample in self.samples] logging.debug(f"Sample tasks created. Waiting for completion...: {tasks}") await asyncio.gather(*tasks) diff --git a/lib/realms/smartseq3/ss3_sample.py b/lib/realms/smartseq3/ss3_sample.py index 05b2855..2c59afb 100644 --- a/lib/realms/smartseq3/ss3_sample.py +++ b/lib/realms/smartseq3/ss3_sample.py @@ -85,11 +85,9 @@ def status(self): def status(self, value): self._status = value - async def process(self): - """ - Process the sample by collecting metadata, creating YAML files, generating Slurm scripts, and submitting jobs. - """ - logging.info(f"Processing sample {self.id}") + async def pre_process(self): + """Pre-process the sample by collecting metadata and creating YAML files.""" + logging.info(f"Pre-processing sample {self.id}") yaml_metadata = self._collect_yaml_metadata() if not yaml_metadata: logging.warning(f"Metadata missing for sample {self.id}") @@ -115,22 +113,16 @@ async def process(self): ): logging.error(f"Failed to create Slurm script for sample {self.id}.") return None + else: + logging.debug(f"Slurm script created for sample {self.id}") - # Submit the job - logging.debug("Slurm script created. Submitting job") + async def process(self): + """Process the sample by submitting its job.""" + logging.debug("Submitting job for sample {self.id}") self.job_id = await self.sjob_manager.submit_job( self.file_handler.slurm_script_path ) - # if self.job_id: - # logging.debug(f"[{self.id}] Job submitted with ID: {self.job_id}") - - # asyncio.create_task(self.sjob_manager.monitor_job(self.job_id, self)) - # logging.debug(f"[{self.id}] Job {self.job_id} submitted for monitoring.") - # else: - # logging.error(f"[{self.id}] Failed to submit job.") - # return None - if self.job_id: logging.debug(f"[{self.id}] Job submitted with ID: {self.job_id}") # Wait here for the monitoring to complete before exiting the process method