Skip to content

Commit

Permalink
Merge pull request #18 from glrs/feature/ss3-dev
Browse files Browse the repository at this point in the history
Feature/ss3 dev
  • Loading branch information
glrs authored Nov 15, 2024
2 parents 071dfc3 + 49744e4 commit 57ccc60
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 32 deletions.
3 changes: 2 additions & 1 deletion lib/module_utils/sjob_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ def check_status(job_id: str, status: str, sample: Any) -> None:
status (str): The status of the job.
sample (object): The sample object with `id` and `status` attributes.
"""
logging.debug(f"Job {job_id} status: {status}")
logging.info("\n")
logging.debug(f"[{sample.id}] Job {job_id} status: {status}")
if status == "COMPLETED":
logging.info(f"[{sample.id}] Job completed successfully.")
sample.status = "processed"
Expand Down
31 changes: 28 additions & 3 deletions lib/realms/smartseq3/ss3_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,11 @@ def ensure_project_directory(self):

async def launch(self):
"""Launch the SmartSeq3 Realm to handle its samples."""
self.status = "processing"
logging.info(
f"Processing SmartSeq3 project {self.project_info['project_name']}"
)
self.status = "processing"

self.samples = self.extract_samples()
if not self.samples:
logging.warning("No samples found for processing. Returning...")
Expand All @@ -159,11 +160,35 @@ async def launch(self):

# NOTE: Could control whether to proceed with processing based on config or parameters

# Filter samples that passed pre-processing
pre_processed_samples = [
sample for sample in self.samples if sample.status == "pre_processed"
]

if not pre_processed_samples:
logging.warning("No samples passed pre-processing. Exiting...")
return

logging.info("\n")
logging.info(
f"Samples that passed pre-processing:"
f"{[sample.id for sample in pre_processed_samples]}"
)

# Process samples
tasks = [sample.process() for sample in self.samples]
tasks = [sample.process() for sample in pre_processed_samples]
logging.debug(f"Sample tasks created. Waiting for completion...: {tasks}")
await asyncio.gather(*tasks)
logging.info("All samples processed. Finalizing project...")

# Log samples that passed processing
processed_samples = [
sample for sample in pre_processed_samples if sample.status == "completed"
]
logging.info("\n")
logging.info(
f"Samples that finished successfully: "
f"{[sample.id for sample in processed_samples]}\n"
)
self.finalize_project()

def extract_samples(self):
Expand Down
64 changes: 45 additions & 19 deletions lib/realms/smartseq3/ss3_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ def __init__(self, sample_id, sample_data, project_info, config):

self.config = config
# self.job_id = None

# TODO: Currently not used much, but should be used if we write to a database
self._status = "pending" # other statuses: "processing", "completed", "failed"
# self._status = "initialized"
self.metadata = None

if DEBUG:
Expand All @@ -65,6 +66,8 @@ def __init__(self, sample_id, sample_data, project_info, config):
# Initialize SampleFileHandler
self.file_handler = SampleFileHandler(self)

self._status = "initialized"

@property
def id(self):
return self._id
Expand All @@ -83,37 +86,45 @@ async def pre_process(self):
logging.info(f"[{self.id}] Pre-processing...")
yaml_metadata = self._collect_yaml_metadata()
if not yaml_metadata:
logging.warning(f"Metadata missing for sample {self.id}")
return None

logging.debug("Metadata collected. Creating YAML file")

self.create_yaml_file(yaml_metadata)
logging.error(f"[{self.id}] Metadata missing. Pre-processing failed.")
self.status = "pre_processing_failed"
return

# TODO: Check if the YAML file was created successfully
logging.debug("YAML file created.")
logging.info(f"[{self.id}] Metadata collected. Creating YAML file")
if not self.create_yaml_file(yaml_metadata):
logging.error(f"[{self.id}] Failed to create YAML file.")
self.status = "pre_processing_failed"
return
logging.debug(f"[{self.id}] YAML file created.")

logging.debug("Creating Slurm script")
logging.debug(f"[{self.id}] Creating Slurm script")
slurm_metadata = self._collect_slurm_metadata()
if not slurm_metadata:
logging.warning(f"Slurm metadata missing for sample {self.id}")
return None
logging.error(f"[{self.id}] Slurm metadata missing. Pre-processing failed.")
self.status = "pre_processing_failed"
return

# Create Slurm script and submit job
slurm_template_path = self.config["slurm_template"]
# TODO: Move slurm_template_path to SampleFileHandler
slurm_template_path = self.config.get("slurm_template", "")
if not generate_slurm_script(
slurm_metadata, slurm_template_path, self.file_handler.slurm_script_path
):
logging.error(f"Failed to create Slurm script for sample {self.id}.")
return None
logging.error(f"[{self.id}] Failed to create Slurm script.")
self.status = "pre_processing_failed"
return
else:
logging.debug(f"Slurm script created for sample {self.id}")
logging.debug(f"[{self.id}] Slurm script created.")

# If all pre-processing steps succeeded
self.status = "pre_processed"

async def process(self):
"""Process the sample by submitting its job."""
logging.info("\n")
logging.info(f"[{self.id}] Processing...")
logging.debug(f"[{self.id}] Submitting job...")
self.status = "processing"
self.job_id = await self.sjob_manager.submit_job(
self.file_handler.slurm_script_path
)
Expand All @@ -125,7 +136,7 @@ async def process(self):
logging.debug(f"[{self.id}] Job {self.job_id} monitoring complete.")
else:
logging.error(f"[{self.id}] Failed to submit job.")
return None
self.status = "processing_failed"

def get_barcode(self):
"""
Expand Down Expand Up @@ -308,35 +319,42 @@ def _get_ref_paths(self, ref_gen, config):
)
return None, None

def create_yaml_file(self, metadata):
def create_yaml_file(self, metadata) -> bool:
"""
Create a YAML file with the provided metadata.
Args:
metadata (dict): Metadata to write to the YAML file.
Returns:
bool: True if the YAML file was created successfully, False otherwise.
"""
write_yaml(self.config, metadata)
return write_yaml(self.config, metadata)

def post_process(self):
"""
Post-process the sample after job completion.
"""
logging.info("\n")
logging.info(f"[{self.id}] Post-processing...")
self.status = "post_processing"

# Check if sample output is valid
if not self.file_handler.is_output_valid():
# TODO: Send a notification (Slack?) for manual intervention
logging.error(
f"[{self.id}] Pipeline output is invalid. Skipping post-processing."
)
self.status = "post_processing_failed"
return

self.file_handler.create_directories()

# Create symlinks for the fastq files
if not self.file_handler.symlink_fastq_files():
logging.error(f"[{self.id}] Failed to manage symlinks and auxiliary files.")
self.status = "post_processing_failed"
return
else:
logging.info(
f"[{self.id}] Successfully managed symlinks and auxiliary files."
Expand All @@ -350,13 +368,15 @@ def post_process(self):
logging.error(
f"[{self.id}] Error collecting stats. Skipping report generation."
)
self.status = "post_processing_failed"
return

# Create Plots
if not report_generator.create_graphs():
logging.error(
f"[{self.id}] Error creating plots. Skipping report generation."
)
self.status = "post_processing_failed"
return

# Generate Report
Expand All @@ -367,6 +387,7 @@ def post_process(self):
logging.error(
f"[{self.id}] Report not found at {self.file_handler.report_fpath}"
)
self.status = "post_processing_failed"
return

if transfer_report(
Expand All @@ -377,3 +398,8 @@ def post_process(self):
logging.info(f"[{self.id}] Report transferred successfully.")
else:
logging.error(f"[{self.id}] Failed to transfer report.")
self.status = "post_processing_failed"
return

# If all post-processing steps succeeded
self.status = "completed"
12 changes: 9 additions & 3 deletions lib/realms/smartseq3/utils/yaml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def parse_yaml(file_path):
return yaml.load(Path(file_path))


def write_yaml(config, args):
def write_yaml(config, args) -> bool:
"""
Write data to a YAML file based on a template and provided arguments.
Expand Down Expand Up @@ -74,5 +74,11 @@ def write_yaml(config, args):
logging.debug(f"Path: {args['out_yaml']}")
logging.warning("Continuing to overwrite the file...")

with open(args["out_yaml"], "w") as outfile:
yaml.dump(template, outfile)
try:
with open(args["out_yaml"], "w") as outfile:
yaml.dump(template, outfile)
logging.debug(f"YAML file written successfully at {args['out_yaml']}")
return True
except Exception as e:
logging.error(f"Failed to write YAML file {args['out_yaml']}: {e}")
return False
17 changes: 11 additions & 6 deletions tests/mocks/mock_sjob_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,16 @@ def check_status(job_id, status, sample):
status (str): The status of the job.
sample (object): The sample object (must have a post_process method and id attribute).
"""
logging.debug(f"Job {job_id} status: {status}")
logging.info("\n")
logging.debug(f"[{sample.id}] Job {job_id} status: {status}")
if status == "COMPLETED":
logging.info(f"Sample {sample.id} processing completed.")
logging.info(f"[{sample.id}] Job completed successfully.")
sample.status = "processed"
sample.post_process()
sample.status = "completed"
elif status in ["FAILED", "CANCELLED"]:
sample.status = "failed"
logging.info(f"Sample {sample.id} processing failed.")
# sample.status = "completed"
elif status in ["FAILED", "CANCELLED", "TIMEOUT", "OUT_OF_ME+"]:
sample.status = "processing_failed"
logging.info(f"[{sample.id}] Job failed.")
else:
logging.warning(f"[{sample.id}] Job ended with unexpacted status: {status}")
sample.status = "processing_failed"

0 comments on commit 57ccc60

Please sign in to comment.