From cc0d345e579c6f71cc8372566702f45de88b9be6 Mon Sep 17 00:00:00 2001 From: Jakob Nybo Nissen Date: Thu, 28 Nov 2024 15:00:58 +0100 Subject: [PATCH] Use strobealign in AVAMB workflow --- workflow_avamb/avamb.snake.conda.smk | 237 +++--------------- workflow_avamb/config.json | 4 +- .../envs/{minimap2.yaml => strobealign.yaml} | 4 +- workflow_avamb/src/abundance.py | 54 ++++ workflow_avamb/src/abundances_mask.py | 40 --- workflow_avamb/src/create_abundances.py | 36 --- workflow_avamb/src/write_abundances.py | 34 --- 7 files changed, 90 insertions(+), 319 deletions(-) rename workflow_avamb/envs/{minimap2.yaml => strobealign.yaml} (59%) create mode 100644 workflow_avamb/src/abundance.py delete mode 100644 workflow_avamb/src/abundances_mask.py delete mode 100644 workflow_avamb/src/create_abundances.py delete mode 100644 workflow_avamb/src/write_abundances.py diff --git a/workflow_avamb/avamb.snake.conda.smk b/workflow_avamb/avamb.snake.conda.smk index 6f301135..2688e317 100644 --- a/workflow_avamb/avamb.snake.conda.smk +++ b/workflow_avamb/avamb.snake.conda.smk @@ -25,8 +25,8 @@ MIN_BIN_SIZE = int(get_config("min_bin_size", "200000", r"[1-9]\d*$")) MIN_IDENTITY = float(get_config("min_identity", "0.95", r".*")) -MM_MEM = get_config("minimap_mem", "35gb", r"[1-9]\d*GB$") -MM_PPN = get_config("minimap_ppn", "10", r"[1-9]\d*$") +SA_MEM = get_config("strobealign_mem", "35gb", r"[1-9]\d*GB$") +SA_PPN = get_config("strobealign_ppn", "10", r"[1-9]\d*$") AVAMB_MEM = get_config("avamb_mem", "20gb", r"[1-9]\d*GB$") AVAMB_PPN = get_config("avamb_ppn", "10", r"[1-9]\d*(:gpus=[1-9]\d*)?$") @@ -102,234 +102,61 @@ rule cat_contigs: "avamb" shell: "python {params.path} {output} {input} -m {MIN_CONTIG_SIZE}" -# Index resulting contig-file with minimap2 -rule index: +# Create abundance tables by aligning reads to the catalogue +rule strobealign: input: contigs = os.path.join(OUTDIR,"contigs.flt.fna.gz") - output: - mmi = os.path.join(OUTDIR,"contigs.flt.mmi") - params: - walltime="864000", - nodes="1", - ppn="1" - resources: - mem="90GB" - threads: - 1 - log: - out_ind = os.path.join(OUTDIR,"log/contigs/index.log"), - o = os.path.join(OUTDIR,"log/contigs/index.o"), - e = os.path.join(OUTDIR,"log/contigs/index.e") - - - conda: - "envs/minimap2.yaml" - shell: - "minimap2 -I {INDEX_SIZE} -d {output} {input} 2> {log.out_ind}" - -# This rule creates a SAM header from a FASTA file. -# We need it because minimap2 for truly unknowable reasons will write -# SAM headers INTERSPERSED in the output SAM file, making it unparseable. -# To work around this mind-boggling bug, we remove all header lines from -# minimap2's SAM output by grepping, then re-add the header created in this -# rule. -rule dict: - input: - contigs = os.path.join(OUTDIR,"contigs.flt.fna.gz") - output: - dict = os.path.join(OUTDIR,"contigs.flt.dict") - params: - walltime="864000", - nodes="1", - ppn="1" - resources: - mem="10GB" - threads: - 1 - log: - out_dict= os.path.join(OUTDIR,"log/contigs/dict.log"), - o = os.path.join(OUTDIR,"log/contigs/dict.o"), - e = os.path.join(OUTDIR,"log/contigs/dict.e") - - conda: - "envs/samtools.yaml" - shell: - "samtools dict {input} | cut -f1-3 > {output} 2> {log.out_dict}" - -# Generate bam files -rule minimap: - input: fq = lambda wildcards: sample2path[wildcards.sample], - mmi = os.path.join(OUTDIR,"contigs.flt.mmi"), - dict = os.path.join(OUTDIR,"contigs.flt.dict") - output: - bam = temp(os.path.join(OUTDIR,"mapped/{sample}.bam")) + output: temp(os.path.join(OUTDIR, "mapped", "{sample}.aemb.tsv")) params: walltime="864000", nodes="1", - ppn=MM_PPN + ppn=SA_PPN resources: - mem=MM_MEM + mem=SA_MEM threads: - int(MM_PPN) - log: - out_minimap = os.path.join(OUTDIR,"log/map/{sample}.minimap.log"), - o = os.path.join(OUTDIR,"log/map/{sample}.minimap.o"), - e = os.path.join(OUTDIR,"log/map/{sample}.minimap.e") - + int(SA_PPN) conda: - "envs/minimap2.yaml" - shell: - # See comment over rule "dict" to understand what happens here - "minimap2 -t {threads} -ax sr {input.mmi} {input.fq} -N 5" - " | grep -v '^@'" - " | cat {input.dict} - " - " | samtools view -F 3584 -b - " # supplementary, duplicate read, fail QC check - " > {output.bam} 2> {log.out_minimap}" - -# Sort bam files -rule sort: - input: - os.path.join(OUTDIR,"mapped/{sample}.bam") - output: - os.path.join(OUTDIR,"mapped/{sample}.sort.bam") - params: - walltime="864000", - nodes="1", - ppn="2", - prefix=os.path.join(OUTDIR,"mapped/tmp.{sample}") - resources: - mem="15GB" - threads: - 2 + "envs/strobealign.yaml" log: - out_sort = os.path.join(OUTDIR,"log/map/{sample}.sort.log"), - o = os.path.join(OUTDIR,"log/map/{sample}.sort.o"), - e = os.path.join(OUTDIR,"log/map/{sample}.sort.e") - + out_sa = os.path.join(OUTDIR,"log/map/{sample}.strobealign.log"), + o = os.path.join(OUTDIR,"log/map/{sample}.strobealign.o"), + e = os.path.join(OUTDIR,"log/map/{sample}.strobealign.e") conda: - "envs/samtools.yaml" + "envs/strobealign.yaml" shell: - "samtools sort {input} -T {params.prefix} --threads 1 -m 3G -o {output} 2> {log.out_sort}" + "strobealign -t {threads} --aemb {input.contigs} {input.fq}" + " > {output} 2> {log.out_sa}" -# Extract header lengths from a BAM file in order to determine which headers -# to filter from the abundance (i.e. get the mask) -rule get_headers: +rule create_abundance_tsv: input: - os.path.join(OUTDIR, "mapped", f"{IDS[1]}.sort.bam") - output: - os.path.join(OUTDIR,"abundances/headers.txt") + # We don't actually use the files in the rule itself, but we need to + # list them as inputs, such that Snakemake will not execute this rule + # until all the files are present. + files=expand(os.path.join(OUTDIR, "mapped", "{sample}.aemb.tsv"), sample=IDS), + directory=os.path.join(OUTDIR, "mapped") + output: os.path.join(OUTDIR, "abundance.tsv") params: - walltime = "86400", - nodes = "1", - ppn = "1" + path=os.path.join(os.path.dirname(SNAKEDIR), "src", "abundance.py"), + walltime="864000", + nodes="1", + ppn="1", resources: - mem = "4GB" + mem="16GB" threads: 1 - conda: - "envs/samtools.yaml" - log: - head = os.path.join(OUTDIR,"log/abundance/headers.log"), - o = os.path.join(OUTDIR,"log/abundance/get_headers.o"), - e = os.path.join(OUTDIR,"log/abundance/get_headers.e") - - shell: - "samtools view -H {input}" - " | grep '^@SQ'" - " | cut -f 2,3" - " > {output} 2> {log.head} " - -# Using the headers above, compute the mask and the refhash -rule abundance_mask: - input: - os.path.join(OUTDIR,"abundances/headers.txt") - output: - os.path.join(OUTDIR,"abundances/mask_refhash.npz") - - log: - mask = os.path.join(OUTDIR,"log/abundance/mask.log"), - o = os.path.join(OUTDIR,"log/abundance/mask.o"), - e = os.path.join(OUTDIR,"log/abundance/mask.e") - params: - path = os.path.join(SNAKEDIR, "src", "abundances_mask.py"), - walltime = "86400", - nodes = "1", - ppn = "4" - resources: - mem = "1GB" - threads: - 4 - conda: - "avamb" - - shell: - """ - python {params.path} --h {input} --msk {output} --minsize {MIN_CONTIG_SIZE} 2> {log.mask} - """ - - -# For every sample, compute the abundances given the mask and refhash above -rule bam_abundance: - input: - bampath=os.path.join(OUTDIR,"mapped/{sample}.sort.bam"), - mask_refhash=os.path.join(OUTDIR,"abundances/mask_refhash.npz") - output: - os.path.join(OUTDIR,"abundances/{sample}.npz") - params: - path = os.path.join(SNAKEDIR, "src", "write_abundances.py"), - walltime = "86400", - nodes = "1", - ppn = "4" - resources: - mem = "1GB" - threads: - 4 - conda: - "avamb" log: - bam = os.path.join(OUTDIR,"log/abundance/bam_abundance_{sample}.log"), - o = os.path.join(OUTDIR,"log/abundance/{sample}.bam_abundance.o"), - e = os.path.join(OUTDIR,"log/abundance/{sample}.bam_abundance.e") - - shell: - """ - python {params.path} --msk {input.mask_refhash} --b {input.bampath} --min_id {MIN_IDENTITY} --out {output} 2> {log.bam} - """ - -# Merge the abundances to a single Abundance object and save it -rule create_abundances: - input: - npzpaths=expand(os.path.join(OUTDIR,"abundances","{sample}.npz"), sample=IDS), - mask_refhash=os.path.join(OUTDIR,"abundances","mask_refhash.npz") - output: - os.path.join(OUTDIR,"abundance.npz") - params: - path = os.path.join(SNAKEDIR, "src", "create_abundances.py"), - abundance_dir = os.path.join(OUTDIR, "abundances"), - walltime = "86400", - nodes = "1", - ppn = "4" - resources: - mem = "1GB" - threads: - 4 + o = os.path.join(OUTDIR,"log/contigs/abundance.o"), + e = os.path.join(OUTDIR,"log/contigs/abundance.e") conda: "avamb" - log: - create_abs = os.path.join(OUTDIR,"log/abundance/create_abundances.log"), - o = os.path.join(OUTDIR,"log/abundance/create_abundances.o"), - e = os.path.join(OUTDIR,"log/abundance/create_abundances.e") - - shell: - "python {params.path} --msk {input.mask_refhash} --ab {input.npzpaths} --min_id {MIN_IDENTITY} --out {output} 2> {log.create_abs} && " - "rm -r {params.abundance_dir}" + shell: "python {params.path} {output} {input.directory}" # Generate the 3 sets of clusters and bins rule run_avamb: input: contigs=os.path.join(OUTDIR,"contigs.flt.fna.gz"), - abundance=os.path.join(OUTDIR,"abundance.npz") + abundance=os.path.join(OUTDIR,"abundance.tsv") output: outdir_avamb=directory(os.path.join(OUTDIR,"avamb")), clusters_aae_z=os.path.join(OUTDIR,"avamb/aae_z_clusters_split.tsv"), @@ -357,7 +184,7 @@ rule run_avamb: rm -f {OUTDIR}/contigs.flt.mmi rm -rf {output.outdir_avamb} {AVAMB_PRELOAD} - vamb --outdir {output.outdir_avamb} --fasta {input.contigs} -p {threads} --abundance {input.abundance} -m {MIN_CONTIG_SIZE} --minfasta {MIN_BIN_SIZE} {params.cuda} {AVAMB_PARAMS} + vamb --outdir {output.outdir_avamb} --fasta {input.contigs} -p {threads} --abundance_tsv {input.abundance} -m {MIN_CONTIG_SIZE} --minfasta {MIN_BIN_SIZE} {params.cuda} {AVAMB_PARAMS} mkdir -p {OUTDIR}/Final_bins mkdir -p {OUTDIR}/tmp/checkm2_all mkdir -p {OUTDIR}/tmp/ripped_bins diff --git a/workflow_avamb/config.json b/workflow_avamb/config.json index 20b36c82..62abf480 100644 --- a/workflow_avamb/config.json +++ b/workflow_avamb/config.json @@ -5,8 +5,8 @@ "min_contig_size": "2000", "min_bin_size": "200000", "min_identity": "0.95", - "minimap_mem": "15GB", - "minimap_ppn": "15", + "strobealign_mem": "16GB", + "strobealign_ppn": "16", "avamb_mem": "15GB", "avamb_ppn": "30", "checkm2_mem": "15GB", diff --git a/workflow_avamb/envs/minimap2.yaml b/workflow_avamb/envs/strobealign.yaml similarity index 59% rename from workflow_avamb/envs/minimap2.yaml rename to workflow_avamb/envs/strobealign.yaml index bbe81442..3bd3e5c6 100644 --- a/workflow_avamb/envs/minimap2.yaml +++ b/workflow_avamb/envs/strobealign.yaml @@ -1,6 +1,6 @@ -name: minimap2 +name: strobealign channels: - bioconda dependencies: - - minimap2 + - strobealign - samtools diff --git a/workflow_avamb/src/abundance.py b/workflow_avamb/src/abundance.py new file mode 100644 index 00000000..b0f0b0c7 --- /dev/null +++ b/workflow_avamb/src/abundance.py @@ -0,0 +1,54 @@ +import argparse +from pathlib import Path +from typing import TextIO +import numpy as np + + +def main(output: Path, input_dir: Path): + files = list(input_dir.iterdir()) + files = [i for i in files if i.suffix == ".tsv"] + names = [i.stem for i in files] + with open(output, "w") as outfile: + print("contigname", "\t".join(names), sep="\t", file=outfile) + write_files(outfile, files) + + +def write_files(outfile: TextIO, files: list[Path]): + if len(files) == 0: + return + index_of_contig: dict[str, int] = dict() + names: list[str] = [] + first_file = files.pop() + abundance: list[float] = [] + with open(first_file) as file: + for line in file: + (contigname, ab) = line.split("\t") + index_of_contig[contigname] = len(index_of_contig) + names.append(contigname) + abundance.append(float(ab)) + array = np.empty((len(abundance), len(files) + 1), dtype=np.float32) + array[:, 0] = abundance + del abundance + + for col_minus_one, filename in enumerate(files): + with open(filename) as file: + for line in file: + (contigname, ab) = line.split("\t") + index = index_of_contig[contigname] + array[index, col_minus_one + 1] = float(ab) + + del index_of_contig + assert len(names) == len(array) + for name, row in zip(names, array): + print(name, "\t".join([f"{x:.6f}" for x in row]), sep="\t", file=outfile) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("output", help="Abundance TSV output path") + parser.add_argument( + "input_dir", help="Dir with single-sample abundance TSV files from strobealign" + ) + + args = parser.parse_args() + main(Path(args.output), Path(args.input_dir)) diff --git a/workflow_avamb/src/abundances_mask.py b/workflow_avamb/src/abundances_mask.py deleted file mode 100644 index 85c7cc7f..00000000 --- a/workflow_avamb/src/abundances_mask.py +++ /dev/null @@ -1,40 +0,0 @@ -import numpy as np -import argparse -from vamb.vambtools import RefHasher -from pathlib import Path - - -def abundances_mask(headers: Path, mask_refhash: Path, min_contig_size: int): - """# Using the headers above, compute the mask and the refhash""" - - mask = [] - identifiers = [] - - with open(headers) as file: - for line in file: - # SN:S27C112075 LN:2239 - (sn, ln) = line.split("\t") - if sn[:3] != "SN:" or ln[:3] != "LN:": - raise ValueError("Unknown format") - passed = int(ln[3:]) >= min_contig_size - mask.append(passed) - if passed: - identifiers.append(sn[3:]) - - np.savez_compressed( - mask_refhash, - mask=np.array(mask, dtype=bool), - refhash=RefHasher.hash_refnames(identifiers), - ) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--h", type=Path, help=" Headers file") - parser.add_argument("--msk", type=Path, help="mask refhash") - - parser.add_argument("--minsize", type=int, help="min contig size") - - opt = parser.parse_args() - - abundances_mask(opt.h, opt.msk, opt.minsize) diff --git a/workflow_avamb/src/create_abundances.py b/workflow_avamb/src/create_abundances.py deleted file mode 100644 index d04a3c1b..00000000 --- a/workflow_avamb/src/create_abundances.py +++ /dev/null @@ -1,36 +0,0 @@ -import numpy as np -import argparse -import vamb -from pathlib import Path - - -def create_abundances( - abundances: list[Path], mask_refhash: Path, min_id: float, outfile: Path -): - """Merge the abundances to a single Abundance object and save it""" - refhash = np.load(mask_refhash)["refhash"] - - n_samples = len(abundances) - first = vamb.vambtools.read_npz(abundances[0]) - print(len(first), n_samples) - print(first.shape) - matrix = np.empty((len(first), n_samples), dtype=np.float32) - matrix[:, 0] = first - for i, path in enumerate(abundances[1:]): - matrix[:, i + 1] = vamb.vambtools.read_npz(path) - abundance = vamb.parsebam.Abundance( - matrix, [str(i) for i in abundances], min_id, refhash - ) - abundance.save(outfile) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--msk", type=Path, help="mask refhash") - parser.add_argument("--ab", type=Path, nargs="+", help=" abundancaes list of files") - parser.add_argument("--min_id", type=float, help="min identity for alignment") - parser.add_argument("--out", type=Path, help="abundances outfile") - - opt = parser.parse_args() - - create_abundances(opt.ab, opt.msk, opt.min_id, opt.out) diff --git a/workflow_avamb/src/write_abundances.py b/workflow_avamb/src/write_abundances.py deleted file mode 100644 index 5ab33606..00000000 --- a/workflow_avamb/src/write_abundances.py +++ /dev/null @@ -1,34 +0,0 @@ -import numpy as np -import argparse -import vamb -from pathlib import Path - - -def write_abundances( - mask_refhash: Path, bampath: Path, min_identity: float, outfile: Path -): - """For every sample, compute the abundances given the mask and refhashes""" - loadnpz = np.load(mask_refhash) - refhash = loadnpz["refhash"] - mask = loadnpz["mask"] - refhash = refhash.reshape(1)[0] - (abundance, _) = vamb.parsebam.Abundance.run_pycoverm( - paths=[bampath], - minid=min_identity, - target_refhash=refhash, - target_identifiers=None, - mask=mask, - ) - vamb.vambtools.write_npz(outfile, abundance.ravel()) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--msk", type=Path, help="mask refhash") - parser.add_argument("--b", type=Path, help=" bam path") - parser.add_argument("--min_id", type=float, help="min identity for alignment") - parser.add_argument("--out", type=Path, help="abundances outfile") - - opt = parser.parse_args() - - write_abundances(opt.msk, opt.b, opt.min_id, opt.out)