Skip to content

Commit

Permalink
Split schedule many
Browse files Browse the repository at this point in the history
  • Loading branch information
marioskogias committed Sep 27, 2024
1 parent 9514989 commit a88d2c4
Showing 1 changed file with 134 additions and 81 deletions.
215 changes: 134 additions & 81 deletions src/rt/sched/behaviourcore.h
Original file line number Diff line number Diff line change
Expand Up @@ -664,9 +664,10 @@ namespace verona::rt
Logging::cout() << "BehaviourCore::schedule_many" << body_count
<< Logging::endl;

size_t count = 0;
// non-unique cowns count
size_t cown_count = 0;
for (size_t i = 0; i < body_count; i++)
count += bodies[i]->count;
cown_count += bodies[i]->count;

// Execution count - we will remove at least
// one from the execution count on finishing phase 2 of the
Expand All @@ -676,19 +677,23 @@ namespace verona::rt
for (size_t i = 0; i < body_count; i++)
ec[i] = 1;

// This array includes an entry for each of the requested cowns.
// The entry has an index into the bodies array and a Slot * for that
// cown inside the behaviour body
// Need to sort the cown requests across the co-scheduled collection of
// cowns We first construct an array that represents pairs of behaviour
// number and slot pointer. Note: Really want a dynamically sized stack
// allocation here.
StackArray<std::tuple<size_t, Slot*>> indexes(count);
StackArray<std::tuple<size_t, Slot*>> cown_to_behaviour_slot_map(
cown_count);
size_t idx = 0;
for (size_t i = 0; i < body_count; i++)
{
auto slots = bodies[i]->get_slots();
for (size_t j = 0; j < bodies[i]->count; j++)
{
std::get<0>(indexes[idx]) = i;
std::get<1>(indexes[idx]) = &slots[j];
std::get<0>(cown_to_behaviour_slot_map[idx]) = i;
std::get<1>(cown_to_behaviour_slot_map[idx]) = &slots[j];
idx++;
}
}
Expand Down Expand Up @@ -735,178 +740,226 @@ namespace verona::rt
return std::get<1>(i)->cown() < std::get<1>(j)->cown();
#endif
};
if (count > 1)
std::sort(indexes.get(), indexes.get() + count, compare);

// First phase - Acquire phase.
if (cown_count > 1)
std::sort(
cown_to_behaviour_slot_map.get(),
cown_to_behaviour_slot_map.get() + cown_count,
compare);

// First phase - Prepare phase
// For each unique chain you need to keep the cown, the first index,
// the Slot for that cown in the last body in the chain
// the transfer count for reference counting, and a bool that will be
// used later to indicate that the chain had no predecessor
size_t i = 0;
while (i < count)
size_t chain_count = 0;
StackArray<std::tuple<Cown*, size_t, Slot*, size_t, bool>> chain_info(
cown_count);

while (i < cown_count)
{
auto cown = std::get<1>(indexes[i])->cown();
auto body = bodies[std::get<0>(indexes[i])];
auto curr_slot = std::get<1>(indexes[i]);
auto first_body = body;
size_t first_chain_index = i;
auto cown = std::get<1>(cown_to_behaviour_slot_map[i])->cown();
auto body = bodies[std::get<0>(cown_to_behaviour_slot_map[i])];
auto last_slot = std::get<1>(cown_to_behaviour_slot_map[i]);
size_t first_body_index = i;

// The number of RCs provided for the current cown by the when.
// I.e. how many moves of cown_refs there were.
size_t transfer_count = curr_slot->is_move();
size_t transfer_count = last_slot->is_move();

Logging::cout() << "Processing " << cown << " " << body << " "
<< curr_slot << " Index " << i << Logging::endl;
<< last_slot << " Index " << i << Logging::endl;

// Detect duplicates for this cown.
// This is required in two cases:
// * overlaps within a single behaviour.
while (((++i) < count) && (cown == std::get<1>(indexes[i])->cown()))
while (((++i) < cown_count) &&
(cown == std::get<1>(cown_to_behaviour_slot_map[i])->cown()))
{
// If the body is the same, then we have an overlap within a single
// behaviour.
auto body_next = bodies[std::get<0>(indexes[i])];
auto body_next = bodies[std::get<0>(cown_to_behaviour_slot_map[i])];
if (body_next == body)
{
// Check if the caller passed an RC and add to the total.
transfer_count += std::get<1>(indexes[i])->is_move();
transfer_count +=
std::get<1>(cown_to_behaviour_slot_map[i])->is_move();

Logging::cout() << "Duplicate " << cown << " for " << body
<< " Index " << i << Logging::endl;
// We need to reduce the execution count by one, as we can't wait
// for ourselves.
ec[std::get<0>(indexes[i])]++;
ec[std::get<0>(cown_to_behaviour_slot_map[i])]++;

// We need to mark the slot as not having a cown associated to it.
std::get<1>(indexes[i])->set_cown_null();
std::get<1>(cown_to_behaviour_slot_map[i])->set_cown_null();
continue;
}

// For writers, create a chain of behaviours
if (!std::get<1>(indexes[i])->is_read_only())
if (!std::get<1>(cown_to_behaviour_slot_map[i])->is_read_only())
{
body = body_next;

// Extend the chain of behaviours linking on this behaviour
curr_slot->set_next_slot_writer(body);
curr_slot = std::get<1>(indexes[i]);
last_slot->set_next_slot_writer(body);
last_slot = std::get<1>(cown_to_behaviour_slot_map[i]);
continue;
}

// TODO: Chain with reads and writes is not implemented.
abort();
}

// For each chain you need the cown, the first and the last body of the
// chain
std::get<0>(chain_info[chain_count]) = cown;
std::get<1>(chain_info[chain_count]) = first_body_index;
std::get<2>(chain_info[chain_count]) = last_slot;
std::get<3>(chain_info[chain_count]) = transfer_count;
std::get<4>(chain_info[chain_count++]) = false;

// Mark the slot as ready for scheduling
curr_slot->reset_status();
yield();
if (curr_slot->is_read_only())
curr_slot->set_behaviour(body);
last_slot->reset_status();
yield();
if (last_slot->is_read_only())
last_slot->set_behaviour(body);
}

// Second phase - Acquire phase
for (size_t i = 0; i < chain_count; i++)
{
auto* cown = std::get<0>(chain_info[i]);
auto first_body_index = std::get<1>(chain_info[i]);
auto* first_body = bodies[first_body_index];
auto* new_slot = std::get<2>(chain_info[i]);

auto prev_slot =
cown->last_slot.exchange(curr_slot, std::memory_order_acq_rel);
cown->last_slot.exchange(new_slot, std::memory_order_acq_rel);

yield();

if (prev_slot == nullptr)
{
if (curr_slot->is_read_only())
{
yield();
bool first_reader = cown->read_ref_count.add_read();
Logging::cout() << "Reader at head of queue and got the cown "
<< *curr_slot << Logging::endl;
yield();
curr_slot->set_read_available();
ec[std::get<0>(indexes[first_chain_index])]++;
yield();
acquire_with_transfer(cown, transfer_count, 1 + first_reader);
continue;
}

yield();

acquire_with_transfer(cown, transfer_count, 1);

yield();

if (cown->read_ref_count.try_write())
{
Logging::cout() << " Writer at head of queue and got the cown "
<< *curr_slot << Logging::endl;
ec[std::get<0>(indexes[first_chain_index])]++;
yield();
continue;
}

Logging::cout() << " Writer waiting for previous readers cown "
<< *curr_slot << Logging::endl;
yield();
cown->next_writer = body;
std::get<4>(chain_info[i]) = true;
continue;
}

// Release any transferred count
acquire_with_transfer(cown, transfer_count, 0);

yield();
Logging::cout() << " Someone in queue cown " << *curr_slot
<< " previous " << *prev_slot << Logging::endl;

while (prev_slot->is_wait_2pl())
{
Systematic::yield_until(
[prev_slot]() { return !prev_slot->is_wait_2pl(); });
Aal::pause();
}

if (curr_slot->is_read_only())
if (new_slot->is_read_only())
{
if (prev_slot->set_next_slot_reader(curr_slot))
if (prev_slot->set_next_slot_reader(new_slot))
{
Logging::cout()
<< " Previous slot is a writer or blocked reader cown "
<< *curr_slot << Logging::endl;
<< *new_slot << Logging::endl;
yield();
continue;
}

yield();
bool first_reader = cown->read_ref_count.add_read();
Logging::cout() << " Reader got the cown " << *curr_slot
Logging::cout() << " Reader got the cown " << *new_slot
<< Logging::endl;
yield();
curr_slot->set_read_available();
ec[std::get<0>(indexes[first_chain_index])]++;
new_slot->set_read_available();
ec[first_body_index]++;
if (first_reader)
{
Logging::cout()
<< "Acquiring reference count for first reader on cown "
<< *curr_slot << Logging::endl;
<< *new_slot << Logging::endl;
Cown::acquire(cown);
}
continue;
}

Logging::cout()
<< " Writer waiting for cown. Set next of previous slot cown "
<< *curr_slot << " previous " << *prev_slot << Logging::endl;
<< *new_slot << " previous " << *prev_slot << Logging::endl;
prev_slot->set_next_slot_writer(first_body);
yield();
}

// Second phase - Release phase.
// Third phase - Release phase.
for (size_t i = 0; i < body_count; i++)
{
Logging::cout() << "Release phase for behaviour " << bodies[i]
<< Logging::endl;
}
for (size_t i = 0; i < count; i++)
// FIXME: Shouldn't this be only over chains?
// Do we avoid the if this way?
for (size_t i = 0; i < cown_count; i++)
{
yield();
auto slot = std::get<1>(indexes[i]);
auto slot = std::get<1>(cown_to_behaviour_slot_map[i]);
Logging::cout() << "Setting slot " << slot << " to ready"
<< Logging::endl;
if (slot->is_wait_2pl())
slot->set_ready();
}

// Fourth phase - Process & Resolve

// Process first reference count and the chains with no predecessor
for (size_t i = 0; i < chain_count; i++)
{
auto* cown = std::get<0>(chain_info[i]);
auto first_body_index = std::get<1>(chain_info[i]);
auto* first_body = bodies[first_body_index];
auto* curr_slot = std::get<2>(chain_info[i]);
auto chain_had_predecessor = std::get<4>(chain_info[i]);
auto transfer_count = std::get<3>(chain_info[i]);

if (chain_had_predecessor)
{
acquire_with_transfer(cown, transfer_count, 0);
}
else
{
if (curr_slot->is_read_only())
{
yield();
bool first_reader = cown->read_ref_count.add_read();
Logging::cout() << "Reader at head of queue and got the cown "
<< *curr_slot << Logging::endl;
yield();
curr_slot->set_read_available();
ec[first_body_index]++;
yield();
acquire_with_transfer(cown, transfer_count, 1 + first_reader);
continue;
}

yield();

acquire_with_transfer(cown, transfer_count, 1);

yield();

if (cown->read_ref_count.try_write())
{
Logging::cout() << " Writer at head of queue and got the cown "
<< *curr_slot << Logging::endl;
ec[first_body_index]++;
yield();
continue;
}

Logging::cout() << " Writer waiting for previous readers cown "
<< *curr_slot << Logging::endl;
yield();
// Was this a bug in the previous implementation?
cown->next_writer = first_body;
}
}

for (size_t i = 0; i < body_count; i++)
{
yield();
Expand Down

0 comments on commit a88d2c4

Please sign in to comment.