Skip to content

Commit

Permalink
fix threapool local, introduce resume_now() instead of handle().resume()
Browse files Browse the repository at this point in the history
  • Loading branch information
petiaccja committed Feb 28, 2024
1 parent 455a93c commit 4dcf76b
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 24 deletions.
3 changes: 1 addition & 2 deletions benchmark/benchmark_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ using namespace asyncpp;

struct noop_promise : schedulable_promise {
noop_promise(std::atomic_size_t* counter = nullptr) : counter(counter) {}
std::coroutine_handle<> handle() override {
void resume_now() override {
if (counter) {
counter->fetch_sub(1, std::memory_order_relaxed);
}
return std::noop_coroutine();
}
std::atomic_size_t* counter = nullptr;
};
Expand Down
2 changes: 1 addition & 1 deletion include/asyncpp/promise.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ struct resumable_promise {

struct schedulable_promise {
virtual ~schedulable_promise() = default;
virtual std::coroutine_handle<> handle() = 0;
virtual void resume_now() = 0;
schedulable_promise* m_scheduler_next = nullptr;
schedulable_promise* m_scheduler_prev = nullptr;
scheduler* m_scheduler = nullptr;
Expand Down
12 changes: 7 additions & 5 deletions include/asyncpp/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,18 @@ namespace impl_stream {
m_result = std::nullopt;
}

auto handle() -> std::coroutine_handle<> override {
return std::coroutine_handle<promise>::from_promise(*this);
void resume() final {
return m_scheduler ? m_scheduler->schedule(*this) : resume_now();
}

void resume() override {
return m_scheduler ? m_scheduler->schedule(*this) : handle().resume();
void resume_now() final {
const auto handle = std::coroutine_handle<promise>::from_promise(*this);
handle.resume();
}

void destroy() noexcept {
handle().destroy();
const auto handle = std::coroutine_handle<promise>::from_promise(*this);
handle.destroy();
}

auto await() noexcept;
Expand Down
10 changes: 6 additions & 4 deletions include/asyncpp/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ namespace impl_task {
return final_awaitable{};
}

auto handle() -> std::coroutine_handle<> final {
return std::coroutine_handle<promise>::from_promise(*this);
void resume_now() final {
const auto handle = std::coroutine_handle<promise>::from_promise(*this);
handle.resume();
}

void resume() final {
return m_scheduler ? m_scheduler->schedule(*this) : handle().resume();
return m_scheduler ? m_scheduler->schedule(*this) : resume_now();
}

void start() {
Expand All @@ -62,7 +63,8 @@ namespace impl_task {
}

void destroy() {
handle().destroy();
const auto handle = std::coroutine_handle<promise>::from_promise(*this);
handle.destroy();
}

private:
Expand Down
7 changes: 4 additions & 3 deletions include/asyncpp/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@ class thread_pool_3 : public scheduler {

private:
void run(pack& pack) {
m_local = this;
size_t stealing_attempt = pack.workers.size();
bool exit_loop = false;
while (!exit_loop) {
const auto promise = try_get_promise(pack, stealing_attempt, exit_loop);
if (promise) {
promise->handle().resume();
promise->resume_now();
}
}
}
Expand Down Expand Up @@ -161,8 +162,8 @@ class thread_pool_3 : public scheduler {
}

private:
pack m_pack;
std::atomic_ptrdiff_t m_next_in_schedule;
alignas(64) pack m_pack;
alignas(64) std::atomic_ptrdiff_t m_next_in_schedule;
inline static thread_local worker* m_local = nullptr;
};

Expand Down
2 changes: 1 addition & 1 deletion src/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void thread_pool::execute(worker& local,
std::span<worker> workers) {
do {
if (const auto item = INTERLEAVED(local.worklist.pop())) {
item->handle().resume();
item->resume_now();
continue;
}
else if (const auto item = INTERLEAVED(global_worklist.pop())) {
Expand Down
2 changes: 1 addition & 1 deletion test/helper_schedulers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class thread_locked_scheduler : public asyncpp::scheduler {
void resume() {
const auto promise = m_promise.exchange(nullptr);
assert(promise != nullptr);
promise->handle().resume();
promise->resume_now();
}

private:
Expand Down
12 changes: 7 additions & 5 deletions test/monitor_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,19 @@ class [[nodiscard]] monitor_task {
m_counters->exception = std::current_exception();
}

void resume() override {
void resume() final {
m_counters->suspensions.fetch_add(1);
m_scheduler ? m_scheduler->schedule(*this) : handle().resume();
m_scheduler ? m_scheduler->schedule(*this) : resume_now();
}

std::coroutine_handle<> handle() override {
return std::coroutine_handle<promise>::from_promise(*this);
void resume_now() final {
const auto handle = std::coroutine_handle<promise>::from_promise(*this);
handle.resume();
}

void destroy() noexcept {
handle().destroy();
const auto handle = std::coroutine_handle<promise>::from_promise(*this);
handle.destroy();
}

const counters& get_counters() const {
Expand Down
3 changes: 1 addition & 2 deletions test/test_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ constexpr size_t branching = 10;


struct test_promise : schedulable_promise {
std::coroutine_handle<> handle() override {
void resume_now() final {
++num_queried;
return std::noop_coroutine();
}
std::atomic_size_t num_queried = 0;
};
Expand Down

0 comments on commit 4dcf76b

Please sign in to comment.