Skip to content

Commit

Permalink
Merge pull request #13 from glrs/feature/report-transfer
Browse files Browse the repository at this point in the history
Add Report Transfer utilities [used in 10x for now]
  • Loading branch information
glrs authored Nov 12, 2024
2 parents 1aec84f + 70faec0 commit 7dd3897
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 8 deletions.
57 changes: 57 additions & 0 deletions lib/module_utils/report_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import subprocess
from pathlib import Path
from typing import Optional

from lib.core_utils.config_loader import configs
from lib.core_utils.logging_utils import custom_logger

logging = custom_logger(__name__.split(".")[-1])


def transfer_report(
report_path: Path, project_id: str, sample_id: Optional[str] = None
) -> bool:
try:
report_transfer_config = configs["report_transfer"]
server = report_transfer_config["server"]
user = report_transfer_config["user"]
destination_path = report_transfer_config["destination"]
ssh_key = report_transfer_config.get("ssh_key")
except KeyError as e:
missing_key = e.args[0]
logging.error(f"Missing configuration for report transfer: '{missing_key}'")
logging.warning("Report transfer will not be attempted. Handle manually...")
return False

if sample_id:
remote_path = f"{user}@{server}:{destination_path}/{project_id}/{sample_id}/"
else:
remote_path = f"{user}@{server}:{destination_path}/{project_id}/"

rsync_command = [
"rsync",
"-avz",
"-e",
f"ssh -i {ssh_key}" if ssh_key else "ssh",
str(report_path),
remote_path,
]

try:
# Execute the rsync command
result = subprocess.run(
rsync_command,
check=True,
text=True,
capture_output=True,
)

logging.info(f"Report transferred successfully to {remote_path}")
return True
except subprocess.CalledProcessError as e:
logging.error(f"Failed to transfer report:\n{e.stderr.strip()}")
return False
except Exception as e:
logging.error(f"Unexpected error during report transfer: {e}")
logging.error(f"RSYNC output: {result.stdout}")
return False
39 changes: 35 additions & 4 deletions lib/realms/tenx/run_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from lib.base.abstract_sample import AbstractSample
from lib.core_utils.logging_utils import custom_logger
from lib.module_utils.report_transfer import transfer_report
from lib.module_utils.sjob_manager import SlurmJobManager
from lib.module_utils.slurm_utils import generate_slurm_script
from lib.realms.tenx.utils.sample_file_handler import SampleFileHandler
Expand Down Expand Up @@ -46,7 +47,6 @@ def __init__(
"feature_to_library_type", {}
)
self._status: str = "initialized"
self.file_handler: SampleFileHandler = SampleFileHandler(self)

self.features: List[str] = self._collect_features()
self.pipeline_info: Optional[Dict[str, Any]] = self._get_pipeline_info() or {}
Expand All @@ -62,6 +62,8 @@ def __init__(
else:
self.sjob_manager = SlurmJobManager()

self.file_handler: SampleFileHandler = SampleFileHandler(self)

@property
def id(self) -> str:
"""Get the run sample ID.
Expand Down Expand Up @@ -197,7 +199,9 @@ async def pre_process(self):
slurm_metadata = {
"sample_id": self.run_sample_id,
"project_name": self.project_info.get("project_name", ""),
"output_dir": str(self.file_handler.project_dir),
"project_dir": str(self.file_handler.project_dir),
"output_log": str(self.file_handler.slurm_output_path),
"error_log": str(self.file_handler.slurm_error_path),
"cellranger_command": cellranger_command,
}

Expand All @@ -213,7 +217,14 @@ async def process(self):
logging.info("\n")
logging.info(f"[{self.run_sample_id}] Processing...")

# Step 4: Submit the SLURM script
if self.pipeline_info is None:
logging.error(
f"[{self.run_sample_id}] Pipeline information is missing. Skipping..."
)
self.status = "failed"
return

# Submit the SLURM script
if not self.pipeline_info.get("submit", False):
logging.info(
f"[{self.run_sample_id}] According to decision table, we should not submit. "
Expand Down Expand Up @@ -405,5 +416,25 @@ def generate_multi_sample_csv(self) -> None:

def post_process(self) -> None:
"""Perform post-processing steps after job completion."""
logging.info("\n")
logging.info(f"[{self.run_sample_id}] Post-processing...")
pass

# Check if the run was successful
if not self.file_handler.check_run_success():
self.status = "failed"
return

# Extract the report path
if not self.file_handler.extract_report_path():
self.status = "failed"
return

# Transfer the report
if self.file_handler.report_path and transfer_report(
report_path=self.file_handler.report_path,
project_id=self.project_info.get("project_id", ""),
sample_id=self.id,
):
logging.info(f"Report for sample {self.id} transferred successfully.")
else:
logging.error(f"Failed to transfer report for sample {self.id}.")
69 changes: 65 additions & 4 deletions lib/realms/tenx/utils/sample_file_handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
from pathlib import Path
from typing import Any, Dict
from typing import Any, Dict, Optional

from lib.core_utils.logging_utils import custom_logger

Expand Down Expand Up @@ -39,6 +40,7 @@ def __init__(self, sample: Any) -> None:
self.sample_ref: str = sample.project_info.get("ref_genome", "")
self.organism: str = sample.project_info.get("organism", "")
self.config: Dict[str, Any] = sample.config
self.pipeline_info: Dict[str, Any] = sample.pipeline_info

# Define sample folder structure
self.project_dir: Path = sample.project_info.get("project_dir", "")
Expand All @@ -57,9 +59,68 @@ def init_file_paths(self) -> None:
self.project_dir / f"{self.sample_id}_slurm_script.sh"
)

# Report output files
# NOTE: Different pipelines may produce summaries in different locations
self.summary_fpath: Path = self.sample_dir / "outs" / "web_summary.html"
self.slurm_output_path: Path = self.project_dir / f"{self.sample_id}.out"
self.slurm_error_path: Path = self.project_dir / f"{self.sample_id}.err"

# Report file path / Will be set after parsing the output file
self._report_path: Optional[Path] = None

@property
def report_path(self):
if self._report_path is None:
if not self.extract_report_path():
return None
return self._report_path

def check_run_success(self) -> bool:
"""Check if the CellRanger run completed successfully."""

if not self.slurm_output_path.exists():
logging.error(f"CellRanger output file not found: {self.slurm_output_path}")
return False

with open(self.slurm_output_path) as f:
content = f.read()

if "Pipestance completed successfully!" in content:
logging.info(
f"CellRanger run completed successfully for sample {self.sample_id}"
)
return True
else:
logging.error(
f"CellRanger did not complete successfully for sample {self.sample_id}"
)
return False

def extract_report_path(self) -> bool:
"""Extract the report path from the Cell Ranger output file."""
if not self.slurm_output_path.exists():
logging.error(f"CellRanger output file not found: {self.slurm_output_path}")
return False

with open(self.slurm_output_path) as f:
content = f.read()

report_path = None
# Patterns to match different pipelines
patterns = [r"Run summary HTML:\s+(\S+)", r"web_summary:\s+(\S+)"]

for pattern in patterns:
match = re.search(pattern, content)
if match:
report_path = Path(match.group(1))
break

if report_path and report_path.exists():
self._report_path = report_path
logging.info(f"Report path found: {self.report_path}")
return True
else:
logging.error(
f"Report path not found in CellRanger output for sample {self.sample_id}"
)
return False

def get_libraries_csv_path(self) -> Path:
return self.project_dir / f"{self.sample_id}_libraries.csv"
Expand Down

0 comments on commit 7dd3897

Please sign in to comment.