Skip to content

Commit

Permalink
feat: add lazy record reader to vcf async reader
Browse files Browse the repository at this point in the history
  • Loading branch information
tshauck committed Aug 31, 2023
1 parent bbef74d commit ea04944
Showing 1 changed file with 107 additions and 1 deletion.
108 changes: 107 additions & 1 deletion noodles-vcf/src/async/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use noodles_csi as csi;
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncSeek};

use self::{header::read_header, query::query};
use crate::{reader::resolve_region, Header, Record};
use crate::{lazy, reader::resolve_region, Header, Record};

const LINE_FEED: char = '\n';
const CARRIAGE_RETURN: char = '\r';
Expand Down Expand Up @@ -152,6 +152,41 @@ where
}
}

/// Reads a single record without eagerly parsing its fields.
///
/// The reads VCF record fields from the underlying stream into the given record's buffer until
/// a newline is reached. No fields are parsed, meaning the record is no necessarily valid.
/// However, the structure of the line is guaranteed to be record-like.
///
/// The stream is expected to be directly after the header or at the start of another record.
///
/// If successful, the number of bytes read is returned. If the number of bytes read is 0, the
/// stream reached EOF.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
/// use noodles_vcf as vcf;
///
/// let data = b"##fileformat=VCFv4.3
/// #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
/// sq0\t1\t.\tA\t.\t.\tPASS\t.
/// ";
///
/// let mut reader = vcf::AsyncReader::new(&data[..]);
/// reader.read_header().await?;
///
/// let mut record = vcf::lazy::Record::default();
/// reader.read_lazy_record(&mut record).await?;
/// # Ok::<_, std::io::Error>(())
/// # }
/// ```
pub async fn read_lazy_record(&mut self, record: &mut lazy::Record) -> io::Result<usize> {
read_lazy_record(&mut self.inner, record).await
}

/// Returns an (async) stream over records starting from the current (input) stream position.
///
/// The (input) stream is expected to be directly after the header or at the start of another
Expand Down Expand Up @@ -330,6 +365,77 @@ where
}
}

async fn read_lazy_record<R>(reader: &mut R, record: &mut lazy::Record) -> io::Result<usize>
where
R: AsyncBufRead + Unpin,
{
let mut len = 0;

len += read_field(reader, &mut record.buf).await?;
record.bounds.chromosome_end = record.buf.len();

len += read_field(reader, &mut record.buf).await?;
record.bounds.position_end = record.buf.len();

len += read_field(reader, &mut record.buf).await?;
record.bounds.ids_end = record.buf.len();

len += read_field(reader, &mut record.buf).await?;
record.bounds.reference_bases_end = record.buf.len();

len += read_field(reader, &mut record.buf).await?;
record.bounds.alternate_bases_end = record.buf.len();

len += read_field(reader, &mut record.buf).await?;
record.bounds.quality_score_end = record.buf.len();

len += read_field(reader, &mut record.buf).await?;
record.bounds.filters_end = record.buf.len();

len += read_field(reader, &mut record.buf).await?;
record.bounds.info_end = record.buf.len();

len += read_line(reader, &mut record.buf).await?;

Ok(len)
}

async fn read_field<R>(reader: &mut R, dst: &mut String) -> io::Result<usize>
where
R: AsyncBufRead + Unpin,
{
const DELIMITER: u8 = b'\t';

let mut is_delimiter = false;
let mut len = 0;

loop {
let src = reader.fill_buf().await?;

if is_delimiter || src.is_empty() {
break;
}

let (buf, n) = match src.iter().position(|&b| b == DELIMITER) {
Some(i) => {
is_delimiter = true;
(&src[..i], i + 1)
}
None => (src, src.len()),
};

let s =
std::str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
dst.push_str(s);

len += n;

reader.consume(n);
}

Ok(len)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit ea04944

Please sign in to comment.