Skip to content

Commit

Permalink
avoid race condition that closes all worker thread
Browse files Browse the repository at this point in the history
  • Loading branch information
ekg committed Aug 2, 2024
1 parent 2719303 commit b764d79
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 17 deletions.
14 changes: 4 additions & 10 deletions src/align/include/computeAlignments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,14 +330,12 @@ void processor_thread(std::atomic<size_t>& total_alignments_queued,

while (!thread_should_exit.load()) {
std::string* line_ptr = nullptr;
//std::cerr << "size of line queue " << line_queue.was_size() << std::endl;
if (line_queue.try_pop(line_ptr)) {
MappingBoundaryRow currentRecord;
parseMashmapRow(*line_ptr, currentRecord);

// Process the record and create seq_record_t
seq_record_t* rec = createSeqRecord(currentRecord, *line_ptr, local_ref_faidx, local_query_faidx);
//std::cerr << "size of seq_queue " << seq_queue.was_size() << std::endl;

while (!seq_queue.try_push(rec)) {
if (thread_should_exit.load()) {
Expand Down Expand Up @@ -376,7 +374,6 @@ void processor_manager(seq_atomic_queue_t& seq_queue,
const size_t high_threshold = queue_capacity * 0.8;

auto spawn_processor = [&](size_t id) {
//std::cerr << "spawn_processor: " << id << std::endl;
thread_should_exit[id].store(false);
processor_threads.emplace_back([this, &total_alignments_queued, &reader_done, &line_queue, &seq_queue, &thread_should_exit, id]() {
this->processor_thread(total_alignments_queued, reader_done, line_queue, seq_queue, thread_should_exit[id]);
Expand All @@ -390,13 +387,9 @@ void processor_manager(seq_atomic_queue_t& seq_queue,
while (!reader_done.load() || !line_queue.was_empty() || !seq_queue.was_empty()) {
size_t queue_size = seq_queue.was_size();

//std::cerr << "queue_size: " << queue_size << std::endl;

if (queue_size < low_threshold && current_processors < max_processors) {
//std::cerr << "spawn_processor: " << current_processors << std::endl;
spawn_processor(current_processors++);
} else if (queue_size > high_threshold && current_processors > 1) {
//std::cerr << "kill_processor: " << current_processors << std::endl;
thread_should_exit[--current_processors].store(true);
}

Expand All @@ -420,6 +413,7 @@ void worker_thread(uint64_t tid,
seq_atomic_queue_t& seq_queue,
paf_atomic_queue_t& paf_queue,
std::atomic<bool>& reader_done,
std::atomic<bool>& processor_done,
progress_meter::ProgressMeter& progress,
std::atomic<uint64_t>& processed_alignment_length) {
is_working.store(true);
Expand All @@ -438,7 +432,7 @@ void worker_thread(uint64_t tid,
processed_alignment_length.fetch_add(alignment_length, std::memory_order_relaxed);

delete rec;
} else if (reader_done.load() && seq_queue.was_empty()) {
} else if (reader_done.load() && processor_done.load() && seq_queue.was_empty()) {
break;
} else {
is_working.store(false);
Expand Down Expand Up @@ -530,8 +524,8 @@ void computeAlignments() {
std::vector<std::thread> workers;
std::vector<std::atomic<bool>> worker_working(param.threads);
for (uint64_t t = 0; t < param.threads; ++t) {
workers.emplace_back([this, t, &worker_working, &seq_queue, &paf_queue, &reader_done, &progress, &processed_alignment_length]() {
this->worker_thread(t, worker_working[t], seq_queue, paf_queue, reader_done, progress, processed_alignment_length);
workers.emplace_back([this, t, &worker_working, &seq_queue, &paf_queue, &reader_done, &processor_done, &progress, &processed_alignment_length]() {
this->worker_thread(t, worker_working[t], seq_queue, paf_queue, reader_done, processor_done, progress, processed_alignment_length);
});
}

Expand Down
7 changes: 0 additions & 7 deletions src/map/include/computeMap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1524,13 +1524,6 @@ namespace skch
void adjustConsecutiveMappings(std::vector<MappingResult>& mappings) {
if (mappings.size() < 2) return;

// Sort mappings by query start position, then by reference start position
std::sort(mappings.begin(), mappings.end(),
[](const MappingResult& a, const MappingResult& b) {
return std::tie(a.queryStartPos, a.refStartPos) <
std::tie(b.queryStartPos, b.refStartPos);
});

// Define threshold for adjustment (e.g., 10 bases)
const int threshold = param.segLength;

Expand Down

0 comments on commit b764d79

Please sign in to comment.