From 4dcf76b05ce772315fc95a95940aebec0f379cc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Kardos?= Date: Wed, 28 Feb 2024 22:07:35 +0100 Subject: [PATCH] fix threapool local, introduce resume_now() instead of handle().resume() --- benchmark/benchmark_thread_pool.cpp | 3 +-- include/asyncpp/promise.hpp | 2 +- include/asyncpp/stream.hpp | 12 +++++++----- include/asyncpp/task.hpp | 10 ++++++---- include/asyncpp/thread_pool.hpp | 7 ++++--- src/thread_pool.cpp | 2 +- test/helper_schedulers.hpp | 2 +- test/monitor_task.hpp | 12 +++++++----- test/test_thread_pool.cpp | 3 +-- 9 files changed, 29 insertions(+), 24 deletions(-) diff --git a/benchmark/benchmark_thread_pool.cpp b/benchmark/benchmark_thread_pool.cpp index 200cc16..5d09b0a 100644 --- a/benchmark/benchmark_thread_pool.cpp +++ b/benchmark/benchmark_thread_pool.cpp @@ -13,11 +13,10 @@ using namespace asyncpp; struct noop_promise : schedulable_promise { noop_promise(std::atomic_size_t* counter = nullptr) : counter(counter) {} - std::coroutine_handle<> handle() override { + void resume_now() override { if (counter) { counter->fetch_sub(1, std::memory_order_relaxed); } - return std::noop_coroutine(); } std::atomic_size_t* counter = nullptr; }; diff --git a/include/asyncpp/promise.hpp b/include/asyncpp/promise.hpp index eae324d..85db1fb 100644 --- a/include/asyncpp/promise.hpp +++ b/include/asyncpp/promise.hpp @@ -72,7 +72,7 @@ struct resumable_promise { struct schedulable_promise { virtual ~schedulable_promise() = default; - virtual std::coroutine_handle<> handle() = 0; + virtual void resume_now() = 0; schedulable_promise* m_scheduler_next = nullptr; schedulable_promise* m_scheduler_prev = nullptr; scheduler* m_scheduler = nullptr; diff --git a/include/asyncpp/stream.hpp b/include/asyncpp/stream.hpp index eebfde9..7f6daa1 100644 --- a/include/asyncpp/stream.hpp +++ b/include/asyncpp/stream.hpp @@ -100,16 +100,18 @@ namespace impl_stream { m_result = std::nullopt; } - auto handle() -> std::coroutine_handle<> override { - return std::coroutine_handle::from_promise(*this); + void resume() final { + return m_scheduler ? m_scheduler->schedule(*this) : resume_now(); } - void resume() override { - return m_scheduler ? m_scheduler->schedule(*this) : handle().resume(); + void resume_now() final { + const auto handle = std::coroutine_handle::from_promise(*this); + handle.resume(); } void destroy() noexcept { - handle().destroy(); + const auto handle = std::coroutine_handle::from_promise(*this); + handle.destroy(); } auto await() noexcept; diff --git a/include/asyncpp/task.hpp b/include/asyncpp/task.hpp index 3da938e..77ff137 100644 --- a/include/asyncpp/task.hpp +++ b/include/asyncpp/task.hpp @@ -40,12 +40,13 @@ namespace impl_task { return final_awaitable{}; } - auto handle() -> std::coroutine_handle<> final { - return std::coroutine_handle::from_promise(*this); + void resume_now() final { + const auto handle = std::coroutine_handle::from_promise(*this); + handle.resume(); } void resume() final { - return m_scheduler ? m_scheduler->schedule(*this) : handle().resume(); + return m_scheduler ? m_scheduler->schedule(*this) : resume_now(); } void start() { @@ -62,7 +63,8 @@ namespace impl_task { } void destroy() { - handle().destroy(); + const auto handle = std::coroutine_handle::from_promise(*this); + handle.destroy(); } private: diff --git a/include/asyncpp/thread_pool.hpp b/include/asyncpp/thread_pool.hpp index ac9e60c..72ac74d 100644 --- a/include/asyncpp/thread_pool.hpp +++ b/include/asyncpp/thread_pool.hpp @@ -107,12 +107,13 @@ class thread_pool_3 : public scheduler { private: void run(pack& pack) { + m_local = this; size_t stealing_attempt = pack.workers.size(); bool exit_loop = false; while (!exit_loop) { const auto promise = try_get_promise(pack, stealing_attempt, exit_loop); if (promise) { - promise->handle().resume(); + promise->resume_now(); } } } @@ -161,8 +162,8 @@ class thread_pool_3 : public scheduler { } private: - pack m_pack; - std::atomic_ptrdiff_t m_next_in_schedule; + alignas(64) pack m_pack; + alignas(64) std::atomic_ptrdiff_t m_next_in_schedule; inline static thread_local worker* m_local = nullptr; }; diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 59d07a4..26f6f95 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -70,7 +70,7 @@ void thread_pool::execute(worker& local, std::span workers) { do { if (const auto item = INTERLEAVED(local.worklist.pop())) { - item->handle().resume(); + item->resume_now(); continue; } else if (const auto item = INTERLEAVED(global_worklist.pop())) { diff --git a/test/helper_schedulers.hpp b/test/helper_schedulers.hpp index 58a2238..b419edf 100644 --- a/test/helper_schedulers.hpp +++ b/test/helper_schedulers.hpp @@ -22,7 +22,7 @@ class thread_locked_scheduler : public asyncpp::scheduler { void resume() { const auto promise = m_promise.exchange(nullptr); assert(promise != nullptr); - promise->handle().resume(); + promise->resume_now(); } private: diff --git a/test/monitor_task.hpp b/test/monitor_task.hpp index b134aec..d0580a2 100644 --- a/test/monitor_task.hpp +++ b/test/monitor_task.hpp @@ -37,17 +37,19 @@ class [[nodiscard]] monitor_task { m_counters->exception = std::current_exception(); } - void resume() override { + void resume() final { m_counters->suspensions.fetch_add(1); - m_scheduler ? m_scheduler->schedule(*this) : handle().resume(); + m_scheduler ? m_scheduler->schedule(*this) : resume_now(); } - std::coroutine_handle<> handle() override { - return std::coroutine_handle::from_promise(*this); + void resume_now() final { + const auto handle = std::coroutine_handle::from_promise(*this); + handle.resume(); } void destroy() noexcept { - handle().destroy(); + const auto handle = std::coroutine_handle::from_promise(*this); + handle.destroy(); } const counters& get_counters() const { diff --git a/test/test_thread_pool.cpp b/test/test_thread_pool.cpp index 5c700d0..144cb8d 100644 --- a/test/test_thread_pool.cpp +++ b/test/test_thread_pool.cpp @@ -21,9 +21,8 @@ constexpr size_t branching = 10; struct test_promise : schedulable_promise { - std::coroutine_handle<> handle() override { + void resume_now() final { ++num_queried; - return std::noop_coroutine(); } std::atomic_size_t num_queried = 0; };