-
Notifications
You must be signed in to change notification settings - Fork 0
Nextflow
This guide explains how to create a new step or module in the Cohesive system using Nextflow version 24.04.2.5914.
$ nextflow -v
nextflow version 24.04.2.5914
To create a new step or module in the Cohesive system, follow the instructions below.
Clone the ngsmanager repository at https://github.com/genpat-it/cohesive-ngsmanager
$ git clone https://github.com/genpat-it/cohesive-ngsmanager
The nextflow.config
file in Nextflow defines numerous global parameters that can be customized to adapt the pipeline to different environments or specific computational requirements.
For instance, the following lines define the maximum amount of memory and CPU resources available to the pipeline:
def MAX_MEMORY = System.getenv("NXF_EXECUTOR_MEMORY") ?: '200.GB'
def MAX_CPU = System.getenv("NXF_EXECUTOR_CPU") ?: '64'
In this setup:
-
MAX_MEMORY
sets a default maximum memory allocation of200 GB
, which can be overridden by setting theNXF_EXECUTOR_MEMORY
environment variable. -
MAX_CPU
sets the default maximum number of CPU cores to64
, which can similarly be overridden by setting theNXF_EXECUTOR_CPU
environment variable.
These parameters allow flexibility for scaling the pipeline to various computational resources while maintaining efficient usage.
You can change these settings also using system environment variables:
export NXF_EXECUTOR_MEMORY='200.GB'
export NXF_EXECUTOR_CPU=64
You can switch also NXF version globally as described here:
export NXF_VER=24.04.2.5914
nextflow.config
defines two profiles from scratch. You can add new profiles or override global parameters in the existing ones.
Remember: the
nextflow.config
file is always read and executed by Nextflow when a workflow is started.
Try test.nf
script:
nextflow.enable.dsl=2
process printProfileVariables {
debug true
script:
"""
echo "=== Profile Variables ==="
echo "Max memory: ${params.max_memory ?: 'undefined'}"
echo "Max CPUs: ${params.max_cpus ?: 'undefined'}"
"""
}
workflow {
printProfileVariables()
}
using this command:
nextflow run test.nf -profile cohesive
This command is launched by Jenkins through the process-nextflow
job and represents the execution of an NGS analysis pipeline. The configuration files meta.config
and jenkins.config
are generated within the Jenkins job and contain information about nodes and other system configurations.
Important:
- Jenkins considers the
WORK_FOLDER
and theSAMPLES_FOLDER
set in the.env
file for samples, logs, reports and outputs.
nextflow [options] run [workflow-options] <workflow-file> [parameters]
-
-q
: Quiet mode, reduces verbose output -
-log [file]
: Specifies the log file for execution- Example:
${WORK_FOLDER}/[timestamp]/pipeline_info/nxf-0.log
- Example:
-
-c meta.config
: Contains execution metadata (node, build tag, commit ID, etc.) -
-c jenkins.config
: Contains Jenkins-specific configurations -
-params-file [file]
: JSON file containing additional parameters- Example:
${WORK_FOLDER}/[timestamp]/params.json
- Example:
-
-work-dir
: Directory for temporary work files- Example:
${WORK_FOLDER}/[timestamp]/work/0
- Example:
-
--tracedir
: Directory for execution trace files- Example:
${WORK_FOLDER}/[timestamp]/pipeline_info
- Example:
-
--report_dir
: Directory for reports- Example:
${WORK_FOLDER}/[timestamp]/pipeline_info
- Example:
-
--timeline_dir
: Directory for timeline files- Example:
${WORK_FOLDER}/[timestamp]/pipeline_info
- Example:
-
--outdir
: Directory for final results- Example:
${WORK_FOLDER}/[timestamp]/results
- Example:
The file containing the workflow to execute:
${WORK_FOLDER}/[timestamp]/ngsmanager/steps/step_1PP_trimming__trimmomatic.nf
-
--cmp
: Sample ID (e.g.,2024.EXT.0726.1343.110
) -
--riscd
: Dataset ID (e.g.,240726-020240726134311352-0SQ_rawreads-external
) -
--genus_species
: Target species (e.g.,Brucella_abortus
) -
--analysis_date
: Analysis date (e.g.,241111103036919
)
-
-with-weblog
: Enables web logging -
--report_prefix
: Prefix for report files (e.g.,2024.EXT.0726.1343.110_
) -
--report_suffix false
: Disables report suffix -
--monochrome_logs
: Colorless log output
-
--max_memory
: Maximum allocatable memory (e.g.,123089.MB
) -
--max_cpus
: Maximum number of usable CPUs (e.g.,192
)
${WORK_FOLDER}/[timestamp]/
├── pipeline_info/ # Directory for logs and reports
│ ├── nxf-0.log # Main log file
│ └── ... # Other report files
├── work/ # Temporary work directory
│ └── 0/ # Subdirectory for current execution
├── results/ # Directory for final results
└── params.json # Parameters file
- The directory structure follows a precise temporal pattern
- Configuration files (
meta.config
andjenkins.config
) are dynamically generated - Resources (memory and CPU) are calculated based on the execution node
- Reports and logs are organized in separate directories for better management
nextflow -q run workflow.nf --cmp SAMPLE_ID --riscd DATASET_ID
nextflow run workflow.nf --max_memory '64.GB' --max_cpus 16
nextflow run workflow.nf --inputdir /custom/input --outdir /custom/output --work-dir /custom/work
nextflow -q -log pipeline_info/run.log run \
-c meta.config \
-c jenkins.config \
-params-file params.json \
-work-dir work/current \
--tracedir pipeline_info \
--outdir results \
workflow.nf \
--cmp SAMPLE_ID \
--riscd DATASET_ID \
--genus_species SPECIES \
-with-weblog \
--max_memory '64.GB' \
--max_cpus 16
Requirements:
- There's an uploaded Sample in the Cohesive platform
Steps:
- create a new profile
development.config
in theprofiles
folder and copy the content of thejenkins.config
- Grab the Id of the uploaded Sample in Jenkins (example:
2024.EXT.0625.0943.26105
, using the fieldCode
in the Samples page) - Grab the riscd (example:
240625-10520240625094326613-2AS_import-external
, using the fieldCode
in the Analysis Results page filtering by the Id of the sample)
Remember: In Cohesive, any
riscd
ending with-external
indicates the upload step.
Tip: The riscd parameter generally serves as the input for the analysis. Initially, this is often a FASTA or FASTQ file, so you’ll typically use the upload or import
riscd
for the input.
Execute the step with the following command:
nextflow -q -c profiles/development.config run steps/step_4TY_lineage__westnile.nf --cmp 2024.EXT.0625.0943.26105 --riscd 240625-10520240625094326613-2AS_import-external
The output should be:
[~/cohesive-ngsmanager]$ nextflow -c profiles/development.config run steps/step_4TY_lineage__westnile.nf --cmp 2024.EXT.0625.0943.26105 --riscd 240625-10520240625094326613-2AS_import-external
N E X T F L O W ~ version 24.04.2
Launching `steps/step_4TY_lineage__westnile.nf` [suspicious_carlsson] DSL2 - revision: 1d56183756
executor > local (19)
[cb/d89298] ste…le:snippy (2024.EXT.0625.0943.26105/DS10520240625094326613/DT240625) [100%] 9 of 9 ✔
[9a/fa2c6d] ste…ols_depth (2024.EXT.0625.0943.26105/DS10520240625094326613/DT241112) [100%] 9 of 9 ✔
[cb/3aece4] ste…selection (2024.EXT.0625.0943.26105/DS10520240625094326613/DT241112) [100%] 1 of 1 ✔
[~/cohesive-ngsmanager]$ ls work/cb/3aece49fec907ebdcba16913efc3f8/
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_AY277251.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_AY765264.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_FJ483548.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_GQ851605.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_GU047875.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_HQ537483.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_KJ831223.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_KY703855.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_KY703856.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_lineage.csv
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_lineage.log
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_lineage_summary.csv
Start by copying a Nextflow file (step*.nf
) from an existing step within the same class (e.g., 1PP
, 2AS
, etc.) if possible, or take one with similar input requirements.
Rename the file following the naming convention: step_[class]_[assessment]__[method].nf
.
step_1PP_hostdepl__minimap2.nf
-
STEP =
1PP_hostdepl
→ Step Class and Accertamento name -
METHOD =
minimap2
→ Primary tool used in the step
At the start of the script, include useful general functions for handling input data and setting parameters. Below is an example:
nextflow.enable.dsl=2
include { parseMetadataFromFileName;executionMetadata;taskMemory;stepInputs;extractKey } from '../functions/common.nf'
include { getSingleInput;isCompatibleWithSeqType;getReference;getRisCd} from '../functions/parameters.nf'
Next, define some global variables. If you copied a previous workflow, modify and rename the variables accordingly:
def ex = executionMetadata()
def STEP = '1PP_filtering'
def METHOD = 'minimap2'
def ENTRYPOINT = "step_${STEP}__${METHOD}"
If the step uses a database or schema with a specific path, define these paths at the beginning of the workflow:
def SPECIES_SCHEMA = [
listeria_monocytogenes : ['listeria_monocytogenes_chewie_220623_180354']
]
def SCHEMAS = [
listeria_monocytogenes_chewie_220623_180354 : "/mnt/biowork/databases/PROGRAMS/chewie-ns Listeria_monocytogenes_Pasteur_cgMLST_2022-06-23T18_03_54.613576",
]
def TRANSCODING_READY = [
'listeria_monocytogenes_chewie_220623_180354'
]
CHEWBBACA_ION_PARAMS = [
'listeria_monocytogenes': [
size_threshold: 0.1,
blast_score_ratio: 0.6
]
]
In the last part of the file (the first part executed), invoke the workflows that handle input data. These can also retrieve accessory inputs like Host or Reference.
workflow {
step_2AS_denovo__spades(getSingleInput())
}
workflow {
input = getSingleInput()
.combine(getHostUnkeyed())
step_1PP_hostdepl__bowtie(input)
}
workflow {
getSingleInput().cross(getReference('fa')) { extractKey(it) }
.multiMap {
reads: it[0], // riscd, R[]
refs: it[1][1..3] // riscd, code, path
}.set { input }
step_2AS_mapping__medaka(input.reads, input.refs)
}
workflow {
step_2AS_hybrid__unicycler(getSingleInput(), getLongReads())
}
Rename the workflow following the naming convention and define the take, main, and emit sections. Here’s an example:
workflow step_1PP_hostdepl__minimap2 {
take:
reads
host
main:
minimap2(reads, host).sam | samtools
emit:
samtools.out.depleted
}
workflow step_4TY_cgMLST__chewbbacaefsa {
take:
assembly
genus_species
schema
main:
chewbbaca_result = chewbbaca(assembly, genus_species, schema)
hashing(chewbbaca_result.alleles)
chewbbaca_check(chewbbaca_result.stats)
chewbbaca_transcoding(chewbbaca_result.alleles_with_new)
}
By convention, name the process after the tool you are using.
process minimap2 {
container 'quay.io/biocontainers/minimap2:2.17--h5bf99c6_3'
memory { taskMemory(200.MB, task.attempt) }
cpus 16
tag "${md?.cmp}/${md?.ds}/${md?.dt}"
when:
isCompatibleWithSeqType(reads, ['nanopore'], task.process)
The string "${md?.cmp}/${md?.ds}/${md?.dt}"
follows a specific convention used by Cohesive to store metadata about the current task. Here’s a breakdown of each part:
-
md?.cmp
:
Represents the component of the metadata. It identifies the specific type of component or operation being performed. For example, this could indicate a particular tool or method. -
md?.ds
:
Refers to the dataset associated with the task. This provides a way to track which dataset or sample the current operation is being applied to. It might be a unique identifier or name of the dataset. -
md?.dt
:
Refers to the date/time (or timestamp) of the task execution. It allows tracking when the task was run and ensures the correct sequencing of operations.
This metadata structure helps ensure that logs, outputs, and temporary files can be identified, organized, and traced back to their originating step, dataset, and time of execution.
Define the inputs passed from the workflow's main function (usually as a tuple).
input:
tuple val(riscd_input), path(reads), val(host)
Define the expected output. Outputs listed under emit are stored in a variable for later use in other processes or modules.
output:
tuple val(riscd), path('*.fastq.gz'), emit: depleted
path '*'
path '{*.sh,*.log}', hidden: true
By convention, logs and executables are saved in the meta folder.
publishDir mode: 'rellink', "${params.outdir}/${md.anno}/${md.cmp}/${STEP}/${md.ds}-${ex.dt}_${METHOD}/meta", pattern: '.command.log', saveAs: { "${base}_samtools.log" }
publishDir mode: 'rellink', "${params.outdir}/${md.anno}/${md.cmp}/${STEP}/${md.ds}-${ex.dt}_${METHOD}/meta", pattern: '.command.sh', saveAs: { "${base}_samtools.cfg" }
Define the variables for the command line and specify the command within triple quotes.
script:
(t1,t2) = (reads instanceof java.util.Collection) ? reads : [reads, null]
md = parseMetadataFromFileName(t1.getName())
base = "${md.ds}-${ex.dt}_${md.cmp}_${METHOD}"
riscd = getRisCd(md, ex, STEP, METHOD)
'''
minimap2 -a ${host} ${t1} | samtools view -b -o ${base}.bam -
'''
This structure allows for flexibility when designing workflows or steps in Nextflow, adhering to the Cohesive system standards for script organization and execution.