Skip to content

Commit

Permalink
edf: adding a construction option with a given number of pre-picks (e…
Browse files Browse the repository at this point in the history
…nvoyproxy#31592)

This is a follow up on envoyproxy#29953, with some insights about an additional scenario that has a similar problem.
The EDF scheduler is used in 2 levels of Zone-Aware LB policies:

1. For choosing the locality to use (here for healthy hosts). Note that as opposed to the problem in (2), this also impacts equal-weighted localities.
2. For choosing the host within the chosen locality (here).

Prior suggestions (WRSQ, envoyproxy#29953) focused on fixing level (2), and required converting the weights to integer-precision before performing the LB policy.
While this may work for (2), assuming one can convert the weights to integers by multiplying by some factor and truncating, for (1) this approach is more challenging as when computing the "effective locality weight" its value is dependent on the over-provisioning factor and the ratio of available hosts to all hosts. An additional benefit is that the current approach can also work with slow-start.

Thus, this PR suggests a different mechanism to "perform" some random number of picks when creating the EDF scheduler. The creation process is split into two steps:

1. Estimate a lower-bound on the number of picks each entry will be chosen, and initialize the internal EDF priority-queue accordingly.
2. Perform up to N "pickAndAdd" operations, where N is the number of localities/hosts.

Note that this approach is ~equal to performing some P picks from the scheduler where P >> N (~equal up to double precision computation differences and entries order of entries with the same weight).

After this PR, the next thing is to plumb the PRNG (or a random value) into the locality-scheduler creation process, and fix the call site for (1).

Here's a short doc providing the rationale behind this work: https://docs.google.com/document/d/1i53RYOy8sJUwL6_PSIF4yN39Xc1LvJtZ-18aSBZq0vM/edit?usp=sharing

Risk Level: low - code not used by Envoy's main codebase
Testing: Added tests to validate equivalence with prior approach.
Docs Changes: N/A
Release Notes:N/A - will be updated when the
Platform Specific Features: N/A

Signed-off-by: Adi Suissa-Peleg <[email protected]>
  • Loading branch information
adisuissa authored Jan 25, 2024
1 parent d15fee5 commit 45ab9cf
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 10 deletions.
110 changes: 110 additions & 0 deletions source/common/upstream/edf_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ namespace Upstream {
// weights and an O(log n) pick time.
template <class C> class EdfScheduler : public Scheduler<C> {
public:
EdfScheduler() {}

// See scheduler.h for an explanation of each public method.
std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override {
std::shared_ptr<C> ret = popEntry();
Expand Down Expand Up @@ -64,7 +66,110 @@ template <class C> class EdfScheduler : public Scheduler<C> {

bool empty() const override { return queue_.empty(); }

// Creates an EdfScheduler with the given weights and their corresponding
// entries, and emulating a number of initial picks to be performed. Note that
// the internal state of the scheduler will be very similar to creating an empty
// scheduler, adding the entries one after the other, and then performing
// "picks" pickAndAdd operation without modifying the entries' weights.
// The only thing that may be different is that entries with the same weight
// may be chosen a bit differently (the order_offset_ values may be different).
// Breaking the ties of same weight entries will be kept in future picks from
// the scheduler.
static EdfScheduler<C> createWithPicks(const std::vector<std::shared_ptr<C>>& entries,
std::function<double(const C&)> calculate_weight,
uint32_t picks) {
// Limiting the number of picks, as over 400M picks should be sufficient
// for most scenarios.
picks = picks % 429496729; // % UINT_MAX/10
EDF_TRACE("Creating an EDF-scheduler with {} weights and {} pre-picks.", entries.size(), picks);
// Assume no non-positive weights.
ASSERT(std::none_of(entries.cbegin(), entries.cend(),
[&calculate_weight](const std::shared_ptr<C>& entry) {
return calculate_weight(*entry) <= 0;
}));

// Nothing to do if there are no entries.
if (entries.size() == 0) {
return EdfScheduler<C>();
}

// Augment the weight computation to add some epsilon to each entry's
// weight to avoid cases where weights are multiplies of each other. For
// example if there are 2 weights: 25 and 75, and picks=23, then the
// floor_picks will be {5, 17} (respectively), and the deadlines will be
// {0.24000000000000002 and 0.24} (respectively). This small difference will
// cause a "wrong" pick compared to when starting from an empty scheduler
// and picking 23 times. Adding a small value to each weight circumvents
// this problem. This was added as a result of the following comment:
// https://github.com/envoyproxy/envoy/pull/31592#issuecomment-1877663769.
auto aug_calculate_weight = [&calculate_weight](const C& entry) -> double {
return calculate_weight(entry) + 1e-13;
};

// Let weights {w_1, w_2, ..., w_N} be the per-entry weight where (w_i > 0),
// W = sum(w_i), and P be the number of times to "pick" from the scheduler.
// Let p'_i = floor(P * w_i/W), then the number of times each entry is being
// picked is p_i >= p'_i. Note that 0 <= P - sum(p'_i) < N.
//
// The following code does P picks, by first emulating p'_i picks for each
// entry, and then executing the leftover P - sum(p'_i) picks.
double weights_sum = std::accumulate(
entries.cbegin(), entries.cend(), 0.0,
[&aug_calculate_weight](double sum_so_far, const std::shared_ptr<C>& entry) {
return sum_so_far + aug_calculate_weight(*entry);
});
std::vector<uint32_t> floor_picks;
floor_picks.reserve(entries.size());
std::transform(entries.cbegin(), entries.cend(), std::back_inserter(floor_picks),
[picks, weights_sum, &aug_calculate_weight](const std::shared_ptr<C>& entry) {
// Getting the lower-bound by casting to an integer.
return static_cast<uint32_t>(aug_calculate_weight(*entry) * picks /
weights_sum);
});

// Pre-compute the priority-queue entries to use an O(N) initialization c'tor.
std::vector<EdfEntry> scheduler_entries;
scheduler_entries.reserve(entries.size());
uint32_t picks_so_far = 0;
double max_pick_time = 0.0;
// Emulate a per-entry addition to a deadline that is applicable to N picks.
for (size_t i = 0; i < entries.size(); ++i) {
// Add the entry with p'_i picks. As there were p'_i picks, the entry's
// next deadline is (p'_i + 1) / w_i.
const double weight = aug_calculate_weight(*entries[i]);
// While validating the algorithm there were a few cases where the math
// and floating-point arithmetic did not agree (specifically floor(A*B)
// was greater than A*B). The following if statement solves the problem by
// reducing floor-picks for the entry, which may result in more iterations
// in the code after the loop.
if ((floor_picks[i] > 0) && (floor_picks[i] / weight >= picks / weights_sum)) {
floor_picks[i]--;
}
const double pick_time = floor_picks[i] / weight;
const double deadline = (floor_picks[i] + 1) / weight;
EDF_TRACE("Insertion {} in queue with emualted {} picks, deadline {} and weight {}.",
static_cast<const void*>(entries[i].get()), floor_picks[i], deadline, weight);
scheduler_entries.emplace_back(EdfEntry{deadline, i, entries[i]});
max_pick_time = std::max(max_pick_time, pick_time);
picks_so_far += floor_picks[i];
}
// The scheduler's current_time_ needs to be the largest time that some entry was picked.
EdfScheduler<C> scheduler(std::move(scheduler_entries), max_pick_time, entries.size());
ASSERT(scheduler.queue_.top().deadline_ >= scheduler.current_time_);

// Left to do some picks, execute them one after the other.
EDF_TRACE("Emulated {} picks in init step, {} picks remaining for one after the other step",
picks_so_far, picks - picks_so_far);
while (picks_so_far < picks) {
scheduler.pickAndAdd(calculate_weight);
picks_so_far++;
}
return scheduler;
}

private:
friend class EdfSchedulerTest;

/**
* Clears expired entries and pops the next unexpired entry in the queue.
*/
Expand Down Expand Up @@ -106,6 +211,11 @@ template <class C> class EdfScheduler : public Scheduler<C> {
}
};

EdfScheduler(std::vector<EdfEntry>&& scheduler_entries, double current_time,
uint32_t order_offset)
: current_time_(current_time), order_offset_(order_offset),
queue_(scheduler_entries.cbegin(), scheduler_entries.cend()) {}

// Current time in EDF scheduler.
// TODO(htuch): Is it worth the small extra complexity to use integer time for performance
// reasons?
Expand Down
5 changes: 4 additions & 1 deletion test/common/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ envoy_cc_test(
envoy_cc_test(
name = "edf_scheduler_test",
srcs = ["edf_scheduler_test.cc"],
deps = ["//source/common/upstream:scheduler_lib"],
deps = [
"//source/common/upstream:scheduler_lib",
"//test/test_common:utility_lib",
],
)

envoy_cc_test_library(
Expand Down
198 changes: 189 additions & 9 deletions test/common/upstream/edf_scheduler_test.cc
Original file line number Diff line number Diff line change
@@ -1,19 +1,60 @@
#include "source/common/upstream/edf_scheduler.h"

#include "test/test_common/test_random_generator.h"

#include "gtest/gtest.h"

namespace Envoy {
namespace Upstream {
namespace {

TEST(EdfSchedulerTest, Empty) {
class EdfSchedulerTest : public testing::Test {
public:
template <typename T>
static void compareEdfSchedulers(EdfScheduler<T>& scheduler1, EdfScheduler<T>& scheduler2) {
// Compares that the given EdfSchedulers internal queues are equal up
// (ignoring the order_offset_ values).
EXPECT_EQ(scheduler1.queue_.size(), scheduler2.queue_.size());
// Cannot iterate over std::priority_queue directly, so need to copy the
// contents to a vector first.
auto copyFunc = [](EdfScheduler<T>& scheduler) {
std::vector<typename EdfScheduler<T>::EdfEntry> result;
result.reserve(scheduler.queue_.size());
while (!scheduler.empty()) {
result.emplace_back(std::move(scheduler.queue_.top()));
scheduler.queue_.pop();
}
// Re-add all elements so the contents of the input scheduler isn't
// changed.
for (auto& entry : result) {
scheduler.queue_.push(entry);
}
return result;
};
std::vector<typename EdfScheduler<T>::EdfEntry> contents1 = copyFunc(scheduler1);
std::vector<typename EdfScheduler<T>::EdfEntry> contents2 = copyFunc(scheduler2);
for (size_t i = 0; i < contents1.size(); ++i) {
// Given 2 queues and some number of picks P, where one queue is created empty
// and P picks are performed, and the other queue is created using
// `EdfScheduler::createWithPicks()` their deadlines may be a bit different
// due to floating point arithmetic differences. The comparison code uses
// a NEAR comparison to account for such differences.
EXPECT_NEAR(contents1[i].deadline_, contents2[i].deadline_, 1e-5)
<< "inequal deadline in element " << i;
std::shared_ptr<T> entry1 = contents1[i].entry_.lock();
std::shared_ptr<T> entry2 = contents2[i].entry_.lock();
EXPECT_EQ(*entry1, *entry2) << "inequal entry in element " << i;
}
}
};

TEST_F(EdfSchedulerTest, Empty) {
EdfScheduler<uint32_t> sched;
EXPECT_EQ(nullptr, sched.peekAgain([](const double&) { return 0; }));
EXPECT_EQ(nullptr, sched.pickAndAdd([](const double&) { return 0; }));
}

// Validate we get regular RR behavior when all weights are the same.
TEST(EdfSchedulerTest, Unweighted) {
TEST_F(EdfSchedulerTest, Unweighted) {
EdfScheduler<uint32_t> sched;
constexpr uint32_t num_entries = 128;
std::shared_ptr<uint32_t> entries[num_entries];
Expand All @@ -34,7 +75,7 @@ TEST(EdfSchedulerTest, Unweighted) {
}

// Validate we get weighted RR behavior when weights are distinct.
TEST(EdfSchedulerTest, Weighted) {
TEST_F(EdfSchedulerTest, Weighted) {
EdfScheduler<uint32_t> sched;
constexpr uint32_t num_entries = 128;
std::shared_ptr<uint32_t> entries[num_entries];
Expand All @@ -59,7 +100,7 @@ TEST(EdfSchedulerTest, Weighted) {
}

// Validate that expired entries are ignored.
TEST(EdfSchedulerTest, Expired) {
TEST_F(EdfSchedulerTest, Expired) {
EdfScheduler<uint32_t> sched;

auto second_entry = std::make_shared<uint32_t>(42);
Expand All @@ -77,7 +118,7 @@ TEST(EdfSchedulerTest, Expired) {
}

// Validate that expired entries are not peeked.
TEST(EdfSchedulerTest, ExpiredPeek) {
TEST_F(EdfSchedulerTest, ExpiredPeek) {
EdfScheduler<uint32_t> sched;

{
Expand All @@ -93,7 +134,7 @@ TEST(EdfSchedulerTest, ExpiredPeek) {
}

// Validate that expired entries are ignored.
TEST(EdfSchedulerTest, ExpiredPeekedIsNotPicked) {
TEST_F(EdfSchedulerTest, ExpiredPeekedIsNotPicked) {
EdfScheduler<uint32_t> sched;

{
Expand All @@ -110,7 +151,7 @@ TEST(EdfSchedulerTest, ExpiredPeekedIsNotPicked) {
EXPECT_TRUE(sched.pickAndAdd([](const double&) { return 1; }) == nullptr);
}

TEST(EdfSchedulerTest, ManyPeekahead) {
TEST_F(EdfSchedulerTest, ManyPeekahead) {
EdfScheduler<uint32_t> sched1;
EdfScheduler<uint32_t> sched2;
constexpr uint32_t num_entries = 128;
Expand All @@ -134,6 +175,145 @@ TEST(EdfSchedulerTest, ManyPeekahead) {
}
}

} // namespace
// Validates that creating a scheduler using the createWithPicks (with 0 picks)
// is equal to creating an empty scheduler and adding entries one after the other.
TEST_F(EdfSchedulerTest, SchedulerWithZeroPicksEqualToEmptyWithAddedEntries) {
constexpr uint32_t num_entries = 128;
std::vector<std::shared_ptr<uint32_t>> entries;
entries.reserve(num_entries);

// Populate sched1 one entry after the other.
EdfScheduler<uint32_t> sched1;
for (uint32_t i = 0; i < num_entries; ++i) {
entries.emplace_back(std::make_shared<uint32_t>(i + 1));
sched1.add(i + 1, entries.back());
}

EdfScheduler<uint32_t> sched2 = EdfScheduler<uint32_t>::createWithPicks(
entries, [](const double& w) { return w; }, 0);

compareEdfSchedulers(sched1, sched2);
}

// Validates that creating a scheduler using the createWithPicks (with 5 picks)
// is equal to creating an empty scheduler and adding entries one after the other,
// and then performing some number of picks.
TEST_F(EdfSchedulerTest, SchedulerWithSomePicksEqualToEmptyWithAddedEntries) {
constexpr uint32_t num_entries = 128;
// Use double-precision weights from the range [0.01, 100.5].
// Using different weights to avoid a case where entries with the same weight
// will be chosen in different order.
std::vector<std::shared_ptr<double>> entries;
entries.reserve(num_entries);
for (uint32_t i = 0; i < num_entries; ++i) {
const double entry_weight = (100.5 - 0.01) / num_entries * i + 0.01;
entries.emplace_back(std::make_shared<double>(entry_weight));
}

const std::vector<uint32_t> all_picks{5, 140, 501, 123456, 894571};
for (const auto picks : all_picks) {
// Populate sched1 one entry after the other.
EdfScheduler<double> sched1;
for (uint32_t i = 0; i < num_entries; ++i) {
sched1.add(*entries[i], entries[i]);
}
// Perform the picks on sched1.
for (uint32_t i = 0; i < picks; ++i) {
sched1.pickAndAdd([](const double& w) { return w; });
}

// Create sched2 with pre-built and pre-picked entries.
EdfScheduler<double> sched2 = EdfScheduler<double>::createWithPicks(
entries, [](const double& w) { return w; }, picks);

compareEdfSchedulers(sched1, sched2);
}
}

// Emulates first-pick scenarios by creating a scheduler with the given
// weights and a random number of pre-picks, and validates that the next pick
// of all the weights is close to the given weights.
void firstPickTest(const std::vector<double> weights) {
TestRandomGenerator rand;
ASSERT(std::accumulate(weights.begin(), weights.end(), 0.) == 100.0);
// To be able to converge to the expected weights, a decent number of iterations
// should be used. If the number of weights is large, the number of iterations
// should be larger than 10000.
constexpr uint64_t iterations = 4e5;
// The expected range of the weights is [0,100). If this is no longer the
// case, this value may need to be updated.
constexpr double tolerance_pct = 1.0;

// Set up the entries as simple integers.
std::vector<std::shared_ptr<size_t>> entries;
entries.reserve(weights.size());
for (size_t i = 0; i < weights.size(); ++i) {
entries.emplace_back(std::make_shared<size_t>(i));
}

absl::flat_hash_map<size_t, int> sched_picks;
auto calc_weight = [&weights](const size_t& i) -> double { return weights[i]; };

for (uint64_t i = 0; i < iterations; ++i) {
// Create a scheduler with the given weights with a random number of
// emulated pre-picks.
uint32_t r = rand.random();
auto sched = EdfScheduler<size_t>::createWithPicks(entries, calc_weight, r);

// Perform a "first-pick" from that scheduler, and increase the counter for
// that entry.
sched_picks[*sched.pickAndAdd(calc_weight)]++;
}

// Validate that the observed distribution and expected weights are close.
ASSERT_EQ(weights.size(), sched_picks.size());
for (const auto& it : sched_picks) {
const double expected = calc_weight(it.first);
const double observed = 100 * static_cast<double>(it.second) / iterations;
EXPECT_NEAR(expected, observed, tolerance_pct);
}
}

// Validates that after creating schedulers using the createWithPicks (with random picks)
// and then performing a "first-pick", the distribution of the "first-picks" is
// equal to the weights.
TEST_F(EdfSchedulerTest, SchedulerWithRandomPicksFirstPickDistribution) {
firstPickTest({25.0, 75.0});
firstPickTest({1.0, 99.0});
firstPickTest({50.0, 50.0});
firstPickTest({1.0, 20.0, 79.0});
}

constexpr uint64_t BATCH_SIZE = 50;
static std::vector<uint64_t> picksStarts() {
std::vector<uint64_t> start_idxs;
// Add the first range, as it starts at 1 (and not 0).
start_idxs.emplace_back(1);
// The weight delta between iterations is 0.001, so to cover the range
// from 0 to 100, the largest start_idx will be 100 / 0.001.
for (uint64_t i = 50; i < 100 * 1000; i += BATCH_SIZE) {
start_idxs.emplace_back(i);
}
return start_idxs;
}

class EdfSchedulerSpecialTest : public testing::TestWithParam<uint64_t> {};
// Validates that after creating schedulers using the createWithPicks (with random picks)
// and then performing a "first-pick", the distribution of the "first-picks" is
// equal to the weights. Trying the case of 2 weights between 0 to 100, in steps
// of 0.001. This test takes too long, and therefore it is disabled by default.
// If the EDF scheduler is enable, it can be manually executed.
TEST_P(EdfSchedulerSpecialTest, DISABLED_ExhustiveValidator) {
const uint64_t start_idx = GetParam();
for (uint64_t i = start_idx; i < start_idx + BATCH_SIZE; ++i) {
const double w1 = 0.001 * i;
ENVOY_LOG_MISC(trace, "Testing weights: w1={}, w2={}", w1, 100.0 - w1);
firstPickTest({w1, 100.0 - w1});
}
}

INSTANTIATE_TEST_SUITE_P(ExhustiveValidator, EdfSchedulerSpecialTest,
testing::ValuesIn(picksStarts()));

} // namespace Upstream
} // namespace Envoy

0 comments on commit 45ab9cf

Please sign in to comment.