Skip to content

Commit

Permalink
remove query sequence processing from computeAlignments reader_thread…
Browse files Browse the repository at this point in the history
…, as it is now done in worker_thread
  • Loading branch information
ekg committed Jun 1, 2024
1 parent 6fcddc2 commit 70e896a
Showing 1 changed file with 20 additions and 78 deletions.
98 changes: 20 additions & 78 deletions src/align/include/computeAlignments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,84 +215,26 @@ namespace align
w.store(true);
}

// reader picks up candidate alignments from input
auto reader_thread =
[&]() {
//Parse query sequences
for(const auto &fileName : param.querySequences)
{
//#define DEBUG true
#ifdef DEBUG
std::cerr << "INFO, align::Aligner::computeAlignments, parsing query sequences in file " << fileName << std::endl;
#endif

//Open mashmap output file
std::ifstream mappingListStream(param.mashmapPafFile);
std::string mappingRecordLine;
MappingBoundaryRow currentRecord;

seqiter::for_each_seq_in_file(
fileName, {}, "",
[&](const std::string& qSeqId,
const std::string& _seq) {
++total_seqs;

uint64_t rank_mapping = 0;

// copy our input into a shared ptr
std::shared_ptr<std::string> seq(new std::string(_seq));
// todo: offset_t is an 32-bit integer, which could cause problems
skch::offset_t len = seq->length();
// upper-case our input and make sure it's canonical DNA (for WFA)
skch::CommonFunc::makeUpperCaseAndValidDNA((char*)seq->c_str(), len);
// todo maybe this should change to some kind of unique pointer?
// something where we can GC it when we're done aligning to it
//std::string qSequence = seq;
//std::cerr << seq << std::endl;

//Check if all mapping records are processed already
while(!mappingListStream.eof() && mappingRecordLine.empty()) {
//Read first record from mashmap output file during first iteration
std::getline(mappingListStream, mappingRecordLine);
}

if( !mappingRecordLine.empty() ) {
parseMashmapRow(mappingRecordLine, currentRecord);

//Check if mapping query id matches current query sequence id
if(currentRecord.qId == qSeqId)
{
//Queue up this query record
auto q = new seq_record_t(currentRecord, mappingRecordLine, seq);
q->currentRecord.rankMapping = rank_mapping++;
seq_queue.push(q);

//Check if more mappings have same query sequence id
while(std::getline(mappingListStream, mappingRecordLine))
{
parseMashmapRow(mappingRecordLine, currentRecord);

if(currentRecord.qId != qSeqId)
{
//Break the inner loop to read query sequence
break;
}
else
{
auto q = new seq_record_t(currentRecord, mappingRecordLine, seq);
q->currentRecord.rankMapping = rank_mapping++;
seq_queue.push(q);
}
}
}
}
});

mappingListStream.close();

}
reader_done.store(true);
};
// reader picks up candidate alignments from input
auto reader_thread =
[&]() {
std::ifstream mappingListStream(param.mashmapPafFile);
std::string mappingRecordLine;
MappingBoundaryRow currentRecord;

while (!mappingListStream.eof()) {
std::getline(mappingListStream, mappingRecordLine);
if (!mappingRecordLine.empty()) {
parseMashmapRow(mappingRecordLine, currentRecord);
total_alignment_length += currentRecord.qEndPos - currentRecord.qStartPos;
auto q = new seq_record_t(currentRecord, mappingRecordLine, nullptr);
seq_queue.push(q);
}
}

mappingListStream.close();
reader_done.store(true);
};

// helper to check if we're still aligning
auto still_working =
Expand Down

0 comments on commit 70e896a

Please sign in to comment.