From 4865ac5b3672dba7b460ad75492947f726729989 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Kardos?= Date: Sun, 25 Feb 2024 22:01:43 +0100 Subject: [PATCH 1/9] add benchmark target & a sample benchmark --- .clang-format | 2 +- CMakeLists.txt | 4 + benchmark/CMakeLists.txt | 12 +++ benchmark/benchmark_task_spawn.cpp | 116 +++++++++++++++++++++++++++++ benchmark/main.cpp | 15 ++++ conanfile.txt | 1 + 6 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 benchmark/CMakeLists.txt create mode 100644 benchmark/benchmark_task_spawn.cpp create mode 100644 benchmark/main.cpp diff --git a/.clang-format b/.clang-format index 03e45a4..698b533 100644 --- a/.clang-format +++ b/.clang-format @@ -54,7 +54,7 @@ DeriveLineEnding: 'true' FixNamespaceComments: true IncludeBlocks: Regroup IncludeCategories: - - Regex: '(<|")((C|c)atch.*)/' + - Regex: '(<|")(((C|c)atch.*)|((C|c)elero.*))/' Priority: 5 - Regex: '(<|")((async\+\+).*)/' Priority: 2 diff --git a/CMakeLists.txt b/CMakeLists.txt index 370d12f..a2f0667 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,6 +4,7 @@ project(asyncpp) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) option(ASYNCPP_BUILD_TESTS "Build tests." ON) +option(ASYNCPP_BUILD_BENCHMARKS "Build benchmarks." ON) option(ENABLE_LLVM_COV "Enable LLVM source-based code coverage." OFF) option(ENABLE_LLVM_ADDRESS_SANITIZER "Enable LLVM address sanitizer." OFF) option(ENABLE_LLVM_MEMORY_SANITIZER "Enable LLVM memory sanitizer." OFF) @@ -49,6 +50,9 @@ add_subdirectory(src) if (${ASYNCPP_BUILD_TESTS}) add_subdirectory(test) endif() +if (${ASYNCPP_BUILD_BENCHMARKS}) + add_subdirectory(benchmark) +endif() install(TARGETS asyncpp DESTINATION "lib") install(DIRECTORY "include/asyncpp" DESTINATION "include") \ No newline at end of file diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt new file mode 100644 index 0000000..40024b8 --- /dev/null +++ b/benchmark/CMakeLists.txt @@ -0,0 +1,12 @@ +add_executable(benchmark) + +target_sources(benchmark + PRIVATE + main.cpp + benchmark_task_spawn.cpp +) + + +find_package(Celero REQUIRED) +target_link_libraries(benchmark celero) +target_link_libraries(benchmark asyncpp) \ No newline at end of file diff --git a/benchmark/benchmark_task_spawn.cpp b/benchmark/benchmark_task_spawn.cpp new file mode 100644 index 0000000..4e3b5db --- /dev/null +++ b/benchmark/benchmark_task_spawn.cpp @@ -0,0 +1,116 @@ +#include +#include + +#include +#include + +#include + + +using namespace asyncpp; + + +task plain() { + co_return 1; +} + + +task allocator_backed(std::allocator_arg_t, std::pmr::polymorphic_allocator<> alloc) { + co_return 1; +} + + +std::pmr::polymorphic_allocator<>& get_new_delete_alloc() { + static auto alloc = std::pmr::polymorphic_allocator(std::pmr::new_delete_resource()); + return alloc; +} + + +std::pmr::polymorphic_allocator<>& get_unsynchronized_pool_alloc() { + static auto resource = std::pmr::unsynchronized_pool_resource(std::pmr::new_delete_resource()); + static auto alloc = std::pmr::polymorphic_allocator(&resource); + return alloc; +} + + +std::pmr::polymorphic_allocator<>& get_stack_alloc(bool renew) { + static std::vector initial_buffer(1048576); + static auto resource = std::pmr::monotonic_buffer_resource(initial_buffer.data(), initial_buffer.size(), std::pmr::new_delete_resource()); + static auto alloc = std::pmr::polymorphic_allocator(&resource); + if (renew) { + alloc.~polymorphic_allocator(); + resource.~monotonic_buffer_resource(); + new (&resource) std::pmr::monotonic_buffer_resource(initial_buffer.data(), initial_buffer.size(), std::pmr::new_delete_resource()); + new (&alloc) std::pmr::polymorphic_allocator(&resource); + } + return alloc; +} + + +BASELINE(task_spawn, unoptimized, 60, 50000) { + bool ready = false; + { + auto task = plain(); + volatile auto ptr = &task; + ptr->launch(); + ready = ptr->ready(); + } + assert(ready); + celero::DoNotOptimizeAway(ready); +} + + +BENCHMARK(task_spawn, HALO, 60, 50000) { + bool ready = false; + { + auto task = plain(); + task.launch(); + ready = task.ready(); + } + assert(ready); + celero::DoNotOptimizeAway(ready); +} + + +BENCHMARK(task_spawn, PMR_new_delete, 60, 50000) { + bool ready = false; + auto& alloc = get_new_delete_alloc(); + { + auto task = allocator_backed(std::allocator_arg, alloc); + volatile auto ptr = &task; + ptr->launch(); + ready = ptr->ready(); + } + assert(ready); + celero::DoNotOptimizeAway(ready); +} + + +BENCHMARK(task_spawn, PMR_unsync_pool, 60, 50000) { + bool ready = false; + auto& alloc = get_unsynchronized_pool_alloc(); + { + auto task = allocator_backed(std::allocator_arg, alloc); + volatile auto ptr = &task; + ptr->launch(); + ready = ptr->ready(); + } + assert(ready); + celero::DoNotOptimizeAway(ready); +} + + +BENCHMARK(task_spawn, PMR_stack, 60, 50000) { + bool ready = false; + static int counter = 0; + counter = (counter + 1) % 512; + auto& alloc = get_stack_alloc(counter == 0); + { + auto task = allocator_backed(std::allocator_arg, alloc); + volatile auto ptr = &task; + ptr->launch(); + ready = ready && ptr->ready(); + } + assert(ready); + celero::DoNotOptimizeAway(ready); +} \ No newline at end of file diff --git a/benchmark/main.cpp b/benchmark/main.cpp new file mode 100644 index 0000000..74f2575 --- /dev/null +++ b/benchmark/main.cpp @@ -0,0 +1,15 @@ +#include + +#include + + +int main(int argc, char* argv[]) { + try { + celero::Run(argc, argv); + } + catch (std::exception& ex) { + std::cerr << "benchmarks exited with error: " << ex.what() << std::endl; + return 1; + } + return 0; +} \ No newline at end of file diff --git a/conanfile.txt b/conanfile.txt index c7fc0a6..8badf34 100644 --- a/conanfile.txt +++ b/conanfile.txt @@ -1,5 +1,6 @@ [requires] catch2/3.4.0 +celero/2.9.0 [generators] CMakeDeps From 1666ead2304576159221bd99933a211106007ac9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Kardos?= Date: Sun, 25 Feb 2024 23:30:21 +0100 Subject: [PATCH 2/9] fix clang compile --- benchmark/benchmark_task_spawn.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/benchmark/benchmark_task_spawn.cpp b/benchmark/benchmark_task_spawn.cpp index 4e3b5db..f982e22 100644 --- a/benchmark/benchmark_task_spawn.cpp +++ b/benchmark/benchmark_task_spawn.cpp @@ -21,14 +21,14 @@ task allocator_backed(std::allocator_arg_t, std::pmr::polymorphic_allocator std::pmr::polymorphic_allocator<>& get_new_delete_alloc() { - static auto alloc = std::pmr::polymorphic_allocator(std::pmr::new_delete_resource()); + static auto alloc = std::pmr::polymorphic_allocator<>(std::pmr::new_delete_resource()); return alloc; } std::pmr::polymorphic_allocator<>& get_unsynchronized_pool_alloc() { static auto resource = std::pmr::unsynchronized_pool_resource(std::pmr::new_delete_resource()); - static auto alloc = std::pmr::polymorphic_allocator(&resource); + static auto alloc = std::pmr::polymorphic_allocator<>(&resource); return alloc; } @@ -36,12 +36,12 @@ std::pmr::polymorphic_allocator<>& get_unsynchronized_pool_alloc() { std::pmr::polymorphic_allocator<>& get_stack_alloc(bool renew) { static std::vector initial_buffer(1048576); static auto resource = std::pmr::monotonic_buffer_resource(initial_buffer.data(), initial_buffer.size(), std::pmr::new_delete_resource()); - static auto alloc = std::pmr::polymorphic_allocator(&resource); + static auto alloc = std::pmr::polymorphic_allocator<>(&resource); if (renew) { alloc.~polymorphic_allocator(); resource.~monotonic_buffer_resource(); new (&resource) std::pmr::monotonic_buffer_resource(initial_buffer.data(), initial_buffer.size(), std::pmr::new_delete_resource()); - new (&alloc) std::pmr::polymorphic_allocator(&resource); + new (&alloc) std::pmr::polymorphic_allocator<>(&resource); } return alloc; } From 7a3e7d72a612cc9f3c3a9297b66d0beeac8854ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Kardos?= Date: Mon, 26 Feb 2024 15:45:13 +0100 Subject: [PATCH 3/9] replace with test fixtures --- benchmark/benchmark_task_spawn.cpp | 75 +++++++++++++++++++----------- include/asyncpp/promise.hpp | 8 ++-- 2 files changed, 51 insertions(+), 32 deletions(-) diff --git a/benchmark/benchmark_task_spawn.cpp b/benchmark/benchmark_task_spawn.cpp index f982e22..cdf315e 100644 --- a/benchmark/benchmark_task_spawn.cpp +++ b/benchmark/benchmark_task_spawn.cpp @@ -20,34 +20,55 @@ task allocator_backed(std::allocator_arg_t, std::pmr::polymorphic_allocator } -std::pmr::polymorphic_allocator<>& get_new_delete_alloc() { - static auto alloc = std::pmr::polymorphic_allocator<>(std::pmr::new_delete_resource()); - return alloc; -} +struct FixtureNewDelete : celero::TestFixture { + inline std::pmr::polymorphic_allocator<>& getAlloc() { + return alloc; + } +private: + std::pmr::polymorphic_allocator<> alloc = { std::pmr::new_delete_resource() }; +}; -std::pmr::polymorphic_allocator<>& get_unsynchronized_pool_alloc() { - static auto resource = std::pmr::unsynchronized_pool_resource(std::pmr::new_delete_resource()); - static auto alloc = std::pmr::polymorphic_allocator<>(&resource); - return alloc; -} +struct FixturePool : celero::TestFixture { + inline std::pmr::polymorphic_allocator<>& getAlloc() { + return alloc; + } -std::pmr::polymorphic_allocator<>& get_stack_alloc(bool renew) { - static std::vector initial_buffer(1048576); - static auto resource = std::pmr::monotonic_buffer_resource(initial_buffer.data(), initial_buffer.size(), std::pmr::new_delete_resource()); - static auto alloc = std::pmr::polymorphic_allocator<>(&resource); - if (renew) { +private: + std::pmr::unsynchronized_pool_resource resource; + std::pmr::polymorphic_allocator<> alloc = { &resource }; +}; + + +struct FixtureStack : celero::TestFixture { + void setUp(const ExperimentValue* x) override { alloc.~polymorphic_allocator(); resource.~monotonic_buffer_resource(); - new (&resource) std::pmr::monotonic_buffer_resource(initial_buffer.data(), initial_buffer.size(), std::pmr::new_delete_resource()); + new (&resource) std::pmr::monotonic_buffer_resource(buffer.get(), size, std::pmr::new_delete_resource()); new (&alloc) std::pmr::polymorphic_allocator<>(&resource); } - return alloc; -} + + inline std::pmr::polymorphic_allocator<>& getAlloc() { + return alloc; + } + +private: + static constexpr inline size_t size = 10485760; + struct block { + alignas(64) std::byte content[64]; + }; + std::unique_ptr buffer = std::make_unique_for_overwrite(size / sizeof(block)); + std::pmr::monotonic_buffer_resource resource; + std::pmr::polymorphic_allocator<> alloc = { &resource }; +}; -BASELINE(task_spawn, unoptimized, 60, 50000) { +constexpr int numSamples = 1000; +constexpr int numIterations = 5000; + + +BASELINE(task_spawn, unoptimized, numSamples, numIterations) { bool ready = false; { auto task = plain(); @@ -60,7 +81,7 @@ BASELINE(task_spawn, unoptimized, 60, 50000) { } -BENCHMARK(task_spawn, HALO, 60, 50000) { +BENCHMARK(task_spawn, HALO, numSamples, numIterations) { bool ready = false; { auto task = plain(); @@ -72,9 +93,9 @@ BENCHMARK(task_spawn, HALO, 60, 50000) { } -BENCHMARK(task_spawn, PMR_new_delete, 60, 50000) { +BENCHMARK_F(task_spawn, PMR_new_delete, FixtureNewDelete, numSamples, numIterations) { bool ready = false; - auto& alloc = get_new_delete_alloc(); + auto& alloc = getAlloc(); { auto task = allocator_backed(std::allocator_arg, alloc); volatile auto ptr = &task; @@ -86,9 +107,9 @@ BENCHMARK(task_spawn, PMR_new_delete, 60, 50000) { } -BENCHMARK(task_spawn, PMR_unsync_pool, 60, 50000) { +BENCHMARK_F(task_spawn, PMR_unsync_pool, FixturePool, numSamples, numIterations) { bool ready = false; - auto& alloc = get_unsynchronized_pool_alloc(); + auto& alloc = getAlloc(); { auto task = allocator_backed(std::allocator_arg, alloc); volatile auto ptr = &task; @@ -100,16 +121,14 @@ BENCHMARK(task_spawn, PMR_unsync_pool, 60, 50000) { } -BENCHMARK(task_spawn, PMR_stack, 60, 50000) { +BENCHMARK_F(task_spawn, PMR_stack, FixtureStack, numSamples, numIterations) { bool ready = false; - static int counter = 0; - counter = (counter + 1) % 512; - auto& alloc = get_stack_alloc(counter == 0); + auto& alloc = getAlloc(); { auto task = allocator_backed(std::allocator_arg, alloc); volatile auto ptr = &task; ptr->launch(); - ready = ready && ptr->ready(); + ready = ptr->ready(); } assert(ready); celero::DoNotOptimizeAway(ready); diff --git a/include/asyncpp/promise.hpp b/include/asyncpp/promise.hpp index c37a938..5e758d6 100644 --- a/include/asyncpp/promise.hpp +++ b/include/asyncpp/promise.hpp @@ -139,11 +139,11 @@ struct allocator_aware_promise { }; static constexpr dealloc_t dealloc = [](void* ptr, size_t size) { - auto& alloc = *reinterpret_cast(static_cast(ptr) + alloc_offset(size)); - auto moved = std::move(alloc); - alloc.~alloc_t(); + const auto alloc_ptr = reinterpret_cast(static_cast(ptr) + alloc_offset(size)); + auto alloc = std::move(*alloc_ptr); + alloc_ptr->~alloc_t(); const auto num_blocks = (total_size(size) + sizeof(block_t) - 1) / sizeof(block_t); - std::allocator_traits::deallocate(moved, static_cast(ptr), num_blocks); + std::allocator_traits::deallocate(alloc, static_cast(ptr), num_blocks); }; auto rebound_alloc = alloc_t(alloc); From de58bfaeb7a7484f1520bf3aa390635b5a44e182 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Kardos?= Date: Mon, 26 Feb 2024 19:00:38 +0100 Subject: [PATCH 4/9] benchmark some aspects of threa_pool --- benchmark/CMakeLists.txt | 1 + benchmark/benchmark_thread_pool.cpp | 116 ++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 benchmark/benchmark_thread_pool.cpp diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 40024b8..8ea0985 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -4,6 +4,7 @@ target_sources(benchmark PRIVATE main.cpp benchmark_task_spawn.cpp + benchmark_thread_pool.cpp ) diff --git a/benchmark/benchmark_thread_pool.cpp b/benchmark/benchmark_thread_pool.cpp new file mode 100644 index 0000000..cf328a2 --- /dev/null +++ b/benchmark/benchmark_thread_pool.cpp @@ -0,0 +1,116 @@ +#include +#include +#include + +#include +#include + +#include + + +using namespace asyncpp; + + +struct noop_promise : schedulable_promise { + noop_promise(std::atomic_size_t* counter = nullptr) : counter(counter) {} + std::coroutine_handle<> handle() override { + if (counter) { + counter->fetch_sub(1, std::memory_order_relaxed); + } + return std::noop_coroutine(); + } + std::atomic_size_t* counter = nullptr; +}; + + +struct FixtureScheduleOffthread : celero::TestFixture { + FixtureScheduleOffthread() : pool(1), num_running(0), promises(400000, noop_promise(&num_running)) {} + + void setUp(const ExperimentValue* x) override { + num_running.store(promises.size()); + } + + void tearDown() override { + while (num_running.load(std::memory_order_relaxed) != 0) { + } + } + + std::atomic_size_t num_running; + thread_pool pool; + std::vector promises; +}; + + +BASELINE_F(thread_pool_schedule, off_thread, FixtureScheduleOffthread, 30, 1) { + for (auto& promise : promises) { + pool.schedule(promise); + } +} + + +BENCHMARK_F(thread_pool_schedule, pool_thread, FixtureScheduleOffthread, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { + for (auto& promise : promises) { + pool.schedule(promise); + } + co_return; + }; + join(launch(coro(promises, pool), pool)); +} + + +template +struct FixtureStealing : celero::TestFixture { + FixtureStealing() : pool(NumThreads + 1), num_running(0), promises(400000, noop_promise(&num_running)) {} + + void setUp(const ExperimentValue* x) override { + num_running.store(promises.size()); + } + + + void sync() { + while (num_running.load(std::memory_order_relaxed) != 0) { + } + } + + void tearDown() override { + sync(); + } + + thread_pool pool; + std::atomic_size_t num_running; + std::vector promises; +}; + + +BASELINE_F(thread_pool_stealing, x1_thread, FixtureStealing<1>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { + for (auto& promise : promises) { + pool.schedule(promise); + } + co_return; + }; + join(launch(coro(promises, pool), pool)); +} + + +BENCHMARK_F(thread_pool_stealing, x2_thread, FixtureStealing<2>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { + for (auto& promise : promises) { + pool.schedule(promise); + } + co_return; + }; + join(launch(coro(promises, pool), pool)); +} + + +BENCHMARK_F(thread_pool_stealing, x4_thread, FixtureStealing<4>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { + for (auto& promise : promises) { + pool.schedule(promise); + } + co_return; + }; + join(launch(coro(promises, pool), pool)); +} From 455a93c5c4a58b28762872709ff6a31a4498afb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Kardos?= Date: Wed, 28 Feb 2024 21:20:08 +0100 Subject: [PATCH 5/9] thread pool v3, fixes, benchmarks --- benchmark/CMakeLists.txt | 1 + benchmark/benchmark_atomic.cpp | 133 ++++++++++++++++++++ benchmark/benchmark_thread_pool.cpp | 99 +++++++++------ include/asyncpp/promise.hpp | 1 + include/asyncpp/testing/interleaver.hpp | 2 +- include/asyncpp/thread_pool.hpp | 153 ++++++++++++++++++++++++ test/test_thread_pool.cpp | 125 ++++++++++++++++++- test/testing/test_interleaver.cpp | 76 ++++++++---- 8 files changed, 523 insertions(+), 67 deletions(-) create mode 100644 benchmark/benchmark_atomic.cpp diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 8ea0985..0c533a7 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -5,6 +5,7 @@ target_sources(benchmark main.cpp benchmark_task_spawn.cpp benchmark_thread_pool.cpp + benchmark_atomic.cpp ) diff --git a/benchmark/benchmark_atomic.cpp b/benchmark/benchmark_atomic.cpp new file mode 100644 index 0000000..d21f3b8 --- /dev/null +++ b/benchmark/benchmark_atomic.cpp @@ -0,0 +1,133 @@ +#include +#include +#include + +#include + + +// This file benchmarks atomic operations themselves, not the library. +// The measurements can be used as a baseline or target as to what is +// achievable and reasonable on the hardware. + + +static constexpr size_t base_reps = 4'000'000; + + +BASELINE(atomic_rmw, x1_thread, 30, 1) { + alignas(64) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + counter.fetch_add(1, std::memory_order_relaxed); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BENCHMARK(atomic_rmw, x2_thread, 30, 1) { + alignas(64) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps / 2; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + counter.fetch_add(1, std::memory_order_relaxed); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BENCHMARK(atomic_rmw, x4_thread, 30, 1) { + alignas(64) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps / 4; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + counter.fetch_add(1, std::memory_order_relaxed); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BENCHMARK(atomic_rmw, x8_thread, 30, 1) { + alignas(64) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps / 8; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + counter.fetch_add(1, std::memory_order_relaxed); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BASELINE(atomic_read, x1_thread, 30, 1) { + alignas(64) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + static_cast(counter.load(std::memory_order_relaxed)); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BENCHMARK(atomic_read, x2_thread, 30, 1) { + alignas(64) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps / 2; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + static_cast(counter.load(std::memory_order_relaxed)); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BENCHMARK(atomic_read, x4_thread, 30, 1) { + alignas(64) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps / 4; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + static_cast(counter.load(std::memory_order_relaxed)); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} + + +BENCHMARK(atomic_read, x8_thread, 30, 1) { + alignas(64) std::atomic_size_t counter = 0; + static constexpr size_t reps = base_reps / 8; + + static const auto func = [&counter] { + for (size_t rep = 0; rep < reps; ++rep) { + static_cast(counter.load(std::memory_order_relaxed)); + } + }; + + std::array threads; + std::ranges::generate(threads, [&] { return std::jthread(func); }); +} \ No newline at end of file diff --git a/benchmark/benchmark_thread_pool.cpp b/benchmark/benchmark_thread_pool.cpp index cf328a2..200cc16 100644 --- a/benchmark/benchmark_thread_pool.cpp +++ b/benchmark/benchmark_thread_pool.cpp @@ -23,68 +23,99 @@ struct noop_promise : schedulable_promise { }; -struct FixtureScheduleOffthread : celero::TestFixture { - FixtureScheduleOffthread() : pool(1), num_running(0), promises(400000, noop_promise(&num_running)) {} +template +struct FixturePool : celero::TestFixture { + FixturePool() : pool(NumThreads), num_running(0), promises(400000, noop_promise(&num_running)) {} void setUp(const ExperimentValue* x) override { num_running.store(promises.size()); } - void tearDown() override { + + void sync() { while (num_running.load(std::memory_order_relaxed) != 0) { } } + void tearDown() override { + sync(); + } + + thread_pool_3 pool; std::atomic_size_t num_running; - thread_pool pool; std::vector promises; }; -BASELINE_F(thread_pool_schedule, off_thread, FixtureScheduleOffthread, 30, 1) { +BASELINE_F(tp_schedule_outside, x1_thread, FixturePool<1>, 30, 1) { for (auto& promise : promises) { pool.schedule(promise); } } -BENCHMARK_F(thread_pool_schedule, pool_thread, FixtureScheduleOffthread, 30, 1) { - static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { +BENCHMARK_F(tp_schedule_outside, x2_thread, FixturePool<4>, 30, 1) { + for (auto& promise : promises) { + pool.schedule(promise); + } +} + + +BENCHMARK_F(tp_schedule_outside, x4_thread, FixturePool<4>, 30, 1) { + for (auto& promise : promises) { + pool.schedule(promise); + } +} + + +BASELINE_F(tp_schedule_inside, x1_thread, FixturePool<1>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool_3& pool) -> task { for (auto& promise : promises) { pool.schedule(promise); } co_return; }; - join(launch(coro(promises, pool), pool)); + auto t1 = launch(coro(promises, pool), pool); + join(t1); } -template -struct FixtureStealing : celero::TestFixture { - FixtureStealing() : pool(NumThreads + 1), num_running(0), promises(400000, noop_promise(&num_running)) {} - - void setUp(const ExperimentValue* x) override { - num_running.store(promises.size()); - } - - - void sync() { - while (num_running.load(std::memory_order_relaxed) != 0) { +BENCHMARK_F(tp_schedule_inside, x2_thread, FixturePool<2>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool_3& pool) -> task { + for (auto& promise : promises) { + pool.schedule(promise); } - } + co_return; + }; + const auto count = std::ssize(promises) / 2; + auto t1 = launch(coro(std::span(promises.begin(), promises.begin() + count), pool), pool); + auto t2 = launch(coro(std::span(promises.begin() + count, promises.end()), pool), pool); + join(t1); + join(t2); +} - void tearDown() override { - sync(); - } - thread_pool pool; - std::atomic_size_t num_running; - std::vector promises; -}; +BENCHMARK_F(tp_schedule_inside, x4_thread, FixturePool<4>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool_3& pool) -> task { + for (auto& promise : promises) { + pool.schedule(promise); + } + co_return; + }; + const auto count = std::ssize(promises) / 4; + auto t1 = launch(coro(std::span(promises.begin(), promises.begin() + 1 * count), pool), pool); + auto t2 = launch(coro(std::span(promises.begin() + 1 * count, promises.begin() + 2 * count), pool), pool); + auto t3 = launch(coro(std::span(promises.begin() + 2 * count, promises.begin() + 3 * count), pool), pool); + auto t4 = launch(coro(std::span(promises.begin() + 3 * count, promises.end()), pool), pool); + join(t1); + join(t2); + join(t3); + join(t4); +} -BASELINE_F(thread_pool_stealing, x1_thread, FixtureStealing<1>, 30, 1) { - static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { +BASELINE_F(tp_stealing, x1_thread, FixturePool<1 + 1>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool_3& pool) -> task { for (auto& promise : promises) { pool.schedule(promise); } @@ -94,8 +125,8 @@ BASELINE_F(thread_pool_stealing, x1_thread, FixtureStealing<1>, 30, 1) { } -BENCHMARK_F(thread_pool_stealing, x2_thread, FixtureStealing<2>, 30, 1) { - static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { +BENCHMARK_F(tp_stealing, x2_thread, FixturePool<2 + 1>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool_3& pool) -> task { for (auto& promise : promises) { pool.schedule(promise); } @@ -105,12 +136,12 @@ BENCHMARK_F(thread_pool_stealing, x2_thread, FixtureStealing<2>, 30, 1) { } -BENCHMARK_F(thread_pool_stealing, x4_thread, FixtureStealing<4>, 30, 1) { - static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { +BENCHMARK_F(tp_stealing, x4_thread, FixturePool<4 + 1>, 30, 1) { + static constexpr auto coro = [](std::span promises, thread_pool_3& pool) -> task { for (auto& promise : promises) { pool.schedule(promise); } co_return; }; join(launch(coro(promises, pool), pool)); -} +} \ No newline at end of file diff --git a/include/asyncpp/promise.hpp b/include/asyncpp/promise.hpp index 5e758d6..eae324d 100644 --- a/include/asyncpp/promise.hpp +++ b/include/asyncpp/promise.hpp @@ -74,6 +74,7 @@ struct schedulable_promise { virtual ~schedulable_promise() = default; virtual std::coroutine_handle<> handle() = 0; schedulable_promise* m_scheduler_next = nullptr; + schedulable_promise* m_scheduler_prev = nullptr; scheduler* m_scheduler = nullptr; }; diff --git a/include/asyncpp/testing/interleaver.hpp b/include/asyncpp/testing/interleaver.hpp index 32d20d8..d068e2e 100644 --- a/include/asyncpp/testing/interleaver.hpp +++ b/include/asyncpp/testing/interleaver.hpp @@ -190,7 +190,7 @@ class interleaver { do { auto [swarm, scenario] = launch_threads(m_thread_funcs); const auto path_ = run_next_interleaving(tree, swarm); - if constexpr (std::convertible_to) { + if constexpr (std::convertible_to&, validated_scenario&>) { scenario->validate(path_); } } while (!is_transitively_complete(tree, tree.root())); diff --git a/include/asyncpp/thread_pool.hpp b/include/asyncpp/thread_pool.hpp index f116cbc..ac9e60c 100644 --- a/include/asyncpp/thread_pool.hpp +++ b/include/asyncpp/thread_pool.hpp @@ -1,11 +1,13 @@ #pragma once +#include "container/atomic_deque.hpp" #include "container/atomic_stack.hpp" #include "scheduler.hpp" #include #include +#include #include #include #include @@ -14,6 +16,157 @@ namespace asyncpp { +class thread_pool_3 : public scheduler { +public: + struct pack; + + class worker { + public: + using queue = deque; + + worker() + : m_sema(0) {} + + ~worker() { + cancel(); + } + + void insert(schedulable_promise& promise) { + std::unique_lock lk(m_spinlock, std::defer_lock); + INTERLEAVED_ACQUIRE(lk.lock()); + const auto previous = m_promises.push_back(&promise); + const auto blocked = m_blocked.test(std::memory_order_relaxed); + INTERLEAVED(lk.unlock()); + + if (!previous && blocked) { + INTERLEAVED(m_sema.release()); + } + } + + schedulable_promise* steal_from_this() { + std::unique_lock lk(m_spinlock, std::defer_lock); + INTERLEAVED(lk.lock()); + return m_promises.pop_front(); + } + + schedulable_promise* try_get_promise(pack& pack, size_t& stealing_attempt, bool& exit_loop) { + std::unique_lock lk(m_spinlock, std::defer_lock); + INTERLEAVED_ACQUIRE(lk.lock()); + const auto promise = m_promises.front(); + if (promise) { + m_promises.pop_front(); + return promise; + } + + if (stealing_attempt > 0) { + INTERLEAVED(lk.unlock()); + const auto stolen = steal_from_other(pack, stealing_attempt); + stealing_attempt = stolen ? pack.workers.size() : stealing_attempt - 1; + return stolen; + } + + + if (INTERLEAVED(m_cancelled.test(std::memory_order_relaxed))) { + exit_loop = true; + } + else { + INTERLEAVED(m_blocked.test_and_set(std::memory_order_relaxed)); + pack.blocked.push(this); + pack.num_blocked.fetch_add(1, std::memory_order_relaxed); + INTERLEAVED(lk.unlock()); + INTERLEAVED_ACQUIRE(m_sema.acquire()); + INTERLEAVED(m_blocked.clear()); + stealing_attempt = pack.workers.size(); + } + return nullptr; + } + + schedulable_promise* steal_from_other(pack& pack, size_t& stealing_attempt) const { + const size_t pack_size = pack.workers.size(); + const size_t my_index = this - pack.workers.data(); + const size_t victim_index = (my_index + stealing_attempt) % pack_size; + return pack.workers[victim_index].steal_from_this(); + } + + void start(pack& pack) { + m_thread = std::jthread([this, &pack] { + run(pack); + }); + } + + void cancel() { + std::unique_lock lk(m_spinlock, std::defer_lock); + INTERLEAVED_ACQUIRE(lk.lock()); + INTERLEAVED(m_cancelled.test_and_set(std::memory_order_relaxed)); + const auto blocked = INTERLEAVED(m_blocked.test(std::memory_order_relaxed)); + lk.unlock(); + if (blocked) { + INTERLEAVED(m_sema.release()); + } + } + + private: + void run(pack& pack) { + size_t stealing_attempt = pack.workers.size(); + bool exit_loop = false; + while (!exit_loop) { + const auto promise = try_get_promise(pack, stealing_attempt, exit_loop); + if (promise) { + promise->handle().resume(); + } + } + } + + public: + worker* m_next = nullptr; + + private: + alignas(64) spinlock m_spinlock; + alignas(64) queue m_promises; + alignas(64) std::atomic_flag m_blocked; + alignas(64) std::binary_semaphore m_sema; + alignas(64) std::jthread m_thread; + alignas(64) std::atomic_flag m_cancelled; + }; + + struct pack { + alignas(64) std::vector workers; + alignas(64) atomic_stack blocked; + alignas(64) std::atomic_size_t num_blocked = 0; + }; + + +public: + thread_pool_3(size_t num_threads = 1) + : m_pack(std::vector(num_threads)), m_next_in_schedule(0) { + for (auto& worker : m_pack.workers) { + worker.start(m_pack); + } + } + + void schedule(schedulable_promise& promise) override { + size_t num_blocked = m_pack.num_blocked.load(std::memory_order_relaxed); + const auto blocked = num_blocked > 0 ? m_pack.blocked.pop() : nullptr; + if (blocked) { + blocked->insert(promise); + m_pack.num_blocked.fetch_sub(1, std::memory_order_relaxed); + } + else if (m_local) { + m_local->insert(promise); + } + else { + const auto selected = m_next_in_schedule.fetch_add(1, std::memory_order_relaxed) % m_pack.workers.size(); + m_pack.workers[selected].insert(promise); + } + } + +private: + pack m_pack; + std::atomic_ptrdiff_t m_next_in_schedule; + inline static thread_local worker* m_local = nullptr; +}; + + class thread_pool : public scheduler { public: struct worker { diff --git a/test/test_thread_pool.cpp b/test/test_thread_pool.cpp index 9c84699..5c700d0 100644 --- a/test/test_thread_pool.cpp +++ b/test/test_thread_pool.cpp @@ -9,13 +9,16 @@ #include #include #include -#include #include using namespace asyncpp; +constexpr size_t num_threads = 4; +constexpr int64_t depth = 5; +constexpr size_t branching = 10; + struct test_promise : schedulable_promise { std::coroutine_handle<> handle() override { @@ -26,6 +29,121 @@ struct test_promise : schedulable_promise { }; +TEST_CASE("Thread pool 3: insert - try_get_promise interleave", "[Thread pool 3]") { + struct scenario : testing::validated_scenario { + thread_pool_3::pack pack{ .workers = std::vector(1) }; + test_promise promise; + bool exit_loop = false; + size_t stealing_attempt = 0; + schedulable_promise* result = nullptr; + + void insert() { + pack.workers[0].insert(promise); + } + + void try_get_promise() { + result = pack.workers[0].try_get_promise(pack, stealing_attempt, exit_loop); + } + + void validate(const testing::path& p) override { + INFO(p.dump()); + if (!result) { + stealing_attempt = 1; + result = pack.workers[0].try_get_promise(pack, stealing_attempt, exit_loop); + } + REQUIRE(result == &promise); + REQUIRE(exit_loop == false); + } + }; + + INTERLEAVED_RUN(scenario, THREAD("insert", &scenario::insert), THREAD("get", &scenario::try_get_promise)); +} + + +TEST_CASE("Thread pool 3: cancel - try_get_promise interleave", "[Thread pool 3]") { + struct scenario : testing::validated_scenario { + thread_pool_3::pack pack{ .workers = std::vector(1) }; + bool exit_loop = false; + size_t stealing_attempt = 0; + schedulable_promise* result = nullptr; + + void insert() { + pack.workers[0].cancel(); + } + + void try_get_promise() { + pack.workers[0].try_get_promise(pack, stealing_attempt, exit_loop); + } + + void validate(const testing::path& p) override { + INFO(p.dump()); + REQUIRE(result == nullptr); + if (!exit_loop) { + stealing_attempt = 0; + pack.workers[0].try_get_promise(pack, stealing_attempt, exit_loop); + } + REQUIRE(exit_loop == true); + } + }; + + INTERLEAVED_RUN(scenario, THREAD("cancel", &scenario::insert), THREAD("get", &scenario::try_get_promise)); +} + + +TEST_CASE("Thread pool 3: steal - try_get_promise interleave", "[Thread pool 3]") { + struct scenario : testing::validated_scenario { + thread_pool_3::pack pack{ .workers = std::vector(1) }; + test_promise promise; + bool exit_loop = false; + size_t stealing_attempt = 0; + schedulable_promise* popped = nullptr; + schedulable_promise* stolen = nullptr; + + scenario() { + pack.workers[0].insert(promise); + } + + void steal() { + stolen = pack.workers[0].steal_from_this(); + pack.workers[0].cancel(); + } + + void try_get_promise() { + popped = pack.workers[0].try_get_promise(pack, stealing_attempt, exit_loop); + } + + void validate(const testing::path& p) override { + INFO(p.dump()); + REQUIRE((!!popped ^ !!stolen)); + } + }; + + INTERLEAVED_RUN(scenario, THREAD("cancel", &scenario::steal), THREAD("get", &scenario::try_get_promise)); +} + + +TEST_CASE("Thread pool 3: smoke test - schedule tasks", "[Scheduler]") { + thread_pool_3 sched(num_threads); + + static const auto coro = [&sched](auto self, int depth) -> task { + if (depth <= 0) { + co_return 1; + } + std::array, branching> children; + std::ranges::generate(children, [&] { return launch(self(self, depth - 1), sched); }); + int64_t sum = 0; + for (auto& tk : children) { + sum += co_await tk; + } + co_return sum; + }; + + const auto count = int64_t(std::pow(branching, depth)); + const auto result = join(bind(coro(coro, depth), sched)); + REQUIRE(result == count); +} + + TEST_CASE("Thread pool: schedule worklist selection", "[Thread pool]") { std::condition_variable global_notification; std::mutex global_mutex; @@ -99,11 +217,6 @@ TEST_CASE("Thread pool: ensure execution", "[Thread pool]") { } -constexpr size_t num_threads = 4; -constexpr int64_t depth = 5; -constexpr size_t branching = 10; - - TEST_CASE("Thread pool: smoke test - schedule tasks", "[Scheduler]") { thread_pool sched(num_threads); diff --git a/test/testing/test_interleaver.cpp b/test/testing/test_interleaver.cpp index cea2f6a..be474cc 100644 --- a/test/testing/test_interleaver.cpp +++ b/test/testing/test_interleaver.cpp @@ -193,8 +193,8 @@ TEST_CASE("Interleaver - select resumed", "[Interleaver]") { } -struct CollectorScenario { - CollectorScenario() { +struct collector_scenario { + collector_scenario() { interleavings.push_back({}); } @@ -211,7 +211,7 @@ struct CollectorScenario { TEST_CASE("Interleaver - single thread combinatorics", "[Interleaver]") { - struct Scenario : CollectorScenario { + struct scenario : collector_scenario { void thread_0() { INTERLEAVED(hit(0)); INTERLEAVED(hit(1)); @@ -219,19 +219,19 @@ TEST_CASE("Interleaver - single thread combinatorics", "[Interleaver]") { } }; - Scenario::reset(); + scenario::reset(); INTERLEAVED_RUN( - Scenario, - THREAD("thread_0", &Scenario::thread_0)); + scenario, + THREAD("thread_0", &scenario::thread_0)); const std::vector> expected = { {0, 1, 2} }; - REQUIRE(Scenario::interleavings == expected); + REQUIRE(scenario::interleavings == expected); } TEST_CASE("Interleaver - two thread combinatorics", "[Interleaver]") { - struct Scenario : CollectorScenario { + struct scenario : collector_scenario { void thread_0() { hit(10); INTERLEAVED("A0"); @@ -248,14 +248,14 @@ TEST_CASE("Interleaver - two thread combinatorics", "[Interleaver]") { } }; - Scenario::reset(); + scenario::reset(); INTERLEAVED_RUN( - Scenario, - THREAD("thread_0", &Scenario::thread_0), - THREAD("thread_1", &Scenario::thread_1)); + scenario, + THREAD("thread_0", &scenario::thread_0), + THREAD("thread_1", &scenario::thread_1)); // Get and sort(!) all executed interleavings. - auto interleaveings = std::move(Scenario::interleavings); + auto interleaveings = std::move(scenario::interleavings); std::ranges::sort(interleaveings); // Check no interleaving was run twice. @@ -267,7 +267,7 @@ TEST_CASE("Interleaver - two thread combinatorics", "[Interleaver]") { TEST_CASE("Interleaver - three thread combinatorics", "[Interleaver]") { - struct Scenario : CollectorScenario { + struct scenario : collector_scenario { void thread_0() { hit(10); INTERLEAVED("A0"); @@ -285,14 +285,14 @@ TEST_CASE("Interleaver - three thread combinatorics", "[Interleaver]") { } }; - Scenario::reset(); + scenario::reset(); INTERLEAVED_RUN( - Scenario, - THREAD("thread_0", &Scenario::thread_0), - THREAD("thread_1", &Scenario::thread_1), - THREAD("thread_2", &Scenario::thread_2)); + scenario, + THREAD("thread_0", &scenario::thread_0), + THREAD("thread_1", &scenario::thread_1), + THREAD("thread_2", &scenario::thread_2)); - auto interleaveings = std::move(Scenario::interleavings); + auto interleaveings = std::move(scenario::interleavings); std::ranges::sort(interleaveings); REQUIRE(std::ranges::unique(interleaveings).end() == interleaveings.end()); @@ -301,7 +301,7 @@ TEST_CASE("Interleaver - three thread combinatorics", "[Interleaver]") { TEST_CASE("Interleaver - acquire", "[Interleaver]") { - struct Scenario : CollectorScenario { + struct scenario : collector_scenario { std::atomic_flag f; void thread_0() { @@ -318,14 +318,38 @@ TEST_CASE("Interleaver - acquire", "[Interleaver]") { } }; - Scenario::reset(); + scenario::reset(); INTERLEAVED_RUN( - Scenario, - THREAD("thread_0", &Scenario::thread_0), - THREAD("thread_1", &Scenario::thread_1)); + scenario, + THREAD("thread_0", &scenario::thread_0), + THREAD("thread_1", &scenario::thread_1)); - auto interleaveings = std::move(Scenario::interleavings); + auto interleaveings = std::move(scenario::interleavings); std::ranges::sort(interleaveings); REQUIRE(interleaveings.size() >= 1); REQUIRE(interleaveings[0] == std::vector{ 20, 10 }); +} + + +TEST_CASE("Interleaver - validate", "[Interleaver]") { + static size_t validations = 0; + + struct scenario : validated_scenario { + std::atomic_flag flag; + + void thread_0() { + flag.test_and_set(); + } + + void validate(const path& p) override { + INFO(p.dump()); + validations++; + } + }; + + INTERLEAVED_RUN( + scenario, + THREAD("thread_0", &scenario::thread_0)); + + REQUIRE(validations == 1); } \ No newline at end of file From 4dcf76b05ce772315fc95a95940aebec0f379cc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Kardos?= Date: Wed, 28 Feb 2024 22:07:35 +0100 Subject: [PATCH 6/9] fix threapool local, introduce resume_now() instead of handle().resume() --- benchmark/benchmark_thread_pool.cpp | 3 +-- include/asyncpp/promise.hpp | 2 +- include/asyncpp/stream.hpp | 12 +++++++----- include/asyncpp/task.hpp | 10 ++++++---- include/asyncpp/thread_pool.hpp | 7 ++++--- src/thread_pool.cpp | 2 +- test/helper_schedulers.hpp | 2 +- test/monitor_task.hpp | 12 +++++++----- test/test_thread_pool.cpp | 3 +-- 9 files changed, 29 insertions(+), 24 deletions(-) diff --git a/benchmark/benchmark_thread_pool.cpp b/benchmark/benchmark_thread_pool.cpp index 200cc16..5d09b0a 100644 --- a/benchmark/benchmark_thread_pool.cpp +++ b/benchmark/benchmark_thread_pool.cpp @@ -13,11 +13,10 @@ using namespace asyncpp; struct noop_promise : schedulable_promise { noop_promise(std::atomic_size_t* counter = nullptr) : counter(counter) {} - std::coroutine_handle<> handle() override { + void resume_now() override { if (counter) { counter->fetch_sub(1, std::memory_order_relaxed); } - return std::noop_coroutine(); } std::atomic_size_t* counter = nullptr; }; diff --git a/include/asyncpp/promise.hpp b/include/asyncpp/promise.hpp index eae324d..85db1fb 100644 --- a/include/asyncpp/promise.hpp +++ b/include/asyncpp/promise.hpp @@ -72,7 +72,7 @@ struct resumable_promise { struct schedulable_promise { virtual ~schedulable_promise() = default; - virtual std::coroutine_handle<> handle() = 0; + virtual void resume_now() = 0; schedulable_promise* m_scheduler_next = nullptr; schedulable_promise* m_scheduler_prev = nullptr; scheduler* m_scheduler = nullptr; diff --git a/include/asyncpp/stream.hpp b/include/asyncpp/stream.hpp index eebfde9..7f6daa1 100644 --- a/include/asyncpp/stream.hpp +++ b/include/asyncpp/stream.hpp @@ -100,16 +100,18 @@ namespace impl_stream { m_result = std::nullopt; } - auto handle() -> std::coroutine_handle<> override { - return std::coroutine_handle::from_promise(*this); + void resume() final { + return m_scheduler ? m_scheduler->schedule(*this) : resume_now(); } - void resume() override { - return m_scheduler ? m_scheduler->schedule(*this) : handle().resume(); + void resume_now() final { + const auto handle = std::coroutine_handle::from_promise(*this); + handle.resume(); } void destroy() noexcept { - handle().destroy(); + const auto handle = std::coroutine_handle::from_promise(*this); + handle.destroy(); } auto await() noexcept; diff --git a/include/asyncpp/task.hpp b/include/asyncpp/task.hpp index 3da938e..77ff137 100644 --- a/include/asyncpp/task.hpp +++ b/include/asyncpp/task.hpp @@ -40,12 +40,13 @@ namespace impl_task { return final_awaitable{}; } - auto handle() -> std::coroutine_handle<> final { - return std::coroutine_handle::from_promise(*this); + void resume_now() final { + const auto handle = std::coroutine_handle::from_promise(*this); + handle.resume(); } void resume() final { - return m_scheduler ? m_scheduler->schedule(*this) : handle().resume(); + return m_scheduler ? m_scheduler->schedule(*this) : resume_now(); } void start() { @@ -62,7 +63,8 @@ namespace impl_task { } void destroy() { - handle().destroy(); + const auto handle = std::coroutine_handle::from_promise(*this); + handle.destroy(); } private: diff --git a/include/asyncpp/thread_pool.hpp b/include/asyncpp/thread_pool.hpp index ac9e60c..72ac74d 100644 --- a/include/asyncpp/thread_pool.hpp +++ b/include/asyncpp/thread_pool.hpp @@ -107,12 +107,13 @@ class thread_pool_3 : public scheduler { private: void run(pack& pack) { + m_local = this; size_t stealing_attempt = pack.workers.size(); bool exit_loop = false; while (!exit_loop) { const auto promise = try_get_promise(pack, stealing_attempt, exit_loop); if (promise) { - promise->handle().resume(); + promise->resume_now(); } } } @@ -161,8 +162,8 @@ class thread_pool_3 : public scheduler { } private: - pack m_pack; - std::atomic_ptrdiff_t m_next_in_schedule; + alignas(64) pack m_pack; + alignas(64) std::atomic_ptrdiff_t m_next_in_schedule; inline static thread_local worker* m_local = nullptr; }; diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 59d07a4..26f6f95 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -70,7 +70,7 @@ void thread_pool::execute(worker& local, std::span workers) { do { if (const auto item = INTERLEAVED(local.worklist.pop())) { - item->handle().resume(); + item->resume_now(); continue; } else if (const auto item = INTERLEAVED(global_worklist.pop())) { diff --git a/test/helper_schedulers.hpp b/test/helper_schedulers.hpp index 58a2238..b419edf 100644 --- a/test/helper_schedulers.hpp +++ b/test/helper_schedulers.hpp @@ -22,7 +22,7 @@ class thread_locked_scheduler : public asyncpp::scheduler { void resume() { const auto promise = m_promise.exchange(nullptr); assert(promise != nullptr); - promise->handle().resume(); + promise->resume_now(); } private: diff --git a/test/monitor_task.hpp b/test/monitor_task.hpp index b134aec..d0580a2 100644 --- a/test/monitor_task.hpp +++ b/test/monitor_task.hpp @@ -37,17 +37,19 @@ class [[nodiscard]] monitor_task { m_counters->exception = std::current_exception(); } - void resume() override { + void resume() final { m_counters->suspensions.fetch_add(1); - m_scheduler ? m_scheduler->schedule(*this) : handle().resume(); + m_scheduler ? m_scheduler->schedule(*this) : resume_now(); } - std::coroutine_handle<> handle() override { - return std::coroutine_handle::from_promise(*this); + void resume_now() final { + const auto handle = std::coroutine_handle::from_promise(*this); + handle.resume(); } void destroy() noexcept { - handle().destroy(); + const auto handle = std::coroutine_handle::from_promise(*this); + handle.destroy(); } const counters& get_counters() const { diff --git a/test/test_thread_pool.cpp b/test/test_thread_pool.cpp index 5c700d0..144cb8d 100644 --- a/test/test_thread_pool.cpp +++ b/test/test_thread_pool.cpp @@ -21,9 +21,8 @@ constexpr size_t branching = 10; struct test_promise : schedulable_promise { - std::coroutine_handle<> handle() override { + void resume_now() final { ++num_queried; - return std::noop_coroutine(); } std::atomic_size_t num_queried = 0; }; From 6f7d76bfb740115282c8c2b3fa5c97dbf1cd7f20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Kardos?= Date: Thu, 29 Feb 2024 21:59:41 +0100 Subject: [PATCH 7/9] clean up thread_pool_3 thing --- benchmark/benchmark_atomic.cpp | 21 +-- benchmark/benchmark_task_spawn.cpp | 3 +- benchmark/benchmark_thread_pool.cpp | 14 +- include/asyncpp/thread_pool.hpp | 198 ++++------------------------ include/asyncpp/threading/cache.hpp | 14 ++ src/thread_pool.cpp | 170 ++++++++++++++---------- test/test_thread_pool.cpp | 103 +-------------- 7 files changed, 169 insertions(+), 354 deletions(-) create mode 100644 include/asyncpp/threading/cache.hpp diff --git a/benchmark/benchmark_atomic.cpp b/benchmark/benchmark_atomic.cpp index d21f3b8..0248e55 100644 --- a/benchmark/benchmark_atomic.cpp +++ b/benchmark/benchmark_atomic.cpp @@ -1,3 +1,5 @@ +#include + #include #include #include @@ -10,11 +12,14 @@ // achievable and reasonable on the hardware. +using namespace asyncpp; + + static constexpr size_t base_reps = 4'000'000; BASELINE(atomic_rmw, x1_thread, 30, 1) { - alignas(64) std::atomic_size_t counter = 0; + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; static constexpr size_t reps = base_reps; static const auto func = [&counter] { @@ -29,7 +34,7 @@ BASELINE(atomic_rmw, x1_thread, 30, 1) { BENCHMARK(atomic_rmw, x2_thread, 30, 1) { - alignas(64) std::atomic_size_t counter = 0; + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; static constexpr size_t reps = base_reps / 2; static const auto func = [&counter] { @@ -44,7 +49,7 @@ BENCHMARK(atomic_rmw, x2_thread, 30, 1) { BENCHMARK(atomic_rmw, x4_thread, 30, 1) { - alignas(64) std::atomic_size_t counter = 0; + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; static constexpr size_t reps = base_reps / 4; static const auto func = [&counter] { @@ -59,7 +64,7 @@ BENCHMARK(atomic_rmw, x4_thread, 30, 1) { BENCHMARK(atomic_rmw, x8_thread, 30, 1) { - alignas(64) std::atomic_size_t counter = 0; + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; static constexpr size_t reps = base_reps / 8; static const auto func = [&counter] { @@ -74,7 +79,7 @@ BENCHMARK(atomic_rmw, x8_thread, 30, 1) { BASELINE(atomic_read, x1_thread, 30, 1) { - alignas(64) std::atomic_size_t counter = 0; + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; static constexpr size_t reps = base_reps; static const auto func = [&counter] { @@ -89,7 +94,7 @@ BASELINE(atomic_read, x1_thread, 30, 1) { BENCHMARK(atomic_read, x2_thread, 30, 1) { - alignas(64) std::atomic_size_t counter = 0; + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; static constexpr size_t reps = base_reps / 2; static const auto func = [&counter] { @@ -104,7 +109,7 @@ BENCHMARK(atomic_read, x2_thread, 30, 1) { BENCHMARK(atomic_read, x4_thread, 30, 1) { - alignas(64) std::atomic_size_t counter = 0; + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; static constexpr size_t reps = base_reps / 4; static const auto func = [&counter] { @@ -119,7 +124,7 @@ BENCHMARK(atomic_read, x4_thread, 30, 1) { BENCHMARK(atomic_read, x8_thread, 30, 1) { - alignas(64) std::atomic_size_t counter = 0; + alignas(avoid_false_sharing) std::atomic_size_t counter = 0; static constexpr size_t reps = base_reps / 8; static const auto func = [&counter] { diff --git a/benchmark/benchmark_task_spawn.cpp b/benchmark/benchmark_task_spawn.cpp index cdf315e..444f550 100644 --- a/benchmark/benchmark_task_spawn.cpp +++ b/benchmark/benchmark_task_spawn.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -56,7 +57,7 @@ struct FixtureStack : celero::TestFixture { private: static constexpr inline size_t size = 10485760; struct block { - alignas(64) std::byte content[64]; + alignas(avoid_false_sharing) std::byte content[avoid_false_sharing]; }; std::unique_ptr buffer = std::make_unique_for_overwrite(size / sizeof(block)); std::pmr::monotonic_buffer_resource resource; diff --git a/benchmark/benchmark_thread_pool.cpp b/benchmark/benchmark_thread_pool.cpp index 5d09b0a..14b586f 100644 --- a/benchmark/benchmark_thread_pool.cpp +++ b/benchmark/benchmark_thread_pool.cpp @@ -40,7 +40,7 @@ struct FixturePool : celero::TestFixture { sync(); } - thread_pool_3 pool; + thread_pool pool; std::atomic_size_t num_running; std::vector promises; }; @@ -68,7 +68,7 @@ BENCHMARK_F(tp_schedule_outside, x4_thread, FixturePool<4>, 30, 1) { BASELINE_F(tp_schedule_inside, x1_thread, FixturePool<1>, 30, 1) { - static constexpr auto coro = [](std::span promises, thread_pool_3& pool) -> task { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { for (auto& promise : promises) { pool.schedule(promise); } @@ -80,7 +80,7 @@ BASELINE_F(tp_schedule_inside, x1_thread, FixturePool<1>, 30, 1) { BENCHMARK_F(tp_schedule_inside, x2_thread, FixturePool<2>, 30, 1) { - static constexpr auto coro = [](std::span promises, thread_pool_3& pool) -> task { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { for (auto& promise : promises) { pool.schedule(promise); } @@ -95,7 +95,7 @@ BENCHMARK_F(tp_schedule_inside, x2_thread, FixturePool<2>, 30, 1) { BENCHMARK_F(tp_schedule_inside, x4_thread, FixturePool<4>, 30, 1) { - static constexpr auto coro = [](std::span promises, thread_pool_3& pool) -> task { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { for (auto& promise : promises) { pool.schedule(promise); } @@ -114,7 +114,7 @@ BENCHMARK_F(tp_schedule_inside, x4_thread, FixturePool<4>, 30, 1) { BASELINE_F(tp_stealing, x1_thread, FixturePool<1 + 1>, 30, 1) { - static constexpr auto coro = [](std::span promises, thread_pool_3& pool) -> task { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { for (auto& promise : promises) { pool.schedule(promise); } @@ -125,7 +125,7 @@ BASELINE_F(tp_stealing, x1_thread, FixturePool<1 + 1>, 30, 1) { BENCHMARK_F(tp_stealing, x2_thread, FixturePool<2 + 1>, 30, 1) { - static constexpr auto coro = [](std::span promises, thread_pool_3& pool) -> task { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { for (auto& promise : promises) { pool.schedule(promise); } @@ -136,7 +136,7 @@ BENCHMARK_F(tp_stealing, x2_thread, FixturePool<2 + 1>, 30, 1) { BENCHMARK_F(tp_stealing, x4_thread, FixturePool<4 + 1>, 30, 1) { - static constexpr auto coro = [](std::span promises, thread_pool_3& pool) -> task { + static constexpr auto coro = [](std::span promises, thread_pool& pool) -> task { for (auto& promise : promises) { pool.schedule(promise); } diff --git a/include/asyncpp/thread_pool.hpp b/include/asyncpp/thread_pool.hpp index 72ac74d..e4cd187 100644 --- a/include/asyncpp/thread_pool.hpp +++ b/include/asyncpp/thread_pool.hpp @@ -4,19 +4,19 @@ #include "container/atomic_deque.hpp" #include "container/atomic_stack.hpp" #include "scheduler.hpp" +#include "threading/cache.hpp" +#include "threading/spinlock.hpp" #include #include #include -#include #include #include namespace asyncpp { - -class thread_pool_3 : public scheduler { +class thread_pool : public scheduler { public: struct pack; @@ -24,194 +24,46 @@ class thread_pool_3 : public scheduler { public: using queue = deque; - worker() - : m_sema(0) {} - - ~worker() { - cancel(); - } - - void insert(schedulable_promise& promise) { - std::unique_lock lk(m_spinlock, std::defer_lock); - INTERLEAVED_ACQUIRE(lk.lock()); - const auto previous = m_promises.push_back(&promise); - const auto blocked = m_blocked.test(std::memory_order_relaxed); - INTERLEAVED(lk.unlock()); - - if (!previous && blocked) { - INTERLEAVED(m_sema.release()); - } - } - - schedulable_promise* steal_from_this() { - std::unique_lock lk(m_spinlock, std::defer_lock); - INTERLEAVED(lk.lock()); - return m_promises.pop_front(); - } - - schedulable_promise* try_get_promise(pack& pack, size_t& stealing_attempt, bool& exit_loop) { - std::unique_lock lk(m_spinlock, std::defer_lock); - INTERLEAVED_ACQUIRE(lk.lock()); - const auto promise = m_promises.front(); - if (promise) { - m_promises.pop_front(); - return promise; - } - - if (stealing_attempt > 0) { - INTERLEAVED(lk.unlock()); - const auto stolen = steal_from_other(pack, stealing_attempt); - stealing_attempt = stolen ? pack.workers.size() : stealing_attempt - 1; - return stolen; - } - - - if (INTERLEAVED(m_cancelled.test(std::memory_order_relaxed))) { - exit_loop = true; - } - else { - INTERLEAVED(m_blocked.test_and_set(std::memory_order_relaxed)); - pack.blocked.push(this); - pack.num_blocked.fetch_add(1, std::memory_order_relaxed); - INTERLEAVED(lk.unlock()); - INTERLEAVED_ACQUIRE(m_sema.acquire()); - INTERLEAVED(m_blocked.clear()); - stealing_attempt = pack.workers.size(); - } - return nullptr; - } - - schedulable_promise* steal_from_other(pack& pack, size_t& stealing_attempt) const { - const size_t pack_size = pack.workers.size(); - const size_t my_index = this - pack.workers.data(); - const size_t victim_index = (my_index + stealing_attempt) % pack_size; - return pack.workers[victim_index].steal_from_this(); - } - - void start(pack& pack) { - m_thread = std::jthread([this, &pack] { - run(pack); - }); - } - - void cancel() { - std::unique_lock lk(m_spinlock, std::defer_lock); - INTERLEAVED_ACQUIRE(lk.lock()); - INTERLEAVED(m_cancelled.test_and_set(std::memory_order_relaxed)); - const auto blocked = INTERLEAVED(m_blocked.test(std::memory_order_relaxed)); - lk.unlock(); - if (blocked) { - INTERLEAVED(m_sema.release()); - } - } + worker(); + ~worker(); + + void insert(schedulable_promise& promise); + schedulable_promise* steal_from_this(); + schedulable_promise* try_get_promise(pack& pack, size_t& stealing_attempt, bool& exit_loop); + schedulable_promise* steal_from_other(pack& pack, size_t& stealing_attempt) const; + void start(pack& pack); + void cancel(); private: - void run(pack& pack) { - m_local = this; - size_t stealing_attempt = pack.workers.size(); - bool exit_loop = false; - while (!exit_loop) { - const auto promise = try_get_promise(pack, stealing_attempt, exit_loop); - if (promise) { - promise->resume_now(); - } - } - } + void run(pack& pack); public: worker* m_next = nullptr; private: - alignas(64) spinlock m_spinlock; - alignas(64) queue m_promises; - alignas(64) std::atomic_flag m_blocked; - alignas(64) std::binary_semaphore m_sema; - alignas(64) std::jthread m_thread; - alignas(64) std::atomic_flag m_cancelled; + alignas(avoid_false_sharing) spinlock m_spinlock; + alignas(avoid_false_sharing) queue m_promises; + alignas(avoid_false_sharing) std::atomic_flag m_blocked; + alignas(avoid_false_sharing) std::binary_semaphore m_sema; + alignas(avoid_false_sharing) std::jthread m_thread; + alignas(avoid_false_sharing) std::atomic_flag m_cancelled; }; struct pack { - alignas(64) std::vector workers; - alignas(64) atomic_stack blocked; - alignas(64) std::atomic_size_t num_blocked = 0; + alignas(avoid_false_sharing) std::vector workers; + alignas(avoid_false_sharing) atomic_stack blocked; + alignas(avoid_false_sharing) std::atomic_size_t num_blocked = 0; }; public: - thread_pool_3(size_t num_threads = 1) - : m_pack(std::vector(num_threads)), m_next_in_schedule(0) { - for (auto& worker : m_pack.workers) { - worker.start(m_pack); - } - } - - void schedule(schedulable_promise& promise) override { - size_t num_blocked = m_pack.num_blocked.load(std::memory_order_relaxed); - const auto blocked = num_blocked > 0 ? m_pack.blocked.pop() : nullptr; - if (blocked) { - blocked->insert(promise); - m_pack.num_blocked.fetch_sub(1, std::memory_order_relaxed); - } - else if (m_local) { - m_local->insert(promise); - } - else { - const auto selected = m_next_in_schedule.fetch_add(1, std::memory_order_relaxed) % m_pack.workers.size(); - m_pack.workers[selected].insert(promise); - } - } - -private: - alignas(64) pack m_pack; - alignas(64) std::atomic_ptrdiff_t m_next_in_schedule; - inline static thread_local worker* m_local = nullptr; -}; - - -class thread_pool : public scheduler { -public: - struct worker { - worker* m_next = nullptr; - - std::jthread thread; - atomic_stack worklist; - }; - thread_pool(size_t num_threads = 1); - thread_pool(thread_pool&&) = delete; - thread_pool operator=(thread_pool&&) = delete; - ~thread_pool(); - void schedule(schedulable_promise& promise) override; - - static void schedule(schedulable_promise& item, - atomic_stack& global_worklist, - std::condition_variable& global_notification, - std::mutex& global_mutex, - std::atomic_size_t& num_waiting, - worker* local = nullptr); - - static schedulable_promise* steal(std::span workers); - - static void execute(worker& local, - atomic_stack& global_worklist, - std::condition_variable& global_notification, - std::mutex& global_mutex, - std::atomic_flag& terminate, - std::atomic_size_t& num_waiting, - std::span workers); - private: - std::condition_variable m_global_notification; - std::mutex m_global_mutex; - atomic_stack m_global_worklist; - std::vector m_workers; - std::atomic_flag m_terminate; - std::atomic_size_t m_num_waiting = 0; - - inline static thread_local worker* local = nullptr; + alignas(avoid_false_sharing) pack m_pack; + alignas(avoid_false_sharing) std::atomic_ptrdiff_t m_next_in_schedule; + inline static thread_local worker* m_local = nullptr; }; - } // namespace asyncpp \ No newline at end of file diff --git a/include/asyncpp/threading/cache.hpp b/include/asyncpp/threading/cache.hpp new file mode 100644 index 0000000..8adbc2f --- /dev/null +++ b/include/asyncpp/threading/cache.hpp @@ -0,0 +1,14 @@ +#pragma once + + +namespace asyncpp { + +#ifdef __cpp_lib_hardware_interference_size +inline constexpr size_t avoid_false_sharing = std::hardware_destructive_interference_size; +inline constexpr size_t promote_true_sharing = std::hardware_constructive_interference_size; +#else +inline constexpr size_t avoid_false_sharing = 64; +inline constexpr size_t promote_true_sharing = 64; +#endif + +} // namespace asyncpp \ No newline at end of file diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 26f6f95..b786ce4 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -5,92 +5,130 @@ namespace asyncpp { -thread_pool::thread_pool(size_t num_threads) - : m_workers(num_threads) { - for (auto& w : m_workers) { - w.thread = std::jthread([this, &w] { - local = &w; - execute(w, m_global_worklist, m_global_notification, m_global_mutex, m_terminate, m_num_waiting, m_workers); - }); - } +thread_pool::worker::worker() + : m_sema(0) {} + + +thread_pool::worker::~worker() { + cancel(); } -thread_pool::~thread_pool() { - std::lock_guard lk(m_global_mutex); - m_terminate.test_and_set(); - m_global_notification.notify_all(); +void thread_pool::worker::insert(schedulable_promise& promise) { + std::unique_lock lk(m_spinlock, std::defer_lock); + INTERLEAVED_ACQUIRE(lk.lock()); + const auto previous = m_promises.push_back(&promise); + const auto blocked = m_blocked.test(std::memory_order_relaxed); + INTERLEAVED(lk.unlock()); + + if (!previous && blocked) { + INTERLEAVED(m_sema.release()); + } } -void thread_pool::schedule(schedulable_promise& promise) { - schedule(promise, m_global_worklist, m_global_notification, m_global_mutex, m_num_waiting, local); +schedulable_promise* thread_pool::worker::steal_from_this() { + std::unique_lock lk(m_spinlock, std::defer_lock); + INTERLEAVED(lk.lock()); + return m_promises.pop_front(); } -void thread_pool::schedule(schedulable_promise& item, - atomic_stack& global_worklist, - std::condition_variable& global_notification, - std::mutex& global_mutex, - std::atomic_size_t& num_waiting, - worker* local) { - if (local) { - const auto prev_item = INTERLEAVED(local->worklist.push(&item)); - if (prev_item != nullptr) { - if (num_waiting.load(std::memory_order_relaxed) > 0) { - global_notification.notify_one(); - } - } +schedulable_promise* thread_pool::worker::try_get_promise(pack& pack, size_t& stealing_attempt, bool& exit_loop) { + std::unique_lock lk(m_spinlock, std::defer_lock); + INTERLEAVED_ACQUIRE(lk.lock()); + const auto promise = m_promises.front(); + if (promise) { + m_promises.pop_front(); + return promise; + } + + if (stealing_attempt > 0) { + INTERLEAVED(lk.unlock()); + const auto stolen = steal_from_other(pack, stealing_attempt); + stealing_attempt = stolen ? pack.workers.size() : stealing_attempt - 1; + return stolen; + } + + + if (INTERLEAVED(m_cancelled.test(std::memory_order_relaxed))) { + exit_loop = true; } else { - std::unique_lock lk(global_mutex, std::defer_lock); - INTERLEAVED_ACQUIRE(lk.lock()); - INTERLEAVED(global_worklist.push(&item)); - INTERLEAVED(global_notification.notify_one()); + INTERLEAVED(m_blocked.test_and_set(std::memory_order_relaxed)); + pack.blocked.push(this); + pack.num_blocked.fetch_add(1, std::memory_order_relaxed); + INTERLEAVED(lk.unlock()); + INTERLEAVED_ACQUIRE(m_sema.acquire()); + INTERLEAVED(m_blocked.clear()); + stealing_attempt = pack.workers.size(); } + return nullptr; } -schedulable_promise* thread_pool::steal(std::span workers) { - for (auto& w : workers) { - if (const auto item = INTERLEAVED(w.worklist.pop())) { - return item; - } +schedulable_promise* thread_pool::worker::steal_from_other(pack& pack, size_t& stealing_attempt) const { + const size_t pack_size = pack.workers.size(); + const size_t my_index = this - pack.workers.data(); + const size_t victim_index = (my_index + stealing_attempt) % pack_size; + return pack.workers[victim_index].steal_from_this(); +} + + +void thread_pool::worker::start(pack& pack) { + m_thread = std::jthread([this, &pack] { + run(pack); + }); +} + + +void thread_pool::worker::cancel() { + std::unique_lock lk(m_spinlock, std::defer_lock); + INTERLEAVED_ACQUIRE(lk.lock()); + INTERLEAVED(m_cancelled.test_and_set(std::memory_order_relaxed)); + const auto blocked = INTERLEAVED(m_blocked.test(std::memory_order_relaxed)); + lk.unlock(); + if (blocked) { + INTERLEAVED(m_sema.release()); } - return nullptr; } -void thread_pool::execute(worker& local, - atomic_stack& global_worklist, - std::condition_variable& global_notification, - std::mutex& global_mutex, - std::atomic_flag& terminate, - std::atomic_size_t& num_waiting, - std::span workers) { - do { - if (const auto item = INTERLEAVED(local.worklist.pop())) { - item->resume_now(); - continue; - } - else if (const auto item = INTERLEAVED(global_worklist.pop())) { - local.worklist.push(item); - continue; - } - else if (const auto item = steal(workers)) { - local.worklist.push(item); - continue; - } - else { - std::unique_lock lk(global_mutex, std::defer_lock); - INTERLEAVED_ACQUIRE(lk.lock()); - if (!INTERLEAVED(terminate.test()) && INTERLEAVED(global_worklist.empty())) { - num_waiting.fetch_add(1, std::memory_order_relaxed); - INTERLEAVED_ACQUIRE(global_notification.wait(lk)); - num_waiting.fetch_sub(1, std::memory_order_relaxed); - } +void thread_pool::worker::run(pack& pack) { + m_local = this; + size_t stealing_attempt = pack.workers.size(); + bool exit_loop = false; + while (!exit_loop) { + const auto promise = try_get_promise(pack, stealing_attempt, exit_loop); + if (promise) { + promise->resume_now(); } - } while (!INTERLEAVED(terminate.test())); + } +} + + +thread_pool::thread_pool(size_t num_threads) + : m_pack(std::vector(num_threads)), m_next_in_schedule(0) { + for (auto& worker : m_pack.workers) { + worker.start(m_pack); + } +} + + +void thread_pool::schedule(schedulable_promise& promise) { + size_t num_blocked = m_pack.num_blocked.load(std::memory_order_relaxed); + const auto blocked = num_blocked > 0 ? m_pack.blocked.pop() : nullptr; + if (blocked) { + blocked->insert(promise); + m_pack.num_blocked.fetch_sub(1, std::memory_order_relaxed); + } + else if (m_local) { + m_local->insert(promise); + } + else { + const auto selected = m_next_in_schedule.fetch_add(1, std::memory_order_relaxed) % m_pack.workers.size(); + m_pack.workers[selected].insert(promise); + } } diff --git a/test/test_thread_pool.cpp b/test/test_thread_pool.cpp index 144cb8d..6f55c58 100644 --- a/test/test_thread_pool.cpp +++ b/test/test_thread_pool.cpp @@ -30,7 +30,7 @@ struct test_promise : schedulable_promise { TEST_CASE("Thread pool 3: insert - try_get_promise interleave", "[Thread pool 3]") { struct scenario : testing::validated_scenario { - thread_pool_3::pack pack{ .workers = std::vector(1) }; + thread_pool::pack pack{ .workers = std::vector(1) }; test_promise promise; bool exit_loop = false; size_t stealing_attempt = 0; @@ -61,7 +61,7 @@ TEST_CASE("Thread pool 3: insert - try_get_promise interleave", "[Thread pool 3] TEST_CASE("Thread pool 3: cancel - try_get_promise interleave", "[Thread pool 3]") { struct scenario : testing::validated_scenario { - thread_pool_3::pack pack{ .workers = std::vector(1) }; + thread_pool::pack pack{ .workers = std::vector(1) }; bool exit_loop = false; size_t stealing_attempt = 0; schedulable_promise* result = nullptr; @@ -91,7 +91,7 @@ TEST_CASE("Thread pool 3: cancel - try_get_promise interleave", "[Thread pool 3] TEST_CASE("Thread pool 3: steal - try_get_promise interleave", "[Thread pool 3]") { struct scenario : testing::validated_scenario { - thread_pool_3::pack pack{ .workers = std::vector(1) }; + thread_pool::pack pack{ .workers = std::vector(1) }; test_promise promise; bool exit_loop = false; size_t stealing_attempt = 0; @@ -122,101 +122,6 @@ TEST_CASE("Thread pool 3: steal - try_get_promise interleave", "[Thread pool 3]" TEST_CASE("Thread pool 3: smoke test - schedule tasks", "[Scheduler]") { - thread_pool_3 sched(num_threads); - - static const auto coro = [&sched](auto self, int depth) -> task { - if (depth <= 0) { - co_return 1; - } - std::array, branching> children; - std::ranges::generate(children, [&] { return launch(self(self, depth - 1), sched); }); - int64_t sum = 0; - for (auto& tk : children) { - sum += co_await tk; - } - co_return sum; - }; - - const auto count = int64_t(std::pow(branching, depth)); - const auto result = join(bind(coro(coro, depth), sched)); - REQUIRE(result == count); -} - - -TEST_CASE("Thread pool: schedule worklist selection", "[Thread pool]") { - std::condition_variable global_notification; - std::mutex global_mutex; - atomic_stack global_worklist; - std::atomic_size_t num_waiting; - std::vector workers(1); - - test_promise promise; - - SECTION("has local worker") { - thread_pool::schedule(promise, global_worklist, global_notification, global_mutex, num_waiting, &workers[0]); - REQUIRE(workers[0].worklist.pop() == &promise); - REQUIRE(global_worklist.empty()); - } - SECTION("no local worker") { - thread_pool::schedule(promise, global_worklist, global_notification, global_mutex, num_waiting, &workers[0]); - REQUIRE(workers[0].worklist.pop() == &promise); - } -} - - -TEST_CASE("Thread pool: steal from workers", "[Thread pool]") { - std::vector workers(4); - - test_promise promise; - - SECTION("no work items") { - REQUIRE(nullptr == thread_pool::steal(workers)); - } - SECTION("1 work item") { - workers[2].worklist.push(&promise); - REQUIRE(&promise == thread_pool::steal(workers)); - } -} - - -TEST_CASE("Thread pool: ensure execution", "[Thread pool]") { - // This test makes sure that no matter the interleaving, a scheduled promise - // will be picked up and executed by a worker thread. - - struct scenario : testing::validated_scenario { - std::condition_variable global_notification; - std::mutex global_mutex; - atomic_stack global_worklist; - std::atomic_size_t num_waiting; - std::vector workers; - std::atomic_flag terminate; - test_promise promise; - - scenario() : workers(1) {} - - void schedule() { - thread_pool::schedule(promise, global_worklist, global_notification, global_mutex, num_waiting); - std::unique_lock lk(global_mutex, std::defer_lock); - INTERLEAVED_ACQUIRE(lk.lock()); - INTERLEAVED(terminate.test_and_set()); - INTERLEAVED(global_notification.notify_all()); - } - - void execute() { - thread_pool::execute(workers[0], global_worklist, global_notification, global_mutex, terminate, num_waiting, std::span(workers)); - } - - void validate(const testing::path& p) override { - INFO(p.dump()); - REQUIRE(promise.num_queried.load() > 0); - } - }; - - INTERLEAVED_RUN(scenario, THREAD("schedule", &scenario::schedule), THREAD("execute", &scenario::execute)); -} - - -TEST_CASE("Thread pool: smoke test - schedule tasks", "[Scheduler]") { thread_pool sched(num_threads); static const auto coro = [&sched](auto self, int depth) -> task { @@ -235,4 +140,4 @@ TEST_CASE("Thread pool: smoke test - schedule tasks", "[Scheduler]") { const auto count = int64_t(std::pow(branching, depth)); const auto result = join(bind(coro(coro, depth), sched)); REQUIRE(result == count); -} \ No newline at end of file +} From b84b6c9b09384d07e3b81d4604d9c8031c411ff4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Kardos?= Date: Thu, 29 Feb 2024 22:25:04 +0100 Subject: [PATCH 8/9] include cstddef --- include/asyncpp/threading/cache.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/include/asyncpp/threading/cache.hpp b/include/asyncpp/threading/cache.hpp index 8adbc2f..2829515 100644 --- a/include/asyncpp/threading/cache.hpp +++ b/include/asyncpp/threading/cache.hpp @@ -1,6 +1,9 @@ #pragma once +#include + + namespace asyncpp { #ifdef __cpp_lib_hardware_interference_size From ff401dfe82183bce5c6e3af44194db4e95e9f019 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Kardos?= Date: Thu, 29 Feb 2024 22:34:51 +0100 Subject: [PATCH 9/9] include --- include/asyncpp/threading/cache.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/include/asyncpp/threading/cache.hpp b/include/asyncpp/threading/cache.hpp index 2829515..b4aa0e9 100644 --- a/include/asyncpp/threading/cache.hpp +++ b/include/asyncpp/threading/cache.hpp @@ -2,6 +2,7 @@ #include +#include namespace asyncpp {