Skip to content

Commit

Permalink
Merge pull request #21 from PacificBiosciences/churn_patch
Browse files Browse the repository at this point in the history
changes for v1.1.0
  • Loading branch information
holtjma authored Dec 4, 2023
2 parents bddb15a + a13177f commit abfb8b1
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 56 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
# v1.1.0
## Changes
- Updated the way unphaseable variants are placed into artificial phase blocks. Instead of unphased singletons, these are now grouped into larger unphased blocks. Phased variants are unaffected by this change.
- This has a major impact on I/O churning, especially in centromeric and telomeric regions. Internal tests show up to 60% reduction in wall clock time with 16 threads accompanied by up to 30% reduction in CPU time depending on the phasing mode.
- There are now fewer total phase blocks reported due to the singleton collapse. This primarily impacts the statistic tracking optional outputs of HiPhase.

## Fixed
- Fixed a related issue where unphased singleton blocks were falsely tagged with `TR_OVERLAP`
- Fixed an issue where the default `--io-threads` frequently caused issues on long-running jobs. This now defaults to `min(--threads, 4)`, but can still be specified with `--io-threads`.

# v1.0.0
## Changes
- Added support for tandem repeat calls from [TRGT](https://github.com/PacificBiosciences/trgt); minimum supported version - v0.5.0
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "hiphase"
version = "1.0.0"
version = "1.1.0"
authors = ["J. Matthew Holt <[email protected]>"]
description = "A tool for phasing HiFi VCF files."
description = "A tool for jointly phasing small, structural, and tandem repeat variants for PacBio sequencing data"
edition = "2021"
license-file="LICENSE.md"

Expand Down
4 changes: 2 additions & 2 deletions LICENSE-THIRDPARTY.json
Original file line number Diff line number Diff line change
Expand Up @@ -496,12 +496,12 @@
},
{
"name": "hiphase",
"version": "1.0.0",
"version": "1.1.0",
"authors": "J. Matthew Holt <[email protected]>",
"repository": null,
"license": null,
"license_file": "LICENSE.md",
"description": "A tool for phasing HiFi VCF files."
"description": "A tool for jointly phasing small, structural, and tandem repeat variants for PacBio sequencing data"
},
{
"name": "hts-sys",
Expand Down
122 changes: 101 additions & 21 deletions src/block_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,14 @@ pub struct PhaseBlock {
end: u64,
/// The total number of variants in the block so far.
num_variants: usize,
/// The VCF index of the first variant in the block
first_variant_vcf: usize,
/// The number of variants from each VCF in this block
vcf_index_counts: Vec<usize>,
/// The minimum quality of variants that were included
min_quality: i32,
/// The sample name within the VCF that this block corresponds to
sample_name: String
sample_name: String,
/// if True, then the variants in this block are meant to be skipped and left unphased
unphased_block: bool
}

impl std::fmt::Debug for PhaseBlock {
Expand All @@ -354,17 +356,19 @@ impl PhaseBlock {
/// * `chrom_index` - the chromosome index in the VCF file, for ordering
/// * `min_quality` - the minimum quality to include a variant in this phase block
/// * `sample_name` - the name of the sample in the VCF(s) that this block info corresponds to
pub fn new(block_index: usize, chrom: String, chrom_index: u32, min_quality: i32, sample_name: String) -> PhaseBlock {
/// * `num_vcfs` - the number of VCFs, used to generate initial empty counts Vec
pub fn new(block_index: usize, chrom: String, chrom_index: u32, min_quality: i32, sample_name: String, num_vcfs: usize) -> PhaseBlock {
PhaseBlock {
block_index,
chrom,
chrom_index,
start: 0,
end: 0,
num_variants: 0,
first_variant_vcf: 0,
vcf_index_counts: vec![0; num_vcfs],
min_quality,
sample_name
sample_name,
unphased_block: false
}
}

Expand Down Expand Up @@ -396,8 +400,8 @@ impl PhaseBlock {
self.num_variants
}

pub fn get_first_variant_vcf(&self) -> usize {
self.first_variant_vcf
pub fn vcf_index_counts(&self) -> &[usize] {
&self.vcf_index_counts
}

pub fn get_min_quality(&self) -> i32 {
Expand All @@ -413,6 +417,14 @@ impl PhaseBlock {
&self.sample_name
}

pub fn set_unphased_block(&mut self) {
self.unphased_block = true;
}

pub fn unphased_block(&self) -> bool {
self.unphased_block
}

/// Add a single-position variant to the phase block, will panic if the chromosome does not match
/// # Arguments
/// * `chrom` - the chromosome string
Expand All @@ -429,10 +441,7 @@ impl PhaseBlock {
self.end = pos;
}
self.num_variants += 1;

if self.num_variants == 1 {
self.first_variant_vcf = vcf_index;
}
self.vcf_index_counts[vcf_index] += 1;
}

/// Checks if a given start/end overlaps the existing phase block
Expand Down Expand Up @@ -640,15 +649,61 @@ impl PhaseBlockIterator {
}

if span_list.len() < self.min_spanning_reads {
// we don't have enough reads to reach our minimum threshold, so return this position+1
// the +1 is because we are returning the end of a half-open range that only includes pos
pos + 1
// this is a sentinel indicating that the range is effectively empty
pos
} else {
span_list.sort();
span_list[span_list.len() - self.min_spanning_reads]
}
}

/// Returns the next position after `pos` such that _at least_ `min_read_count` reads have been found.
/// # Arguments
/// * `chrom` - the chromosome of the locus
/// * `pos` - the position of the locus
fn get_next_mapped(&self, chrom: &str, pos: u64) -> u64 {
use rust_htslib::bam::Read;

// TODO: This method may over-generate blocks when a user uses something other than `min_read_count = 1`.
// If this becomes an issue, we will likely need to rework this code a little bit to read the cigar.

let mut next_positions: Vec<i64> = vec![];
// iterate over each bam, we will cache the next `min_read_count` positions for each BAM
for bam_ref in self.bam_readers.iter() {
let mut bam = bam_ref.borrow_mut();
bam.fetch((chrom, pos, i64::MAX)).unwrap();
let mut counted: usize = 0;

// calling .records() is what is triggering the URL warning
for read_entry in bam.records() {
let read = read_entry.unwrap();

//make sure we care about the alignment
if filter_out_alignment_record(&read, self.min_mapq) {
continue;
}

let start_position: i64 = read.pos();
next_positions.push(start_position);
counted += 1;
if counted >= self.min_spanning_reads {
// we found enough reads from this BAM
break;
}
}
}

if next_positions.len() >= self.min_spanning_reads {
// we found enough reads, sort them by position, then return the one at the correct location
next_positions.sort();
next_positions[self.min_spanning_reads-1] as u64
} else {
// we did not find enough reads on the rest of the chromosome, so return the max
u64::MAX
}

}

/// Returns true if there are at least `min_read_count` reads that connect from the given position back into the current phase block.
/// # Arguments
/// * `chrom` - the chromosome of the locus
Expand Down Expand Up @@ -749,7 +804,8 @@ impl Iterator for PhaseBlockIterator {

// initialize with an empty block containing just this chromosome
let mut phase_block: PhaseBlock = PhaseBlock::new(
self.next_block_index, chrom_name.clone(), self.chrom_index, self.min_quality, self.sample_name.clone()
self.next_block_index, chrom_name.clone(), self.chrom_index, self.min_quality, self.sample_name.clone(),
self.ref_vcf_readers.len()
);
self.next_block_index += 1;

Expand Down Expand Up @@ -795,6 +851,7 @@ impl Iterator for PhaseBlockIterator {

let mut previous_pos: u64 = 0;
let mut max_span: u64 = 0;
let mut next_valid_read_pos: u64 = 0;

while !variant_queue.is_empty() {
// get the source of the next variant to process
Expand Down Expand Up @@ -823,15 +880,38 @@ impl Iterator for PhaseBlockIterator {
// second condition is for variants that overlap but are before our start position
if include_variant {
//heterozygous variant found
if phase_block.get_num_variants() == 0 || max_span > variant_pos {
//either:
//1 - this is a new block OR
//2 - we already found enough reads that spans _past_ this variant
if phase_block.get_num_variants() == 0 {
//this is a new block, first add the variant
phase_block.add_locus_variant(&chrom_name, variant_pos, pop_index);

// go ahead and run the max span calculation
max_span = self.get_longest_multispan(&chrom_name, variant_pos);
if max_span == variant_pos {
// there are not enough reads overlapping this position, it will be unphased
phase_block.set_unphased_block();
next_valid_read_pos = self.get_next_mapped(&chrom_name, variant_pos);

// this just insures than any overlapping variants get in to match previous run pattern
max_span += 1;
}
}
else if max_span > variant_pos {
// we already found enough reads that spans _past_ this variant, so just add it
phase_block.add_locus_variant(&chrom_name, variant_pos, pop_index);
} else if phase_block.unphased_block() {
// if we get here, we are using logic for an unphased phase block
if variant_pos < next_valid_read_pos {
// this variant is before the next valid read, so it's automatically unphased as well
phase_block.add_locus_variant(&chrom_name, variant_pos, pop_index);
} else {
// this variant potentially has reads to use, so we return our unphased block
self.chrom_position = variant_pos;
return Some(Ok(phase_block));
}
} else {
//we check the reads from the most recent locus
//max_span = self.get_longest_span(&chrom_name, previous_pos);
max_span = self.get_longest_multispan(&chrom_name, previous_pos);
assert!(max_span != previous_pos);
if max_span > variant_pos {
//new max span connects
phase_block.add_locus_variant(&chrom_name, variant_pos, pop_index);
Expand Down
6 changes: 4 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub struct Settings {
#[clap(help_heading = Some("Input/Output"))]
pub haplotag_filename: Option<PathBuf>,

/// Number of threads for BAM I/O (default: copy `--threads`)
/// Number of threads for BAM I/O (default: minimum of `--threads` or `4`)
#[clap(long = "io-threads")]
#[clap(value_name = "THREADS")]
#[clap(help_heading = Some("Input/Output"))]
Expand Down Expand Up @@ -311,7 +311,9 @@ pub fn check_settings(mut settings: Settings) -> Settings {

// if this is not specified, then set it to the same as processing
if settings.io_threads.is_none() {
settings.io_threads = Some(settings.threads);
// setting to the same as threads generates some issues with no real benefit
// 4 is a happy default, users can override if needed
settings.io_threads = Some(settings.threads.min(4));
}

// dump stuff to the logger
Expand Down
2 changes: 1 addition & 1 deletion src/data_types/variants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub enum Zygosity {

/// A variant definition structure.
/// It currently assumes that chromosome is fixed and that the variant is a SNP.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Variant {
/// The vcf index from the input datasets
vcf_index: usize,
Expand Down
15 changes: 9 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use hiphase::block_gen::{MultiPhaseBlockIterator, PhaseBlockIterator, get_vcf_samples, get_sample_bams};
use hiphase::cli::{Settings,check_settings,get_raw_settings};
use hiphase::data_types::reference_genome::ReferenceGenome;
use hiphase::phaser::{HaplotagResult, PhaseResult, solve_block, singleton_block};
use hiphase::phaser::{HaplotagResult, PhaseResult, solve_block, create_unphased_result};
use hiphase::writers::block_stats::BlockStatsCollector;
use hiphase::writers::haplotag_writer::HaplotagWriter;
use hiphase::writers::ordered_bam_writer::OrderedBamWriter;
Expand Down Expand Up @@ -255,6 +255,7 @@ fn main() {

// values related to printing
const UPDATE_SPEED: u64 = 100;
info!("Phase block generation starting...");

if cli_settings.threads <= 1 {
for (i, block_result) in block_iterator.by_ref().enumerate().skip(skip_count).take(take_count) {
Expand All @@ -269,7 +270,7 @@ fn main() {

// we likely need to separate out the phase result from the haplotag result
let sample_bams = sample_to_bams.get(block.sample_name()).unwrap();
let (phase_result, haplotag_result): (PhaseResult, HaplotagResult) = if phase_singletons || block.get_num_variants() > 1 {
let (phase_result, haplotag_result): (PhaseResult, HaplotagResult) = if !block.unphased_block() && (phase_singletons || block.get_num_variants() > 1) {
match solve_block(
&block,
&cli_settings.vcf_filenames,
Expand All @@ -291,7 +292,7 @@ fn main() {
}
}
} else {
singleton_block(&block)
create_unphased_result(&block)
};

// this is only for printing
Expand Down Expand Up @@ -365,7 +366,7 @@ fn main() {
info!("Generated {} phase blocks, latest block: {:?}", jobs_queued, block);
}

if phase_singletons || block.get_num_variants() > 1 {
if !block.unphased_block() && (phase_singletons || block.get_num_variants() > 1) {
let tx = tx.clone();
let arc_cli_settings = arc_cli_settings.clone();
let arc_reference_genome = arc_reference_genome.clone();
Expand Down Expand Up @@ -397,9 +398,9 @@ fn main() {
tx.send(all_results).expect("channel will be there waiting for the pool");
});
} else {
// this is a singleton we can short-circuit here
// this is a unphased block we can short-circuit here
let (phase_result, haplotag_result): (PhaseResult, HaplotagResult) =
singleton_block(&block);
create_unphased_result(&block);

// this is only for printing
total_variants += phase_result.phase_block.get_num_variants() as u64;
Expand Down Expand Up @@ -451,6 +452,8 @@ fn main() {
}
}

info!("All phase blocks analyzed, finalizing output files...");

// if we are only doing partial files, this will not behave, so skip it
if !debug_run {
// we call this once at the end
Expand Down
Loading

0 comments on commit abfb8b1

Please sign in to comment.