diff --git a/.clang-format b/.clang-format index 03e45a4..698b533 100644 --- a/.clang-format +++ b/.clang-format @@ -54,7 +54,7 @@ DeriveLineEnding: 'true' FixNamespaceComments: true IncludeBlocks: Regroup IncludeCategories: - - Regex: '(<|")((C|c)atch.*)/' + - Regex: '(<|")(((C|c)atch.*)|((C|c)elero.*))/' Priority: 5 - Regex: '(<|")((async\+\+).*)/' Priority: 2 diff --git a/CMakeLists.txt b/CMakeLists.txt index 370d12f..a2f0667 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,6 +4,7 @@ project(asyncpp) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) option(ASYNCPP_BUILD_TESTS "Build tests." ON) +option(ASYNCPP_BUILD_BENCHMARKS "Build benchmarks." ON) option(ENABLE_LLVM_COV "Enable LLVM source-based code coverage." OFF) option(ENABLE_LLVM_ADDRESS_SANITIZER "Enable LLVM address sanitizer." OFF) option(ENABLE_LLVM_MEMORY_SANITIZER "Enable LLVM memory sanitizer." OFF) @@ -49,6 +50,9 @@ add_subdirectory(src) if (${ASYNCPP_BUILD_TESTS}) add_subdirectory(test) endif() +if (${ASYNCPP_BUILD_BENCHMARKS}) + add_subdirectory(benchmark) +endif() install(TARGETS asyncpp DESTINATION "lib") install(DIRECTORY "include/asyncpp" DESTINATION "include") \ No newline at end of file diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt new file mode 100644 index 0000000..0c533a7 --- /dev/null +++ b/benchmark/CMakeLists.txt @@ -0,0 +1,14 @@ +add_executable(benchmark) + +target_sources(benchmark + PRIVATE + main.cpp + benchmark_task_spawn.cpp + benchmark_thread_pool.cpp + benchmark_atomic.cpp +) + + +find_package(Celero REQUIRED) +target_link_libraries(benchmark celero) +target_link_libraries(benchmark asyncpp) \ No newline at end of file diff --git a/benchmark/benchmark_atomic.cpp b/benchmark/benchmark_atomic.cpp new file mode 100644 index 0000000..0248e55 --- /dev/null +++ b/benchmark/benchmark_atomic.cpp @@ -0,0 +1,138 @@ +#include + +#include +#include +#include + +#include + + +// This file benchmarks atomic operations themselves, not the library. +// The measurements can be used as a baseline or target as to what is +// achievable and reasonable on the hardware. + + +using namespace asyncpp; + + +static constexpr size_t base_reps = 4'000'000; + + +BASELINE(atomic_rmw, x1_thread, 30, 1) { + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + counter.fetch_add(1, std::memory_order_relaxed); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BENCHMARK(atomic_rmw, x2_thread, 30, 1) { + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps / 2; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + counter.fetch_add(1, std::memory_order_relaxed); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BENCHMARK(atomic_rmw, x4_thread, 30, 1) { + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps / 4; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + counter.fetch_add(1, std::memory_order_relaxed); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BENCHMARK(atomic_rmw, x8_thread, 30, 1) { + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps / 8; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + counter.fetch_add(1, std::memory_order_relaxed); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BASELINE(atomic_read, x1_thread, 30, 1) { + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + static_cast(counter.load(std::memory_order_relaxed)); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BENCHMARK(atomic_read, x2_thread, 30, 1) { + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps / 2; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + static_cast(counter.load(std::memory_order_relaxed)); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BENCHMARK(atomic_read, x4_thread, 30, 1) { + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps / 4; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + static_cast(counter.load(std::memory_order_relaxed)); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BENCHMARK(atomic_read, x8_thread, 30, 1) { + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps / 8; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + static_cast(counter.load(std::memory_order_relaxed)); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} \ No newline at end of file diff --git a/benchmark/benchmark_task_spawn.cpp b/benchmark/benchmark_task_spawn.cpp new file mode 100644 index 0000000..444f550 --- /dev/null +++ b/benchmark/benchmark_task_spawn.cpp @@ -0,0 +1,136 @@ +#include +#include +#include + +#include +#include + +#include + + +using namespace asyncpp; + + +task plain() { + co_return 1; +} + + +task allocator_backed(std::allocator_arg_t, std::pmr::polymorphic_allocator<> alloc) { + co_return 1; +} + + +struct FixtureNewDelete : celero::TestFixture { + inline std::pmr::polymorphic_allocator<>& getAlloc() { + return alloc; + } + +private: + std::pmr::polymorphic_allocator<> alloc = { std::pmr::new_delete_resource() }; +}; + + +struct FixturePool : celero::TestFixture { + inline std::pmr::polymorphic_allocator<>& getAlloc() { + return alloc; + } + +private: + std::pmr::unsynchronized_pool_resource resource; + std::pmr::polymorphic_allocator<> alloc = { &resource }; +}; + + +struct FixtureStack : celero::TestFixture { + void setUp(const ExperimentValue* x) override { + alloc.~polymorphic_allocator(); + resource.~monotonic_buffer_resource(); + new (&resource) std::pmr::monotonic_buffer_resource(buffer.get(), size, std::pmr::new_delete_resource()); + new (&alloc) std::pmr::polymorphic_allocator<>(&resource); + } + + inline std::pmr::polymorphic_allocator<>& getAlloc() { + return alloc; + } + +private: + static constexpr inline size_t size = 10485760; + struct block { + alignas(avoid_false_sharing) std::byte content[avoid_false_sharing]; + }; + std::unique_ptr buffer = std::make_unique_for_overwrite(size / sizeof(block)); + std::pmr::monotonic_buffer_resource resource; + std::pmr::polymorphic_allocator<> alloc = { &resource }; +}; + + +constexpr int numSamples = 1000; +constexpr int numIterations = 5000; + + +BASELINE(task_spawn, unoptimized, numSamples, numIterations) { + bool ready = false; + { + auto task = plain(); + volatile auto ptr = &task; + ptr->launch(); + ready = ptr->ready(); + } + assert(ready); + celero::DoNotOptimizeAway(ready); +} + + +BENCHMARK(task_spawn, HALO, numSamples, numIterations) { + bool ready = false; + { + auto task = plain(); + task.launch(); + ready = task.ready(); + } + assert(ready); + celero::DoNotOptimizeAway(ready); +} + + +BENCHMARK_F(task_spawn, PMR_new_delete, FixtureNewDelete, numSamples, numIterations) { + bool ready = false; + auto& alloc = getAlloc(); + { + auto task = allocator_backed(std::allocator_arg, alloc); + volatile auto ptr = &task; + ptr->launch(); + ready = ptr->ready(); + } + assert(ready); + celero::DoNotOptimizeAway(ready); +} + + +BENCHMARK_F(task_spawn, PMR_unsync_pool, FixturePool, numSamples, numIterations) { + bool ready = false; + auto& alloc = getAlloc(); + { + auto task = allocator_backed(std::allocator_arg, alloc); + volatile auto ptr = &task; + ptr->launch(); + ready = ptr->ready(); + } + assert(ready); + celero::DoNotOptimizeAway(ready); +} + + +BENCHMARK_F(task_spawn, PMR_stack, FixtureStack, numSamples, numIterations) { + bool ready = false; + auto& alloc = getAlloc(); + { + auto task = allocator_backed(std::allocator_arg, alloc); + volatile auto ptr = &task; + ptr->launch(); + ready = ptr->ready(); + } + assert(ready); + celero::DoNotOptimizeAway(ready); +} \ No newline at end of file diff --git a/benchmark/benchmark_thread_pool.cpp b/benchmark/benchmark_thread_pool.cpp new file mode 100644 index 0000000..14b586f --- /dev/null +++ b/benchmark/benchmark_thread_pool.cpp @@ -0,0 +1,146 @@ +#include +#include +#include + +#include +#include + +#include + + +using namespace asyncpp; + + +struct noop_promise : schedulable_promise { + noop_promise(std::atomic_size_t* counter = nullptr) : counter(counter) {} + void resume_now() override { + if (counter) { + counter->fetch_sub(1, std::memory_order_relaxed); + } + } + std::atomic_size_t* counter = nullptr; +}; + + +template +struct FixturePool : celero::TestFixture { + FixturePool() : pool(NumThreads), num_running(0), promises(400000, noop_promise(&num_running)) {} + + void setUp(const ExperimentValue* x) override { + num_running.store(promises.size()); + } + + + void sync() { + while (num_running.load(std::memory_order_relaxed) != 0) { + } + } + + void tearDown() override { + sync(); + } + + thread_pool pool; + std::atomic_size_t num_running; + std::vector promises; +}; + + +BASELINE_F(tp_schedule_outside, x1_thread, FixturePool<1>, 30, 1) { + for (auto& promise : promises) { + pool.schedule(promise); + } +} + + +BENCHMARK_F(tp_schedule_outside, x2_thread, FixturePool<4>, 30, 1) { + for (auto& promise : promises) { + pool.schedule(promise); + } +} + + +BENCHMARK_F(tp_schedule_outside, x4_thread, FixturePool<4>, 30, 1) { + for (auto& promise : promises) { + pool.schedule(promise); + } +} + + +BASELINE_F(tp_schedule_inside, x1_thread, FixturePool<1>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { + for (auto& promise : promises) { + pool.schedule(promise); + } + co_return; + }; + auto t1 = launch(coro(promises, pool), pool); + join(t1); +} + + +BENCHMARK_F(tp_schedule_inside, x2_thread, FixturePool<2>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { + for (auto& promise : promises) { + pool.schedule(promise); + } + co_return; + }; + const auto count = std::ssize(promises) / 2; + auto t1 = launch(coro(std::span(promises.begin(), promises.begin() + count), pool), pool); + auto t2 = launch(coro(std::span(promises.begin() + count, promises.end()), pool), pool); + join(t1); + join(t2); +} + + +BENCHMARK_F(tp_schedule_inside, x4_thread, FixturePool<4>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { + for (auto& promise : promises) { + pool.schedule(promise); + } + co_return; + }; + const auto count = std::ssize(promises) / 4; + auto t1 = launch(coro(std::span(promises.begin(), promises.begin() + 1 * count), pool), pool); + auto t2 = launch(coro(std::span(promises.begin() + 1 * count, promises.begin() + 2 * count), pool), pool); + auto t3 = launch(coro(std::span(promises.begin() + 2 * count, promises.begin() + 3 * count), pool), pool); + auto t4 = launch(coro(std::span(promises.begin() + 3 * count, promises.end()), pool), pool); + join(t1); + join(t2); + join(t3); + join(t4); +} + + +BASELINE_F(tp_stealing, x1_thread, FixturePool<1 + 1>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { + for (auto& promise : promises) { + pool.schedule(promise); + } + co_return; + }; + join(launch(coro(promises, pool), pool)); +} + + +BENCHMARK_F(tp_stealing, x2_thread, FixturePool<2 + 1>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { + for (auto& promise : promises) { + pool.schedule(promise); + } + co_return; + }; + join(launch(coro(promises, pool), pool)); +} + + +BENCHMARK_F(tp_stealing, x4_thread, FixturePool<4 + 1>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { + for (auto& promise : promises) { + pool.schedule(promise); + } + co_return; + }; + join(launch(coro(promises, pool), pool)); +} \ No newline at end of file diff --git a/benchmark/main.cpp b/benchmark/main.cpp new file mode 100644 index 0000000..74f2575 --- /dev/null +++ b/benchmark/main.cpp @@ -0,0 +1,15 @@ +#include + +#include + + +int main(int argc, char* argv[]) { + try { + celero::Run(argc, argv); + } + catch (std::exception& ex) { + std::cerr << "benchmarks exited with error: " << ex.what() << std::endl; + return 1; + } + return 0; +} \ No newline at end of file diff --git a/conanfile.txt b/conanfile.txt index c7fc0a6..8badf34 100644 --- a/conanfile.txt +++ b/conanfile.txt @@ -1,5 +1,6 @@ [requires] catch2/3.4.0 +celero/2.9.0 [generators] CMakeDeps diff --git a/include/asyncpp/promise.hpp b/include/asyncpp/promise.hpp index c37a938..85db1fb 100644 --- a/include/asyncpp/promise.hpp +++ b/include/asyncpp/promise.hpp @@ -72,8 +72,9 @@ struct resumable_promise { struct schedulable_promise { virtual ~schedulable_promise() = default; - virtual std::coroutine_handle<> handle() = 0; + virtual void resume_now() = 0; schedulable_promise* m_scheduler_next = nullptr; + schedulable_promise* m_scheduler_prev = nullptr; scheduler* m_scheduler = nullptr; }; @@ -139,11 +140,11 @@ struct allocator_aware_promise { }; static constexpr dealloc_t dealloc = [](void* ptr, size_t size) { - auto& alloc = *reinterpret_cast(static_cast(ptr) + alloc_offset(size)); - auto moved = std::move(alloc); - alloc.~alloc_t(); + const auto alloc_ptr = reinterpret_cast(static_cast(ptr) + alloc_offset(size)); + auto alloc = std::move(*alloc_ptr); + alloc_ptr->~alloc_t(); const auto num_blocks = (total_size(size) + sizeof(block_t) - 1) / sizeof(block_t); - std::allocator_traits::deallocate(moved, static_cast(ptr), num_blocks); + std::allocator_traits::deallocate(alloc, static_cast(ptr), num_blocks); }; auto rebound_alloc = alloc_t(alloc); diff --git a/include/asyncpp/stream.hpp b/include/asyncpp/stream.hpp index eebfde9..7f6daa1 100644 --- a/include/asyncpp/stream.hpp +++ b/include/asyncpp/stream.hpp @@ -100,16 +100,18 @@ namespace impl_stream { m_result = std::nullopt; } - auto handle() -> std::coroutine_handle<> override { - return std::coroutine_handle::from_promise(*this); + void resume() final { + return m_scheduler ? m_scheduler->schedule(*this) : resume_now(); } - void resume() override { - return m_scheduler ? m_scheduler->schedule(*this) : handle().resume(); + void resume_now() final { + const auto handle = std::coroutine_handle::from_promise(*this); + handle.resume(); } void destroy() noexcept { - handle().destroy(); + const auto handle = std::coroutine_handle::from_promise(*this); + handle.destroy(); } auto await() noexcept; diff --git a/include/asyncpp/task.hpp b/include/asyncpp/task.hpp index 3da938e..77ff137 100644 --- a/include/asyncpp/task.hpp +++ b/include/asyncpp/task.hpp @@ -40,12 +40,13 @@ namespace impl_task { return final_awaitable{}; } - auto handle() -> std::coroutine_handle<> final { - return std::coroutine_handle::from_promise(*this); + void resume_now() final { + const auto handle = std::coroutine_handle::from_promise(*this); + handle.resume(); } void resume() final { - return m_scheduler ? m_scheduler->schedule(*this) : handle().resume(); + return m_scheduler ? m_scheduler->schedule(*this) : resume_now(); } void start() { @@ -62,7 +63,8 @@ namespace impl_task { } void destroy() { - handle().destroy(); + const auto handle = std::coroutine_handle::from_promise(*this); + handle.destroy(); } private: diff --git a/include/asyncpp/testing/interleaver.hpp b/include/asyncpp/testing/interleaver.hpp index 32d20d8..d068e2e 100644 --- a/include/asyncpp/testing/interleaver.hpp +++ b/include/asyncpp/testing/interleaver.hpp @@ -190,7 +190,7 @@ class interleaver { do { auto [swarm, scenario] = launch_threads(m_thread_funcs); const auto path_ = run_next_interleaving(tree, swarm); - if constexpr (std::convertible_to) { + if constexpr (std::convertible_to&, validated_scenario&>) { scenario->validate(path_); } } while (!is_transitively_complete(tree, tree.root())); diff --git a/include/asyncpp/thread_pool.hpp b/include/asyncpp/thread_pool.hpp index f116cbc..e4cd187 100644 --- a/include/asyncpp/thread_pool.hpp +++ b/include/asyncpp/thread_pool.hpp @@ -1,63 +1,69 @@ #pragma once +#include "container/atomic_deque.hpp" #include "container/atomic_stack.hpp" #include "scheduler.hpp" +#include "threading/cache.hpp" +#include "threading/spinlock.hpp" #include #include -#include +#include #include #include namespace asyncpp { - class thread_pool : public scheduler { public: - struct worker { - worker* m_next = nullptr; + struct pack; - std::jthread thread; - atomic_stack worklist; - }; + class worker { + public: + using queue = deque; - thread_pool(size_t num_threads = 1); - thread_pool(thread_pool&&) = delete; - thread_pool operator=(thread_pool&&) = delete; - ~thread_pool(); + worker(); + ~worker(); - void schedule(schedulable_promise& promise) override; + void insert(schedulable_promise& promise); + schedulable_promise* steal_from_this(); + schedulable_promise* try_get_promise(pack& pack, size_t& stealing_attempt, bool& exit_loop); + schedulable_promise* steal_from_other(pack& pack, size_t& stealing_attempt) const; + void start(pack& pack); + void cancel(); + + private: + void run(pack& pack); + + public: + worker* m_next = nullptr; + private: + alignas(avoid_false_sharing) spinlock m_spinlock; + alignas(avoid_false_sharing) queue m_promises; + alignas(avoid_false_sharing) std::atomic_flag m_blocked; + alignas(avoid_false_sharing) std::binary_semaphore m_sema; + alignas(avoid_false_sharing) std::jthread m_thread; + alignas(avoid_false_sharing) std::atomic_flag m_cancelled; + }; - static void schedule(schedulable_promise& item, - atomic_stack& global_worklist, - std::condition_variable& global_notification, - std::mutex& global_mutex, - std::atomic_size_t& num_waiting, - worker* local = nullptr); + struct pack { + alignas(avoid_false_sharing) std::vector workers; + alignas(avoid_false_sharing) atomic_stack blocked; + alignas(avoid_false_sharing) std::atomic_size_t num_blocked = 0; + }; - static schedulable_promise* steal(std::span workers); - static void execute(worker& local, - atomic_stack& global_worklist, - std::condition_variable& global_notification, - std::mutex& global_mutex, - std::atomic_flag& terminate, - std::atomic_size_t& num_waiting, - std::span workers); +public: + thread_pool(size_t num_threads = 1); + void schedule(schedulable_promise& promise) override; private: - std::condition_variable m_global_notification; - std::mutex m_global_mutex; - atomic_stack m_global_worklist; - std::vector m_workers; - std::atomic_flag m_terminate; - std::atomic_size_t m_num_waiting = 0; - - inline static thread_local worker* local = nullptr; + alignas(avoid_false_sharing) pack m_pack; + alignas(avoid_false_sharing) std::atomic_ptrdiff_t m_next_in_schedule; + inline static thread_local worker* m_local = nullptr; }; - } // namespace asyncpp \ No newline at end of file diff --git a/include/asyncpp/threading/cache.hpp b/include/asyncpp/threading/cache.hpp new file mode 100644 index 0000000..b4aa0e9 --- /dev/null +++ b/include/asyncpp/threading/cache.hpp @@ -0,0 +1,18 @@ +#pragma once + + +#include +#include + + +namespace asyncpp { + +#ifdef __cpp_lib_hardware_interference_size +inline constexpr size_t avoid_false_sharing = std::hardware_destructive_interference_size; +inline constexpr size_t promote_true_sharing = std::hardware_constructive_interference_size; +#else +inline constexpr size_t avoid_false_sharing = 64; +inline constexpr size_t promote_true_sharing = 64; +#endif + +} // namespace asyncpp \ No newline at end of file diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 59d07a4..b786ce4 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -5,92 +5,130 @@ namespace asyncpp { -thread_pool::thread_pool(size_t num_threads) - : m_workers(num_threads) { - for (auto& w : m_workers) { - w.thread = std::jthread([this, &w] { - local = &w; - execute(w, m_global_worklist, m_global_notification, m_global_mutex, m_terminate, m_num_waiting, m_workers); - }); - } +thread_pool::worker::worker() + : m_sema(0) {} + + +thread_pool::worker::~worker() { + cancel(); } -thread_pool::~thread_pool() { - std::lock_guard lk(m_global_mutex); - m_terminate.test_and_set(); - m_global_notification.notify_all(); +void thread_pool::worker::insert(schedulable_promise& promise) { + std::unique_lock lk(m_spinlock, std::defer_lock); + INTERLEAVED_ACQUIRE(lk.lock()); + const auto previous = m_promises.push_back(&promise); + const auto blocked = m_blocked.test(std::memory_order_relaxed); + INTERLEAVED(lk.unlock()); + + if (!previous && blocked) { + INTERLEAVED(m_sema.release()); + } } -void thread_pool::schedule(schedulable_promise& promise) { - schedule(promise, m_global_worklist, m_global_notification, m_global_mutex, m_num_waiting, local); +schedulable_promise* thread_pool::worker::steal_from_this() { + std::unique_lock lk(m_spinlock, std::defer_lock); + INTERLEAVED(lk.lock()); + return m_promises.pop_front(); } -void thread_pool::schedule(schedulable_promise& item, - atomic_stack& global_worklist, - std::condition_variable& global_notification, - std::mutex& global_mutex, - std::atomic_size_t& num_waiting, - worker* local) { - if (local) { - const auto prev_item = INTERLEAVED(local->worklist.push(&item)); - if (prev_item != nullptr) { - if (num_waiting.load(std::memory_order_relaxed) > 0) { - global_notification.notify_one(); - } - } +schedulable_promise* thread_pool::worker::try_get_promise(pack& pack, size_t& stealing_attempt, bool& exit_loop) { + std::unique_lock lk(m_spinlock, std::defer_lock); + INTERLEAVED_ACQUIRE(lk.lock()); + const auto promise = m_promises.front(); + if (promise) { + m_promises.pop_front(); + return promise; + } + + if (stealing_attempt > 0) { + INTERLEAVED(lk.unlock()); + const auto stolen = steal_from_other(pack, stealing_attempt); + stealing_attempt = stolen ? pack.workers.size() : stealing_attempt - 1; + return stolen; + } + + + if (INTERLEAVED(m_cancelled.test(std::memory_order_relaxed))) { + exit_loop = true; } else { - std::unique_lock lk(global_mutex, std::defer_lock); - INTERLEAVED_ACQUIRE(lk.lock()); - INTERLEAVED(global_worklist.push(&item)); - INTERLEAVED(global_notification.notify_one()); + INTERLEAVED(m_blocked.test_and_set(std::memory_order_relaxed)); + pack.blocked.push(this); + pack.num_blocked.fetch_add(1, std::memory_order_relaxed); + INTERLEAVED(lk.unlock()); + INTERLEAVED_ACQUIRE(m_sema.acquire()); + INTERLEAVED(m_blocked.clear()); + stealing_attempt = pack.workers.size(); } + return nullptr; } -schedulable_promise* thread_pool::steal(std::span workers) { - for (auto& w : workers) { - if (const auto item = INTERLEAVED(w.worklist.pop())) { - return item; - } +schedulable_promise* thread_pool::worker::steal_from_other(pack& pack, size_t& stealing_attempt) const { + const size_t pack_size = pack.workers.size(); + const size_t my_index = this - pack.workers.data(); + const size_t victim_index = (my_index + stealing_attempt) % pack_size; + return pack.workers[victim_index].steal_from_this(); +} + + +void thread_pool::worker::start(pack& pack) { + m_thread = std::jthread([this, &pack] { + run(pack); + }); +} + + +void thread_pool::worker::cancel() { + std::unique_lock lk(m_spinlock, std::defer_lock); + INTERLEAVED_ACQUIRE(lk.lock()); + INTERLEAVED(m_cancelled.test_and_set(std::memory_order_relaxed)); + const auto blocked = INTERLEAVED(m_blocked.test(std::memory_order_relaxed)); + lk.unlock(); + if (blocked) { + INTERLEAVED(m_sema.release()); } - return nullptr; } -void thread_pool::execute(worker& local, - atomic_stack& global_worklist, - std::condition_variable& global_notification, - std::mutex& global_mutex, - std::atomic_flag& terminate, - std::atomic_size_t& num_waiting, - std::span workers) { - do { - if (const auto item = INTERLEAVED(local.worklist.pop())) { - item->handle().resume(); - continue; - } - else if (const auto item = INTERLEAVED(global_worklist.pop())) { - local.worklist.push(item); - continue; - } - else if (const auto item = steal(workers)) { - local.worklist.push(item); - continue; - } - else { - std::unique_lock lk(global_mutex, std::defer_lock); - INTERLEAVED_ACQUIRE(lk.lock()); - if (!INTERLEAVED(terminate.test()) && INTERLEAVED(global_worklist.empty())) { - num_waiting.fetch_add(1, std::memory_order_relaxed); - INTERLEAVED_ACQUIRE(global_notification.wait(lk)); - num_waiting.fetch_sub(1, std::memory_order_relaxed); - } +void thread_pool::worker::run(pack& pack) { + m_local = this; + size_t stealing_attempt = pack.workers.size(); + bool exit_loop = false; + while (!exit_loop) { + const auto promise = try_get_promise(pack, stealing_attempt, exit_loop); + if (promise) { + promise->resume_now(); } - } while (!INTERLEAVED(terminate.test())); + } +} + + +thread_pool::thread_pool(size_t num_threads) + : m_pack(std::vector(num_threads)), m_next_in_schedule(0) { + for (auto& worker : m_pack.workers) { + worker.start(m_pack); + } +} + + +void thread_pool::schedule(schedulable_promise& promise) { + size_t num_blocked = m_pack.num_blocked.load(std::memory_order_relaxed); + const auto blocked = num_blocked > 0 ? m_pack.blocked.pop() : nullptr; + if (blocked) { + blocked->insert(promise); + m_pack.num_blocked.fetch_sub(1, std::memory_order_relaxed); + } + else if (m_local) { + m_local->insert(promise); + } + else { + const auto selected = m_next_in_schedule.fetch_add(1, std::memory_order_relaxed) % m_pack.workers.size(); + m_pack.workers[selected].insert(promise); + } } diff --git a/test/helper_schedulers.hpp b/test/helper_schedulers.hpp index 58a2238..b419edf 100644 --- a/test/helper_schedulers.hpp +++ b/test/helper_schedulers.hpp @@ -22,7 +22,7 @@ class thread_locked_scheduler : public asyncpp::scheduler { void resume() { const auto promise = m_promise.exchange(nullptr); assert(promise != nullptr); - promise->handle().resume(); + promise->resume_now(); } private: diff --git a/test/monitor_task.hpp b/test/monitor_task.hpp index b134aec..d0580a2 100644 --- a/test/monitor_task.hpp +++ b/test/monitor_task.hpp @@ -37,17 +37,19 @@ class [[nodiscard]] monitor_task { m_counters->exception = std::current_exception(); } - void resume() override { + void resume() final { m_counters->suspensions.fetch_add(1); - m_scheduler ? m_scheduler->schedule(*this) : handle().resume(); + m_scheduler ? m_scheduler->schedule(*this) : resume_now(); } - std::coroutine_handle<> handle() override { - return std::coroutine_handle::from_promise(*this); + void resume_now() final { + const auto handle = std::coroutine_handle::from_promise(*this); + handle.resume(); } void destroy() noexcept { - handle().destroy(); + const auto handle = std::coroutine_handle::from_promise(*this); + handle.destroy(); } const counters& get_counters() const { diff --git a/test/test_thread_pool.cpp b/test/test_thread_pool.cpp index 9c84699..6f55c58 100644 --- a/test/test_thread_pool.cpp +++ b/test/test_thread_pool.cpp @@ -9,102 +9,119 @@ #include #include #include -#include #include using namespace asyncpp; +constexpr size_t num_threads = 4; +constexpr int64_t depth = 5; +constexpr size_t branching = 10; + struct test_promise : schedulable_promise { - std::coroutine_handle<> handle() override { + void resume_now() final { ++num_queried; - return std::noop_coroutine(); } std::atomic_size_t num_queried = 0; }; -TEST_CASE("Thread pool: schedule worklist selection", "[Thread pool]") { - std::condition_variable global_notification; - std::mutex global_mutex; - atomic_stack global_worklist; - std::atomic_size_t num_waiting; - std::vector workers(1); +TEST_CASE("Thread pool 3: insert - try_get_promise interleave", "[Thread pool 3]") { + struct scenario : testing::validated_scenario { + thread_pool::pack pack{ .workers = std::vector(1) }; + test_promise promise; + bool exit_loop = false; + size_t stealing_attempt = 0; + schedulable_promise* result = nullptr; - test_promise promise; + void insert() { + pack.workers[0].insert(promise); + } - SECTION("has local worker") { - thread_pool::schedule(promise, global_worklist, global_notification, global_mutex, num_waiting, &workers[0]); - REQUIRE(workers[0].worklist.pop() == &promise); - REQUIRE(global_worklist.empty()); - } - SECTION("no local worker") { - thread_pool::schedule(promise, global_worklist, global_notification, global_mutex, num_waiting, &workers[0]); - REQUIRE(workers[0].worklist.pop() == &promise); - } + void try_get_promise() { + result = pack.workers[0].try_get_promise(pack, stealing_attempt, exit_loop); + } + + void validate(const testing::path& p) override { + INFO(p.dump()); + if (!result) { + stealing_attempt = 1; + result = pack.workers[0].try_get_promise(pack, stealing_attempt, exit_loop); + } + REQUIRE(result == &promise); + REQUIRE(exit_loop == false); + } + }; + + INTERLEAVED_RUN(scenario, THREAD("insert", &scenario::insert), THREAD("get", &scenario::try_get_promise)); } -TEST_CASE("Thread pool: steal from workers", "[Thread pool]") { - std::vector workers(4); +TEST_CASE("Thread pool 3: cancel - try_get_promise interleave", "[Thread pool 3]") { + struct scenario : testing::validated_scenario { + thread_pool::pack pack{ .workers = std::vector(1) }; + bool exit_loop = false; + size_t stealing_attempt = 0; + schedulable_promise* result = nullptr; - test_promise promise; + void insert() { + pack.workers[0].cancel(); + } - SECTION("no work items") { - REQUIRE(nullptr == thread_pool::steal(workers)); - } - SECTION("1 work item") { - workers[2].worklist.push(&promise); - REQUIRE(&promise == thread_pool::steal(workers)); - } -} + void try_get_promise() { + pack.workers[0].try_get_promise(pack, stealing_attempt, exit_loop); + } + void validate(const testing::path& p) override { + INFO(p.dump()); + REQUIRE(result == nullptr); + if (!exit_loop) { + stealing_attempt = 0; + pack.workers[0].try_get_promise(pack, stealing_attempt, exit_loop); + } + REQUIRE(exit_loop == true); + } + }; -TEST_CASE("Thread pool: ensure execution", "[Thread pool]") { - // This test makes sure that no matter the interleaving, a scheduled promise - // will be picked up and executed by a worker thread. + INTERLEAVED_RUN(scenario, THREAD("cancel", &scenario::insert), THREAD("get", &scenario::try_get_promise)); +} + +TEST_CASE("Thread pool 3: steal - try_get_promise interleave", "[Thread pool 3]") { struct scenario : testing::validated_scenario { - std::condition_variable global_notification; - std::mutex global_mutex; - atomic_stack global_worklist; - std::atomic_size_t num_waiting; - std::vector workers; - std::atomic_flag terminate; + thread_pool::pack pack{ .workers = std::vector(1) }; test_promise promise; + bool exit_loop = false; + size_t stealing_attempt = 0; + schedulable_promise* popped = nullptr; + schedulable_promise* stolen = nullptr; - scenario() : workers(1) {} + scenario() { + pack.workers[0].insert(promise); + } - void schedule() { - thread_pool::schedule(promise, global_worklist, global_notification, global_mutex, num_waiting); - std::unique_lock lk(global_mutex, std::defer_lock); - INTERLEAVED_ACQUIRE(lk.lock()); - INTERLEAVED(terminate.test_and_set()); - INTERLEAVED(global_notification.notify_all()); + void steal() { + stolen = pack.workers[0].steal_from_this(); + pack.workers[0].cancel(); } - void execute() { - thread_pool::execute(workers[0], global_worklist, global_notification, global_mutex, terminate, num_waiting, std::span(workers)); + void try_get_promise() { + popped = pack.workers[0].try_get_promise(pack, stealing_attempt, exit_loop); } void validate(const testing::path& p) override { INFO(p.dump()); - REQUIRE(promise.num_queried.load() > 0); + REQUIRE((!!popped ^ !!stolen)); } }; - INTERLEAVED_RUN(scenario, THREAD("schedule", &scenario::schedule), THREAD("execute", &scenario::execute)); + INTERLEAVED_RUN(scenario, THREAD("cancel", &scenario::steal), THREAD("get", &scenario::try_get_promise)); } -constexpr size_t num_threads = 4; -constexpr int64_t depth = 5; -constexpr size_t branching = 10; - - -TEST_CASE("Thread pool: smoke test - schedule tasks", "[Scheduler]") { +TEST_CASE("Thread pool 3: smoke test - schedule tasks", "[Scheduler]") { thread_pool sched(num_threads); static const auto coro = [&sched](auto self, int depth) -> task { @@ -123,4 +140,4 @@ TEST_CASE("Thread pool: smoke test - schedule tasks", "[Scheduler]") { const auto count = int64_t(std::pow(branching, depth)); const auto result = join(bind(coro(coro, depth), sched)); REQUIRE(result == count); -} \ No newline at end of file +} diff --git a/test/testing/test_interleaver.cpp b/test/testing/test_interleaver.cpp index cea2f6a..be474cc 100644 --- a/test/testing/test_interleaver.cpp +++ b/test/testing/test_interleaver.cpp @@ -193,8 +193,8 @@ TEST_CASE("Interleaver - select resumed", "[Interleaver]") { } -struct CollectorScenario { - CollectorScenario() { +struct collector_scenario { + collector_scenario() { interleavings.push_back({}); } @@ -211,7 +211,7 @@ struct CollectorScenario { TEST_CASE("Interleaver - single thread combinatorics", "[Interleaver]") { - struct Scenario : CollectorScenario { + struct scenario : collector_scenario { void thread_0() { INTERLEAVED(hit(0)); INTERLEAVED(hit(1)); @@ -219,19 +219,19 @@ TEST_CASE("Interleaver - single thread combinatorics", "[Interleaver]") { } }; - Scenario::reset(); + scenario::reset(); INTERLEAVED_RUN( - Scenario, - THREAD("thread_0", &Scenario::thread_0)); + scenario, + THREAD("thread_0", &scenario::thread_0)); const std::vector> expected = { {0, 1, 2} }; - REQUIRE(Scenario::interleavings == expected); + REQUIRE(scenario::interleavings == expected); } TEST_CASE("Interleaver - two thread combinatorics", "[Interleaver]") { - struct Scenario : CollectorScenario { + struct scenario : collector_scenario { void thread_0() { hit(10); INTERLEAVED("A0"); @@ -248,14 +248,14 @@ TEST_CASE("Interleaver - two thread combinatorics", "[Interleaver]") { } }; - Scenario::reset(); + scenario::reset(); INTERLEAVED_RUN( - Scenario, - THREAD("thread_0", &Scenario::thread_0), - THREAD("thread_1", &Scenario::thread_1)); + scenario, + THREAD("thread_0", &scenario::thread_0), + THREAD("thread_1", &scenario::thread_1)); // Get and sort(!) all executed interleavings. - auto interleaveings = std::move(Scenario::interleavings); + auto interleaveings = std::move(scenario::interleavings); std::ranges::sort(interleaveings); // Check no interleaving was run twice. @@ -267,7 +267,7 @@ TEST_CASE("Interleaver - two thread combinatorics", "[Interleaver]") { TEST_CASE("Interleaver - three thread combinatorics", "[Interleaver]") { - struct Scenario : CollectorScenario { + struct scenario : collector_scenario { void thread_0() { hit(10); INTERLEAVED("A0"); @@ -285,14 +285,14 @@ TEST_CASE("Interleaver - three thread combinatorics", "[Interleaver]") { } }; - Scenario::reset(); + scenario::reset(); INTERLEAVED_RUN( - Scenario, - THREAD("thread_0", &Scenario::thread_0), - THREAD("thread_1", &Scenario::thread_1), - THREAD("thread_2", &Scenario::thread_2)); + scenario, + THREAD("thread_0", &scenario::thread_0), + THREAD("thread_1", &scenario::thread_1), + THREAD("thread_2", &scenario::thread_2)); - auto interleaveings = std::move(Scenario::interleavings); + auto interleaveings = std::move(scenario::interleavings); std::ranges::sort(interleaveings); REQUIRE(std::ranges::unique(interleaveings).end() == interleaveings.end()); @@ -301,7 +301,7 @@ TEST_CASE("Interleaver - three thread combinatorics", "[Interleaver]") { TEST_CASE("Interleaver - acquire", "[Interleaver]") { - struct Scenario : CollectorScenario { + struct scenario : collector_scenario { std::atomic_flag f; void thread_0() { @@ -318,14 +318,38 @@ TEST_CASE("Interleaver - acquire", "[Interleaver]") { } }; - Scenario::reset(); + scenario::reset(); INTERLEAVED_RUN( - Scenario, - THREAD("thread_0", &Scenario::thread_0), - THREAD("thread_1", &Scenario::thread_1)); + scenario, + THREAD("thread_0", &scenario::thread_0), + THREAD("thread_1", &scenario::thread_1)); - auto interleaveings = std::move(Scenario::interleavings); + auto interleaveings = std::move(scenario::interleavings); std::ranges::sort(interleaveings); REQUIRE(interleaveings.size() >= 1); REQUIRE(interleaveings[0] == std::vector{ 20, 10 }); +} + + +TEST_CASE("Interleaver - validate", "[Interleaver]") { + static size_t validations = 0; + + struct scenario : validated_scenario { + std::atomic_flag flag; + + void thread_0() { + flag.test_and_set(); + } + + void validate(const path& p) override { + INFO(p.dump()); + validations++; + } + }; + + INTERLEAVED_RUN( + scenario, + THREAD("thread_0", &scenario::thread_0)); + + REQUIRE(validations == 1); } \ No newline at end of file