diff --git a/source/common/upstream/edf_scheduler.h b/source/common/upstream/edf_scheduler.h index 57098be6b2f7..a625d72a98df 100644 --- a/source/common/upstream/edf_scheduler.h +++ b/source/common/upstream/edf_scheduler.h @@ -27,6 +27,8 @@ namespace Upstream { // weights and an O(log n) pick time. template class EdfScheduler : public Scheduler { public: + EdfScheduler() {} + // See scheduler.h for an explanation of each public method. std::shared_ptr peekAgain(std::function calculate_weight) override { std::shared_ptr ret = popEntry(); @@ -64,7 +66,110 @@ template class EdfScheduler : public Scheduler { bool empty() const override { return queue_.empty(); } + // Creates an EdfScheduler with the given weights and their corresponding + // entries, and emulating a number of initial picks to be performed. Note that + // the internal state of the scheduler will be very similar to creating an empty + // scheduler, adding the entries one after the other, and then performing + // "picks" pickAndAdd operation without modifying the entries' weights. + // The only thing that may be different is that entries with the same weight + // may be chosen a bit differently (the order_offset_ values may be different). + // Breaking the ties of same weight entries will be kept in future picks from + // the scheduler. + static EdfScheduler createWithPicks(const std::vector>& entries, + std::function calculate_weight, + uint32_t picks) { + // Limiting the number of picks, as over 400M picks should be sufficient + // for most scenarios. + picks = picks % 429496729; // % UINT_MAX/10 + EDF_TRACE("Creating an EDF-scheduler with {} weights and {} pre-picks.", entries.size(), picks); + // Assume no non-positive weights. + ASSERT(std::none_of(entries.cbegin(), entries.cend(), + [&calculate_weight](const std::shared_ptr& entry) { + return calculate_weight(*entry) <= 0; + })); + + // Nothing to do if there are no entries. + if (entries.size() == 0) { + return EdfScheduler(); + } + + // Augment the weight computation to add some epsilon to each entry's + // weight to avoid cases where weights are multiplies of each other. For + // example if there are 2 weights: 25 and 75, and picks=23, then the + // floor_picks will be {5, 17} (respectively), and the deadlines will be + // {0.24000000000000002 and 0.24} (respectively). This small difference will + // cause a "wrong" pick compared to when starting from an empty scheduler + // and picking 23 times. Adding a small value to each weight circumvents + // this problem. This was added as a result of the following comment: + // https://github.com/envoyproxy/envoy/pull/31592#issuecomment-1877663769. + auto aug_calculate_weight = [&calculate_weight](const C& entry) -> double { + return calculate_weight(entry) + 1e-13; + }; + + // Let weights {w_1, w_2, ..., w_N} be the per-entry weight where (w_i > 0), + // W = sum(w_i), and P be the number of times to "pick" from the scheduler. + // Let p'_i = floor(P * w_i/W), then the number of times each entry is being + // picked is p_i >= p'_i. Note that 0 <= P - sum(p'_i) < N. + // + // The following code does P picks, by first emulating p'_i picks for each + // entry, and then executing the leftover P - sum(p'_i) picks. + double weights_sum = std::accumulate( + entries.cbegin(), entries.cend(), 0.0, + [&aug_calculate_weight](double sum_so_far, const std::shared_ptr& entry) { + return sum_so_far + aug_calculate_weight(*entry); + }); + std::vector floor_picks; + floor_picks.reserve(entries.size()); + std::transform(entries.cbegin(), entries.cend(), std::back_inserter(floor_picks), + [picks, weights_sum, &aug_calculate_weight](const std::shared_ptr& entry) { + // Getting the lower-bound by casting to an integer. + return static_cast(aug_calculate_weight(*entry) * picks / + weights_sum); + }); + + // Pre-compute the priority-queue entries to use an O(N) initialization c'tor. + std::vector scheduler_entries; + scheduler_entries.reserve(entries.size()); + uint32_t picks_so_far = 0; + double max_pick_time = 0.0; + // Emulate a per-entry addition to a deadline that is applicable to N picks. + for (size_t i = 0; i < entries.size(); ++i) { + // Add the entry with p'_i picks. As there were p'_i picks, the entry's + // next deadline is (p'_i + 1) / w_i. + const double weight = aug_calculate_weight(*entries[i]); + // While validating the algorithm there were a few cases where the math + // and floating-point arithmetic did not agree (specifically floor(A*B) + // was greater than A*B). The following if statement solves the problem by + // reducing floor-picks for the entry, which may result in more iterations + // in the code after the loop. + if ((floor_picks[i] > 0) && (floor_picks[i] / weight >= picks / weights_sum)) { + floor_picks[i]--; + } + const double pick_time = floor_picks[i] / weight; + const double deadline = (floor_picks[i] + 1) / weight; + EDF_TRACE("Insertion {} in queue with emualted {} picks, deadline {} and weight {}.", + static_cast(entries[i].get()), floor_picks[i], deadline, weight); + scheduler_entries.emplace_back(EdfEntry{deadline, i, entries[i]}); + max_pick_time = std::max(max_pick_time, pick_time); + picks_so_far += floor_picks[i]; + } + // The scheduler's current_time_ needs to be the largest time that some entry was picked. + EdfScheduler scheduler(std::move(scheduler_entries), max_pick_time, entries.size()); + ASSERT(scheduler.queue_.top().deadline_ >= scheduler.current_time_); + + // Left to do some picks, execute them one after the other. + EDF_TRACE("Emulated {} picks in init step, {} picks remaining for one after the other step", + picks_so_far, picks - picks_so_far); + while (picks_so_far < picks) { + scheduler.pickAndAdd(calculate_weight); + picks_so_far++; + } + return scheduler; + } + private: + friend class EdfSchedulerTest; + /** * Clears expired entries and pops the next unexpired entry in the queue. */ @@ -106,6 +211,11 @@ template class EdfScheduler : public Scheduler { } }; + EdfScheduler(std::vector&& scheduler_entries, double current_time, + uint32_t order_offset) + : current_time_(current_time), order_offset_(order_offset), + queue_(scheduler_entries.cbegin(), scheduler_entries.cend()) {} + // Current time in EDF scheduler. // TODO(htuch): Is it worth the small extra complexity to use integer time for performance // reasons? diff --git a/test/common/upstream/BUILD b/test/common/upstream/BUILD index 12fe0396553a..d2bcf46fb45c 100644 --- a/test/common/upstream/BUILD +++ b/test/common/upstream/BUILD @@ -168,7 +168,10 @@ envoy_cc_test( envoy_cc_test( name = "edf_scheduler_test", srcs = ["edf_scheduler_test.cc"], - deps = ["//source/common/upstream:scheduler_lib"], + deps = [ + "//source/common/upstream:scheduler_lib", + "//test/test_common:utility_lib", + ], ) envoy_cc_test_library( diff --git a/test/common/upstream/edf_scheduler_test.cc b/test/common/upstream/edf_scheduler_test.cc index 7eac13bc8153..1619093aa077 100644 --- a/test/common/upstream/edf_scheduler_test.cc +++ b/test/common/upstream/edf_scheduler_test.cc @@ -1,19 +1,60 @@ #include "source/common/upstream/edf_scheduler.h" +#include "test/test_common/test_random_generator.h" + #include "gtest/gtest.h" namespace Envoy { namespace Upstream { -namespace { -TEST(EdfSchedulerTest, Empty) { +class EdfSchedulerTest : public testing::Test { +public: + template + static void compareEdfSchedulers(EdfScheduler& scheduler1, EdfScheduler& scheduler2) { + // Compares that the given EdfSchedulers internal queues are equal up + // (ignoring the order_offset_ values). + EXPECT_EQ(scheduler1.queue_.size(), scheduler2.queue_.size()); + // Cannot iterate over std::priority_queue directly, so need to copy the + // contents to a vector first. + auto copyFunc = [](EdfScheduler& scheduler) { + std::vector::EdfEntry> result; + result.reserve(scheduler.queue_.size()); + while (!scheduler.empty()) { + result.emplace_back(std::move(scheduler.queue_.top())); + scheduler.queue_.pop(); + } + // Re-add all elements so the contents of the input scheduler isn't + // changed. + for (auto& entry : result) { + scheduler.queue_.push(entry); + } + return result; + }; + std::vector::EdfEntry> contents1 = copyFunc(scheduler1); + std::vector::EdfEntry> contents2 = copyFunc(scheduler2); + for (size_t i = 0; i < contents1.size(); ++i) { + // Given 2 queues and some number of picks P, where one queue is created empty + // and P picks are performed, and the other queue is created using + // `EdfScheduler::createWithPicks()` their deadlines may be a bit different + // due to floating point arithmetic differences. The comparison code uses + // a NEAR comparison to account for such differences. + EXPECT_NEAR(contents1[i].deadline_, contents2[i].deadline_, 1e-5) + << "inequal deadline in element " << i; + std::shared_ptr entry1 = contents1[i].entry_.lock(); + std::shared_ptr entry2 = contents2[i].entry_.lock(); + EXPECT_EQ(*entry1, *entry2) << "inequal entry in element " << i; + } + } +}; + +TEST_F(EdfSchedulerTest, Empty) { EdfScheduler sched; EXPECT_EQ(nullptr, sched.peekAgain([](const double&) { return 0; })); EXPECT_EQ(nullptr, sched.pickAndAdd([](const double&) { return 0; })); } // Validate we get regular RR behavior when all weights are the same. -TEST(EdfSchedulerTest, Unweighted) { +TEST_F(EdfSchedulerTest, Unweighted) { EdfScheduler sched; constexpr uint32_t num_entries = 128; std::shared_ptr entries[num_entries]; @@ -34,7 +75,7 @@ TEST(EdfSchedulerTest, Unweighted) { } // Validate we get weighted RR behavior when weights are distinct. -TEST(EdfSchedulerTest, Weighted) { +TEST_F(EdfSchedulerTest, Weighted) { EdfScheduler sched; constexpr uint32_t num_entries = 128; std::shared_ptr entries[num_entries]; @@ -59,7 +100,7 @@ TEST(EdfSchedulerTest, Weighted) { } // Validate that expired entries are ignored. -TEST(EdfSchedulerTest, Expired) { +TEST_F(EdfSchedulerTest, Expired) { EdfScheduler sched; auto second_entry = std::make_shared(42); @@ -77,7 +118,7 @@ TEST(EdfSchedulerTest, Expired) { } // Validate that expired entries are not peeked. -TEST(EdfSchedulerTest, ExpiredPeek) { +TEST_F(EdfSchedulerTest, ExpiredPeek) { EdfScheduler sched; { @@ -93,7 +134,7 @@ TEST(EdfSchedulerTest, ExpiredPeek) { } // Validate that expired entries are ignored. -TEST(EdfSchedulerTest, ExpiredPeekedIsNotPicked) { +TEST_F(EdfSchedulerTest, ExpiredPeekedIsNotPicked) { EdfScheduler sched; { @@ -110,7 +151,7 @@ TEST(EdfSchedulerTest, ExpiredPeekedIsNotPicked) { EXPECT_TRUE(sched.pickAndAdd([](const double&) { return 1; }) == nullptr); } -TEST(EdfSchedulerTest, ManyPeekahead) { +TEST_F(EdfSchedulerTest, ManyPeekahead) { EdfScheduler sched1; EdfScheduler sched2; constexpr uint32_t num_entries = 128; @@ -134,6 +175,145 @@ TEST(EdfSchedulerTest, ManyPeekahead) { } } -} // namespace +// Validates that creating a scheduler using the createWithPicks (with 0 picks) +// is equal to creating an empty scheduler and adding entries one after the other. +TEST_F(EdfSchedulerTest, SchedulerWithZeroPicksEqualToEmptyWithAddedEntries) { + constexpr uint32_t num_entries = 128; + std::vector> entries; + entries.reserve(num_entries); + + // Populate sched1 one entry after the other. + EdfScheduler sched1; + for (uint32_t i = 0; i < num_entries; ++i) { + entries.emplace_back(std::make_shared(i + 1)); + sched1.add(i + 1, entries.back()); + } + + EdfScheduler sched2 = EdfScheduler::createWithPicks( + entries, [](const double& w) { return w; }, 0); + + compareEdfSchedulers(sched1, sched2); +} + +// Validates that creating a scheduler using the createWithPicks (with 5 picks) +// is equal to creating an empty scheduler and adding entries one after the other, +// and then performing some number of picks. +TEST_F(EdfSchedulerTest, SchedulerWithSomePicksEqualToEmptyWithAddedEntries) { + constexpr uint32_t num_entries = 128; + // Use double-precision weights from the range [0.01, 100.5]. + // Using different weights to avoid a case where entries with the same weight + // will be chosen in different order. + std::vector> entries; + entries.reserve(num_entries); + for (uint32_t i = 0; i < num_entries; ++i) { + const double entry_weight = (100.5 - 0.01) / num_entries * i + 0.01; + entries.emplace_back(std::make_shared(entry_weight)); + } + + const std::vector all_picks{5, 140, 501, 123456, 894571}; + for (const auto picks : all_picks) { + // Populate sched1 one entry after the other. + EdfScheduler sched1; + for (uint32_t i = 0; i < num_entries; ++i) { + sched1.add(*entries[i], entries[i]); + } + // Perform the picks on sched1. + for (uint32_t i = 0; i < picks; ++i) { + sched1.pickAndAdd([](const double& w) { return w; }); + } + + // Create sched2 with pre-built and pre-picked entries. + EdfScheduler sched2 = EdfScheduler::createWithPicks( + entries, [](const double& w) { return w; }, picks); + + compareEdfSchedulers(sched1, sched2); + } +} + +// Emulates first-pick scenarios by creating a scheduler with the given +// weights and a random number of pre-picks, and validates that the next pick +// of all the weights is close to the given weights. +void firstPickTest(const std::vector weights) { + TestRandomGenerator rand; + ASSERT(std::accumulate(weights.begin(), weights.end(), 0.) == 100.0); + // To be able to converge to the expected weights, a decent number of iterations + // should be used. If the number of weights is large, the number of iterations + // should be larger than 10000. + constexpr uint64_t iterations = 4e5; + // The expected range of the weights is [0,100). If this is no longer the + // case, this value may need to be updated. + constexpr double tolerance_pct = 1.0; + + // Set up the entries as simple integers. + std::vector> entries; + entries.reserve(weights.size()); + for (size_t i = 0; i < weights.size(); ++i) { + entries.emplace_back(std::make_shared(i)); + } + + absl::flat_hash_map sched_picks; + auto calc_weight = [&weights](const size_t& i) -> double { return weights[i]; }; + + for (uint64_t i = 0; i < iterations; ++i) { + // Create a scheduler with the given weights with a random number of + // emulated pre-picks. + uint32_t r = rand.random(); + auto sched = EdfScheduler::createWithPicks(entries, calc_weight, r); + + // Perform a "first-pick" from that scheduler, and increase the counter for + // that entry. + sched_picks[*sched.pickAndAdd(calc_weight)]++; + } + + // Validate that the observed distribution and expected weights are close. + ASSERT_EQ(weights.size(), sched_picks.size()); + for (const auto& it : sched_picks) { + const double expected = calc_weight(it.first); + const double observed = 100 * static_cast(it.second) / iterations; + EXPECT_NEAR(expected, observed, tolerance_pct); + } +} + +// Validates that after creating schedulers using the createWithPicks (with random picks) +// and then performing a "first-pick", the distribution of the "first-picks" is +// equal to the weights. +TEST_F(EdfSchedulerTest, SchedulerWithRandomPicksFirstPickDistribution) { + firstPickTest({25.0, 75.0}); + firstPickTest({1.0, 99.0}); + firstPickTest({50.0, 50.0}); + firstPickTest({1.0, 20.0, 79.0}); +} + +constexpr uint64_t BATCH_SIZE = 50; +static std::vector picksStarts() { + std::vector start_idxs; + // Add the first range, as it starts at 1 (and not 0). + start_idxs.emplace_back(1); + // The weight delta between iterations is 0.001, so to cover the range + // from 0 to 100, the largest start_idx will be 100 / 0.001. + for (uint64_t i = 50; i < 100 * 1000; i += BATCH_SIZE) { + start_idxs.emplace_back(i); + } + return start_idxs; +} + +class EdfSchedulerSpecialTest : public testing::TestWithParam {}; +// Validates that after creating schedulers using the createWithPicks (with random picks) +// and then performing a "first-pick", the distribution of the "first-picks" is +// equal to the weights. Trying the case of 2 weights between 0 to 100, in steps +// of 0.001. This test takes too long, and therefore it is disabled by default. +// If the EDF scheduler is enable, it can be manually executed. +TEST_P(EdfSchedulerSpecialTest, DISABLED_ExhustiveValidator) { + const uint64_t start_idx = GetParam(); + for (uint64_t i = start_idx; i < start_idx + BATCH_SIZE; ++i) { + const double w1 = 0.001 * i; + ENVOY_LOG_MISC(trace, "Testing weights: w1={}, w2={}", w1, 100.0 - w1); + firstPickTest({w1, 100.0 - w1}); + } +} + +INSTANTIATE_TEST_SUITE_P(ExhustiveValidator, EdfSchedulerSpecialTest, + testing::ValuesIn(picksStarts())); + } // namespace Upstream } // namespace Envoy