From dfc86b3e1bfc464f3a0ef792389016588e215f6f Mon Sep 17 00:00:00 2001 From: Carlos Segarra Date: Fri, 26 Apr 2024 05:41:26 +0000 Subject: [PATCH] planner: introduce SPOT VMs policy --- .env | 4 +- .github/workflows/tests.yml | 12 +- VERSION | 2 +- .../faabric/batch-scheduler/BatchScheduler.h | 5 + .../faabric/batch-scheduler/SpotScheduler.h | 33 ++ include/faabric/planner/Planner.h | 13 + include/faabric/planner/PlannerState.h | 18 + include/faabric/util/func.h | 9 + src/batch-scheduler/BatchScheduler.cpp | 3 + src/batch-scheduler/CMakeLists.txt | 1 + src/batch-scheduler/CompactScheduler.cpp | 8 +- src/batch-scheduler/SpotScheduler.cpp | 329 +++++++++++++ src/executor/Executor.cpp | 29 +- src/planner/Planner.cpp | 293 +++++++++++- src/planner/PlannerEndpointHandler.cpp | 51 ++ src/planner/planner.proto | 18 +- src/scheduler/Scheduler.cpp | 9 + tests/dist/mpi/mpi_native.cpp | 37 +- tests/dist/mpi/test_multiple_mpi_worlds.cpp | 164 +++++++ .../batch-scheduler/test_spot_scheduler.cpp | 450 ++++++++++++++++++ tests/test/planner/test_planner_endpoint.cpp | 67 +++ tests/test/scheduler/test_scheduler.cpp | 2 +- .../test_message_endpoint_client.cpp | 12 - tests/utils/faabric_utils.h | 5 + tests/utils/fixtures.h | 6 + tests/utils/planner_utils.cpp | 30 ++ 26 files changed, 1560 insertions(+), 50 deletions(-) create mode 100644 include/faabric/batch-scheduler/SpotScheduler.h create mode 100644 src/batch-scheduler/SpotScheduler.cpp create mode 100644 tests/test/batch-scheduler/test_spot_scheduler.cpp diff --git a/.env b/.env index a17db033a..a379ae624 100644 --- a/.env +++ b/.env @@ -1,4 +1,4 @@ -FAABRIC_VERSION=0.18.0 -FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.18.0 +FAABRIC_VERSION=0.19.0 +FAABRIC_CLI_IMAGE=faasm.azurecr.io/faabric:0.19.0 COMPOSE_PROJECT_NAME=faabric-dev CONAN_CACHE_MOUNT_SOURCE=./conan-cache/ diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ecac5e483..cb924bb6b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -20,7 +20,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.18.0 + image: faasm.azurecr.io/faabric:0.19.0 env: DEPLOYMENT_TYPE: gha-ci steps: @@ -34,7 +34,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.18.0 + image: faasm.azurecr.io/faabric:0.19.0 steps: - name: "Check out code" uses: actions/checkout@v4 @@ -45,7 +45,7 @@ jobs: if: github.event.pull_request.draft == false runs-on: ubuntu-latest container: - image: faasm.azurecr.io/faabric:0.18.0 + image: faasm.azurecr.io/faabric:0.19.0 steps: - name: "Check out code" uses: actions/checkout@v4 @@ -65,7 +65,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.18.0 + image: faasm.azurecr.io/faabric:0.19.0 options: --privileged services: redis: @@ -104,7 +104,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.18.0 + image: faasm.azurecr.io/faabric:0.19.0 options: --privileged services: redis: @@ -156,7 +156,7 @@ jobs: REDIS_QUEUE_HOST: redis REDIS_STATE_HOST: redis container: - image: faasm.azurecr.io/faabric:0.18.0 + image: faasm.azurecr.io/faabric:0.19.0 services: redis: image: redis diff --git a/VERSION b/VERSION index 66333910a..1cf0537c3 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.18.0 +0.19.0 diff --git a/include/faabric/batch-scheduler/BatchScheduler.h b/include/faabric/batch-scheduler/BatchScheduler.h index fd9974e7c..be3ceef02 100644 --- a/include/faabric/batch-scheduler/BatchScheduler.h +++ b/include/faabric/batch-scheduler/BatchScheduler.h @@ -12,6 +12,11 @@ #define NOT_ENOUGH_SLOTS_DECISION \ faabric::batch_scheduler::SchedulingDecision(NOT_ENOUGH_SLOTS, \ NOT_ENOUGH_SLOTS) +#define MUST_FREEZE -97 +#define MUST_FREEZE_DECISION \ + faabric::batch_scheduler::SchedulingDecision(MUST_FREEZE, MUST_FREEZE) + +#define MUST_EVICT_IP "E.VI.CT.ME" namespace faabric::batch_scheduler { diff --git a/include/faabric/batch-scheduler/SpotScheduler.h b/include/faabric/batch-scheduler/SpotScheduler.h new file mode 100644 index 000000000..ff4db3e92 --- /dev/null +++ b/include/faabric/batch-scheduler/SpotScheduler.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include + +namespace faabric::batch_scheduler { + +// This batch scheduler behaves in the same way than BinPack for NEW and +// SCALE_CHANGE requests, but for DIST_CHANGE it considers if any of the +// hosts in the Host Map have been tainted with the eviction mark. In which +// case it first tries to migrate them to other running hosts and, if not +// enough hosts are available, freezes the messages. +class SpotScheduler final : public BatchScheduler +{ + public: + std::shared_ptr makeSchedulingDecision( + HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req) override; + + private: + bool isFirstDecisionBetter( + std::shared_ptr decisionA, + std::shared_ptr decisionB) override; + + std::vector getSortedHosts( + HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req, + const DecisionType& decisionType) override; +}; +} diff --git a/include/faabric/planner/Planner.h b/include/faabric/planner/Planner.h index 0c28aeeee..597ac9fd2 100644 --- a/include/faabric/planner/Planner.h +++ b/include/faabric/planner/Planner.h @@ -33,6 +33,8 @@ class Planner void printConfig() const; + std::string getPolicy(); + void setPolicy(const std::string& newPolicy); // ---------- @@ -87,10 +89,21 @@ class Planner // the planner was last reset int getNumMigrations(); + // Helper method to get the next host that will be evicted + std::string getNextEvictedHostIp(); + + std::map> getEvictedReqs(); + // Main entrypoint to request the execution of batches std::shared_ptr callBatch( std::shared_ptr req); + // ---------- + // API exclusive to SPOT policy mode + // ---------- + + void setNextEvictedVm(const std::string& vmIp); + private: // There's a singleton instance of the planner running, but it must allow // concurrent requests diff --git a/include/faabric/planner/PlannerState.h b/include/faabric/planner/PlannerState.h index 92f77006a..0b5e7d97b 100644 --- a/include/faabric/planner/PlannerState.h +++ b/include/faabric/planner/PlannerState.h @@ -12,6 +12,10 @@ namespace faabric::planner { */ struct PlannerState { + // Policy to operate the planner in. Mostly determins the batch scheduler + // behaviour, but also the planner's in some cases + std::string policy; + // Accounting of the hosts that are registered in the system and responsive // We deliberately use the host's IP as unique key, but assign a unique host // id for redundancy @@ -36,5 +40,19 @@ struct PlannerState // Helper coutner of the total number of migrations std::atomic numMigrations = 0; + + // ----- + // Data structures used only under the SPOT policy + // ----- + + // Map containing the BER that have been evicted due to a SPOT VM eviction. + // All messages in the VM have been checkpointed, are in the snapshot + // registry in the planner, and are ready to be scheduled when capacity + // appears + std::map> evictedRequests; + + // This variable simulates the values we would get from a cloud provider's + // API indicating the (set of) VM to be evicted next + std::string nextEvictedHostIp; }; } diff --git a/include/faabric/util/func.h b/include/faabric/util/func.h index e8ec5b93e..35aa1ef1b 100644 --- a/include/faabric/util/func.h +++ b/include/faabric/util/func.h @@ -6,9 +6,18 @@ #include #define MIGRATED_FUNCTION_RETURN_VALUE -99 +#define FROZEN_FUNCTION_RETURN_VALUE -98 namespace faabric::util { +class FunctionFrozenException : public faabric::util::FaabricException +{ + public: + explicit FunctionFrozenException(std::string message) + : FaabricException(std::move(message)) + {} +}; + class FunctionMigratedException : public faabric::util::FaabricException { public: diff --git a/src/batch-scheduler/BatchScheduler.cpp b/src/batch-scheduler/BatchScheduler.cpp index 58dc27d35..416a595d4 100644 --- a/src/batch-scheduler/BatchScheduler.cpp +++ b/src/batch-scheduler/BatchScheduler.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -23,6 +24,8 @@ std::shared_ptr getBatchScheduler() batchScheduler = std::make_shared(); } else if (mode == "compact") { batchScheduler = std::make_shared(); + } else if (mode == "spot") { + batchScheduler = std::make_shared(); } else { SPDLOG_ERROR("Unrecognised batch scheduler mode: {}", mode); throw std::runtime_error("Unrecognised batch scheduler mode"); diff --git a/src/batch-scheduler/CMakeLists.txt b/src/batch-scheduler/CMakeLists.txt index 7a73ddcfa..79ebdd789 100644 --- a/src/batch-scheduler/CMakeLists.txt +++ b/src/batch-scheduler/CMakeLists.txt @@ -7,6 +7,7 @@ faabric_lib(batch_scheduler BatchScheduler.cpp BinPackScheduler.cpp CompactScheduler.cpp + SpotScheduler.cpp ) target_link_libraries(batch_scheduler PRIVATE diff --git a/src/batch-scheduler/CompactScheduler.cpp b/src/batch-scheduler/CompactScheduler.cpp index f37270dc4..b623afe18 100644 --- a/src/batch-scheduler/CompactScheduler.cpp +++ b/src/batch-scheduler/CompactScheduler.cpp @@ -98,7 +98,7 @@ bool CompactScheduler::isFirstDecisionBetter( throw std::runtime_error("Method not supported for COMPACT scheduler"); } -HostMap deepCopyHostMap(const HostMap& hostMap) +static HostMap deepCopyHostMap(const HostMap& hostMap) { HostMap newHostMap; @@ -173,9 +173,9 @@ bool CompactScheduler::isFirstDecisionBetter( // Filter-out from the host map all nodes that are executing requests from a // different user -void filterHosts(HostMap& hostMap, - const InFlightReqs& inFlightReqs, - std::shared_ptr req) +static void filterHosts(HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req) { // We temporarily use the request subtype field to attach a user id for our // multi-tenant simulations diff --git a/src/batch-scheduler/SpotScheduler.cpp b/src/batch-scheduler/SpotScheduler.cpp new file mode 100644 index 000000000..2b21ea41a --- /dev/null +++ b/src/batch-scheduler/SpotScheduler.cpp @@ -0,0 +1,329 @@ +#include +#include +#include +#include + +namespace faabric::batch_scheduler { + +static std::map getHostFreqCount( + std::shared_ptr decision) +{ + std::map hostFreqCount; + for (auto host : decision->hosts) { + hostFreqCount[host] += 1; + } + + return hostFreqCount; +} + +// Given a new decision that improves on an old decision (i.e. to migrate), we +// want to make sure that we minimise the number of migration requests we send. +// This is, we want to keep as many host-message scheduling in the old decision +// as possible, and also have the overall locality of the new decision (i.e. +// the host-message histogram) +// NOTE: keep in mind that the newDecision has the right host histogram, but +// the messages may be completely out-of-order +static std::shared_ptr minimiseNumOfMigrations( + std::shared_ptr newDecision, + std::shared_ptr oldDecision) +{ + auto decision = std::make_shared(oldDecision->appId, + oldDecision->groupId); + + // We want to maintain the new decision's host-message histogram + auto hostFreqCount = getHostFreqCount(newDecision); + + // Helper function to find the next host in the histogram with slots + auto nextHostWithSlots = [&hostFreqCount]() -> std::string { + for (auto [ip, slots] : hostFreqCount) { + if (slots > 0) { + return ip; + } + } + + // Unreachable (in this context) + throw std::runtime_error("No next host with slots found!"); + }; + + assert(newDecision->hosts.size() == oldDecision->hosts.size()); + + // First we try to allocate to each message the same host they used to have + for (int i = 0; i < oldDecision->hosts.size(); i++) { + auto oldHost = oldDecision->hosts.at(i); + + if (hostFreqCount.contains(oldHost) && hostFreqCount.at(oldHost) > 0) { + decision->addMessageInPosition(i, + oldHost, + oldDecision->messageIds.at(i), + oldDecision->appIdxs.at(i), + oldDecision->groupIdxs.at(i), + oldDecision->mpiPorts.at(i)); + + hostFreqCount.at(oldHost) -= 1; + } + } + + // Second we allocate the rest + for (int i = 0; i < oldDecision->hosts.size(); i++) { + if (decision->nFunctions <= i || decision->hosts.at(i).empty()) { + + auto nextHost = nextHostWithSlots(); + decision->addMessageInPosition(i, + nextHost, + oldDecision->messageIds.at(i), + oldDecision->appIdxs.at(i), + oldDecision->groupIdxs.at(i), + -1); + + hostFreqCount.at(nextHost) -= 1; + } + } + + // Assert that we have preserved the new decision's host-message histogram + // (use the pre-processor macro as we assert repeatedly in the loop, so we + // want to avoid having an empty loop in non-debug mode) +#ifndef NDEBUG + for (auto [host, freq] : hostFreqCount) { + assert(freq == 0); + } +#endif + + return decision; +} + +bool SpotScheduler::isFirstDecisionBetter( + std::shared_ptr decisionA, + std::shared_ptr decisionB) +{ + throw std::runtime_error("Method not supported for COMPACT scheduler"); +} + +// Filter-out from the host map the next VM that will be evicted +static std::string filterHosts(HostMap& hostMap) +{ + std::string ipToRemove; + + for (const auto& [hostIp, host] : hostMap) { + if (host->ip == MUST_EVICT_IP) { + ipToRemove = hostIp; + } + } + + hostMap.erase(ipToRemove); + + return ipToRemove; +} + +std::vector SpotScheduler::getSortedHosts( + HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req, + const DecisionType& decisionType) +{ + std::vector sortedHosts; + for (auto [ip, host] : hostMap) { + sortedHosts.push_back(host); + } + + std::shared_ptr oldDecision = nullptr; + std::map hostFreqCount; + if (decisionType != DecisionType::NEW) { + oldDecision = inFlightReqs.at(req->appid()).second; + hostFreqCount = getHostFreqCount(oldDecision); + } + + auto isFirstHostLarger = [&](const Host& hostA, const Host& hostB) -> bool { + // The SPOT scheduler sorts hosts by number of available slots + int nAvailableA = numSlotsAvailable(hostA); + int nAvailableB = numSlotsAvailable(hostB); + if (nAvailableA != nAvailableB) { + return nAvailableA > nAvailableB; + } + + // In case of a tie, it will pick larger hosts first + int nSlotsA = numSlots(hostA); + int nSlotsB = numSlots(hostB); + if (nSlotsA != nSlotsB) { + return nSlotsA > nSlotsB; + } + + // Lastly, in case of a tie, return the largest host alphabetically + return getIp(hostA) > getIp(hostB); + }; + + auto isFirstHostLargerWithFreq = [&](auto hostA, auto hostB) -> bool { + // When updating an existing scheduling decision (SCALE_CHANGE or + // DIST_CHANGE), the SPOT scheduler takes into consideration the + // existing host-message histogram (i.e. how many messages for this app + // does each host _already_ run). This behaviour is the same than the + // BIN_PACK and COMPACT policies + + int numInHostA = hostFreqCount.contains(getIp(hostA)) + ? hostFreqCount.at(getIp(hostA)) + : 0; + int numInHostB = hostFreqCount.contains(getIp(hostB)) + ? hostFreqCount.at(getIp(hostB)) + : 0; + + // If at least one of the hosts has messages for this request, return + // the host with the more messages for this request (note that it is + // possible that this host has no available slots at all, in this case + // we will just pack 0 messages here but we still want to sort it first + // nontheless) + if (numInHostA != numInHostB) { + return numInHostA > numInHostB; + } + + // In case of a tie, use the same criteria than NEW + return isFirstHostLarger(hostA, hostB); + }; + + switch (decisionType) { + case DecisionType::NEW: { + // For a NEW decision type, the SPOT scheduler just sorts the + // hosts in decreasing order of capacity, and bin-packs messages + // to hosts in this order. This has one caveat that it skips the + // next VM that we know will be evicted + std::sort( + sortedHosts.begin(), sortedHosts.end(), isFirstHostLarger); + + break; + } + case DecisionType::SCALE_CHANGE: { + // If we are changing the scale of a running app (i.e. via chaining + // or thread/process forking) we want to prioritise co-locating + // as much as possible. This means that we will sort first by the + // frequency of messages of the running app, and second with the + // same criteria than NEW + // IMPORTANT: a SCALE_CHANGE request with 4 messages means that we + // want to add 4 NEW messages to the running app (not that the new + // total count is 4) + std::sort(sortedHosts.begin(), + sortedHosts.end(), + isFirstHostLargerWithFreq); + + break; + } + case DecisionType::DIST_CHANGE: { + // A DIST_CHANGE with the SPOT scheduler means that, if the app + // is running any messages of the to-be-evicted VM, we must move + // them from there. Two things may happen: + // * We have slots to move them-to (equivalent to re-scheduling + // from scratch without the tainted VM) + // * We do not have slots to move them-to, in which case all + // messages need to freeze until there is capacity in the cluster + // again + + auto oldDecision = inFlightReqs.at(req->appid()).second; + auto hostFreqCount = getHostFreqCount(oldDecision); + + // First remove the slots the app occupies to have a fresh new + // shot at the scheduling + for (auto host : sortedHosts) { + if (hostFreqCount.contains(getIp(host))) { + freeSlots(host, hostFreqCount.at(getIp(host))); + } + } + + // Try to schedule again without the tainted VM. Note that this + // app may not be using the tainted VM _at all_ in which case we + // will just discard the suggested migration. + std::sort(sortedHosts.begin(), + sortedHosts.end(), + isFirstHostLargerWithFreq); + + break; + } + default: { + SPDLOG_ERROR("Unrecognised decision type: {}", decisionType); + throw std::runtime_error("Unrecognised decision type"); + } + } + + return sortedHosts; +} + +// The BinPack's scheduler decision algorithm is very simple. It first sorts +// hosts (i.e. bins) in a specific order (depending on the scheduling type), +// and then starts filling bins from begining to end, until it runs out of +// messages to schedule. The SPOT scheduler behaves as the BinPack for +// NEW and SCALE_CHANGE requests, with two caveats: +// - it avoids setting any messages to a host that is going to be evicted +// - when migrating, it will check if the migration candidate has any messages +// running in the to-be-evicted VM. If so, it will try to migrate messages +// away from the evicted-to-VM. If it cannot, it will request the app to +// INTERRUPT +std::shared_ptr SpotScheduler::makeSchedulingDecision( + HostMap& hostMap, + const InFlightReqs& inFlightReqs, + std::shared_ptr req) +{ + auto decision = std::make_shared(req->appid(), 0); + + // Filter the hosts removing the VM that will be evicted next + std::string evictedHostIp = filterHosts(hostMap); + + // Get the sorted list of hosts + auto decisionType = getDecisionType(inFlightReqs, req); + auto sortedHosts = getSortedHosts(hostMap, inFlightReqs, req, decisionType); + + // Assign slots from the list (i.e. bin-pack) + auto itr = sortedHosts.begin(); + int numLeftToSchedule = req->messages_size(); + int msgIdx = 0; + while (itr < sortedHosts.end()) { + // Calculate how many slots can we assign to this host (assign as many + // as possible) + int numOnThisHost = + std::min(numLeftToSchedule, numSlotsAvailable(*itr)); + for (int i = 0; i < numOnThisHost; i++) { + decision->addMessage(getIp(*itr), req->messages(msgIdx)); + msgIdx++; + } + + // Update the number of messages left to schedule + numLeftToSchedule -= numOnThisHost; + + // If there are no more messages to schedule, we are done + if (numLeftToSchedule == 0) { + break; + } + + // Otherwise, it means that we have exhausted this host, and need to + // check in the next one + itr++; + } + + bool isDistChange = decisionType == DecisionType::DIST_CHANGE; + + // If we still have enough slots to schedule, we are out of slots + if (numLeftToSchedule > 0 && !isDistChange) { + return std::make_shared(NOT_ENOUGH_SLOTS_DECISION); + } + + if (isDistChange) { + // If we ran out of slots whilst processing a migration request it + // means that we have some messages running in the to-be-evicted VM + // and we can not migrate them elsewhere. In this case we must FREEZE + // all messages + if (numLeftToSchedule > 0) { + return std::make_shared(MUST_FREEZE_DECISION); + } + + // Check if we are running any messages in the to-be evicted VM. Only + // migrate if we are + auto oldDecision = inFlightReqs.at(req->appid()).second; + for (const auto& hostIp : oldDecision->hosts) { + if (hostIp == evictedHostIp) { + // If we are requesting a migration, make sure that we minimise + // the number of messages to actuall migrate + return minimiseNumOfMigrations(decision, oldDecision); + } + } + + return std::make_shared(DO_NOT_MIGRATE_DECISION); + } + + return decision; +} +} diff --git a/src/executor/Executor.cpp b/src/executor/Executor.cpp index 32a91d776..c58f2093b 100644 --- a/src/executor/Executor.cpp +++ b/src/executor/Executor.cpp @@ -414,6 +414,32 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) } } } + } catch (const faabric::util::FunctionFrozenException& ex) { + SPDLOG_DEBUG( + "Task {} frozen, shutting down executor {}", msg.id(), id); + + returnValue = FROZEN_FUNCTION_RETURN_VALUE; + + // TODO: maybe we do not need this here as we know this VM will be + // destroyed soon, but we do it nontheless just in case + if (msg.ismpi()) { + auto& mpiWorldRegistry = faabric::mpi::getMpiWorldRegistry(); + if (mpiWorldRegistry.worldExists(msg.mpiworldid())) { + bool mustClear = + mpiWorldRegistry.getWorld(msg.mpiworldid()).destroy(); + + if (mustClear) { + SPDLOG_DEBUG("{}:{}:{} clearing world {} from host {}", + msg.appid(), + msg.groupid(), + msg.groupidx(), + msg.mpiworldid(), + msg.executedhost()); + + mpiWorldRegistry.clearWorld(msg.mpiworldid()); + } + } + } } catch (const std::exception& ex) { returnValue = 1; @@ -482,8 +508,7 @@ void Executor::threadPoolThread(std::stop_token st, int threadPoolIdx) // main host we still have the zero-th thread executing) auto mainThreadSnapKey = faabric::util::getMainThreadSnapshotKey(msg); std::vector diffs; - // FIXME: thread 0 locally is not part of this batch, but is still - // in the same executor + bool isRemoteThread = task.req->messages(0).mainhost() != conf.endpointHost; if (isLastThreadInBatch && doDirtyTracking && isRemoteThread) { diff --git a/src/planner/Planner.cpp b/src/planner/Planner.cpp index f2a4aba02..21530b984 100644 --- a/src/planner/Planner.cpp +++ b/src/planner/Planner.cpp @@ -132,11 +132,19 @@ void Planner::printConfig() const SPDLOG_INFO("HTTP_SERVER_THREADS {}", config.numthreadshttpserver()); } +std::string Planner::getPolicy() +{ + faabric::util::SharedLock lock(plannerMx); + + return state.policy; +} + void Planner::setPolicy(const std::string& newPolicy) { // Acquire lock to prevent any changes in state whilst we change the policy faabric::util::FullLock lock(plannerMx); + state.policy = newPolicy; faabric::batch_scheduler::resetBatchScheduler(newPolicy); } @@ -193,10 +201,16 @@ void Planner::flushSchedulingState() { faabric::util::FullLock lock(plannerMx); + state.policy = "bin-pack"; + state.inFlightReqs.clear(); state.appResults.clear(); state.appResultWaiters.clear(); + state.numMigrations = 0; + + state.evictedRequests.clear(); + state.nextEvictedHostIp = ""; } std::vector> Planner::getAvailableHosts() @@ -349,14 +363,57 @@ void Planner::setMessageResult(std::shared_ptr msg) msg->groupid(), msg->groupidx()); + // If we are setting the result for a frozen message, it is important + // that we store the message itself in the evicted BER as it contains + // information like the function pointer and snapshot key to eventually + // un-freeze from. In addition, we want to skip setting the message result + // as we will set it when the message finally succeeds + bool isFrozenMsg = msg->returnvalue() == FROZEN_FUNCTION_RETURN_VALUE; + if (isFrozenMsg) { + if (!state.evictedRequests.contains(msg->appid())) { + SPDLOG_ERROR("Message {} is frozen but app (id: {}) not in map!", + msg->id(), + msg->appid()); + throw std::runtime_error("Orphaned frozen message!"); + } + + auto ber = state.evictedRequests.at(msg->appid()); + bool found = false; + for (int i = 0; i < ber->messages_size(); i++) { + if (ber->messages(i).id() == msg->id()) { + SPDLOG_DEBUG("Setting message {} in the forzen BER for app {}", + msg->id(), + appId); + + // Propagate the fields that we set during migration + ber->mutable_messages(i)->set_funcptr(msg->funcptr()); + ber->mutable_messages(i)->set_inputdata(msg->inputdata()); + ber->mutable_messages(i)->set_snapshotkey(msg->snapshotkey()); + ber->mutable_messages(i)->set_returnvalue(msg->returnvalue()); + + found = true; + break; + } + } + + if (!found) { + SPDLOG_ERROR( + "Error trying to set message {} in the frozen BER for app {}", + msg->id(), + appId); + } + } + // Release the slot only once assert(state.hostMap.contains(msg->executedhost())); - if (!state.appResults[appId].contains(msgId)) { + if (!state.appResults[appId].contains(msgId) || isFrozenMsg) { releaseHostSlots(state.hostMap.at(msg->executedhost())); } // Set the result - state.appResults[appId][msgId] = msg; + if (!isFrozenMsg) { + state.appResults[appId][msgId] = msg; + } // Remove the message from the in-flight requests if (!state.inFlightReqs.contains(appId)) { @@ -414,6 +471,12 @@ void Planner::setMessageResult(std::shared_ptr msg) } } + // When setting a frozen's message result, we can skip notifying waiting + // hosts + if (isFrozenMsg) { + return; + } + // Finally, dispatch an async message to all hosts that are waiting once // all planner accounting is updated if (state.appResultWaiters.find(msgId) != state.appResultWaiters.end()) { @@ -537,20 +600,77 @@ std::shared_ptr Planner::getBatchResults( { auto berStatus = faabric::util::batchExecStatusFactory(appId); + // When querying for the result of a batch we always check if it has been + // evicted, as it is one of the triggers to try and re-schedule it again + bool isFrozen = false; + std::shared_ptr frozenBer = nullptr; + // Acquire a read lock to copy all the results we have for this batch { faabric::util::SharedLock lock(plannerMx); - if (!state.appResults.contains(appId)) { - return nullptr; + if (state.evictedRequests.contains(appId)) { + isFrozen = true; + + // To prevent race conditions, before treating an app as frozen + // we require all messages to have reported as frozen + for (const auto& msg : + state.evictedRequests.at(appId)->messages()) { + if (msg.returnvalue() != FROZEN_FUNCTION_RETURN_VALUE) { + isFrozen = false; + } + } + + if (isFrozen) { + frozenBer = state.evictedRequests.at(appId); + + // If the app is frozen (i.e. all messages have frozen) it + // should not be fully in the in-flight map anymore + if (state.inFlightReqs.contains(appId) && + frozenBer->messages_size() == + state.inFlightReqs.at(appId).first->messages_size()) { + SPDLOG_ERROR("Inconsistent planner state: app {} is both " + "frozen and in-flight!", + appId); + return nullptr; + } + } + } + + if (!isFrozen) { + if (!state.appResults.contains(appId)) { + return nullptr; + } + + for (auto msgResultPair : state.appResults.at(appId)) { + *berStatus->add_messageresults() = *(msgResultPair.second); + } + + // Set the finished condition + berStatus->set_finished(!state.inFlightReqs.contains(appId)); } + } - for (auto msgResultPair : state.appResults.at(appId)) { - *berStatus->add_messageresults() = *(msgResultPair.second); + // Only try to un-freeze when the app is fully frozen and not in-flight. + // Note that when we un-freeze it may be that the app is not still + // fully in-flight, and hence we have not removed it from the evicted map + if (isFrozen && !state.inFlightReqs.contains(appId)) { + SPDLOG_DEBUG("Planner trying to un-freeze app {}", appId); + + // This should trigger a NEW decision. We make a deep-copy of the BER + // to avoid changing the values in the evicted map + auto newBer = std::make_shared(); + *newBer = *frozenBer; + auto decision = callBatch(newBer); + + // This means that there are not enough free slots to schedule the + // decision, we must just return a keep-alive to the poller thread + if (*decision == NOT_ENOUGH_SLOTS_DECISION) { + SPDLOG_DEBUG("Can not un-freeze app {}: not enough slots!", appId); } - // Set the finished condition - berStatus->set_finished(!state.inFlightReqs.contains(appId)); + // In any case, the app is in-flight and so not finished + berStatus->set_finished(false); } return berStatus; @@ -592,14 +712,41 @@ int Planner::getNumMigrations() return state.numMigrations.load(std::memory_order_acquire); } +std::string Planner::getNextEvictedHostIp() +{ + faabric::util::SharedLock lock(plannerMx); + + return state.nextEvictedHostIp; +} + +std::map> +Planner::getEvictedReqs() +{ + faabric::util::SharedLock lock(plannerMx); + + std::map> evictedReqs; + + for (const auto& [appId, ber] : state.evictedRequests) { + evictedReqs[appId] = std::make_shared(); + *evictedReqs.at(appId) = *ber; + } + + return evictedReqs; +} + static faabric::batch_scheduler::HostMap convertToBatchSchedHostMap( - std::map> hostMapIn) + std::map> hostMapIn, + const std::string& nextEvictedHostIp) { faabric::batch_scheduler::HostMap hostMap; for (const auto& [ip, host] : hostMapIn) { hostMap[ip] = std::make_shared( host->ip(), host->slots(), host->usedslots()); + + if (ip == nextEvictedHostIp) { + hostMap.at(ip)->ip = MUST_EVICT_IP; + } } return hostMap; @@ -620,7 +767,8 @@ Planner::callBatch(std::shared_ptr req) // Make a copy of the host-map state to make sure the scheduling process // does not modify it - auto hostMapCopy = convertToBatchSchedHostMap(state.hostMap); + auto hostMapCopy = + convertToBatchSchedHostMap(state.hostMap, state.nextEvictedHostIp); bool isDistChange = decisionType == faabric::batch_scheduler::DecisionType::DIST_CHANGE; @@ -652,17 +800,19 @@ Planner::callBatch(std::shared_ptr req) if (!isDistChange && state.preloadedSchedulingDecisions.contains(appId)) { decision = getPreloadedSchedulingDecision(appId, req); } else if (isNew && isMpi) { - mpiReq = faabric::util::batchExecFactory( - req->user(), req->function(), req->messages(0).mpiworldsize()); - // Propagate the subtype for multi-tenant runs - mpiReq->set_subtype(req->subtype()); - - // Populate the temporary request - mpiReq->mutable_messages()->at(0) = req->messages(0); - faabric::util::updateBatchExecAppId(mpiReq, appId); - for (int i = 0; i < mpiReq->messages_size(); i++) { - mpiReq->mutable_messages()->at(i).set_groupidx(i); + mpiReq = std::make_shared(); + *mpiReq = *req; + + // Deep-copy as many messages we can from the original BER, and mock + // the rest + for (int i = req->messages_size(); i < req->messages(0).mpiworldsize(); + i++) { + auto* newMpiMsg = mpiReq->add_messages(); + + newMpiMsg->set_appid(req->appid()); + newMpiMsg->set_groupidx(i); } + assert(mpiReq->messages_size() == req->messages(0).mpiworldsize()); decision = batchScheduler->makeSchedulingDecision( hostMapCopy, state.inFlightReqs, mpiReq); @@ -678,7 +828,9 @@ Planner::callBatch(std::shared_ptr req) "Not enough free slots to schedule app: {} (requested: {})", appId, req->messages_size()); +#ifndef NDEBUG printHostState(state.hostMap, "error"); +#endif return decision; } @@ -687,6 +839,67 @@ Planner::callBatch(std::shared_ptr req) return decision; } + if (*decision == MUST_FREEZE_DECISION) { + SPDLOG_INFO("Decided to FREEZE app: {}", appId); + + // Note that the app will be naturally removed from in-flight as the + // messages throw an exception and finish, so here we only need to + // add the request to the evicted requests. Also, given that the + // app will be removed from in-flight, we want to deep copy the BER + state.evictedRequests[appId] = + std::make_shared(); + *state.evictedRequests.at(appId) = *state.inFlightReqs.at(appId).first; + + return decision; + } + + // If we have managed to schedule a frozen request, un-freeze it by + // removing it from the evicted request map + if (state.evictedRequests.contains(appId)) { + // When un-freezing an MPI app, we treat it as a NEW request, and thus + // we will go through the two-step initialisation with a preloaded + // decision + if (isNew && isMpi) { + // During the first step, and to make the downstream assertions + // pass, it is safe to remove all messages greater than zero from + // here + SPDLOG_INFO("Decided to un-FREEZE app {}", appId); + + auto firstMessage = req->messages(0); + req->clear_messages(); + *req->add_messages() = firstMessage; + } else if (isMpi && !isDistChange) { + // During the second step, we amend the messages provided by MPI + // (as part of MPI_Init) with the fields that we require for a + // successful restore + assert(req->messages_size() == req->messages(0).mpiworldsize() - 1); + + auto evictedBer = state.evictedRequests.at(appId); + for (int i = 0; i < req->messages_size(); i++) { + for (int j = 1; j < evictedBer->messages_size(); j++) { + // We match by groupidx and not by message id, as the + // message id is set by MPI and we need to overwrite it + if (req->messages(i).groupidx() == + evictedBer->messages(j).groupidx()) { + + req->mutable_messages()->at(i).set_id( + evictedBer->messages(j).id()); + req->mutable_messages()->at(i).set_funcptr( + evictedBer->messages(j).funcptr()); + req->mutable_messages()->at(i).set_inputdata( + evictedBer->messages(j).inputdata()); + req->mutable_messages()->at(i).set_snapshotkey( + evictedBer->messages(j).snapshotkey()); + + break; + } + } + } + + state.evictedRequests.erase(appId); + } + } + // Skip claiming slots and ports if we have preemptively allocated them bool skipClaim = decision->groupId == MPI_PRELOADED_DECISION_GROUPID; @@ -967,6 +1180,30 @@ void Planner::dispatchSchedulingDecision( } } + // In an un-FREEZE request, we need to first push the snapshots to + // the destination host. This snapshots correspond to the messages + // that were FROZEN + if (!isThreads && !hostReq->messages(0).snapshotkey().empty()) { + // Unlike in a THREADS request, each un-forzen message has a + // different snapshot + // TODO: consider ways to optimise this transferring + for (int i = 0; i < hostReq->messages_size(); i++) { + auto snapshotKey = hostReq->messages(i).snapshotkey(); + try { + auto snap = snapshotRegistry.getSnapshot(snapshotKey); + + // TODO: could we push only the diffs? + faabric::snapshot::getSnapshotClient(hostIp)->pushSnapshot( + snapshotKey, snap); + } catch (std::runtime_error& e) { + // Catch errors, but don't let them crash the planner. Let + // the worker crash instead + SPDLOG_ERROR("Snapshot {} not regsitered in planner!", + snapshotKey); + } + } + } + faabric::scheduler::getFunctionCallClient(hostIp)->executeFunctions( hostReq); } @@ -975,6 +1212,22 @@ void Planner::dispatchSchedulingDecision( req->messages_size()); } +// TODO: should check if the VM is in the host map! +void Planner::setNextEvictedVm(const std::string& vmIp) +{ + faabric::util::FullLock lock(plannerMx); + + if (state.policy != "spot") { + SPDLOG_ERROR("Error setting evicted VM (ip: {}) with policy {}", + vmIp, + state.policy); + SPDLOG_ERROR("To set the next evicted VM policy must be: spot"); + throw std::runtime_error("Error setting the next evicted VM!"); + } + + state.nextEvictedHostIp = vmIp; +} + Planner& getPlanner() { static Planner planner; diff --git a/src/planner/PlannerEndpointHandler.cpp b/src/planner/PlannerEndpointHandler.cpp index bb18e870e..d120c95a9 100644 --- a/src/planner/PlannerEndpointHandler.cpp +++ b/src/planner/PlannerEndpointHandler.cpp @@ -181,6 +181,12 @@ void PlannerEndpointHandler::onRequest( auto* inFlightAppResp = inFlightAppsResponse.add_apps(); inFlightAppResp->set_appid(appId); inFlightAppResp->set_subtype(inFlightPair.first->subtype()); + + if (inFlightPair.first->messages(0).ismpi()) { + inFlightAppResp->set_size( + inFlightPair.first->messages(0).mpiworldsize()); + } + for (const auto& hostIp : decision->hosts) { inFlightAppResp->add_hostips(hostIp); } @@ -191,6 +197,23 @@ void PlannerEndpointHandler::onRequest( faabric::planner::getPlanner().getNumMigrations(); inFlightAppsResponse.set_nummigrations(numMigrations); + // Include the next VM that will be evicted + std::string nextEvictedHostIp = + faabric::planner::getPlanner().getNextEvictedHostIp(); + inFlightAppsResponse.set_nextevictedvmip(nextEvictedHostIp); + + // Include the currently frozen apps + auto evictedApps = faabric::planner::getPlanner().getEvictedReqs(); + for (const auto& [appId, ber] : evictedApps) { + auto* frozenApp = inFlightAppsResponse.add_frozenapps(); + + frozenApp->set_appid(appId); + + if (ber->messages(0).ismpi()) { + frozenApp->set_size(ber->messages(0).mpiworldsize()); + } + } + response.result(beast::http::status::ok); response.body() = faabric::util::messageToJson(inFlightAppsResponse); @@ -328,6 +351,34 @@ void PlannerEndpointHandler::onRequest( return ctx.sendFunction(std::move(response)); } + case faabric::planner::HttpMessage_Type_GET_POLICY: { + SPDLOG_DEBUG("Planner received GET_POLICY request"); + + // Prepare the response + response.result(beast::http::status::ok); + response.body() = faabric::planner::getPlanner().getPolicy(); + + return ctx.sendFunction(std::move(response)); + } + case faabric::planner::HttpMessage_Type_SET_NEXT_EVICTED_VM: { + SPDLOG_DEBUG("Planner received SET_NEXT_EVICTED_VM request"); + + std::string vmIp = msg.payloadjson(); + try { + faabric::planner::getPlanner().setNextEvictedVm(vmIp); + } catch (std::exception& e) { + response.result(beast::http::status::bad_request); + response.body() = std::string( + "Next evicted VM must only be set in 'spot' policy"); + return ctx.sendFunction(std::move(response)); + } + + // Prepare the response + response.result(beast::http::status::ok); + response.body() = std::string("Next evicted VM set"); + + return ctx.sendFunction(std::move(response)); + } default: { SPDLOG_ERROR("Unrecognised message type {}", msg.type()); response.result(beast::http::status::bad_request); diff --git a/src/planner/planner.proto b/src/planner/planner.proto index 6744eaa4b..e67f03375 100644 --- a/src/planner/planner.proto +++ b/src/planner/planner.proto @@ -46,6 +46,11 @@ message HttpMessage { EXECUTE_BATCH_STATUS = 11; PRELOAD_SCHEDULING_DECISION = 12; SET_POLICY = 13; + GET_POLICY = 14; + // This endpoint is only used with the SPOT planner policy. In a real + // deployment we would get this value from a cloud-provider-specific + // API + SET_NEXT_EVICTED_VM = 15; } Type type = 1 [json_name = "http_type"]; @@ -64,11 +69,22 @@ message GetInFlightAppsResponse { message InFlightApp { int32 appId = 1; int32 subType = 2; - repeated string hostIps = 3; + int32 size = 3; + repeated string hostIps = 4; + } + + message FrozenApp { + int32 appId = 1; + int32 subType = 2; + int32 size = 3; } repeated InFlightApp apps = 1; + + // Auxiliary fields to visualise the state of the cluster int32 numMigrations = 2; + string nextEvictedVmIp = 3; + repeated FrozenApp frozenApps = 4; } message NumMigrationsResponse { diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index ae7ac7f72..dcccff791 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -454,6 +454,8 @@ Scheduler::checkForMigrationOpportunities(faabric::Message& msg, // Update the group ID if we want to migrate if (decision == DO_NOT_MIGRATE_DECISION) { newGroupId = groupId; + } else if (decision == MUST_FREEZE_DECISION) { + newGroupId = MUST_FREEZE; } else { newGroupId = decision.groupId; } @@ -475,6 +477,13 @@ Scheduler::checkForMigrationOpportunities(faabric::Message& msg, newGroupId = overwriteNewGroupId; } + if (newGroupId == MUST_FREEZE) { + auto migration = std::make_shared(); + migration->set_appid(MUST_FREEZE); + + return migration; + } + bool appMustMigrate = newGroupId != groupId; if (!appMustMigrate) { return nullptr; diff --git a/tests/dist/mpi/mpi_native.cpp b/tests/dist/mpi/mpi_native.cpp index 95c0fc9de..fda78b96f 100644 --- a/tests/dist/mpi/mpi_native.cpp +++ b/tests/dist/mpi/mpi_native.cpp @@ -1,5 +1,6 @@ #include "mpi_native.h" +#include #include #include #include @@ -15,6 +16,7 @@ #include #include #include +#include using namespace faabric::mpi; @@ -785,7 +787,40 @@ void mpiMigrationPoint(int entrypointFuncArg) // Detect if there is a pending migration for the current app auto migration = sch.checkForMigrationOpportunities(*call); - bool appMustMigrate = migration != nullptr; + bool appMustFreeze = + migration != nullptr && migration->appid() == MUST_FREEZE; + + // Short-cut for when all messages need to freeze. We only need to send + // a snapshot to the planner, and throw an exception + if (appMustFreeze) { + std::string argStr = std::to_string(entrypointFuncArg); + std::vector inputData(argStr.begin(), argStr.end()); + std::string snapKey = "migration_" + std::to_string(call->id()); + + call->set_inputdata(inputData.data(), inputData.size()); + call->set_snapshotkey(snapKey); + + auto* exec = faabric::executor::ExecutorContext::get()->getExecutor(); + auto snap = + std::make_shared(exec->getMemoryView()); + auto& reg = faabric::snapshot::getSnapshotRegistry(); + reg.registerSnapshot(snapKey, snap); + + auto plannerIp = faabric::util::getIPFromHostname( + faabric::util::getSystemConfig().plannerHost); + faabric::snapshot::getSnapshotClient(plannerIp)->pushSnapshot(snapKey, + snap); + + SPDLOG_INFO("{}:{}:{} Freezing message!", + call->appid(), + call->groupid(), + call->groupidx()); + + // Throw an exception to be caught by the executor and terminate + throw faabric::util::FunctionFrozenException("Freezing MPI rank"); + } + + bool appMustMigrate = migration != nullptr && !appMustFreeze; // Detect if this particular function needs to be migrated or not bool funcMustMigrate = false; diff --git a/tests/dist/mpi/test_multiple_mpi_worlds.cpp b/tests/dist/mpi/test_multiple_mpi_worlds.cpp index 74d9990ff..d711d6afa 100644 --- a/tests/dist/mpi/test_multiple_mpi_worlds.cpp +++ b/tests/dist/mpi/test_multiple_mpi_worlds.cpp @@ -243,4 +243,168 @@ TEST_CASE_METHOD(MpiDistTestsFixture, checkAllocationAndResult(req1, hostsAfter1); checkAllocationAndResult(req2, hostsAfter2); } + +TEST_CASE_METHOD( + MpiDistTestsFixture, + "Test migrating an MPI app as a consequence of an eviction (SPOT)", + "[mpi]") +{ + updatePlannerPolicy("spot"); + + int worldSize = 4; + + // Prepare both requests: + // - The first will do work, sleep for five seconds, and do work again + // - The second will do work and check for migration opportunities + auto req1 = setRequest("alltoall-sleep"); + auto req2 = setRequest("migration"); + auto& msg = req2->mutable_messages()->at(0); + msg.set_inputdata(std::to_string(NUM_MIGRATION_LOOPS)); + + updateLocalSlots(8); + updateRemoteSlots(8); + + std::vector hostsBefore1 = { + getMasterIP(), getMasterIP(), getMasterIP(), getMasterIP() + }; + std::vector hostsBefore2; + std::vector hostsAfterMigration; + + std::string evictedVmIp; + + SECTION("Migrate main rank") + { + hostsBefore2 = { + getWorkerIP(), getWorkerIP(), getMasterIP(), getMasterIP() + }; + evictedVmIp = getWorkerIP(); + hostsAfterMigration = + std::vector(worldSize, getMasterIP()); + } + + SECTION("Don't migrate main rank") + { + hostsBefore2 = { + getMasterIP(), getMasterIP(), getWorkerIP(), getWorkerIP() + }; + evictedVmIp = getWorkerIP(); + hostsAfterMigration = + std::vector(worldSize, getMasterIP()); + } + + SECTION("Migrate all ranks") + { + hostsBefore2 = { + getMasterIP(), getMasterIP(), getMasterIP(), getMasterIP() + }; + evictedVmIp = getMasterIP(); + hostsAfterMigration = + std::vector(worldSize, getWorkerIP()); + } + + // Preload decisions to force sub-optimal scheduling + auto preloadDec1 = std::make_shared( + req1->appid(), req1->groupid()); + auto preloadDec2 = std::make_shared( + req2->appid(), req2->groupid()); + for (int i = 0; i < worldSize; i++) { + preloadDec1->addMessage(hostsBefore1.at(i), 0, 0, i); + preloadDec2->addMessage(hostsBefore2.at(i), 0, 0, i); + } + plannerCli.preloadSchedulingDecision(preloadDec1); + plannerCli.preloadSchedulingDecision(preloadDec2); + + // Preload should overwrite the evicted IP, so we can set it before we + // call callFunctions + setNextEvictedVmIp(evictedVmIp); + + plannerCli.callFunctions(req1); + auto actualHostsBefore1 = waitForMpiMessagesInFlight(req1); + REQUIRE(hostsBefore1 == actualHostsBefore1); + + plannerCli.callFunctions(req2); + + checkAllocationAndResult(req1, hostsBefore1); + checkAllocationAndResult(req2, hostsAfterMigration); + + updatePlannerPolicy("bin-pack"); +} + +TEST_CASE_METHOD(MpiDistTestsFixture, + "Test stopping and resuming an MPI application (SPOT)", + "[mpi]") +{ + updatePlannerPolicy("spot"); + + int worldSize = 4; + + // Prepare both requests: + // - The first will do work, sleep for five seconds, and do work again + // - The second will do work and check for migration opportunities + auto req1 = setRequest("alltoall-sleep"); + auto req2 = setRequest("migration"); + auto& msg = req2->mutable_messages()->at(0); + msg.set_inputdata(std::to_string(NUM_MIGRATION_LOOPS)); + + // Make it so that there is not enough slots to migrate. We will have to + // wait for the first request to finish to be able to resume the app + updateLocalSlots(4); + updateRemoteSlots(4); + + auto hostsBefore1 = std::vector(worldSize, getMasterIP()); + // This app will realise it is running on a VM that will be evicted, so + // it will FREEZE + auto hostsBefore2 = std::vector(worldSize, getWorkerIP()); + std::string evictedVmIp = getWorkerIP(); + + // Preload decisions to force the allocation we want + auto preloadDec1 = std::make_shared( + req1->appid(), req1->groupid()); + auto preloadDec2 = std::make_shared( + req2->appid(), req2->groupid()); + for (int i = 0; i < worldSize; i++) { + preloadDec1->addMessage(hostsBefore1.at(i), 0, 0, i); + preloadDec2->addMessage(hostsBefore2.at(i), 0, 0, i); + } + plannerCli.preloadSchedulingDecision(preloadDec1); + plannerCli.preloadSchedulingDecision(preloadDec2); + + // Mark the worker VM as evicted (note that preload takes preference over + // eviction marks) + setNextEvictedVmIp(evictedVmIp); + + plannerCli.callFunctions(req1); + auto actualHostsBefore1 = waitForMpiMessagesInFlight(req1); + REQUIRE(hostsBefore1 == actualHostsBefore1); + + plannerCli.callFunctions(req2); + + // First, if we try to get the batch results it shoud say that the app + // is not finished (even though it will try to re-schedule it again). To + // the eyes of the client, a FROZEN app is still running + auto batchResults2 = plannerCli.getBatchResults(req2); + REQUIRE(!batchResults2->finished()); + + // Second, let's wait for the first request to finish so that more + // slots free up + checkAllocationAndResult(req1, hostsBefore1); + + // Third, no apps are currently in-flight (1 finished, 1 frozen) + auto inFlightApps = getInFlightApps(); + REQUIRE(inFlightApps.apps_size() == 0); + REQUIRE(inFlightApps.frozenapps_size() == 1); + REQUIRE(inFlightApps.frozenapps(0) == req2->appid()); + + // Fourth, try to get the batch results for the FROZEN app again to trigger + // an un-FREEZE + batchResults2 = plannerCli.getBatchResults(req2); + REQUIRE(!batchResults2->finished()); + + // Finally, we should be able to wait on + auto hostsAfterMigration = + std::vector(worldSize, getMasterIP()); + checkAllocationAndResult(req2, hostsAfterMigration); + + updatePlannerPolicy("bin-pack"); +} } diff --git a/tests/test/batch-scheduler/test_spot_scheduler.cpp b/tests/test/batch-scheduler/test_spot_scheduler.cpp new file mode 100644 index 000000000..13f63b219 --- /dev/null +++ b/tests/test/batch-scheduler/test_spot_scheduler.cpp @@ -0,0 +1,450 @@ +#include + +#include "fixtures.h" + +#include +#include + +using namespace faabric::batch_scheduler; + +namespace tests { + +class SpotSchedulerTestFixture : public BatchSchedulerFixture +{ + public: + SpotSchedulerTestFixture() + { + conf.batchSchedulerMode = "spot"; + batchScheduler = getBatchScheduler(); + } +}; + +// SPOT should behave the same as COMPACT and BIN-PACK for NEW requests +TEST_CASE_METHOD(SpotSchedulerTestFixture, + "Test scheduling of new requests with Spot", + "[batch-scheduler]") +{ + // To mock new requests (i.e. DecisionType::NEW), we always set the + // InFlightReqs map to an empty map + BatchSchedulerConfig config = { + .hostMap = {}, + .inFlightReqs = {}, + .expectedDecision = SchedulingDecision(appId, groupId), + }; + + SECTION("Compact scheduler gives up if not enough slots are available") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 1, 1 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.expectedDecision = NOT_ENOUGH_SLOTS_DECISION; + } + + SECTION("Scheduling fits in one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo" }); + } + + SECTION("Scheduling is exactly one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo", "foo" }); + } + + // The bin-pack scheduler will pick hosts with larger empty slots first + SECTION("Scheduling spans two hosts") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.expectedDecision = buildExpectedDecision( + ber, { "foo", "foo", "foo", "foo", "bar", "bar" }); + } + + SECTION("Scheduling spans exactly two hosts") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 7); + config.expectedDecision = buildExpectedDecision( + ber, { "foo", "foo", "foo", "foo", "bar", "bar", "bar" }); + } + + // In particular, it will prioritise hosts with overall less capacity if + // they have more free resources + SECTION("Scheduling spans two hosts") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 3, 4 }, { 0, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo", "bar" }); + } + + // In case of a tie in free resources, the Compact scheduler will pick + // hosts with larger overall capacity first + SECTION("Scheduling spans two hosts with same free resources") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.expectedDecision = buildExpectedDecision( + ber, { "foo", "foo", "foo", "bar", "bar", "bar" }); + } + + // If there's still a tie, the Compact scheduler will solve the tie by + // sorting the hosts alphabetically (from larger to smaller) + SECTION("Scheduling spans two hosts with same free resources and size") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 3, 3 }, { 0, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.expectedDecision = buildExpectedDecision( + ber, { "foo", "foo", "foo", "bar", "bar", "bar" }); + } + + SECTION("Scheduling spans an arbitrarily large number of hosts") + { + config.hostMap = buildHostMap({ "foo", "bar", "baz", "bip", "bup" }, + { 4, 6, 2, 3, 1 }, + { 0, 2, 2, 2, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 10); + config.expectedDecision = buildExpectedDecision(ber, + { "bar", + "bar", + "bar", + "bar", + "foo", + "foo", + "foo", + "foo", + "bip", + "bup" }); + } + + actualDecision = *batchScheduler->makeSchedulingDecision( + config.hostMap, config.inFlightReqs, ber); + compareSchedulingDecisions(actualDecision, config.expectedDecision); +} + +// SPOT should behave the same as COMPACT and BIN-PACK for SCALE_CHANGE requests +TEST_CASE_METHOD(SpotSchedulerTestFixture, + "Test scheduling of scale-change requests with SPOT", + "[batch-scheduler]") +{ + // To mock a scale-change request (i.e. DecisionType::SCALE_CHANGE), we + // need to have one in-flight request in the map with the same app id + // (and not of type MIGRATION) + BatchSchedulerConfig config = { + .hostMap = {}, + .inFlightReqs = {}, + .expectedDecision = SchedulingDecision(appId, groupId), + }; + + // The configs in this test must be read as follows: + // - the host map's used slots contains the current distribution for the app + // (i.e. the number of used slots matches the number in in-flight reqs) + // - the host map's slots contain the total slots + // - the ber contains the NEW messages we are going to add + // - the expected decision includes the expected scheduling decision for + // the new messages + + SECTION("Compact scheduler gives up if not enough slots are available") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 2, 1 }, { 1, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 6); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "foo" }); + config.expectedDecision = NOT_ENOUGH_SLOTS_DECISION; + } + + // When scheduling a SCALE_CHANGE request, we always try to colocate as + // much as possible + SECTION("Scheduling fits in one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 0 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "foo" }); + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo" }); + } + + // We prefer hosts with less capacity if they are already running requests + // for the same app + SECTION("Scheduling fits in one host and prefers known hosts") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 5, 4 }, { 0, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "bar" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "bar" }); + } + + // Like with `NEW` requests, we can also spill to other hosts + SECTION("Scheduling spans more than one host") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 0, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + config.inFlightReqs = buildInFlightReqs(ber, 1, { "bar" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "foo", "foo" }); + } + + // If two hosts are already executing the app, we pick the one that is + // running the largest number of messages + SECTION("Scheduler prefers hosts with more running messages") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 3 }, { 1, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 1); + config.inFlightReqs = + buildInFlightReqs(ber, 3, { "bar", "bar", "foo" }); + config.expectedDecision = buildExpectedDecision(ber, { "bar" }); + } + + // Again, when picking a new host to spill to, we priorities hosts that + // are already running requests for this app + SECTION("Scheduling always picks known hosts first") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 3, 2 }, + { 0, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 5); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "baz", "foo", "foo" }); + } + + // Sometimes the preferred hosts just don't have slots. They will be sorted + // first but the scheduler will skip them when bin-packing + SECTION("Scheduler ignores preferred but full hosts") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 2, 2 }, + { 0, 2, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = + buildInFlightReqs(ber, 3, { "bar", "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "baz", "foo", "foo" }); + } + + // In case of a tie of the number of runing messages, we revert to `NEW`- + // like tie breaking + SECTION("In case of a tie of preferred hosts, fall-back to known " + "tie-breaks (free slots)") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 3, 2 }, + { 0, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "bar", "baz" }); + } + + SECTION("In case of a tie of preferred hosts, fall-back to known " + "tie-breaks (size)") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 3, 2 }, + { 0, 2, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "bar", "baz", "foo" }); + } + + SECTION("In case of a tie of preferred hosts, fall-back to known " + "tie-breaks (alphabetical)") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 4, 2, 2 }, + { 0, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 3); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "bar", "baz" }); + config.expectedDecision = + buildExpectedDecision(ber, { "baz", "bar", "foo" }); + } + + actualDecision = *batchScheduler->makeSchedulingDecision( + config.hostMap, config.inFlightReqs, ber); + compareSchedulingDecisions(actualDecision, config.expectedDecision); +} + +// DIST_CHANGE requests in the SPOT scheduler only concern with tainted VMs +// (i.e. VMs that are going to be evicted): +// - If nothing is going to be evicted, there should be no migration happening +// - If we know one VM is going to be evicted: +// - We migrate out of it if there are slots +// - We FREEZE if there are no slots to migrate to +TEST_CASE_METHOD(SpotSchedulerTestFixture, + "Test scheduling of dist-change requests with SPOT", + "[batch-scheduler]") +{ + // To mock a dist-change request (i.e. DecisionType::DIST_CHANGE), we + // need to have one in-flight request in the map with the same app id, the + // same size (and of type MIGRATION) + BatchSchedulerConfig config = { + .hostMap = {}, + .inFlightReqs = {}, + .expectedDecision = SchedulingDecision(appId, groupId), + }; + + // Note: the way we let the COMPACT scheduler know the VMs that we will + // evict is by setting the IP field in the HostMap **value** (not key) + // to MUST_EVICT_IP + + SECTION("SPOT returns nothing if there are no tainted VMs") + { + config.hostMap = buildHostMap({ "foo" }, { 4 }, { 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 2); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "foo", "foo" }); + config.expectedDecision = DO_NOT_MIGRATE_DECISION; + } + + SECTION("SPOT returns nothing if there are no tainted VMs (multiple)") + { + config.hostMap = buildHostMap({ "foo", "bar" }, { 4, 2 }, { 4, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 5); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 5, { "foo", "foo", "foo", "foo", "bar" }); + config.expectedDecision = DO_NOT_MIGRATE_DECISION; + } + + SECTION("SPOT ignores opportunities to free-up hosts") + { + config.hostMap = + buildHostMap({ "foo", "bar", "baz" }, { 4, 4, 4 }, { 2, 2, 4 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "baz", "baz", "baz", "baz" }); + config.expectedDecision = DO_NOT_MIGRATE_DECISION; + } + + SECTION("SPOT deallocates an app if the VM is tainted and not enough slots " + "(single-host)") + { + config.hostMap = buildHostMap({ "foo" }, { 4 }, { 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 2); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = buildInFlightReqs(ber, 2, { "foo", "foo" }); + config.expectedDecision = MUST_FREEZE_DECISION; + // Must evict host foo + markHostAsEvicted(config.hostMap, "foo"); + } + + SECTION("SPOT deallocates an app if the VM is tainted and not enough slots " + "(multiple-hosts)") + { + config.hostMap = buildHostMap( + { "foo", "bar", "baz", "lol" }, { 4, 4, 2, 2 }, { 2, 4, 2, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "foo", "foo", "bar", "bar" }); + config.expectedDecision = MUST_FREEZE_DECISION; + // Must evict host foo + markHostAsEvicted(config.hostMap, "foo"); + } + + SECTION("SPOT migrated an app if enough slots") + { + config.hostMap = buildHostMap( + { "foo", "bar", "baz", "lol" }, { 4, 4, 2, 2 }, { 2, 2, 2, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "baz", "baz", "lol", "lol" }); + + // Must evict host foo + markHostAsEvicted(config.hostMap, "baz"); + + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "lol", "lol" }); + } + + SECTION("SPOT ignores evicted hosts if not running messages in it") + { + config.hostMap = buildHostMap( + { "foo", "bar", "baz", "lol" }, { 4, 4, 2, 2 }, { 2, 2, 2, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "baz", "baz", "lol", "lol" }); + + // Must evict host foo + markHostAsEvicted(config.hostMap, "foo"); + + config.expectedDecision = DO_NOT_MIGRATE_DECISION; + } + + SECTION("Compact prefers hosts running more messages") + { + config.hostMap = buildHostMap( + { + "foo", + "bar", + "baz", + }, + { 3, 2, 1 }, + { 2, 1, 1 }); + ber = faabric::util::batchExecFactory("bat", "man", 4); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = + buildInFlightReqs(ber, 4, { "foo", "foo", "bar", "baz" }); + + markHostAsEvicted(config.hostMap, "bar"); + + config.expectedDecision = + buildExpectedDecision(ber, { "foo", "foo", "foo", "baz" }); + } + + SECTION("Compact will minimise the number of messages to migrate") + { + config.hostMap = + buildHostMap({ "foo", "bar", "baz" }, { 5, 4, 2 }, { 3, 4, 2 }); + ber = faabric::util::batchExecFactory("bat", "man", 9); + ber->set_type(BatchExecuteRequest_BatchExecuteType_MIGRATION); + config.inFlightReqs = buildInFlightReqs( + ber, + 9, + { "foo", "foo", "foo", "bar", "bar", "bar", "bar", "baz", "baz" }); + + markHostAsEvicted(config.hostMap, "baz"); + + config.expectedDecision = buildExpectedDecision( + ber, + { "foo", "foo", "foo", "bar", "bar", "bar", "bar", "foo", "foo" }); + } + + actualDecision = *batchScheduler->makeSchedulingDecision( + config.hostMap, config.inFlightReqs, ber); + compareSchedulingDecisions(actualDecision, config.expectedDecision); +} +} diff --git a/tests/test/planner/test_planner_endpoint.cpp b/tests/test/planner/test_planner_endpoint.cpp index 62578f67d..8813aa2be 100644 --- a/tests/test/planner/test_planner_endpoint.cpp +++ b/tests/test/planner/test_planner_endpoint.cpp @@ -662,9 +662,14 @@ TEST_CASE_METHOD(PlannerEndpointExecTestFixture, // in-flight apps (we poll the planner until all messages have finished) waitForBerToFinish(ber); + // We also set this host as evicted + updatePlannerPolicy("spot"); + setNextEvictedVmIp(conf.endpointHost); + // Once we are sure the batch has finished, check again that there are // zero apps in-flight GetInFlightAppsResponse emptyExpectedResponse; + emptyExpectedResponse.set_nextevictedvmip(conf.endpointHost); msgJsonStr = faabric::util::messageToJson(inFlightMsg); expectedResponseBody = faabric::util::messageToJson(emptyExpectedResponse); result = doPost(msgJsonStr); @@ -748,6 +753,13 @@ TEST_CASE_METHOD(PlannerEndpointExecTestFixture, expectedResponseBody = "Policy set correctly"; } + SECTION("Valid request (spot)") + { + policy = "spot"; + expectedReturnCode = beast::http::status::ok; + expectedResponseBody = "Policy set correctly"; + } + SECTION("Invalid request") { policy = "foo-bar"; @@ -762,5 +774,60 @@ TEST_CASE_METHOD(PlannerEndpointExecTestFixture, REQUIRE(boost::beast::http::int_to_status(result.first) == expectedReturnCode); REQUIRE(result.second == expectedResponseBody); + + if (expectedReturnCode == beast::http::status::ok) { + // Second, get the policy we just set + msg.set_type(HttpMessage_Type_GET_POLICY); + msg.clear_payloadjson(); + msgJsonStr = faabric::util::messageToJson(msg); + + std::pair result = doPost(msgJsonStr); + REQUIRE(boost::beast::http::int_to_status(result.first) == + expectedReturnCode); + REQUIRE(result.second == policy); + } +} + +TEST_CASE_METHOD(PlannerEndpointExecTestFixture, + "Test setting the next evicted VM", + "[planner]") +{ + HttpMessage policyMsg; + policyMsg.set_type(HttpMessage_Type_SET_POLICY); + HttpMessage msg; + msg.set_type(HttpMessage_Type_SET_NEXT_EVICTED_VM); + msg.set_payloadjson("1.1.1.1"); + msgJsonStr = faabric::util::messageToJson(msg); + + std::string policy; + + SECTION("Valid request") + { + policy = "spot"; + expectedReturnCode = beast::http::status::ok; + expectedResponseBody = "Next evicted VM set"; + } + + SECTION("Invalid request") + { + policy = "compact"; + expectedReturnCode = beast::http::status::bad_request; + expectedResponseBody = + "Next evicted VM must only be set in 'spot' policy"; + } + + policyMsg.set_payloadjson(policy); + std::string policyMsgJsonStr = faabric::util::messageToJson(policyMsg); + + // First set the policy + std::pair result = doPost(policyMsgJsonStr); + REQUIRE(boost::beast::http::int_to_status(result.first) == + boost::beast::http::status::ok); + + // Second set the next evicted VM + result = doPost(msgJsonStr); + REQUIRE(boost::beast::http::int_to_status(result.first) == + expectedReturnCode); + REQUIRE(result.second == expectedResponseBody); } } diff --git a/tests/test/scheduler/test_scheduler.cpp b/tests/test/scheduler/test_scheduler.cpp index 00a589dc0..14ed0b1e4 100644 --- a/tests/test/scheduler/test_scheduler.cpp +++ b/tests/test/scheduler/test_scheduler.cpp @@ -195,7 +195,7 @@ TEST_CASE_METHOD(SlowExecutorTestFixture, SECTION("Processes") { execMode = faabric::BatchExecuteRequest::PROCESSES; - expectedSnapshot = "procSnap"; + expectedSnapshot = ""; expectedSubType = 345; expectedContextData = "proc context"; diff --git a/tests/test/transport/test_message_endpoint_client.cpp b/tests/test/transport/test_message_endpoint_client.cpp index 493c6a35b..8483bef73 100644 --- a/tests/test/transport/test_message_endpoint_client.cpp +++ b/tests/test/transport/test_message_endpoint_client.cpp @@ -315,18 +315,6 @@ TEST_CASE_METHOD(SchedulerFixture, } REQUIRE(success.load(std::memory_order_acquire)); - - for (auto& t : senders) { - if (t.joinable()) { - t.join(); - } - } - - for (auto& t : receivers) { - if (t.joinable()) { - t.join(); - } - } } #endif // End ThreadSanitizer exclusion diff --git a/tests/utils/faabric_utils.h b/tests/utils/faabric_utils.h index 2e7c777c7..0120defc9 100644 --- a/tests/utils/faabric_utils.h +++ b/tests/utils/faabric_utils.h @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -97,5 +98,9 @@ void flushPlannerWorkers(); void resetPlanner(); +faabric::planner::GetInFlightAppsResponse getInFlightApps(); + void updatePlannerPolicy(const std::string& newPolicy); + +void setNextEvictedVmIp(const std::string& evictedVmIp); } diff --git a/tests/utils/fixtures.h b/tests/utils/fixtures.h index 86b4776f5..f40e2405f 100644 --- a/tests/utils/fixtures.h +++ b/tests/utils/fixtures.h @@ -646,6 +646,12 @@ class BatchSchedulerFixture : public ConfFixture return decision; } + static void markHostAsEvicted(faabric::batch_scheduler::HostMap& hostMap, + const std::string& hostIp) + { + hostMap.at(hostIp)->ip = MUST_EVICT_IP; + } + static void compareSchedulingDecisions( const faabric::batch_scheduler::SchedulingDecision& decisionA, const faabric::batch_scheduler::SchedulingDecision& decisionB) diff --git a/tests/utils/planner_utils.cpp b/tests/utils/planner_utils.cpp index 2956f06ad..39c2a794b 100644 --- a/tests/utils/planner_utils.cpp +++ b/tests/utils/planner_utils.cpp @@ -29,6 +29,36 @@ void updatePlannerPolicy(const std::string& newPolicy) assert(result.first == 200); } +void setNextEvictedVmIp(const std::string& evictedVmIp) +{ + faabric::planner::HttpMessage msg; + msg.set_type(faabric::planner::HttpMessage_Type_SET_NEXT_EVICTED_VM); + msg.set_payloadjson(evictedVmIp); + std::string jsonStr = faabric::util::messageToJson(msg); + + faabric::util::SystemConfig& conf = faabric::util::getSystemConfig(); + std::pair result = + postToUrl(conf.plannerHost, conf.plannerPort, jsonStr); + assert(result.first == 200); +} + +faabric::planner::GetInFlightAppsResponse getInFlightApps() +{ + faabric::planner::HttpMessage msg; + msg.set_type(faabric::planner::HttpMessage_Type_GET_IN_FLIGHT_APPS); + std::string jsonStr = faabric::util::messageToJson(msg); + + faabric::util::SystemConfig& conf = faabric::util::getSystemConfig(); + std::pair result = + postToUrl(conf.plannerHost, conf.plannerPort, jsonStr); + assert(result.first == 200); + + faabric::planner::GetInFlightAppsResponse response; + faabric::util::jsonToMessage(result.second, &response); + + return response; +} + void flushPlannerWorkers() { faabric::planner::HttpMessage msg;