Skip to content

Commit

Permalink
update subworkflow taxid_bam_fasta
Browse files Browse the repository at this point in the history
  • Loading branch information
LilyAnderssonLee committed Nov 25, 2024
1 parent 4fb5fb7 commit 4c84e3e
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 82 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ Initial release of genomic-medicine-sweden/meta-val, created with the [nf-core](
- Extract Kraken2 reads with KrakenTools
- Extract Centrifuge reads
- Extract DIAMOND reads
- Screen pathogens via mapping against genomes from a list of pethogens: Bowtie2 for short reads and Minimap2 for long reads
- Screen pathogens via mapping against genomes from a list of pathogens: Bowtie2 for short reads and Minimap2 for long reads
- Call consensus sequences for reads mapped to the pathogen genomes.

### `Fixed`

Expand Down
File renamed without changes.
6 changes: 3 additions & 3 deletions conf/modules.config
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ process {
]
}

withName: 'SUBSET_BAM' {
withName: '.*TAXID_BAM_FASTA_SHORTREAD:SUBSET_BAM_PASS' {
ext.prefix = { "${meta.id}_${meta.taxid}"}
ext.args = '-bh'
publishDir = [
Expand All @@ -180,7 +180,7 @@ process {
]
}

withName: '.*TAXID_BAM_FASTA_SHORTREAD:SAMTOOLS_SORT' {
withName: '.*TAXID_BAM_FASTA_SHORTREAD:SAMTOOLS_SORT_PASS' {
ext.prefix = { "${meta.id}_${meta.taxid}_sorted"}
publishDir = [
path: { "$params.outdir/pathogens/taxid_bam/" },
Expand Down Expand Up @@ -214,7 +214,7 @@ process {
]
}

withName: '.*TAXID_BAM_FASTA_LONGREAD:SAMTOOLS_SORT' {
withName: '.*TAXID_BAM_FASTA_LONGREAD:SAMTOOLS_SORT_PASS' {
ext.prefix = { "${meta.id}_${meta.taxid}_sorted"}
publishDir = [
path: { "$params.outdir/pathogens/taxid_bam/" },
Expand Down
8 changes: 4 additions & 4 deletions conf/test_screenpathogen.config
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ params {
config_profile_description = 'Minimal test dataset to check pipeline function'

// Input data
input = '../final_test_data/samplesheet_v3.csv'
pathogens_genomes = 'assets/test_data/reference/reference.fna'
input = '../final_test_data/samplesheet_v3_origin.csv'
pathogens_genomes = 'assets/test_data/reference/reference.fasta'
accession2taxid = 'assets/test_data/reference/accession2taxid.map'

// Extract reads
Expand All @@ -42,8 +42,8 @@ params {

// Screen pathogens
perform_screen_pathogens = true
perform_longread_consensus = true
perform_shortread_consensus = true
perform_longread_consensus = false
perform_shortread_consensus = false
//longread_consensus_tool = 'medaka'
longread_consensus_tool = 'samtools'

Expand Down
2 changes: 1 addition & 1 deletion nextflow_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
"type": "string",
"enum": ["medaka", "samtools"],
"description": "Specify the tool used for consensus calling of long reads.",
"help_text": "`medaka` is a tool designed for generating consensus sequences from nanopore sequencing data, and it uses a neural network to correct sequence errors, improving the accuracy of consensus sequences. However `samtools consensus` does not have a specific algorithm tailored for nanopore reads..",
"help_text": "`medaka` is a tool designed for generating consensus sequences from nanopore sequencing data, and it uses a neural network to correct sequence errors, improving the accuracy of consensus sequences. However `samtools consensus` does not have a specific algorithm tailored for nanopore reads.",
"fa_icon": "fas fa-tools"
},
"perform_shortread_consensus": {
Expand Down
6 changes: 5 additions & 1 deletion subworkflows/local/longread_consensus.nf
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@ workflow LONGREAD_CONSENSUS {
main:
ch_versions = Channel.empty()

// Choose consensus calling tool based on params.longread_consensus_tool
if ( params.longread_consensus_tool == 'medaka' ) {
// Prepare input for MEDAKA by combining BAM and reference channels
input_medaka = bam.combine(reference).map{ meta_bam, bam, meta_ref, reference -> [ meta_bam, bam, reference ]}

MEDAKA ( input_medaka )
ch_consensus_gz = MEDAKA.out.assembly
ch_versions = ch_versions.mix(MEDAKA.out.versions)

// Unzip MEDAKA output
GUNZIP ( ch_consensus_gz )
ch_consensus = GUNZIP.out.gunzip
ch_versions = ch_versions.mix(GUNZIP.out.versions)
} else if ( params.longread_consensus_tool == 'samtools' ) {
// Combine bam and reference channels
// Run SAMTOOLS_CONSENSUS for consensus calling
SAMTOOLS_CONSENSUS ( bam )
ch_consensus = SAMTOOLS_CONSENSUS.out.fasta
ch_versions = ch_versions.mix(SAMTOOLS_CONSENSUS.out.versions)
Expand Down
145 changes: 75 additions & 70 deletions subworkflows/local/taxid_bam_fasta.nf
Original file line number Diff line number Diff line change
@@ -1,110 +1,115 @@
//
// Prepare an indivudual BAM/FASTA file for each pathogen with mapped reads
// Prepare an individual BAM/FASTA file for each pathogen with mapped reads
//

include { SUBSET_BAM } from '../../modules/local/subset_bam'
include { SAMTOOLS_SORT } from '../../modules/nf-core/samtools/sort/main'
include { SUBSET_BAM as SUBSET_BAM_PASS } from '../../modules/local/subset_bam'
include { SUBSET_BAM as SUBSET_BAM_FAIL } from '../../modules/local/subset_bam'
include { SAMTOOLS_SORT as SAMTOOLS_SORT_PASS } from '../../modules/nf-core/samtools/sort/main'
include { SAMTOOLS_SORT as SAMTOOLS_SORT_FAIL } from '../../modules/nf-core/samtools/sort/main'
include { SAMTOOLS_INDEX } from '../../modules/nf-core/samtools/index/main'
include { SAMTOOLS_IDXSTATS } from '../../modules/nf-core/samtools/idxstats/main'
include { SAMTOOLS_IDXSTATS as SAMTOOLS_IDXSTATS_FILTER } from '../../modules/nf-core/samtools/idxstats/main'
include { SAMTOOLS_FASTA } from '../../modules/nf-core/samtools/fasta/main'
include { GUNZIP as GUNZIP_SINGLE } from '../../modules/nf-core/gunzip/main'
include { GUNZIP as GUNZIP_PAIRED } from '../../modules/nf-core/gunzip/main'
include { GUNZIP } from '../../modules/nf-core/gunzip/main'

workflow TAXID_BAM_FASTA {
take:
bam
bai
accession2taxid
min_read_counts
bam // Channel: [ val(meta), path(bam) ]
bai // Channel: [ val(meta), path(bai) ]
accession2taxid // Channel: path(accession2taxid)
min_read_counts // Value: minimum number of reads to keep a BAM file

main:
ch_versions = Channel.empty()
ch_multiqc_files = Channel.empty()

input_bam = bam.combine( bai,by: 0 )
SAMTOOLS_IDXSTATS( input_bam )
// Combine BAM and BAI files
input_bam = bam.combine(bai, by: 0)

// Get idxstats for input BAM
SAMTOOLS_IDXSTATS(input_bam)
ch_accession = SAMTOOLS_IDXSTATS.out.idxstats
.map { it[1] }
.splitCsv( header: false,sep:"\t" )
.filter { it -> it[0]!= "*" }
.splitCsv(header: false, sep: "\t")
.filter { it -> it[0] != "*" } // Remove the last row

ch_versions.mix( SAMTOOLS_IDXSTATS.out.versions.first() )
ch_versions.mix(SAMTOOLS_IDXSTATS.out.versions.first())

// Load accession2taxid.map
ch_accession2taxidmap = accession2taxid.splitCsv( header: false,sep:"\t" )
// Load accession2taxid map
ch_accession2taxidmap = accession2taxid.splitCsv(header: false, sep: "\t")

// Join accessions with taxids and group by taxid
ch_accession_taxid = ch_accession2taxidmap
.join( ch_accession )
.filter { it -> it[3] != "0" }
.map { [ it[0], it[1] ] }
.groupTuple( by: 1 )
.join(ch_accession)
.map { it ->
def num_reads = it[3].toInteger()
[it[0], it[1], num_reads]
}
.filter { accessions, taxid, num_reads -> num_reads > 0 }
.groupTuple(by: 1)
.map { accession_list, taxid, num_reads ->
def total_reads = num_reads.sum()
[accession_list, taxid, total_reads]
}
.branch {
pass: it[2] >= min_read_counts // The number of mapped reads to a taxID greater than params.min_read_counts
return [it[0], it[1]]
fail: it[2] < min_read_counts // The number of mapped reads to a taxID smaller than params.min_read_counts
return [it[0], it[1]]
}

ch_samtools_view = ch_accession_taxid
// Prepare individual BAM files for each taxID with the number of mapped reads greater than params.min_read_counts
ch_consensus_input = ch_accession_taxid.pass
.combine(input_bam)
.map {accession_list, taxid, meta, bam, bam_index ->
.map { accession_list, taxid, meta, bam, bam_index ->
def new_meta = meta.clone()
new_meta.taxid = taxid
return [ new_meta, bam, bam_index, accession_list ]
return [new_meta, bam, bam_index, accession_list]
}
.multiMap {
meta, bam, bam_index, accession_list ->
bam: [meta, bam, bam_index]
bam: [meta, bam, bam_index]
accession: accession_list.flatten()
}

// Individual BAM file for each taxID
SUBSET_BAM ( ch_samtools_view.bam, ch_samtools_view.accession )
ch_versions = ch_versions.mix( SUBSET_BAM.out.versions.first() )

SAMTOOLS_SORT ( SUBSET_BAM.out.bam, [[],[]] )
ch_versions = ch_versions.mix(SAMTOOLS_SORT.out.versions.first())
// BAM files will be used to call consensus sequences
SUBSET_BAM_PASS(ch_consensus_input.bam, ch_consensus_input.accession)
ch_versions = ch_versions.mix(SUBSET_BAM_PASS.out.versions.first())
SAMTOOLS_SORT_PASS(SUBSET_BAM_PASS.out.bam, [[],[]])
ch_versions = ch_versions.mix(SAMTOOLS_SORT_PASS.out.versions.first())
SAMTOOLS_INDEX(SAMTOOLS_SORT_PASS.out.bam)
ch_versions = ch_versions.mix(SAMTOOLS_INDEX.out.versions.first())

SAMTOOLS_INDEX ( SAMTOOLS_SORT.out.bam )
ch_versions = ch_versions.mix( SAMTOOLS_INDEX.out.versions.first() )


// Count reads in each BAM file
SAMTOOLS_IDXSTATS_FILTER ( SAMTOOLS_SORT.out.bam.join(SAMTOOLS_INDEX.out.bai) )
ch_versions = ch_versions.mix( SAMTOOLS_IDXSTATS_FILTER.out.versions.first() )
// Process IDXSTATS output to get total read count
ch_bam_counts = SAMTOOLS_IDXSTATS_FILTER.out.idxstats
.map { meta, stats_file ->
def total_reads = 0
stats_file.eachLine { line ->
def fields = line.split('\t')
total_reads += fields[2].toInteger() + fields[3].toInteger()
}
[ meta, total_reads ]
// Prepare individual FASTA files for each taxID with the number of mapped reads less than params.min_read_counts
ch_blast_input = ch_accession_taxid.fail
.combine(input_bam)
.map { accession_list, taxid, meta, bam, bam_index ->
def new_meta = meta.clone()
new_meta.taxid = taxid
return [new_meta, bam, bam_index, accession_list]
}
.multiMap {
meta, bam, bam_index, accession_list ->
bam: [meta, bam, bam_index]
accession: accession_list.flatten()
}
// Filter BAM files based on read count: only call
ch_filtered_bam = SAMTOOLS_SORT.out.bam
.join(ch_bam_counts)
.filter { meta, bam, count -> count >= min_read_counts }
.map { meta, bam, count -> [meta, bam] }

// Convert filtered BAM files to FASTA format
SAMTOOLS_FASTA ( SAMTOOLS_SORT.out.bam, false )
ch_versions = ch_versions.mix( SAMTOOLS_FASTA.out.versions.first() )

// Separate single-end and paired-end FASTA files
ch_fasta_single = SAMTOOLS_FASTA.out.fasta.filter { meta, fasta -> meta.single_end }
ch_fasta_paired = SAMTOOLS_FASTA.out.fasta.filter { meta, fasta -> !meta.single_end }

// Unzip single-end fasta files
GUNZIP_SINGLE ( ch_fasta_single )
ch_versions = ch_versions.mix( GUNZIP_SINGLE.out.versions )
// FASTA files will be used as BLAST input
SUBSET_BAM_FAIL(ch_blast_input.bam, ch_blast_input.accession)
ch_versions = ch_versions.mix(SUBSET_BAM_FAIL.out.versions.first())
SAMTOOLS_SORT_FAIL(SUBSET_BAM_FAIL.out.bam, [[],[]])
ch_versions = ch_versions.mix(SAMTOOLS_SORT_FAIL.out.versions.first())

// Unzip paired-end fasta files
GUNZIP_PAIRED ( ch_fasta_paired.transpose() )
ch_versions = ch_versions.mix( GUNZIP_PAIRED.out.versions )
SAMTOOLS_FASTA(SAMTOOLS_SORT_FAIL.out.bam, false)
ch_versions = ch_versions.mix(SAMTOOLS_FASTA.out.versions.first())

ch_unzipped_fasta = GUNZIP_SINGLE.out.gunzip.mix(GUNZIP_PAIRED.out.gunzip.groupTuple())
// Unzip FASTA files
GUNZIP(SAMTOOLS_FASTA.out.fasta.transpose())
ch_unzipped_fasta = GUNZIP.out.gunzip.groupTuple()
ch_versions = ch_versions.mix(GUNZIP.out.versions)

emit:
versions = ch_versions
taxid_bam = ch_filtered_bam
taxid_bam = SAMTOOLS_SORT_PASS.out.bam
taxid_bai = SAMTOOLS_INDEX.out.bai
taxid_fasta = ch_unzipped_fasta
taxid_accession = ch_samtools_view.accession

}
7 changes: 5 additions & 2 deletions workflows/metaval.nf
Original file line number Diff line number Diff line change
Expand Up @@ -170,21 +170,24 @@ workflow METAVAL {
BOWTIE2_BUILD_PATHOGEN.out.index, // ch_index
false, // save unaligned
false, // sort bam
ch_reference // ch_fasta
ch_reference // ch_fasta
)


ch_versions = ch_versions.mix( FASTQ_ALIGN_BOWTIE2.out.versions )
// Map long reads to the pathogens genome
LONGREAD_SCREENPATHOGEN ( ch_input.long_reads, ch_reference )
ch_versions = ch_versions.mix( LONGREAD_SCREENPATHOGEN.out.versions )

// Subset bam file for each taxID
// Subset BAM file for each taxID
accession2taxid_map = Channel.fromPath ( params.accession2taxid, checkIfExists: true )

TAXID_BAM_FASTA_SHORTREAD ( FASTQ_ALIGN_BOWTIE2.out.bam, FASTQ_ALIGN_BOWTIE2.out.bai, accession2taxid_map, params.min_read_counts )
ch_versions = ch_versions.mix( TAXID_BAM_FASTA_SHORTREAD.out.versions )

TAXID_BAM_FASTA_LONGREAD( LONGREAD_SCREENPATHOGEN.out.bam, LONGREAD_SCREENPATHOGEN.out.bai, accession2taxid_map, params.min_read_counts )
ch_versions = ch_versions.mix( TAXID_BAM_FASTA_LONGREAD.out.versions )

// Calling consensus: BAM file with the number of mapped reads > params.min_read_counts
if (params.perform_shortread_consensus) {
SHORTREAD_SAMTOOLS_CONSENSUS ( TAXID_BAM_FASTA_SHORTREAD.out.taxid_bam )
Expand Down

0 comments on commit 4c84e3e

Please sign in to comment.