Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: (#405) somatic variant qc #407

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions snappy_pipeline/workflows/somatic_variant_checking/Snakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
"""CUBI Pipeline tumor mutational burden step Snakefile"""

import os

from snappy_pipeline import expand_ref
from snappy_pipeline.workflows.somatic_variant_checking import (
SomaticVariantQCWorkflow,
)

__author__ = "Gia Cuong Pham"
__email__ = "[email protected]"


# Configuration ===============================================================


configfile: "config.yaml"


# Expand "$ref" JSON pointers in configuration (also works for YAML)
config, lookup_paths, config_paths = expand_ref("config.yaml", config)

# WorkflowImpl Object Setup ===================================================
wf = SomaticVariantQCWorkflow(workflow, config, lookup_paths, config_paths, os.getcwd())


localrules:
# Linking files from work/ to output/ should be done locally
somatic_variant_qc_link_out_run,


rule all:
input:
wf.get_result_files(),


# Generic linking out ---------------------------------------------------------


rule somatic_variant_qc_link_out_run:
input:
wf.get_input_files("link_out", "run"),
output:
wf.get_output_files("link_out", "run"),
run:
shell(wf.get_shell_cmd("link_out", "run", wildcards))


rule somatic_variant_qc:
input:
**wf.get_input_files("SVQC_step", "run"),
output:
**wf.get_output_files("SVQC_step", "run"),
threads: wf.get_resource("SVQC_step", "run", "threads")
resources:
time=wf.get_resource("SVQC_step", "run", "time"),
memory=wf.get_resource("SVQC_step", "run", "memory"),
partition=wf.get_resource("SVQC_step", "run", "partition"),
tmpdir=wf.get_resource("SVQC_step", "run", "tmpdir"),
log:
**wf.get_log_file("SVQC_step", "run"),
wrapper:
wf.wrapper_path("somatic_variants_checking")
259 changes: 211 additions & 48 deletions snappy_pipeline/workflows/somatic_variant_checking/__init__.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,219 @@
# -*- coding: utf-8 -*-
"""Implementation of the germline ``somatic_variant_checking`` step
import os
import sys

The ``somatic_variant_checking`` step takes as the input the results of the
``somatic_variant_annotation`` step. It then executes various tools computing statistics on the
result files and consistency checks with the pedigrees.
from biomedsheets.shortcuts import CancerCaseSheet, CancerCaseSheetOptions, is_not_background
from snakemake.io import expand

.. note::
from snappy_pipeline.base import UnsupportedActionException
from snappy_pipeline.utils import dictify, listify
from snappy_pipeline.workflows.abstract import BaseStep, BaseStepPart, LinkOutStepPart
from snappy_pipeline.workflows.ngs_mapping import NgsMappingWorkflow, ResourceUsage
from snappy_pipeline.workflows.somatic_variant_calling import (
SOMATIC_VARIANT_CALLERS_MATCHED,
SomaticVariantCallingWorkflow,
)

Status: not implemented yet
#: Extensions of files to create as main payload
EXT_VALUES = (".json", ".json.md5")

==========
Step Input
==========
#: Names of the files to create for the extension
EXT_NAMES = ("json", "json_md5")

The variant calling step uses Snakemake sub workflows for using the result of the
``somatic_variant_annotation`` step.

===========
Step Output
===========

.. note:: TODO

====================
Global Configuration
====================

.. note:: TODO

=====================
Default Configuration
=====================

The default configuration is as follows.

.. include:: DEFAULT_CONFIG_somatic_variant_checking.rst

=======
Reports
=======

Currently, no reports are generated.
"""

__author__ = "Manuel Holtgrewe <[email protected]>"

#: Available tools for checking variants
VARIANT_CHECKERS = ("bcftools_stats", "peddy")

#: Default configuration for the somatic_gene_fusion_calling step
#: Default configuration for the tmb calculation step
DEFAULT_CONFIG = r"""
step_config:
somatic_variant_checking:
path_somatic_variant_calling: ../somatic_variant_calling # REQUIRED
somatic_variant_checking:
path_somatic_variant_calling: ../somatic_variant_calling # REQUIRED
tools_ngs_mapping: [] # default to those configured for ngs_mapping
tools_somatic_variant_calling: [] # default to those configured for somatic_variant_calling
target_regions: # REQUIRED
padding: 0 #Used for count the number of variants outside of exom + padding
ignore_regions: []
minimal_support_read: 1
limited_support_read: 5
"""


class SomaticVariantQCStepPart(BaseStepPart):
""""""

name = "SVQC_step"

actions = ("run",)

def __init__(self, parent):
super().__init__(parent)

@dictify
def get_input_files(self, action):
self._validate_action(action)
tpl = (
"output/{mapper}.{var_caller}.{tumor_library}/out/"
"{mapper}.{var_caller}.{tumor_library}"
)
key_ext = {
"full_vcf": ".full.vcf.gz",
"full_vcf_tbi": ".full.vcf.gz.tbi",
"passed_vcf": ".vcf.gz",
"passed_vcf_tbi": ".vcf.gz.tbi",
}
variant_calling = self.parent.sub_workflows["somatic_variant_calling"]
for key, ext in key_ext.items():
yield key, variant_calling(tpl + ext)

@dictify
def get_output_files(self, action):
# Validate action
self._validate_action(action)
prefix = (
"work/{mapper}.{var_caller}.variantsqc.{tumor_library}/out/"
"{mapper}.{var_caller}.variantsqc.{tumor_library}"
)
key_ext = {"json": ".json"}
for key, ext in key_ext.items():
yield key, prefix + ext
yield key + "_md5", prefix + ext + ".md5"

@dictify
def _get_log_file(self, action):
assert action == "run"
giacuong171 marked this conversation as resolved.
Show resolved Hide resolved
prefix = (
"work/{mapper}.{var_caller}.variantsqc.{tumor_library}/log/"
"{mapper}.{var_caller}.variantsqc.{tumor_library}"
)

key_ext = (
("log", ".log"),
("conda_info", ".conda_info.txt"),
("conda_list", ".conda_list.txt"),
)
for key, ext in key_ext:
yield key, prefix + ext

def get_resource_usage(self, action):
"""Get Resource Usage
:param action: Action (i.e., step) in the workflow, example: 'run'.
:type action: str
:return: Returns ResourceUsage for step.
:raises UnsupportedActionException: if action not in class defined list of valid actions.
"""
if action not in self.actions:
giacuong171 marked this conversation as resolved.
Show resolved Hide resolved
actions_str = ", ".join(self.actions)
error_message = f"Action '{action}' is not supported. Valid options: {actions_str}"
raise UnsupportedActionException(error_message)
mem_mb = 4 * 1024 # 4GB
return ResourceUsage(
threads=2,
time="1:00:00", # 1 hour
memory=f"{mem_mb}M",
)


class SomaticVariantQCWorkflow(BaseStep):
"""Perform gathering variant information"""

name = "somaticvariantchecking"
sheet_shortcut_class = CancerCaseSheet
sheet_shortcut_kwargs = {
"options": CancerCaseSheetOptions(allow_missing_normal=True, allow_missing_tumor=True)
}

@classmethod
def default_config_yaml(cls):
"""Return default config YAML, to be overwritten by project-specific one."""
return DEFAULT_CONFIG

def __init__(self, workflow, config, config_lookup_paths, config_paths, workdir):
super().__init__(
workflow,
config,
config_lookup_paths,
config_paths,
workdir,
(SomaticVariantCallingWorkflow, NgsMappingWorkflow),
)
# Register sub step classes so the sub steps are available
self.register_sub_step_classes((SomaticVariantQCStepPart, LinkOutStepPart))
# Register sub workflows
self.register_sub_workflow(
"somatic_variant_calling",
self.w_config["step_config"]["somatic_variant_checking"][
"path_somatic_variant_calling"
],
)
# Copy over "tools" setting from somatic_variant_calling/ngs_mapping if not set here
if not self.w_config["step_config"]["somatic_variant_checking"]["tools_ngs_mapping"]:
self.w_config["step_config"]["somatic_variant_checking"][
"tools_ngs_mapping"
] = self.w_config["step_config"]["ngs_mapping"]["tools"]["dna"]
if not self.w_config["step_config"]["somatic_variant_checking"][
"tools_somatic_variant_calling"
]:
self.w_config["step_config"]["somatic_variant_checking"][
"tools_somatic_variant_calling"
] = self.w_config["step_config"]["somatic_variant_calling"]["tools"]

@listify
def get_result_files(self):
callers = set(
self.w_config["step_config"]["somatic_variant_checking"][
"tools_somatic_variant_calling"
]
)
name_pattern = "{mapper}.{caller}.variantsqc.{tumor_library.name}"
yield from self._yield_result_files_matched(
os.path.join("output", name_pattern, "out", name_pattern + "{ext}"),
mapper=self.w_config["step_config"]["somatic_variant_checking"]["tools_ngs_mapping"],
caller=callers & set(SOMATIC_VARIANT_CALLERS_MATCHED),
ext=EXT_VALUES,
)
yield from self._yield_result_files_matched(
os.path.join("output", name_pattern, "log", name_pattern + "{ext}"),
mapper=self.w_config["step_config"]["somatic_variant_checking"]["tools_ngs_mapping"],
caller=callers & set(SOMATIC_VARIANT_CALLERS_MATCHED),
ext=(
".log",
".log.md5",
".conda_info.txt",
".conda_info.txt.md5",
".conda_list.txt",
".conda_list.txt.md5",
),
)

def _yield_result_files_matched(self, tpl, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All tumor sample with DNA data should be used here.

Looping over tumor samples paired with a normal sample makes sense when calling somatic variants, because the method requires paired samples.
But we need to move away with the paired normal/tumor requirements is other somatic steps, because there will be panel data with only tumor samples

"""Build output paths from path template and extension list.

This function returns the results from the matched somatic variant callers such as
Mutect.
"""
for sheet in filter(is_not_background, self.shortcut_sheets):
for sample_pair in sheet.all_sample_pairs:
if (
not sample_pair.tumor_sample.dna_ngs_library
or not sample_pair.normal_sample.dna_ngs_library
):
msg = (
"INFO: sample pair for cancer bio sample {} has is missing primary"
"normal or primary cancer NGS library"
)
print(msg.format(sample_pair.tumor_sample.name), file=sys.stderr)
continue
yield from expand(
tpl,
tumor_library=[sample_pair.tumor_sample.dna_ngs_library],
**kwargs,
)

def check_config(self):
"""Check that the path to the NGS mapping is present"""
self.ensure_w_config(
("step_config", "somatic_variant_checking", "path_somatic_variant_calling"),
"Path to variant calling not configured but required for somatic variant qc",
)

self.ensure_w_config(
("step_config", "somatic_variant_checking", "target_regions"),
giacuong171 marked this conversation as resolved.
Show resolved Hide resolved
"Path to target_regions file (bed format)"
"not configured but required for somatic variant qc",
)
64 changes: 64 additions & 0 deletions snappy_pipeline/workflows/somatic_variant_qc/Snakefile
giacuong171 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
"""CUBI Pipeline tumor mutational burden step Snakefile"""

import os

from snappy_pipeline import expand_ref
from snappy_pipeline.workflows.somatic_variant_qc import (
SomaticVariantQCWorkflow,
)

__author__ = "Gia Cuong Pham"
__email__ = "[email protected]"


# Configuration ===============================================================


configfile: "config.yaml"


# Expand "$ref" JSON pointers in configuration (also works for YAML)
config, lookup_paths, config_paths = expand_ref("config.yaml", config)

# WorkflowImpl Object Setup ===================================================
wf = SomaticVariantQCWorkflow(workflow, config, lookup_paths, config_paths, os.getcwd())


localrules:
# Linking files from work/ to output/ should be done locally
somatic_variant_qc_link_out_run,


rule all:
input:
wf.get_result_files(),


# Generic linking out ---------------------------------------------------------


rule somatic_variant_qc_link_out_run:
input:
wf.get_input_files("link_out", "run"),
output:
wf.get_output_files("link_out", "run"),
run:
shell(wf.get_shell_cmd("link_out", "run", wildcards))


rule somatic_variant_qc:
input:
**wf.get_input_files("SVQC_step", "run"),
output:
**wf.get_output_files("SVQC_step", "run"),
threads: wf.get_resource("SVQC_step", "run", "threads")
resources:
time=wf.get_resource("SVQC_step", "run", "time"),
memory=wf.get_resource("SVQC_step", "run", "memory"),
partition=wf.get_resource("SVQC_step", "run", "partition"),
tmpdir=wf.get_resource("SVQC_step", "run", "tmpdir"),
log:
**wf.get_log_file("SVQC_step", "run"),
wrapper:
wf.wrapper_path("somatic_variants_qc")
Loading