Skip to content

Commit

Permalink
update snakemake.api calls, use submodules, wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tedil committed Apr 10, 2024
1 parent c49b61e commit 9c2f707
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 33 deletions.
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dependencies:
- git-lfs =3.5

# Snakemake is used for providing the actual wrapper calling functionality
- snakemake =8
- snakemake >=8.10.6

# Additional libraries used by snappy
- ruamel.yaml =0.18 # Nice, round-trip enabled YAML parsing
Expand Down
2 changes: 1 addition & 1 deletion requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ biomedsheets >=0.11.7
termcolor>=1.1.0

# Snakemake is used for providing the actual wrapper calling functionality
snakemake>=8.8.0
snakemake>=8.10.6
# Snakemake needs manual install of PyYAML to make YAML configuration loading work
#PyYAML>=6.0

Expand Down
22 changes: 17 additions & 5 deletions snappy_pipeline/workflows/abstract/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from snappy_pipeline.base import (
MissingConfiguration,
UnsupportedActionException,
expand_ref,
merge_dicts,
merge_kwargs,
print_config,
Expand Down Expand Up @@ -799,13 +800,24 @@ def register_sub_workflow(self, step_name, workdir, sub_workflow_name=None):
abs_workdir = workdir
else:
abs_workdir = os.path.realpath(os.path.join(os.getcwd(), workdir))
self.workflow.subworkflow(
sub_workflow_name,
workdir=abs_workdir,

config_path = abs_workdir + "/" + "config.yaml"
config, *_ = expand_ref(config_path, self.w_config)

from pprint import pprint

pprint(config)
pprint(_)
print(snakefile_path(step_name))
print(abs_workdir)
self.workflow.module(
name=sub_workflow_name,
# workdir=abs_workdir,
prefix=step_name,
snakefile=snakefile_path(step_name),
configfile=abs_workdir + "/" + "config.yaml",
config=config,
)
self.sub_workflows[sub_workflow_name] = self.workflow.globals[sub_workflow_name]
self.sub_workflows[sub_workflow_name] = self.workflow.modules[sub_workflow_name]

def get_args(self, sub_step, action):
"""Return arguments for action of substep with given wildcards
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,13 @@ def _get_input_files_run(self, wildcards):
:return: Returns dictionary with input files for rule 'run', BAM and BAI files.
"""
# Get shorcut to Snakemake sub workflow
ngs_mapping = self.parent.sub_workflows["ngs_mapping"]
module_info = self.parent.sub_workflows["ngs_mapping"]
module_info.use_rules(rules="*")
_parent_path = self.parent.workflow.overwrite_workdir.parent

def ngs_mapping(path: str) -> str:
return "../" + "ngs_mapping" + "/" + path

# Get names of primary libraries of the selected cancer bio sample and the
# corresponding primary normal sample
normal_base_path = "output/{mapper}.{normal_library}/out/{mapper}.{normal_library}".format(
Expand Down
53 changes: 30 additions & 23 deletions snappy_wrappers/wrapper_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import textwrap
import time
from collections.abc import MutableMapping, MutableSequence
from pathlib import Path

from snakemake.api import ResourceSettings, SnakemakeApi
from snakemake.cli import get_profile_dir
Expand Down Expand Up @@ -260,12 +261,14 @@ def run_snakemake(
profile=None,
):
"""Given a pipeline step's configuration, launch sequential or parallel Snakemake"""
snakefile = Path(snakefile)
if config["use_profile"]:
workdir = Path(os.getcwd())
print(
f"Running with Snakemake profile on {num_jobs or config['num_jobs']} "
f"cores in directory {os.getcwd()}"
f"cores in directory {workdir}"
)
os.mkdir(os.path.join(os.getcwd(), "slurm_log"))
os.mkdir(os.path.join(workdir, "slurm_log"))
if partition:
os.environ["SNAPPY_PIPELINE_DEFAULT_PARTITION"] = partition

Expand All @@ -281,33 +284,37 @@ def run_snakemake(
),
)

result = SnakemakeApi.workflow(
snakefile=snakefile,
workdir=os.getcwd(),
resource_settings=ResourceSettings(cores=cores, nodes=num_jobs or config["num_jobs"]),
# TODO properly choose remaining *_settings, if needed
# config_settings=None,
# storage_settings=None,
# workflow_settings=None,
# deployment_settings=None,
# storage_provider_settings=None,
)
with SnakemakeApi() as api:
result = api.workflow(
snakefile=snakefile,
workdir=workdir,
resource_settings=ResourceSettings(
cores=cores, nodes=num_jobs or config["num_jobs"]
),
# TODO properly choose remaining *_settings, if needed
# config_settings=None,
# storage_settings=None,
# workflow_settings=None,
# deployment_settings=None,
# storage_provider_settings=None,
)
else:
print(
"Running locally with {num_jobs} jobs in directory {cwd}".format(
num_jobs=config["num_jobs"], cwd=os.getcwd()
)
)
result = SnakemakeApi.workflow(
snakefile=snakefile,
resource_settings=ResourceSettings(cores=config["num_jobs"]),
# TODO properly choose remaining *_settings, if needed
# config_settings=None,
# storage_settings=None,
# workflow_settings=None,
# deployment_settings=None,
# storage_provider_settings=None,
)
with SnakemakeApi() as api:
result = api.workflow(
snakefile=snakefile,
resource_settings=ResourceSettings(cores=config["num_jobs"]),
# TODO properly choose remaining *_settings, if needed
# config_settings=None,
# storage_settings=None,
# workflow_settings=None,
# deployment_settings=None,
# storage_provider_settings=None,
)
if not result:
raise SnakemakeExecutionFailed("Could not perform nested Snakemake call")

Expand Down
6 changes: 4 additions & 2 deletions snappy_wrappers/wrappers/mutect2/environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ channels:
- bioconda

dependencies:
- python =3.9
- python =3.11
- gatk4 =4.3
- htslib >=1.9
- bcftools >=1.9
# gawk is needed because the awk expression uses a gawk specific version of `match(…)`
- gawk
- datrie
# snakemake needed by the parallel wrapper
- snakemake-minimal =7
- snakemake-minimal =8.10
# needed by snakemake
- cffi
- numpy
Expand Down

0 comments on commit 9c2f707

Please sign in to comment.