Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: co-assembling contigs from multiple samples (#22) #68

Merged
merged 16 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,9 @@ runinfo/

# Editor junk
.vscode

# additional
test_megahit.py
exported_*
reads/*
*.qza
2 changes: 2 additions & 0 deletions q2_assembly/_action_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"num_cpu_threads": "Number of CPU threads.",
"no_hw_accel": "Run MEGAHIT without BMI2 and POPCNT hardware instructions.",
"min_contig_len": "Minimum length of contigs to output.",
"coassemble": "Co-assemble reads into contigs from all samples."
}
# fmt: on
spades_params = {
Expand Down Expand Up @@ -106,6 +107,7 @@
"'off').",
"phred_offset": "PHRED quality offset in the input reads (33 or 64).",
"debug": "Runs SPAdes in debug mode.",
"coassemble": "Co-assemble reads into contigs from all samples.",
}
# fmt: on
quast_params = {
Expand Down
39 changes: 37 additions & 2 deletions q2_assembly/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
)


def run_command(cmd, verbose=True):
def run_command(cmd, verbose=True, concat=False):
if verbose:
print(EXTERNAL_CMD_WARNING)
print("\nCommand:", end=" ")
print(" ".join(cmd), end="\n\n")
subprocess.run(cmd, check=True)
if concat:
subprocess.run(cmd, shell=True, check=True)

Check warning on line 30 in q2_assembly/_utils.py

View check run for this annotation

Codecov / codecov/patch

q2_assembly/_utils.py#L30

Added line #L30 was not covered by tests
else:
subprocess.run(cmd, check=True)


def run_commands_with_pipe(cmd1, cmd2, verbose=True):
Expand Down Expand Up @@ -132,3 +135,35 @@
filename (str): The name of the file/dir we are trying to get
"""
return pkg_resources.resource_filename(package, f"data/{filename}")


def get_file_extension(filepath):
"""Extract the extension of a file to see if it is compressed
or not.

Args:
filepath (str): the path of the file to get the extension

"""
ext = ""
parts = filepath.split(".")

if parts[-1] == "gz":
ext += f".{parts[-2]}.gz"
else:
# take only the last part that
# will correspond to the extension
ext = f".{parts[-1]}"
return ext


def concatenate_files(input_files, output_file):
"""Concatenate the content of the files in input_files and
save the content in the output_file.

Args:
input_files (list): list of all files to be concatenated
output_file (str): the path to the resulting file
"""
cmd = ["cat", *input_files]
subprocess.run(cmd, stdout=open(output_file, "w"), check=True)
39 changes: 31 additions & 8 deletions q2_assembly/megahit/megahit.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,13 @@
no_hw_accel=False,
min_contig_len=200,
num_partitions=None,
coassemble=False,
):
kwargs = {
k: v for k, v in locals().items() if k not in ["seqs", "num_partitions", "ctx"]
# removing num_partitions from this dict to include it in the parameters
k: v
for k, v in locals().items()
if k not in ["seqs", "num_partitions", "ctx"]
}

_assemble_megahit = ctx.get_action("assembly", "_assemble_megahit")
Expand All @@ -140,6 +144,12 @@
else:
raise NotImplementedError()

if coassemble:
num_partitions = 1
print("WARNING: num_partitions set to 1 since co_assemble is used!")
(contig,) = _assemble_megahit(seqs, **kwargs)
return contig

Check warning on line 151 in q2_assembly/megahit/megahit.py

View check run for this annotation

Codecov / codecov/patch

q2_assembly/megahit/megahit.py#L148-L151

Added lines #L148 - L151 were not covered by tests

(partitioned_seqs,) = partition_method(seqs, num_partitions)

contigs = []
Expand Down Expand Up @@ -176,6 +186,7 @@
num_cpu_threads: int = 1,
no_hw_accel: bool = False,
min_contig_len: int = 200,
coassemble: bool = False,
) -> ContigSequencesDirFmt:
if max_tip_len == "auto":
max_tip_len = None
Expand All @@ -189,15 +200,17 @@
"then all must be explicitly set."
)

kwargs = {k: v for k, v in locals().items() if k not in ["seqs"]}
kwargs = {k: v for k, v in locals().items() if k not in ["seqs", "coassemble"]}
common_args = _process_common_input_params(
processing_func=_process_megahit_arg, params=kwargs
)

return assemble_megahit_helper(seqs=seqs, common_args=common_args)
return assemble_megahit_helper(
seqs=seqs, coassemble=coassemble, common_args=common_args
)


def assemble_megahit_helper(seqs, common_args) -> ContigSequencesDirFmt:
def assemble_megahit_helper(seqs, coassemble, common_args) -> ContigSequencesDirFmt:
"""Runs the assembly for all available samples.

Both, paired- and single-end reads can be processed - the output will
Expand All @@ -207,6 +220,8 @@
seqs: Sequences to be processed.
common_args: List of common flags and their values for
the MEGAHIT command.
coassemble: boolean variable that specifies whether
reads from all samples are co-assembled.

Returns:
result (ContigSequencesDirFmt): Assembled contigs.
Expand All @@ -217,9 +232,17 @@
manifest = seqs.manifest.view(pd.DataFrame)
result = ContigSequencesDirFmt()

for samp in list(manifest.index):
fwd = manifest.loc[samp, "forward"]
rev = manifest.loc[samp, "reverse"] if paired else None
if coassemble:
print("Co-assembling reads from all samples.")
fwd = ",".join(manifest["forward"])
rev = ",".join(manifest["reverse"]) if paired else None

_process_sample("all_contigs", fwd, rev, common_args, result)
else:
for samp in list(manifest.index):
fwd = manifest.loc[samp, "forward"]
rev = manifest.loc[samp, "reverse"] if paired else None

_process_sample(samp, fwd, rev, common_args, result)

_process_sample(samp, fwd, rev, common_args, result)
return result
132 changes: 119 additions & 13 deletions q2_assembly/megahit/tests/test_megahit.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,54 @@ def setUp(self):
"100",
]

def get_reads_path(self, kind="paired", sample_id=1, direction="fwd"):
def get_reads_path(
self, kind="paired", sample_id=1, direction="fwd", is_single_sample=False
):
d = 1 if direction == "fwd" else 2
if is_single_sample:
return self.get_data_path(
f"reads/single-sample/{kind}-end/reads{sample_id}_R{d}.fastq.gz"
)
return self.get_data_path(f"reads/{kind}-end/reads{sample_id}_R{d}.fastq.gz")

def generate_exp_calls(self, sample_ids, kind="paired"):
exp_calls = []
def get_fwd_rev_paths(self, kind, sample_id, is_single_sample=False):
rev = None
for s in sample_ids:
fwd = self.get_reads_path(kind, s, "fwd")
if kind == "paired":
rev = self.get_reads_path(kind, s, "rev")
exp_calls.append(call(f"sample{s}", fwd, rev, self.test_params_list, ANY))
fwd = self.get_reads_path(kind, sample_id, "fwd", is_single_sample)
if kind == "paired":
rev = self.get_reads_path(kind, sample_id, "rev", is_single_sample)

return fwd, rev

def generate_exp_calls_coassembly(
self, sample_ids, kind="paired", coassemble=False, is_single_sample=False
):
exp_calls = []
fwd = []
rev = []
if coassemble:
for s in sample_ids:
# collect paths in 2 lists, then make a single call
fwd.append(self.get_fwd_rev_paths(kind, s, is_single_sample)[0])
if kind == "paired":
rev.append(self.get_fwd_rev_paths(kind, s, is_single_sample)[1])
exp_calls.append(
call(
"all_contigs",
",".join(fwd),
(",".join(rev) if len(rev) != 0 else None),
self.test_params_list,
ANY,
)
)
else:
for s in sample_ids:
# make a list of calls, each call has separate
# samples and respective paths
fwd, rev = self.get_fwd_rev_paths(kind, s, is_single_sample)
exp_calls.append(
call(f"sample{s}", fwd, rev, self.test_params_list, ANY)
)

return exp_calls

def test_process_megahit_arg_simple1(self):
Expand Down Expand Up @@ -221,8 +257,12 @@ def test_assemble_megahit_paired(self, p):
input_files = self.get_data_path("reads/paired-end")
input = SingleLanePerSamplePairedEndFastqDirFmt(input_files, mode="r")

obs = assemble_megahit_helper(seqs=input, common_args=self.test_params_list)
exp_calls = self.generate_exp_calls(sample_ids=(1, 2), kind="paired")
obs = assemble_megahit_helper(
seqs=input, coassemble=False, common_args=self.test_params_list
)
exp_calls = self.generate_exp_calls_coassembly(
sample_ids=(1, 2), kind="paired", coassemble=False
)

p.assert_has_calls(exp_calls, any_order=False)
self.assertIsInstance(obs, ContigSequencesDirFmt)
Expand All @@ -232,8 +272,74 @@ def test_assemble_megahit_single(self, p):
input_files = self.get_data_path("reads/single-end")
input = SingleLanePerSampleSingleEndFastqDirFmt(input_files, mode="r")

obs = assemble_megahit_helper(seqs=input, common_args=self.test_params_list)
exp_calls = self.generate_exp_calls(sample_ids=(1, 2), kind="single")
obs = assemble_megahit_helper(
seqs=input,
coassemble=False,
common_args=self.test_params_list,
)
exp_calls = self.generate_exp_calls_coassembly(
sample_ids=(1, 2), kind="single", coassemble=False
)

p.assert_has_calls(exp_calls, any_order=False)
self.assertIsInstance(obs, ContigSequencesDirFmt)

@patch("q2_assembly.megahit._process_sample")
def test_assemble_megahit_paired_coassemble(self, p):
input_files = self.get_data_path("reads/paired-end")
input = SingleLanePerSamplePairedEndFastqDirFmt(input_files, mode="r")

obs = assemble_megahit_helper(
seqs=input, coassemble=True, common_args=self.test_params_list
)
exp_calls = self.generate_exp_calls_coassembly(
sample_ids=(1, 2), kind="paired", coassemble=True
)

p.assert_has_calls(exp_calls, any_order=False)
self.assertIsInstance(obs, ContigSequencesDirFmt)

@patch("q2_assembly.megahit._process_sample")
def test_assemble_megahit_single_coassemble(self, p):
input_files = self.get_data_path("reads/single-end")
input = SingleLanePerSampleSingleEndFastqDirFmt(input_files, mode="r")

obs = assemble_megahit_helper(
seqs=input, coassemble=True, common_args=self.test_params_list
)
exp_calls = self.generate_exp_calls_coassembly(
sample_ids=(1, 2), kind="single", coassemble=True
)

p.assert_has_calls(exp_calls, any_order=False)
self.assertIsInstance(obs, ContigSequencesDirFmt)

@patch("q2_assembly.megahit._process_sample")
def test_assemble_megahit_paired_single_sample_coassemble(self, p):
input_files = self.get_data_path("reads/single-sample/paired-end")
input = SingleLanePerSamplePairedEndFastqDirFmt(input_files, mode="r")

obs = assemble_megahit_helper(
seqs=input, coassemble=True, common_args=self.test_params_list
)
exp_calls = self.generate_exp_calls_coassembly(
sample_ids=(1,), kind="paired", coassemble=True, is_single_sample=True
)

p.assert_has_calls(exp_calls, any_order=False)
self.assertIsInstance(obs, ContigSequencesDirFmt)

@patch("q2_assembly.megahit._process_sample")
def test_assemble_megahit_single_single_sample_coassemble(self, p):
input_files = self.get_data_path("reads/single-sample/single-end")
input = SingleLanePerSampleSingleEndFastqDirFmt(input_files, mode="r")

obs = assemble_megahit_helper(
seqs=input, coassemble=True, common_args=self.test_params_list
)
exp_calls = self.generate_exp_calls_coassembly(
sample_ids=(1,), kind="single", coassemble=True, is_single_sample=True
)

p.assert_has_calls(exp_calls, any_order=False)
self.assertIsInstance(obs, ContigSequencesDirFmt)
Expand Down Expand Up @@ -279,7 +385,7 @@ def test_assemble_megahit_process_params(self, p):
"--min-contig-len",
"200",
]
p.assert_called_with(seqs=input, common_args=exp_args)
p.assert_called_with(seqs=input, coassemble=False, common_args=exp_args)

def test_assemble_megahit_parallel_paired(self):
input_files = self.get_data_path("formatted-reads/paired-end")
Expand Down
21 changes: 15 additions & 6 deletions q2_assembly/plugin_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# ----------------------------------------------------------------------------

from q2_types.feature_data import FeatureData, Sequence
from q2_types.feature_data_mag import Contig
from q2_types.feature_table import FeatureTable, Frequency
from q2_types.per_sample_sequences import (
Contigs,
Expand All @@ -18,6 +19,7 @@
)
from q2_types.per_sample_sequences._type import AlignmentMap
from q2_types.sample_data import SampleData
from qiime2.core.type import Bool, Choices, TypeMap
from qiime2.plugin import Citations, Collection, Int, List, Plugin, Range

import q2_assembly
Expand Down Expand Up @@ -52,11 +54,18 @@
short_description="QIIME 2 plugin for (meta)genome assembly.",
)

P_coassemble, T_coassembled_seqs = TypeMap(
{
Bool % Choices(True): FeatureData[Contig],
Bool % Choices(False): SampleData[Contigs],
}
)

plugin.pipelines.register_function(
function=q2_assembly.megahit.assemble_megahit,
inputs={"seqs": SampleData[SequencesWithQuality | PairedEndSequencesWithQuality]},
parameters={**megahit_params, **partition_params},
outputs=[("contigs", SampleData[Contigs])],
parameters={**megahit_params, "coassemble": P_coassemble, **partition_params},
outputs=[("contigs", T_coassembled_seqs)],
input_descriptions={"seqs": "The paired- or single-end sequences to be assembled."},
parameter_descriptions={
**megahit_param_descriptions,
Expand All @@ -72,8 +81,8 @@
plugin.methods.register_function(
function=q2_assembly.megahit._assemble_megahit,
inputs={"seqs": SampleData[SequencesWithQuality | PairedEndSequencesWithQuality]},
parameters=megahit_params,
outputs=[("contigs", SampleData[Contigs])],
parameters={**megahit_params, "coassemble": P_coassemble}, # megahit_params,
outputs=[("contigs", T_coassembled_seqs)],
input_descriptions={"seqs": "The paired- or single-end sequences to be assembled."},
parameter_descriptions=megahit_param_descriptions,
output_descriptions={"contigs": "The resulting assembled contigs."},
Expand Down Expand Up @@ -113,8 +122,8 @@
plugin.methods.register_function(
function=q2_assembly.spades.assemble_spades,
inputs={"seqs": SampleData[SequencesWithQuality | PairedEndSequencesWithQuality]},
parameters=spades_params,
outputs=[("contigs", SampleData[Contigs])],
parameters={**spades_params, "coassemble": P_coassemble},
outputs=[("contigs", T_coassembled_seqs)],
input_descriptions={"seqs": "The paired- or single-end sequences to be assembled."},
parameter_descriptions=spades_param_descriptions,
output_descriptions={"contigs": "The resulting assembled contigs."},
Expand Down
Loading
Loading