Skip to content

Commit

Permalink
fix line breaker
Browse files Browse the repository at this point in the history
  • Loading branch information
ziadbkh committed Mar 29, 2024
1 parent a816ab5 commit e425659
Show file tree
Hide file tree
Showing 19 changed files with 239 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ glob = "0.3.1"
log = "0.4"
env_logger = "0.10.1"
sysinfo = "0.24.0"
fastq = "0.6.0"

[dev-dependencies]
md5 = "0.7.0"
Expand Down
25 changes: 22 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::sequence_utils::*;
mod index_dic;
use crate::index_dic::*;

mod thread_reader;
use crate::thread_reader::*;


const BUFFER_SIZE: usize = 1 << 19;
Expand Down Expand Up @@ -629,7 +631,7 @@ fn get_read_parts(reader: &mut dyn BufRead) -> (String, String, String, String){
(header, seq, info, quality)
}

fn read_bytes(reader: &mut Box< dyn Read>, buffer: &mut[u8], minimum:usize, last_byte: &mut usize){
fn read_bytes<R: Read>(reader: &mut R, buffer: &mut[u8], minimum:usize, last_byte: &mut usize){
let mut curr_bytes: usize;
loop{
curr_bytes = reader.read(&mut buffer[*last_byte..]).unwrap();
Expand Down Expand Up @@ -1105,9 +1107,12 @@ pub fn demultiplex(
let start = Instant::now();
let dur;


let bufsize: usize = 1 << 22;
let queuelen: usize = 2;

let mut reader_barcode_read = get_reader(&read_barcode_file_path_final);
let mut reader_barcode_read_tmp = flate2::read::MultiGzDecoder::new(std::fs::File::open(&read_barcode_file_path_final).unwrap());
let mut reader_barcode_read = thread_reader::ThreadReader::new(reader_barcode_read_tmp, bufsize, queuelen);

let mut reader_paired_read = if !single_read_input {
let r1 = get_reader(&paired_read_file_path_final);
Some(r1)
Expand Down Expand Up @@ -1753,6 +1758,20 @@ pub fn demultiplex(

}




let handle = reader_barcode_read.handle_take().unwrap();
::std::mem::drop(reader_barcode_read);
match handle.join() {
Ok(_) => println!("Done!"),
Err(e) => panic!("Error"),
};





let max_mismatches = if all_index_error {allowed_mismatches + 1} else {allowed_mismatches * 2 + 1};


Expand Down
205 changes: 205 additions & 0 deletions src/thread_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
//! Wrap a reader in a background thread.
use std::io::{Cursor, Error, ErrorKind, Read, Result};
use std::rc::Rc;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::thread;

struct BufferMessage {
buffer: Box<[u8]>,
written: Result<usize>,
}

fn buffer_channel(bufsize: usize, queuelen: usize) -> (BufferSender, BufferReceiver) {
assert!(queuelen >= 1);
let (full_send, full_recv) = sync_channel(queuelen);
let (empty_send, empty_recv) = sync_channel(queuelen);
for _ in 0..queuelen {
empty_send
.send(vec![0u8; bufsize].into_boxed_slice())
.unwrap();
}

let buffer_sender = BufferSender {
full_send: full_send,
empty_recv: empty_recv,
};
let buffer_receiver = BufferReceiver {
full_recv: full_recv,
empty_send: Rc::new(empty_send),
};
(buffer_sender, buffer_receiver)
}

struct BufferSender {
full_send: SyncSender<BufferMessage>,
empty_recv: Receiver<Box<[u8]>>,
}

impl BufferSender {
fn serve<F>(&mut self, mut func: F)
where
F: FnMut(Box<[u8]>) -> BufferMessage,
{
while let Ok(buffer) = self.empty_recv.recv() {
let reply = func(buffer);
if self.full_send.send(reply).is_err() {
break;
}
}
}
}

struct BufferReceiver {
full_recv: Receiver<BufferMessage>,
empty_send: Rc<SyncSender<Box<[u8]>>>,
}

impl BufferReceiver {
fn next_reader(&mut self) -> Result<PartialReader> {
let data = self
.full_recv
.recv()
.map_err(|e| Error::new(ErrorKind::BrokenPipe, e))?;
Ok(PartialReader {
available: data.written?,
written: 0,
sender: self.empty_send.clone(),
data: Some(data.buffer),
})
}
}

struct PartialReader {
sender: Rc<SyncSender<Box<[u8]>>>,
data: Option<Box<[u8]>>,
available: usize,
written: usize,
}

impl Drop for PartialReader {
fn drop(&mut self) {
if let Some(data) = self.data.take() {
// An error indicates that the other end has hung up.
// In this case we don't need to do anything.
let _ = self.sender.send(data);
}
}
}

impl Read for PartialReader {
fn read(&mut self, buffer: &mut [u8]) -> Result<usize> {
let data = &self.data.as_ref().unwrap()[self.written..self.available];
let res = Cursor::new(data).read(buffer)?;
self.written += res;
Ok(res)
}
}

impl PartialReader {
fn finished(&self) -> bool {
self.available == self.written
}
}

pub struct ThreadReader {
handle: Option<thread::JoinHandle<()>>,
receiver: BufferReceiver,
reader: Option<PartialReader>,
}

impl Read for ThreadReader {
fn read(&mut self, buffer: &mut [u8]) -> Result<usize> {
if self.reader.is_none() || self.reader.as_ref().unwrap().finished() {
// We drop the old partial reader manually before waiting for a new one
// to prevent a deadlock if the queuelen is 1.
::std::mem::drop(self.reader.take());
self.reader = Some(self.receiver.next_reader()?)
}

let reader = self.reader.as_mut().unwrap();
reader.read(buffer)
}
}

impl ThreadReader {
pub fn new<R>(mut reader: R, buffsize: usize, queuelen: usize) -> ThreadReader
where
R: Read + Send + 'static,
{
let (mut bufsend, bufrecv) = buffer_channel(buffsize, queuelen);
let handle = thread::Builder::new()
.name("reader-thread".into())
.spawn(move || {
bufsend.serve(|mut buffer| BufferMessage {
written: reader.read(&mut buffer),
buffer: buffer,
})
})
.unwrap();

ThreadReader {
handle: Some(handle),
receiver: bufrecv,
reader: None,
}
}

pub fn handle_take(&mut self) -> Option<thread::JoinHandle<()>>
{
self.handle.take()
}
}

/// Wrap a reader in a background thread.
///
/// This is only useful for readers that do expensive operations (eg decompression).
///
/// The thread precomputes `queuelen` many reads with the given buffer size, and
/// then waits for the consumer to catch up.
///
/// If the reader panics, reads to the consumer in the main thread returns
/// `ErrorKind::BrokenPipe` errors and the return value of `thread_reader` contains
/// the panic from the reader thread.
///
/// # Examples
///
/// ```rust,no_run
/// extern crate niffler;
/// extern crate fastq;
///
/// use std::io::stdin;
/// use fastq::thread_reader;
///
/// # fn main() {
/// // Compressed file reader is faster with a buffer size equal to the block size (default 4MB)
/// const BUFSIZE: usize = 1 << 22;
/// // The number of buffers the background thread fills, before it waits for
/// // a consumer to catch up.
/// const QUEUELEN: usize = 2;
///
/// let (file, compression_format) = niffler::send::get_reader(Box::new(stdin())).unwrap();
/// let out = thread_reader(BUFSIZE, QUEUELEN, file, |reader| {
/// // do something with the reader
/// });
/// # }
/// ```
pub fn thread_reader<R, F, O>(
bufsize: usize,
queuelen: usize,
reader: R,
func: F,
) -> thread::Result<O>
where
F: FnOnce(&mut ThreadReader) -> O,
R: Read + Send + 'static,
{
let mut inner = ThreadReader::new(reader, bufsize, queuelen);
let out = func(&mut inner);
let handle = inner.handle.take().unwrap();
::std::mem::drop(inner);
match handle.join() {
Ok(_) => Ok(out),
Err(e) => Err(e),
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
job_number sample_id r1_qc_30 r2_qc_30 r3_qc_30 r1_bases r2_bases r3_bases r1_qc r2_qc r3_qc all_reads 0-mismatches
. sample1 167 0 0 200 200 0 6162 5600 0 4 4
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
job_number sample_id r1_qc_30 r2_qc_30 r3_qc_30 r1_bases r2_bases r3_bases r1_qc r2_qc r3_qc all_reads 0-mismatches
. sample2 167 0 32 200 200 32 6162 5600 1120 4 4
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
job_number sample_id r1_qc_30 r2_qc_30 r3_qc_30 r1_bases r2_bases r3_bases r1_qc r2_qc r3_qc all_reads 0-mismatches
. sample3 56 0 0 56 180 40 1848 2340 520 2 2
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
job_number sample_id r1_qc_30 r2_qc_30 r3_qc_30 r1_bases r2_bases r3_bases r1_qc r2_qc r3_qc all_reads 0-mismatches
. sample1 167 0 0 200 200 0 6162 5600 0 4 4
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
job_number sample_id r1_qc_30 r2_qc_30 r3_qc_30 r1_bases r2_bases r3_bases r1_qc r2_qc r3_qc all_reads 0-mismatches 1-mismatches
. sample1 167 0 0 200 200 0 6162 5600 0 4 0 4
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ fn testing_template() {

#[test]
fn testing_demultiplex() {
for ds_itr_tmp in 1..2{//15{
for ds_itr_tmp in 1..15{
let mut disable_illumina_format = false;
let ds_itr_in = match ds_itr_tmp{
6 => 1,
Expand Down

0 comments on commit e425659

Please sign in to comment.