Skip to content

Commit

Permalink
sleep (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
petiaccja authored Dec 1, 2023
1 parent 42fd9d9 commit 44918ac
Show file tree
Hide file tree
Showing 21 changed files with 434 additions and 218 deletions.
17 changes: 12 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ cmake_minimum_required(VERSION 3.25)
project(async++)

set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
option(ENABLE_LLVM_COV OFF)
option(ENABLE_LLVM_ADDRESS_SANITIZER OFF)
option(ENABLE_LLVM_MEMORY_SANITIZER OFF)
option(ENABLE_LLVM_THREAD_SANITIZER OFF)
option(ASYNCPP_BUILD_TESTS "Build tests." ON)
option(ENABLE_LLVM_COV "Enable LLVM source-based code coverage." OFF)
option(ENABLE_LLVM_ADDRESS_SANITIZER "Enable LLVM address sanitizer." OFF)
option(ENABLE_LLVM_MEMORY_SANITIZER "Enable LLVM memory sanitizer." OFF)
option(ENABLE_LLVM_THREAD_SANITIZER "Enable LLVM thread sanitizer." OFF)

set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin")
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib")
Expand Down Expand Up @@ -35,6 +36,12 @@ if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
endif()
endif()

if (${ASYNCPP_BUILD_TESTS})
add_compile_definitions(ASYNCPP_BUILD_TESTS=1)
endif()

add_subdirectory(include/async++)
add_subdirectory(src)
add_subdirectory(test)
if (${ASYNCPP_BUILD_TESTS})
add_subdirectory(test)
endif()
37 changes: 37 additions & 0 deletions include/async++/promise.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

#include "awaitable.hpp"

#include <atomic>
#include <coroutine>
#include <utility>


namespace asyncpp {
Expand Down Expand Up @@ -91,4 +93,39 @@ struct result_promise<void> {
}
};


namespace impl {

class leak_checked_promise {
using snapshot_type = std::pair<intptr_t, intptr_t>;

public:
#ifdef ASYNCPP_BUILD_TESTS
leak_checked_promise() noexcept { num_alive.fetch_add(1, std::memory_order_relaxed); }
leak_checked_promise(const leak_checked_promise&) noexcept { num_alive.fetch_add(1, std::memory_order_relaxed); }
leak_checked_promise(leak_checked_promise&&) noexcept = delete;
leak_checked_promise& operator=(const leak_checked_promise&) noexcept { return *this; }
leak_checked_promise& operator=(leak_checked_promise&&) noexcept = delete;
~leak_checked_promise() {
num_alive.fetch_sub(1, std::memory_order_relaxed);
version.fetch_add(1, std::memory_order_relaxed);
}
#endif

static snapshot_type snapshot() noexcept {
return { num_alive.load(std::memory_order_relaxed), version.load(std::memory_order_relaxed) };
}

static bool check(snapshot_type s) noexcept {
const auto current = snapshot();
return current.first == s.first && current.second > s.second;
}

private:
inline static std::atomic_intptr_t num_alive = 0;
inline static std::atomic_intptr_t version = 0;
};

} // namespace impl

} // namespace asyncpp
6 changes: 3 additions & 3 deletions include/async++/shared_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace impl_shared_task {
};

template <class T>
struct promise : result_promise<T>, resumable_promise, schedulable_promise {
struct promise : result_promise<T>, resumable_promise, schedulable_promise, impl::leak_checked_promise {
struct final_awaitable {
constexpr bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<promise> handle) noexcept {
Expand Down Expand Up @@ -92,7 +92,7 @@ namespace impl_shared_task {
}

bool ready() const {
return m_awaiting.closed();
return INTERLEAVED(m_awaiting.closed());
}

private:
Expand All @@ -114,7 +114,7 @@ namespace impl_shared_task {
}

template <std::convertible_to<const resumable_promise&> Promise>
bool await_suspend(std::coroutine_handle<Promise> enclosing) {
bool await_suspend(std::coroutine_handle<Promise> enclosing) noexcept {
m_enclosing = &enclosing.promise();
const bool ready = m_awaited->await(this);
return !ready;
Expand Down
56 changes: 56 additions & 0 deletions include/async++/sleep.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#pragma once

#include "awaitable.hpp"
#include "promise.hpp"

#include <chrono>
#include <concepts>
#include <coroutine>


namespace asyncpp {

namespace impl_sleep {

using clock_type = std::chrono::steady_clock;

struct awaitable : basic_awaitable<void> {
explicit awaitable(clock_type::time_point time) noexcept;

bool await_ready() const noexcept;
template <std::convertible_to<const resumable_promise&> Promise>
void await_suspend(std::coroutine_handle<Promise> enclosing) noexcept;
void await_resume() const noexcept;
void on_ready() noexcept override;
auto get_time() const noexcept -> clock_type::time_point;

private:
void enqueue() noexcept;

clock_type::time_point m_time;
resumable_promise* m_enclosing = nullptr;
};

template <std::convertible_to<const resumable_promise&> Promise>
void awaitable::await_suspend(std::coroutine_handle<Promise> enclosing) noexcept {
m_enclosing = &enclosing.promise();
enqueue();
}

} // namespace impl_sleep


template <class Rep, class Period>
auto sleep_for(std::chrono::duration<Rep, Period> duration) {
using impl_sleep::clock_type;
return impl_sleep::awaitable{ clock_type::now() + duration };
}


template <class Clock, class Dur>
auto sleep_until(std::chrono::time_point<Clock, Dur> time_point) {
using impl_sleep::clock_type;
return impl_sleep::awaitable{ std::chrono::clock_cast<clock_type>(time_point) };
}

} // namespace asyncpp
2 changes: 1 addition & 1 deletion include/async++/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class stream;
namespace impl_stream {

template <class T>
struct promise : resumable_promise, schedulable_promise {
struct promise : resumable_promise, schedulable_promise, impl::leak_checked_promise {
struct yield_awaitable {
constexpr bool await_ready() const noexcept {
return false;
Expand Down
10 changes: 5 additions & 5 deletions include/async++/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class task;
namespace impl_task {

template <class T>
struct promise : result_promise<T>, resumable_promise, schedulable_promise {
struct promise : result_promise<T>, resumable_promise, schedulable_promise, impl::leak_checked_promise {
struct final_awaitable {
constexpr bool await_ready() const noexcept { return false; }
void await_suspend(std::coroutine_handle<promise> handle) const noexcept {
Expand All @@ -42,7 +42,7 @@ namespace impl_task {
}

constexpr auto initial_suspend() noexcept {
m_released.test_and_set();
INTERLEAVED(m_released.test_and_set());
return std::suspend_always{};
}

Expand All @@ -66,7 +66,7 @@ namespace impl_task {
}

void release() noexcept {
const auto released = m_released.test_and_set();
const auto released = INTERLEAVED(m_released.test_and_set());
if (released) {
handle().destroy();
}
Expand All @@ -85,7 +85,7 @@ namespace impl_task {
}

void resume() final {
[[maybe_unused]] const auto state = m_state.load();
[[maybe_unused]] const auto state = INTERLEAVED(m_state.load());
assert(state != READY);
return m_scheduler ? m_scheduler->schedule(*this) : handle().resume();
}
Expand Down Expand Up @@ -115,7 +115,7 @@ namespace impl_task {
}

template <std::convertible_to<const resumable_promise&> Promise>
bool await_suspend(std::coroutine_handle<Promise> enclosing) {
bool await_suspend(std::coroutine_handle<Promise> enclosing) noexcept {
m_enclosing = &enclosing.promise();
const bool ready = m_awaited->await(this);
return !ready;
Expand Down
4 changes: 2 additions & 2 deletions include/async++/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class thread_pool : public scheduler {
void add_work_item(schedulable_promise& promise);
schedulable_promise* get_work_item();
bool has_work() const;
bool is_terminated() const;
void terminate();
bool is_stopped() const;
void stop();
void wake() const;
void wait() const;

Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ target_sources(async++
thread_pool.cpp
mutex.cpp
shared_mutex.cpp
sleep.cpp
interleaving/runner.cpp
interleaving/sequencer.cpp
interleaving/state_tree.cpp
Expand Down
100 changes: 100 additions & 0 deletions src/sleep.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include <async++/sleep.hpp>

#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>


namespace asyncpp {

namespace impl_sleep {

struct awaiter_priority {
bool operator()(const awaitable* lhs, const awaitable* rhs) const noexcept {
return lhs->get_time() > rhs->get_time();
}
};


class sleep_scheduler {
public:
sleep_scheduler(sleep_scheduler&&) = delete;
sleep_scheduler& operator=(sleep_scheduler&&) = delete;
sleep_scheduler(const sleep_scheduler&) = delete;
sleep_scheduler& operator=(const sleep_scheduler&) = delete;
~sleep_scheduler() {
m_thread.request_stop();
m_cvar.notify_all();
}

static sleep_scheduler& get() {
static sleep_scheduler instance;
return instance;
}

void enqueue(awaitable* awaiter) noexcept {
{
std::lock_guard lk(m_mtx);
m_queue.push(awaiter);
}
m_cvar.notify_one();
}

private:
void awake(std::stop_token token) {
const auto stop_condition = [&] { return token.stop_requested() || !m_queue.empty(); };
while (!token.stop_requested()) {
std::unique_lock lk(m_mtx);
if (m_queue.empty()) {
m_cvar.wait(lk, stop_condition);
}
else {
const auto next = m_queue.top();
m_cvar.wait_until(lk, next->get_time(), stop_condition);
if (next->get_time() <= clock_type::now()) {
m_queue.pop();
lk.unlock();
next->on_ready();
}
}
}
}

sleep_scheduler() {
m_thread = std::jthread([this](std::stop_token token) { awake(token); });
}

private:
std::priority_queue<awaitable*, std::vector<awaitable*>, awaiter_priority> m_queue;
std::mutex m_mtx;
std::condition_variable m_cvar;
std::jthread m_thread;
};

awaitable::awaitable(clock_type::time_point time) noexcept
: m_time(time) {}

bool awaitable::await_ready() const noexcept {
return m_time < clock_type::now();
}

void awaitable::await_resume() const noexcept {
return;
}

void awaitable::on_ready() noexcept {
m_enclosing->resume();
}

void awaitable::enqueue() noexcept {
sleep_scheduler::get().enqueue(this);
}

auto awaitable::get_time() const noexcept -> clock_type::time_point {
return m_time;
}

} // namespace impl_sleep

} // namespace asyncpp
10 changes: 5 additions & 5 deletions src/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ thread_pool::thread_pool(size_t num_threads) {

thread_pool::~thread_pool() noexcept {
std::ranges::for_each(m_workers, [](auto& w) {
w->terminate();
w->stop();
w->wake();
});
}
Expand Down Expand Up @@ -67,7 +67,7 @@ void thread_pool::worker_function(std::shared_ptr<worker> w) noexcept {
m_free_workers.push(w.get());
w->wait();
}
} while (promise || !w->is_terminated());
} while (promise || !w->is_stopped());
}


Expand All @@ -86,12 +86,12 @@ bool thread_pool::worker::has_work() const {
}


bool thread_pool::worker::is_terminated() const {
bool thread_pool::worker::is_stopped() const {
return m_terminated.test();
}


void thread_pool::worker::terminate() {
void thread_pool::worker::stop() {
m_terminated.test_and_set();
}

Expand All @@ -104,7 +104,7 @@ void thread_pool::worker::wake() const {

void thread_pool::worker::wait() const {
std::unique_lock lk(m_wake_mutex);
m_wake_cv.wait(lk, [this] { return has_work() || is_terminated(); });
m_wake_cv.wait(lk, [this] { return has_work() || is_stopped(); });
}


Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ target_sources(test
test_task.cpp
test_thread_pool.cpp
test_event.cpp
test_sleep.cpp
)


Expand Down
Loading

0 comments on commit 44918ac

Please sign in to comment.