Skip to content

Commit

Permalink
correct synchronization issues (race) leading to incomplete processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ekg committed Sep 25, 2024
1 parent 4267b9c commit 6fb2fa9
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions src/map/include/computeMap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ namespace skch
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
workers_done.store(true);
}

void writer_thread(query_output_atomic_queue_t& output_queue,
Expand All @@ -331,9 +330,11 @@ namespace skch
std::ofstream& outstrm,
progress_meter::ProgressMeter& progress,
MappingResultsVector_t& allReadMappings) {
int wait_count = 0;
while (true) {
QueryMappingOutput* output = nullptr;
if (output_queue.try_pop(output)) {
wait_count = 0;
if(output->results.size() > 0)
totalReadsMapped++;
if (param.filterMode == filter::ONETOONE) {
Expand All @@ -343,7 +344,12 @@ namespace skch
}
delete output;
} else if (workers_done.load() && output_queue.was_empty()) {
break;
++wait_count;
if (wait_count < 5) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} else {
break;
}
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
Expand Down Expand Up @@ -442,8 +448,17 @@ namespace skch
for (auto& worker : workers) {
worker.join();
}

workers_done.store(true);

writer.join();

// Check if the writer queue was empty at the end of the run, if not, something bad has happened
if (!output_queue.was_empty()) {
std::cerr << "[mashmap::skch::Map::mapQuery] ERROR, writer queue was not empty at the end of the run" << std::endl;
exit(1);
}

//Filter over reference axis and report the mappings
if (param.filterMode == filter::ONETOONE)
{
Expand Down Expand Up @@ -661,6 +676,8 @@ namespace skch
*/
QueryMappingOutput* mapModule (InputSeqProgContainer* input)
{
//ok
auto output = new QueryMappingOutput{input->seqName};
//save query sequence name and length
bool split_mapping = true;
std::vector<IntervalPoint> intervalPoints;
Expand All @@ -669,7 +686,7 @@ namespace skch
2 * param.sketchSize * refSketch.minmerIndex.size() / refSketch.minmerPosLookupIndex.size());
std::vector<L1_candidateLocus_t> l1Mappings;
MappingResultsVector_t l2Mappings;
MappingResultsVector_t unfilteredMappings;
auto& unfilteredMappings = output->results;
int refGroup = this->getRefGroup(input->seqName);

if (!param.split || input->len <= param.segLength)
Expand Down Expand Up @@ -787,7 +804,7 @@ namespace skch

sparsifyMappings(unfilteredMappings);

return new QueryMappingOutput{input->seqName, std::move(unfilteredMappings)};
return output;
}

/**
Expand Down

0 comments on commit 6fb2fa9

Please sign in to comment.