Skip to content

Nextflow

Andrea de Ruvo edited this page Nov 12, 2024 · 20 revisions

Introduction

This guide explains how to create a new step or module in the Cohesive system using Nextflow version 24.04.2.5914.

$ nextflow -v
nextflow version 24.04.2.5914

To create a new step or module in the Cohesive system, follow the instructions below.

Preliminary Operations

Clone the Repository

Clone the ngsmanager repository at https://github.com/genpat-it/cohesive-ngsmanager

$ git clone https://github.com/genpat-it/cohesive-ngsmanager

Configuration

The nextflow.config file in Nextflow defines numerous global parameters that can be customized to adapt the pipeline to different environments or specific computational requirements.

For instance, the following lines define the maximum amount of memory and CPU resources available to the pipeline:

def MAX_MEMORY = System.getenv("NXF_EXECUTOR_MEMORY") ?: '200.GB'
def MAX_CPU = System.getenv("NXF_EXECUTOR_CPU") ?: '64'

In this setup:

  • MAX_MEMORY sets a default maximum memory allocation of 200 GB, which can be overridden by setting the NXF_EXECUTOR_MEMORY environment variable.
  • MAX_CPU sets the default maximum number of CPU cores to 64, which can similarly be overridden by setting the NXF_EXECUTOR_CPU environment variable.

These parameters allow flexibility for scaling the pipeline to various computational resources while maintaining efficient usage.

You can change these settings also using system environment variables:

export NXF_EXECUTOR_MEMORY='200.GB'
export NXF_EXECUTOR_CPU=64

You can switch also NXF version globally as described here:

export NXF_VER=24.04.2.5914

Global Variables

Profiles

nextflow.config defines two profiles from scratch. You can add new profiles or override global parameters in the existing ones.

Remember: the nextflow.config file is always read and executed by Nextflow when a workflow is started.

Try test.nf script:

nextflow.enable.dsl=2

process printProfileVariables {
    debug true
    
    script:
    """
    echo "=== Profile Variables ==="
    echo "Max memory: ${params.max_memory ?: 'undefined'}"
    echo "Max CPUs: ${params.max_cpus ?: 'undefined'}"
    """
}

workflow {
    printProfileVariables()
}

using this command:

nextflow run test.nf -profile cohesive

Inspect a nextflow run by Jenkins

Overview

This command is launched by Jenkins through the process-nextflow job and represents the execution of an NGS analysis pipeline. The configuration files meta.config and jenkins.config are generated within the Jenkins job and contain information about nodes and other system configurations.

Important:

  • Jenkins considers the WORK_FOLDER and the SAMPLES_FOLDER set in the .env file for samples, logs, reports and outputs.

Command Structure

nextflow [options] run [workflow-options] <workflow-file> [parameters]

Base Options

  • -q: Quiet mode, reduces verbose output
  • -log [file]: Specifies the log file for execution
    • Example: ${WORK_FOLDER}/[timestamp]/pipeline_info/nxf-0.log

Configuration Files

  • -c meta.config: Contains execution metadata (node, build tag, commit ID, etc.)
  • -c jenkins.config: Contains Jenkins-specific configurations
  • -params-file [file]: JSON file containing additional parameters
    • Example: ${WORK_FOLDER}/[timestamp]/params.json

Directory Settings

  • -work-dir: Directory for temporary work files
    • Example: ${WORK_FOLDER}/[timestamp]/work/0
  • --tracedir: Directory for execution trace files
    • Example: ${WORK_FOLDER}/[timestamp]/pipeline_info
  • --report_dir: Directory for reports
    • Example: ${WORK_FOLDER}/[timestamp]/pipeline_info
  • --timeline_dir: Directory for timeline files
    • Example: ${WORK_FOLDER}/[timestamp]/pipeline_info
  • --outdir: Directory for final results
    • Example: ${WORK_FOLDER}/[timestamp]/results

Workflow File

The file containing the workflow to execute:

${WORK_FOLDER}/[timestamp]/ngsmanager/steps/step_1PP_trimming__trimmomatic.nf

Analysis Parameters

  • --cmp: Sample ID (e.g., 2024.EXT.0726.1343.110)
  • --riscd: Dataset ID (e.g., 240726-020240726134311352-0SQ_rawreads-external)
  • --genus_species: Target species (e.g., Brucella_abortus)
  • --analysis_date: Analysis date (e.g., 241111103036919)

Report Options

  • -with-weblog: Enables web logging
  • --report_prefix: Prefix for report files (e.g., 2024.EXT.0726.1343.110_)
  • --report_suffix false: Disables report suffix
  • --monochrome_logs: Colorless log output

Resource Limits

  • --max_memory: Maximum allocatable memory (e.g., 123089.MB)
  • --max_cpus: Maximum number of usable CPUs (e.g., 192)

Directory Structure

${WORK_FOLDER}/[timestamp]/
├── pipeline_info/           # Directory for logs and reports
│   ├── nxf-0.log           # Main log file
│   └── ...                 # Other report files
├── work/                   # Temporary work directory
│   └── 0/                 # Subdirectory for current execution
├── results/               # Directory for final results
└── params.json           # Parameters file

Important Notes

  1. The directory structure follows a precise temporal pattern
  2. Configuration files (meta.config and jenkins.config) are dynamically generated
  3. Resources (memory and CPU) are calculated based on the execution node
  4. Reports and logs are organized in separate directories for better management

Usage Examples

Basic Execution
nextflow -q run workflow.nf --cmp SAMPLE_ID --riscd DATASET_ID
With Custom Resources
nextflow run workflow.nf --max_memory '64.GB' --max_cpus 16
With Custom Directories
nextflow run workflow.nf --inputdir /custom/input --outdir /custom/output --work-dir /custom/work
Full Example
nextflow -q -log pipeline_info/run.log run \
    -c meta.config \
    -c jenkins.config \
    -params-file params.json \
    -work-dir work/current \
    --tracedir pipeline_info \
    --outdir results \
    workflow.nf \
    --cmp SAMPLE_ID \
    --riscd DATASET_ID \
    --genus_species SPECIES \
    -with-weblog \
    --max_memory '64.GB' \
    --max_cpus 16

Run an existing step

Requirements:

  1. There's an uploaded Sample in the Cohesive platform

Steps:

  1. create a new profile development.config in the profiles folder and copy the content of the jenkins.config
  2. Grab the Id of the uploaded Sample in Jenkins (example: 2024.EXT.0625.0943.26105, using the field Code in the Samples page)
  3. Grab the riscd (example: 240625-10520240625094326613-2AS_import-external, using the field Code in the Analysis Results page filtering by the Id of the sample)

Remember: In Cohesive, any riscd ending with -external indicates the upload step.

Tip: The riscd parameter generally serves as the input for the analysis. Initially, this is often a FASTA or FASTQ file, so you’ll typically use the upload or import riscd for the input.

Execute the step with the following command:

nextflow -q -c profiles/development.config run steps/step_4TY_lineage__westnile.nf --cmp 2024.EXT.0625.0943.26105 --riscd 240625-10520240625094326613-2AS_import-external

The output should be:

[~/cohesive-ngsmanager]$ nextflow -c profiles/development.config run steps/step_4TY_lineage__westnile.nf --cmp 2024.EXT.0625.0943.26105 --riscd 240625-10520240625094326613-2AS_import-external

 N E X T F L O W   ~  version 24.04.2

Launching `steps/step_4TY_lineage__westnile.nf` [suspicious_carlsson] DSL2 - revision: 1d56183756

executor >  local (19)
[cb/d89298] ste…le:snippy (2024.EXT.0625.0943.26105/DS10520240625094326613/DT240625) [100%] 9 of 9 ✔
[9a/fa2c6d] ste…ols_depth (2024.EXT.0625.0943.26105/DS10520240625094326613/DT241112) [100%] 9 of 9 ✔
[cb/3aece4] ste…selection (2024.EXT.0625.0943.26105/DS10520240625094326613/DT241112) [100%] 1 of 1 ✔

[~/cohesive-ngsmanager]$ ls work/cb/3aece49fec907ebdcba16913efc3f8/
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_AY277251.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_AY765264.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_FJ483548.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_GQ851605.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_GU047875.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_HQ537483.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_KJ831223.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_KY703855.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_KY703856.1.coverage
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_lineage.csv
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_lineage.log
DS10520240625094326613-DT241112_2024.EXT.0625.0943.26105_westnile_lineage_summary.csv

Use a template

Start by copying a Nextflow file (step*.nf) from an existing step within the same class (e.g., 1PP, 2AS, etc.) if possible, or take one with similar input requirements.

Rename the file following the naming convention: step_[class]_[assessment]__[method].nf.

Example
step_1PP_hostdepl__minimap2.nf
  • STEP = 1PP_hostdepl → Step Class and Accertamento name
  • METHOD = minimap2 → Primary tool used in the step

Including Common Functions and Defining Global Variables

At the start of the script, include useful general functions for handling input data and setting parameters. Below is an example:

nextflow.enable.dsl=2

include { parseMetadataFromFileName;executionMetadata;taskMemory;stepInputs;extractKey } from '../functions/common.nf'
include { getSingleInput;isCompatibleWithSeqType;getReference;getRisCd} from '../functions/parameters.nf'

Next, define some global variables. If you copied a previous workflow, modify and rename the variables accordingly:

def ex = executionMetadata()

def STEP = '1PP_filtering'
def METHOD = 'minimap2'
def ENTRYPOINT = "step_${STEP}__${METHOD}"

If the step uses a database or schema with a specific path, define these paths at the beginning of the workflow:

def SPECIES_SCHEMA = [
  listeria_monocytogenes : ['listeria_monocytogenes_chewie_220623_180354']
]

def SCHEMAS = [
  listeria_monocytogenes_chewie_220623_180354 : "/mnt/biowork/databases/PROGRAMS/chewie-ns Listeria_monocytogenes_Pasteur_cgMLST_2022-06-23T18_03_54.613576",
]

def TRANSCODING_READY = [
  'listeria_monocytogenes_chewie_220623_180354'
]

CHEWBBACA_ION_PARAMS = [
  'listeria_monocytogenes': [
    size_threshold: 0.1,
    blast_score_ratio: 0.6
  ]
]

Defining Workflow Inputs

In the last part of the file (the first part executed), invoke the workflows that handle input data. These can also retrieve accessory inputs like Host or Reference.

Example for a step processing a single sample:

workflow {
    step_2AS_denovo__spades(getSingleInput())
}

Example with host input:

workflow {    
    input = getSingleInput()
      .combine(getHostUnkeyed())
    step_1PP_hostdepl__bowtie(input)    
}

Example with reference input:

workflow {
    getSingleInput().cross(getReference('fa')) { extractKey(it) }
      .multiMap { 
          reads: it[0], // riscd, R[]
          refs:  it[1][1..3] // riscd, code, path
      }.set { input }
    step_2AS_mapping__medaka(input.reads, input.refs)
}

Example with long reads:

workflow {  
    step_2AS_hybrid__unicycler(getSingleInput(), getLongReads())
}

Defining the Main Workflow

Rename the workflow following the naming convention and define the take, main, and emit sections. Here’s an example:

workflow step_1PP_hostdepl__minimap2 {
    take: 
      reads
      host
    main:
      minimap2(reads, host).sam | samtools
    emit:
      samtools.out.depleted 
}

Example with multiple main functions:

workflow step_4TY_cgMLST__chewbbacaefsa {
    take: 
      assembly
      genus_species
      schema
    main:
      chewbbaca_result = chewbbaca(assembly, genus_species, schema)
      hashing(chewbbaca_result.alleles)
      chewbbaca_check(chewbbaca_result.stats)
      chewbbaca_transcoding(chewbbaca_result.alleles_with_new)
}

Process to Run a Tool/Command

By convention, name the process after the tool you are using.

Example:

process minimap2 {
    container 'quay.io/biocontainers/minimap2:2.17--h5bf99c6_3'

    memory { taskMemory(200.MB, task.attempt) }
    cpus 16

    tag "${md?.cmp}/${md?.ds}/${md?.dt}"
    
    when: 
      isCompatibleWithSeqType(reads, ['nanopore'], task.process)

Convention for Storing Data in Memory

The string "${md?.cmp}/${md?.ds}/${md?.dt}" follows a specific convention used by Cohesive to store metadata about the current task. Here’s a breakdown of each part:

  • md?.cmp:
    Represents the component of the metadata. It identifies the specific type of component or operation being performed. For example, this could indicate a particular tool or method.

  • md?.ds:
    Refers to the dataset associated with the task. This provides a way to track which dataset or sample the current operation is being applied to. It might be a unique identifier or name of the dataset.

  • md?.dt:
    Refers to the date/time (or timestamp) of the task execution. It allows tracking when the task was run and ensures the correct sequencing of operations.

This metadata structure helps ensure that logs, outputs, and temporary files can be identified, organized, and traced back to their originating step, dataset, and time of execution.


Main Process Sections:

Input Block

Define the inputs passed from the workflow's main function (usually as a tuple).

    input:
      tuple val(riscd_input), path(reads), val(host)

Output Block

Define the expected output. Outputs listed under emit are stored in a variable for later use in other processes or modules.

    output:
      tuple val(riscd), path('*.fastq.gz'), emit: depleted
      path '*'
      path '{*.sh,*.log}', hidden: true

Publish Directory

By convention, logs and executables are saved in the meta folder.

    publishDir mode: 'rellink', "${params.outdir}/${md.anno}/${md.cmp}/${STEP}/${md.ds}-${ex.dt}_${METHOD}/meta", pattern: '.command.log', saveAs: { "${base}_samtools.log" }
    publishDir mode: 'rellink', "${params.outdir}/${md.anno}/${md.cmp}/${STEP}/${md.ds}-${ex.dt}_${METHOD}/meta", pattern: '.command.sh', saveAs: { "${base}_samtools.cfg" }

Script Block

Define the variables for the command line and specify the command within triple quotes.

    script:
      (t1,t2) = (reads instanceof java.util.Collection) ? reads : [reads, null]
      md = parseMetadataFromFileName(t1.getName())
      base = "${md.ds}-${ex.dt}_${md.cmp}_${METHOD}"
      riscd = getRisCd(md, ex, STEP, METHOD)      
      '''
        minimap2 -a ${host} ${t1} | samtools view -b -o ${base}.bam -
      '''

This structure allows for flexibility when designing workflows or steps in Nextflow, adhering to the Cohesive system standards for script organization and execution.

Clone this wiki locally