From 758da83cd7bb71c96741feb43745a3d1a63fe6c4 Mon Sep 17 00:00:00 2001 From: Marios Kogias Date: Wed, 6 Nov 2024 18:00:00 +0000 Subject: [PATCH] Split schedule many (#54) Refactor the `schedule_many` function into four different phases. This refactor simplifies the code for read-only behaviours and atomic scheduling of multiple behaviours. --- src/rt/sched/behaviourcore.h | 271 ++++++++++++++++++++++------------- 1 file changed, 171 insertions(+), 100 deletions(-) diff --git a/src/rt/sched/behaviourcore.h b/src/rt/sched/behaviourcore.h index 4a523c22..379a2d8d 100644 --- a/src/rt/sched/behaviourcore.h +++ b/src/rt/sched/behaviourcore.h @@ -519,6 +519,40 @@ namespace verona::rt Cown::acquire(cown); } + static std::tuple + handle_read_only_enqueue(Slot* prev_slot, Slot* new_slot, Cown* cown) + { + size_t ref_count = 0, ex_count = 0; + bool first_reader; + + if (prev_slot && (prev_slot->set_next_slot_reader(new_slot))) + { + Logging::cout() << " Previous slot is a writer or blocked reader cown " + << *new_slot << Logging::endl; + yield(); + goto fn_out; + } + + yield(); + first_reader = cown->read_ref_count.add_read(); + Logging::cout() << " Reader got the cown " << *new_slot << Logging::endl; + yield(); + + // TODO: This will not be correct in the multi-schedule cases. + // There needs to be a check that ensures the chain contains only + // reads. This will be calculated in the prepare phase. + new_slot->set_read_available(); + + ex_count = 1; + if (first_reader) + { + ref_count = 1; + } + + fn_out: + return {ref_count, ex_count}; + } + /** * @brief Constructs a behaviour. Leaves space for the closure. * @@ -664,9 +698,10 @@ namespace verona::rt Logging::cout() << "BehaviourCore::schedule_many" << body_count << Logging::endl; - size_t count = 0; + // non-unique cowns count + size_t cown_count = 0; for (size_t i = 0; i < body_count; i++) - count += bodies[i]->count; + cown_count += bodies[i]->count; // Execution count - we will remove at least // one from the execution count on finishing phase 2 of the @@ -676,23 +711,32 @@ namespace verona::rt for (size_t i = 0; i < body_count; i++) ec[i] = 1; + // This array includes an entry for each of the requested cowns. + // The entry has an index into the bodies array and a Slot * for that + // cown inside the behaviour body // Need to sort the cown requests across the co-scheduled collection of // cowns We first construct an array that represents pairs of behaviour // number and slot pointer. Note: Really want a dynamically sized stack // allocation here. - StackArray> indexes(count); + StackArray> cown_to_behaviour_slot_map( + cown_count); size_t idx = 0; for (size_t i = 0; i < body_count; i++) { auto slots = bodies[i]->get_slots(); for (size_t j = 0; j < bodies[i]->count; j++) { - std::get<0>(indexes[idx]) = i; - std::get<1>(indexes[idx]) = &slots[j]; + cown_to_behaviour_slot_map[idx] = {i, &slots[j]}; idx++; } } + // First phase - Prepare phase + // For each unique cown, build a chain of behaviours that need to be + // scheduled. + // First order the cowns to find the unique ones and the prepare the + // chains + // Sort the indexing array so we make the requests in the correct order // across the whole set of behaviours. A consistent order is required to // avoid deadlock. @@ -735,58 +779,78 @@ namespace verona::rt return std::get<1>(i)->cown() < std::get<1>(j)->cown(); #endif }; - if (count > 1) - std::sort(indexes.get(), indexes.get() + count, compare); - - // First phase - Acquire phase. + if (cown_count > 1) + std::sort( + cown_to_behaviour_slot_map.get(), + cown_to_behaviour_slot_map.get() + cown_count, + compare); + + // Helper struct to be used after building the chains in the next phases + struct ChainInfo + { + Cown* cown; + size_t first_body_index; + Slot* last_slot; + size_t transfer_count; + bool had_no_predecessor; + // The last two are only use for reads only chains + size_t ref_count; + size_t ex_count; + }; size_t i = 0; - while (i < count) + size_t chain_count = 0; + StackArray chain_info(cown_count); + + while (i < cown_count) { - auto cown = std::get<1>(indexes[i])->cown(); - auto body = bodies[std::get<0>(indexes[i])]; - auto curr_slot = std::get<1>(indexes[i]); - auto first_body = body; - size_t first_chain_index = i; + auto cown = std::get<1>(cown_to_behaviour_slot_map[i])->cown(); + auto body = bodies[std::get<0>(cown_to_behaviour_slot_map[i])]; + auto last_slot = std::get<1>(cown_to_behaviour_slot_map[i]); + size_t first_body_index = std::get<0>(cown_to_behaviour_slot_map[i]); // The number of RCs provided for the current cown by the when. // I.e. how many moves of cown_refs there were. - size_t transfer_count = curr_slot->is_move(); + size_t transfer_count = last_slot->is_move(); Logging::cout() << "Processing " << cown << " " << body << " " - << curr_slot << " Index " << i << Logging::endl; + << last_slot << " Index " << i << Logging::endl; // Detect duplicates for this cown. // This is required in two cases: // * overlaps within a single behaviour. - while (((++i) < count) && (cown == std::get<1>(indexes[i])->cown())) + while (((++i) < cown_count) && + (cown == std::get<1>(cown_to_behaviour_slot_map[i])->cown())) { // If the body is the same, then we have an overlap within a single // behaviour. - auto body_next = bodies[std::get<0>(indexes[i])]; + auto body_next = bodies[std::get<0>(cown_to_behaviour_slot_map[i])]; if (body_next == body) { // Check if the caller passed an RC and add to the total. - transfer_count += std::get<1>(indexes[i])->is_move(); + transfer_count += + std::get<1>(cown_to_behaviour_slot_map[i])->is_move(); Logging::cout() << "Duplicate " << cown << " for " << body << " Index " << i << Logging::endl; // We need to reduce the execution count by one, as we can't wait // for ourselves. - ec[std::get<0>(indexes[i])]++; + ec[std::get<0>(cown_to_behaviour_slot_map[i])]++; // We need to mark the slot as not having a cown associated to it. - std::get<1>(indexes[i])->set_cown_null(); + std::get<1>(cown_to_behaviour_slot_map[i])->set_cown_null(); continue; } // For writers, create a chain of behaviours - if (!std::get<1>(indexes[i])->is_read_only()) + if (!std::get<1>(cown_to_behaviour_slot_map[i])->is_read_only()) { body = body_next; // Extend the chain of behaviours linking on this behaviour - curr_slot->set_next_slot_writer(body); - curr_slot = std::get<1>(indexes[i]); + last_slot->set_next_slot_writer(body); + last_slot->set_ready(); + + last_slot = std::get<1>(cown_to_behaviour_slot_map[i]); continue; } @@ -794,61 +858,43 @@ namespace verona::rt abort(); } + // For each chain you need the cown, the first and the last body of the + // chain + chain_info[chain_count++] = { + cown, first_body_index, last_slot, transfer_count, false, 0, 0}; + // Mark the slot as ready for scheduling - curr_slot->reset_status(); - yield(); - if (curr_slot->is_read_only()) - curr_slot->set_behaviour(body); + last_slot->reset_status(); yield(); + if (last_slot->is_read_only()) + last_slot->set_behaviour(body); + } + + // Second phase - Acquire phase + for (size_t i = 0; i < chain_count; i++) + { + auto* cown = chain_info[i].cown; + auto first_body_index = chain_info[i].first_body_index; + auto* first_body = bodies[first_body_index]; + auto* new_slot = chain_info[i].last_slot; + auto prev_slot = - cown->last_slot.exchange(curr_slot, std::memory_order_acq_rel); + cown->last_slot.exchange(new_slot, std::memory_order_acq_rel); + yield(); if (prev_slot == nullptr) { - if (curr_slot->is_read_only()) + chain_info[i].had_no_predecessor = true; + if (new_slot->is_read_only()) { - yield(); - bool first_reader = cown->read_ref_count.add_read(); - Logging::cout() << "Reader at head of queue and got the cown " - << *curr_slot << Logging::endl; - yield(); - curr_slot->set_read_available(); - ec[std::get<0>(indexes[first_chain_index])]++; - yield(); - acquire_with_transfer(cown, transfer_count, 1 + first_reader); - continue; + auto counts = handle_read_only_enqueue(prev_slot, new_slot, cown); + chain_info[i].ref_count = std::get<0>(counts); + chain_info[i].ex_count = std::get<1>(counts); } - - yield(); - - acquire_with_transfer(cown, transfer_count, 1); - - yield(); - - if (cown->read_ref_count.try_write()) - { - Logging::cout() << " Writer at head of queue and got the cown " - << *curr_slot << Logging::endl; - ec[std::get<0>(indexes[first_chain_index])]++; - yield(); - continue; - } - - Logging::cout() << " Writer waiting for previous readers cown " - << *curr_slot << Logging::endl; - yield(); - cown->next_writer = body; continue; } - // Release any transferred count - acquire_with_transfer(cown, transfer_count, 0); - - yield(); - Logging::cout() << " Someone in queue cown " << *curr_slot - << " previous " << *prev_slot << Logging::endl; - while (prev_slot->is_wait_2pl()) { Systematic::yield_until( @@ -856,55 +902,80 @@ namespace verona::rt Aal::pause(); } - if (curr_slot->is_read_only()) + if (new_slot->is_read_only()) { - if (prev_slot->set_next_slot_reader(curr_slot)) - { - Logging::cout() - << " Previous slot is a writer or blocked reader cown " - << *curr_slot << Logging::endl; - yield(); - continue; - } - - yield(); - bool first_reader = cown->read_ref_count.add_read(); - Logging::cout() << " Reader got the cown " << *curr_slot - << Logging::endl; - yield(); - curr_slot->set_read_available(); - ec[std::get<0>(indexes[first_chain_index])]++; - if (first_reader) - { - Logging::cout() - << "Acquiring reference count for first reader on cown " - << *curr_slot << Logging::endl; - Cown::acquire(cown); - } + auto counts = handle_read_only_enqueue(prev_slot, new_slot, cown); + chain_info[i].ref_count = std::get<0>(counts); + chain_info[i].ex_count = std::get<1>(counts); continue; } Logging::cout() << " Writer waiting for cown. Set next of previous slot cown " - << *curr_slot << " previous " << *prev_slot << Logging::endl; + << *new_slot << " previous " << *prev_slot << Logging::endl; prev_slot->set_next_slot_writer(first_body); yield(); } - // Second phase - Release phase. + // Third phase - Release phase. for (size_t i = 0; i < body_count; i++) { Logging::cout() << "Release phase for behaviour " << bodies[i] << Logging::endl; } - for (size_t i = 0; i < count; i++) + + for (size_t i = 0; i < chain_count; i++) { yield(); - auto slot = std::get<1>(indexes[i]); + auto slot = chain_info[i].last_slot; Logging::cout() << "Setting slot " << slot << " to ready" << Logging::endl; - if (slot->is_wait_2pl()) - slot->set_ready(); + slot->set_ready(); + + // TODO: We chould also set the READ_AVAILABLE here + } + + // Fourth phase - Process & Resolve + + for (size_t i = 0; i < chain_count; i++) + { + auto* cown = chain_info[i].cown; + auto first_body_index = chain_info[i].first_body_index; + auto* first_body = bodies[first_body_index]; + auto* curr_slot = chain_info[i].last_slot; + auto chain_had_no_predecessor = chain_info[i].had_no_predecessor; + auto transfer_count = chain_info[i].transfer_count; + auto ref_count = chain_info[i].ref_count; + auto ex_count = chain_info[i].ex_count; + + // Process reference count + if (chain_had_no_predecessor) + { + ref_count++; + } + acquire_with_transfer(cown, transfer_count, ref_count); + + // Process writes without predecessor + if ((chain_had_no_predecessor) && (!curr_slot->is_read_only())) + { + if (cown->read_ref_count.try_write()) + { + Logging::cout() << " Writer at head of queue and got the cown " + << *curr_slot << Logging::endl; + ex_count++; + yield(); + } + else + { + Logging::cout() << " Writer waiting for previous readers cown " + << *curr_slot << Logging::endl; + yield(); + cown->next_writer = first_body; + } + } + + // Process execution count + ec[first_body_index] += ex_count; } for (size_t i = 0; i < body_count; i++) @@ -1002,8 +1073,6 @@ namespace verona::rt { Logging::cout() << "Release slot " << *this << Logging::endl; - assert(!is_wait_2pl()); - // This slot represents a duplicate cown, so we can ignore releasing it. if (cown() == nullptr) { @@ -1011,6 +1080,8 @@ namespace verona::rt return; } + assert(!is_wait_2pl()); + if (no_successor()) { auto slot_addr = this;