From 25ec2614afcd9a73c2986af0829942c4ab5a8937 Mon Sep 17 00:00:00 2001 From: DOH-JDJ0303 Date: Fri, 3 May 2024 15:11:24 -0700 Subject: [PATCH] consensus assemblies are now condensed if closer than the max dist --- bin/cluster.R | 2 - bin/condense.R | 78 ++++++++++++++++++++++++++++++++++ bin/summary.R | 43 ++++++++----------- conf/modules.config | 46 ++++++++++++++++++-- modules/local/bind-clusters.nf | 2 +- modules/local/condense.nf | 43 +++++++++++++++++++ modules/local/consensus.nf | 5 +-- modules/local/mash.nf | 3 +- modules/local/summary.nf | 7 +-- workflows/epitome.nf | 78 ++++++++++++++++++++++++---------- 10 files changed, 244 insertions(+), 63 deletions(-) create mode 100755 bin/condense.R create mode 100644 modules/local/condense.nf diff --git a/bin/cluster.R b/bin/cluster.R index c7a26e2..7709962 100755 --- a/bin/cluster.R +++ b/bin/cluster.R @@ -24,8 +24,6 @@ if(dist_path == "version"){ library(tidyverse) library(ggtree) library(ape) -install.packages("bigmemory") -library(bigmemory) # set output file name file.name <- paste(taxa_name,segment_name,iteration,sep="-") diff --git a/bin/condense.R b/bin/condense.R new file mode 100755 index 0000000..6ff80a9 --- /dev/null +++ b/bin/condense.R @@ -0,0 +1,78 @@ +#!/usr/bin/env Rscript +version <- "1.0" + +# condense.R +# Author: Jared Johnson, jared.johnson@doh.wa.gov + +#---- ARGUMENTS ----# +args <- commandArgs(trailingOnly = T) +dist_path <- args[1] +lengths_path <- args[2] +clusters_path <- args[3] +taxa_name <- args[4] +segment_name <- args[5] +threshold <- args[6] + +#---- VERSION ----# +if(dist_path == "version"){ + cat(version, sep = "\n") + quit(status=0) +} + +#---- LIBRARIES ----# +library(tidyverse) +library(ggtree) +library(ape) + +file.base <- paste(taxa_name,segment_name,sep="-") + +#---- LOAD CLUSTER SET & GET COUNT ----# +clusters.df <- read_csv(clusters_path) %>% + mutate(seq = paste(taxa,segment,cluster,sep = "-")) %>% + group_by(seq,taxa,segment) %>% + count() + +#---- LOAD SEQ LENGTHS ----# +len.df <- read_csv(lengths_path, col_names = c("seq","length")) + +#---- LOAD PAIRWISE DISTANCES ----# +dist.df <- read_tsv(dist_path, col_names = c("ID1","ID2","DIST", "PVAL", "HASH")) %>% + select(ID1, ID2, DIST) +dist.mat <- dist.df %>% + pivot_wider(names_from="ID2", values_from="DIST") %>% + column_to_rownames(var="ID1") %>% + as.matrix() %>% + as.dist() + +#---- HIERARCHICAL CLUSTER ----# +# create tree +tree <- hclust(dist.mat, method = "complete") %>% + as.phylo() +# test that tree is ultrametric and rooted (required for cutree) +if(! is.ultrametric(tree)){ + cat("Error: Tree is not ultrameric!") + q() +} +if(! is.rooted(tree)){ + cat("Error: Tree is not rooted!") + q() +} + +#---- CUT DENDROGRAM AT DISTANCE THRESHOLD ----# +clusters.refs.df <- cutree(as.hclust(tree), h = as.numeric(threshold)) %>% + data.frame() %>% + rownames_to_column(var = "seq") %>% + rename(cluster = 2) %>% + left_join(clusters.df, by = "seq") %>% + left_join(len.df, by = "seq") %>% + group_by(cluster) %>% + mutate(n2 = n()) %>% + filter(!(n < 10 & n2 > 1)) %>% + filter(length == max(length)) %>% + ungroup() %>% + select(seq,taxa,segment,cluster,n,n2,length) + +write.csv(x= clusters.refs.df, file = paste0(file.base,".condensed.csv"), quote = F, row.names = F) + + + diff --git a/bin/summary.R b/bin/summary.R index 38ec49e..98c989b 100755 --- a/bin/summary.R +++ b/bin/summary.R @@ -6,14 +6,13 @@ version <- "1.0" #---- ARGUMENTS ----# args <- commandArgs(trailingOnly = T) -clusters_file <- args[1] -lengths_file <- args[2] -fastani_ava_file <- args[3] -fastani_seeds_file <- args[4] -seeds_file <- args[5] +summary_file <- args[1] +fastani_ava_file <- args[2] +fastani_seeds_file <- args[3] +seeds_file <- args[4] #---- VERSION ----# -if(clusters_file == "version"){ +if(summary_file == "version"){ cat(version, sep = "\n") quit(status=0) } @@ -29,17 +28,12 @@ basename_fa <- function(path){ } -#---- LOAD & JOIN CLUSTER & LENGTH DATA ---- # -clusters <- read.csv(clusters_file, header = F, col.names = c("seq","taxa","segment","cluster")) %>% - group_by(taxa, segment, cluster) %>% - count() %>% - ungroup() %>% - mutate(seq = paste(taxa,segment,cluster,sep="-")) %>% - select(seq, taxa, segment, cluster, n) - -lengths <- read.csv(lengths_file, header = F, col.names = c("seq","length")) - -cluster_lengths <- full_join(clusters, lengths, by = "seq") +#---- LOAD SUMMARIES ---- # +clusters <- read.csv(summary_file, header = F, col.names = c("seq","taxa","segment","cluster","n","n2","length")) %>% + mutate(condensed = case_when(n2 > 1 ~ TRUE, + n2 == 1 ~ FALSE) + ) %>% + select(-n2) #---- LOAD FASTANI AVA RESULTS & ADD MISSING ----# fastani_ava <- read_tsv(fastani_ava_file, col_names = c("query","ref","ani","mapped","total")) %>% @@ -47,15 +41,15 @@ fastani_ava <- read_tsv(fastani_ava_file, col_names = c("query","ref","ani","map mutate(query = basename_fa(query), ref = basename_fa(ref)) # get list of pairwise comparisons, add back to fastani_ava, filter by taxa & segment, then calculate ID -q_filt <- cluster_lengths %>% +q_filt <- clusters %>% select(seq, taxa, segment, length) %>% rename(query = seq) -r_filt <- cluster_lengths %>% +r_filt <- clusters %>% mutate(ref = seq, staxa=taxa, sseg=segment) %>% select(ref, staxa, sseg) -fastani_ava <- cluster_lengths %>% +fastani_ava <- clusters %>% select(seq) %>% mutate(query=seq, tmp='this works') %>% @@ -87,7 +81,7 @@ plot_matrix <- function(ts){ }else{ dims <- 5 } - if(n>100){ + if(n<100){ ggsave(p, filename = paste0(ts,".jpg"), dpi = 300, height = dims, width = dims, limitsize = FALSE) }else(cat("Matrix image not saved. Too many references!\n")) @@ -112,9 +106,8 @@ fastani_ava <- fastani_ava %>% ## WITHOUT SEEDS clusters %>% full_join(fastani_ava, by = "seq") %>% - full_join(lengths, by = "seq") %>% - select(seq,taxa,segment,cluster,n,length,min_ani,max_ani) %>% - write.csv(file = "summary.csv", quote = F, row.names = F) + select(seq,taxa,segment,cluster,n,condensed,length,min_ani,max_ani) %>% + write.csv(file = paste0(format(Sys.Date(), "%s"),"-epitome.csv"), quote = F, row.names = F) ## WITH SEEDS if(file.exists(fastani_seeds_file) & file.exists(seeds_file)){ @@ -136,5 +129,5 @@ if(file.exists(fastani_seeds_file) & file.exists(seeds_file)){ full_join(clusters, by = "seq") %>% left_join(seeds, by = "ref") %>% select(seq,taxa,segment,cluster,n,length,min_ani,max_ani, seed, seed_ani) %>% - write.csv(file = "summary.csv", quote = F, row.names = F) + write.csv(file = paste0(format(Sys.Date(), "%s"),"-epitome.csv"), quote = F, row.names = F) } \ No newline at end of file diff --git a/conf/modules.config b/conf/modules.config index 341fb8f..1a16721 100644 --- a/conf/modules.config +++ b/conf/modules.config @@ -24,7 +24,25 @@ process { pattern: "*.csv" ] } - withName: MASH { + withName: MASH_TOP { + publishDir = [ + path: { "${params.outdir}/${taxa}/${segment}/clusters" }, + pattern: "*.txt.gz" + ] + } + withName: MASH_REMAINDER { + publishDir = [ + path: { "${params.outdir}/${taxa}/${segment}/clusters" }, + pattern: "*" + ] + } + withName: MASH_LOOSEENDS { + publishDir = [ + path: { "${params.outdir}/${taxa}/${segment}/clusters" }, + pattern: "*.txt.gz" + ] + } + withName: MASH_CONDENSE { publishDir = [ path: { "${params.outdir}/${taxa}/${segment}/clusters" }, pattern: "*.txt.gz" @@ -35,14 +53,30 @@ process { path: { "${params.outdir}/${taxa}/${segment}/clusters" } ] } - withName: CLUSTER_LARGE { + withName: CLUSTER_LOOSEENDS { publishDir = [ path: { "${params.outdir}/${taxa}/${segment}/clusters" } ] } + withName: ASSIGN_REMAINDER { + publishDir = [ + path: { "${params.outdir}/${taxa}/${segment}/clusters" } + ] + } + withName: BIND_CLUSTERS { + publishDir = [ + path: { "${params.outdir}/${taxa}/${segment}/clusters" } + ] + } withName: SEQTK_SUBSEQ { publishDir = [ - path: { "${params.outdir}/${taxa}/${segment}/clusters" }, + path: { "${params.outdir}/" }, + pattern: "none" + ] + } + withName: SEQTK_LOOSEENDS { + publishDir = [ + path: { "${params.outdir}/" }, pattern: "none" ] } @@ -53,6 +87,12 @@ process { ] } withName: CONSENSUS { + publishDir = [ + path: { "${params.outdir}/${taxa}/${segment}/consensus" }, + pattern: "none" + ] + } + withName: CONDENSE { publishDir = [ path: { "${params.outdir}/${taxa}/${segment}/consensus" }, pattern: "*.fa" diff --git a/modules/local/bind-clusters.nf b/modules/local/bind-clusters.nf index eabae05..66ae1cd 100644 --- a/modules/local/bind-clusters.nf +++ b/modules/local/bind-clusters.nf @@ -7,7 +7,7 @@ process BIND_CLUSTERS { tuple val(taxa), val(segment), path(clusters) output: - path "*.csv", emit: results + tuple val(taxa), val(segment), path("*.csv"), emit: results when: diff --git a/modules/local/condense.nf b/modules/local/condense.nf new file mode 100644 index 0000000..fa41854 --- /dev/null +++ b/modules/local/condense.nf @@ -0,0 +1,43 @@ +process CONDENSE { + tag "${prefix}" + label "process_high" + container 'docker.io/johnjare/spree:1.0' + stageInMode 'copy' + + input: + tuple val(taxa), val(segment), path(dist), path(consensus), path(clusters) + + output: + tuple val(taxa), val(segment), path("${prefix}.condensed.csv"), path("*.fa", includeInputs: true), env(min_len), emit: results + //path "versions.yml", emit: versions + + + when: + task.ext.when == null || task.ext.when + + script: + prefix = "${taxa}-${segment}" + """ + # get seq lengths + cat ${consensus} | paste - - | tr -d '>' | sed 's/.fa//g' | awk -v OFS=',' '{print \$1,length(\$2)}' > lengths.csv + + # run script + condense.R ${dist} lengths.csv ${clusters} "${taxa}" "${segment}" ${params.dist_threshold} + + # remove sequences that will not be retained + mkdir tmp + for s in \$(cat ${prefix}.condensed.csv | tail -n +2 | cut -f 1 -d ',') + do + mv \${s}.fa tmp/ + done + rm *.fa && mv tmp/*.fa ./ && rm -r tmp + + # get min seq length - for FastANI + cat ${prefix}.condensed.csv | cut -f 7 -d ',' | tail -n +2 | sort -n | sed -n 1p | awk '{print "min_len="\$1-1}' > .command.env + + cat <<-END_VERSIONS > versions.yml + "${task.process}": + cluster: \$(condense.R version) + END_VERSIONS + """ +} \ No newline at end of file diff --git a/modules/local/consensus.nf b/modules/local/consensus.nf index 1f55e3e..20f1802 100644 --- a/modules/local/consensus.nf +++ b/modules/local/consensus.nf @@ -7,9 +7,8 @@ process CONSENSUS { tuple val(taxa), val(segment), val(cluster), path(aln) output: - tuple val(taxa), val(segment), val(cluster), path("${prefix}.fa"), env(length), emit: fa - path "${prefix}_length.csv", emit: len - path "versions.yml", emit: versions + tuple val(taxa), val(segment), path("${prefix}.fa"), emit: fa + path "versions.yml", emit: versions when: task.ext.when == null || task.ext.when diff --git a/modules/local/mash.nf b/modules/local/mash.nf index d203729..55742b4 100644 --- a/modules/local/mash.nf +++ b/modules/local/mash.nf @@ -21,13 +21,14 @@ process MASH_TOP { prefix = "${taxa}-${segment}" """ + cat ${sequences} > seqs.fa # create mash sketch mash \\ sketch \\ $args \\ -p $task.cpus \\ -o sketch \\ - -i ${sequences} + -i seqs.fa # calculate distance mash \\ diff --git a/modules/local/summary.nf b/modules/local/summary.nf index 343777d..41ec458 100755 --- a/modules/local/summary.nf +++ b/modules/local/summary.nf @@ -3,8 +3,7 @@ process SUMMARY { container 'docker.io/johnjare/spree:1.0' input: - path clusters - path lengths + path summary path ani_ava path ani_seeds path seeds @@ -21,10 +20,8 @@ process SUMMARY { script: """ - # remove unwanted headers in cluster dataset - cat ${clusters} | grep -v 'seq,taxa,segment,cluster' > clusters-no-header.csv # run script - summary.R clusters-no-header.csv ${lengths} ${ani_ava} ${ani_seeds} ${seeds} + summary.R ${summary} ${ani_ava} ${ani_seeds} ${seeds} cat <<-END_VERSIONS > versions.yml "${task.process}": diff --git a/workflows/epitome.nf b/workflows/epitome.nf index 3448f46..186b7eb 100644 --- a/workflows/epitome.nf +++ b/workflows/epitome.nf @@ -31,21 +31,23 @@ ch_multiqc_custom_methods_description = params.multiqc_methods_description ? fil IMPORT LOCAL MODULES/SUBWORKFLOWS ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ -include { INPUT_QC } from '../modules/local/input-qc' -include { MASH_TOP } from '../modules/local/mash' -include { CLUSTER } from '../modules/local/cluster' -include { MASH_REMAINDER } from '../modules/local/mash' -include { ASSIGN_REMAINDER } from '../modules/local/assign-remainder' -include { SEQTK_LOOSEENDS } from '../modules/local/seqtk_subseq' -include { MASH_TOP as MASH_LOOSEENDS } from '../modules/local/mash' -include { CLUSTER as CLUSTER_LOOSEENDS } from '../modules/local/cluster' -include { BIND_CLUSTERS } from '../modules/local/bind-clusters.nf' -include { SEQTK_SUBSEQ } from '../modules/local/seqtk_subseq' -include { MAFFT } from '../modules/local/mafft' -include { CONSENSUS } from '../modules/local/consensus' -include { FASTANI_AVA } from '../modules/local/fastani' -include { FASTANI_SEEDS } from '../modules/local/fastani' -include { SUMMARY } from '../modules/local/summary' +include { INPUT_QC } from '../modules/local/input-qc' +include { MASH_TOP } from '../modules/local/mash' +include { CLUSTER } from '../modules/local/cluster' +include { MASH_REMAINDER } from '../modules/local/mash' +include { ASSIGN_REMAINDER } from '../modules/local/assign-remainder' +include { SEQTK_LOOSEENDS } from '../modules/local/seqtk_subseq' +include { MASH_TOP as MASH_LOOSEENDS } from '../modules/local/mash' +include { CLUSTER as CLUSTER_LOOSEENDS } from '../modules/local/cluster' +include { BIND_CLUSTERS } from '../modules/local/bind-clusters.nf' +include { SEQTK_SUBSEQ } from '../modules/local/seqtk_subseq' +include { MAFFT } from '../modules/local/mafft' +include { CONSENSUS } from '../modules/local/consensus' +include { MASH_TOP as MASH_CONDENSE } from '../modules/local/mash' +include { CONDENSE } from '../modules/local/condense' +include { FASTANI_AVA } from '../modules/local/fastani' +include { FASTANI_SEEDS } from '../modules/local/fastani' +include { SUMMARY } from '../modules/local/summary' // // SUBWORKFLOW: Consisting of a mix of local and nf-core/modules @@ -101,7 +103,7 @@ workflow EPITOME { /* ============================================================================================================================= - CLUSTER SEQUENCES: ROUND 1 + CLUSTER SEQUENCES: CLUSTER SUBSET ============================================================================================================================= */ // MODULE: Determine pairwise Mash distances of sequence subset (number determined by `--max_clusters`; will include all if lower than the assigned threshold.) @@ -119,7 +121,7 @@ workflow EPITOME { /* ============================================================================================================================= - CLUSTER SEQUENCES: ROUND 2 + CLUSTER SEQUENCES: ASSIGN REMAINING USING SUBSET CLASSIFIER ============================================================================================================================= */ // MODULE: Run Mash on the remainder of sequences compared to representatives of each cluster identified in round 1 @@ -136,6 +138,11 @@ workflow EPITOME { MASH_REMAINDER.out.results ) + /* + ============================================================================================================================= + CLUSTER SEQUENCES: CLUSTER LOOSE-ENDS + ============================================================================================================================= + */ SEQTK_LOOSEENDS ( ASSIGN_REMAINDER .out @@ -145,15 +152,23 @@ workflow EPITOME { .join(INPUT_QC.out.seqs.map{ taxa, segment, top, remainder, remainder_count -> [ taxa, segment, remainder ] }, by: [0,1]) ) + // MODULE: Determine pairwise Mash distances of loose-ends. MASH_LOOSEENDS ( SEQTK_LOOSEENDS.out.sequences ) + // MODULE: Cluster loose-ends with hclust & cutree CLUSTER_LOOSEENDS ( MASH_LOOSEENDS.out.dist, "looseends" ) + /* + ============================================================================================================================= + CLUSTER SEQUENCES: COMBINE ALL CLUSTERS + ============================================================================================================================= + */ + // MODULE: Combine all cluster channels. Loose-end clusters are adjusted based on largest subset cluster. BIND_CLUSTERS ( CLUSTER.out.results.concat(ASSIGN_REMAINDER.out.assigned).concat(CLUSTER_LOOSEENDS.out.results).groupTuple(by: [0,1]) ) @@ -161,6 +176,7 @@ workflow EPITOME { BIND_CLUSTERS .out .results + .map{ taxa, segment, result -> result } .splitCsv(header: true) .map{ tuple(it.taxa, it.segment, it.cluster, it.seq) } .groupTuple(by: [0,1,2]) @@ -180,7 +196,6 @@ workflow EPITOME { ALIGN SEQUENCE CLUSTERS ============================================================================================================================= */ - // MODULE: Align clustered sequences with mafft - only performed on clusters containing more than one sequence MAFFT( SEQTK_SUBSEQ @@ -211,14 +226,32 @@ workflow EPITOME { ) //ch_versions = ch_versions.mix(CONSENSUS.out.versions.first()) + CONSENSUS.out.fa.groupTuple(by: [0,1]).set{ ch_consensus } + + /* + ============================================================================================================================= + CONDENSE CONSENSUS SEQS + ============================================================================================================================= + */ + + // MODULE: Determine pairwise Mash distances of consensus sequences. + MASH_CONDENSE ( + ch_consensus + ) + + // MODULE: Condense sequences that share sequence identity below `--dist_threshold` + CONDENSE ( + MASH_CONDENSE.out.dist.join(ch_consensus, by: [0,1]).join(BIND_CLUSTERS.out.results, by: [0,1]) + ) + /* ============================================================================================================================= - GATHER DATA ON CONSENSUS SEQUENCES + GATHER DATA ON FINAL SEQUENCES ============================================================================================================================= */ // MODULE: Determine average nucleotide identity between consensus sequences FASTANI_AVA ( - CONSENSUS.out.fa.groupTuple(by: [0,1]).map{ taxa, segment, cluster, assembly, length -> [ taxa, segment, assembly, length.min() ] } + CONDENSE.out.results.map{ taxa, segment, summary, consensus, length -> [ taxa, segment, consensus, length ] } ) //ch_versions = ch_versions.mix(FASTANI_AVA.out.versions.first()) // Classify consensus sequences based on supplied seed sequences - if supplied @@ -230,7 +263,7 @@ workflow EPITOME { .set{ seeds } // MODULE: Determine average nucleotide identity between the consensus sequences and seed sequences FASTANI_SEEDS ( - CONSENSUS.out.fa.map{ taxa, segment, cluster, assembly, length -> assembly }.collect(), + CONDENSE.results.map{ taxa, segment, summary, assembly, length -> assembly }.collect(), seeds.map{ ref, assembly -> assembly }.collect() ) //ch_versions = ch_versions.mix(FASTANI_SEEDS.out.versions.first()) @@ -243,8 +276,7 @@ workflow EPITOME { */ // MODULE: Create summary SUMMARY( - BIND_CLUSTERS.out.results.splitText().collectFile(name: "all-clusters.csv"), - CONSENSUS.out.len.splitText().collectFile(name: "all-lengths.csv"), + CONDENSE.out.results.map{taxa, segment, summary, assembly, length -> summary}.splitText(keepHeader: true).collectFile(name: 'all-summaries.csv'), FASTANI_AVA.out.ani.map{ taxa, segment, ani -> ani }.splitText().collectFile(name: "all-ani.tsv"), params.seeds ? FASTANI_SEEDS.out.ani : [], params.seeds ? file(params.seeds) : []