Skip to content

Commit

Permalink
mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
petiaccja committed Nov 17, 2023
1 parent 7ec4f4a commit 4a08066
Show file tree
Hide file tree
Showing 13 changed files with 450 additions and 18 deletions.
68 changes: 68 additions & 0 deletions include/async++/container/atomic_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#pragma once

#include "../sync/spinlock.hpp"

#include <atomic>


namespace asyncpp {

template <class Element, Element* Element::*next, Element* Element::*prev>
class atomic_queue {
public:
Element* push(Element* element) noexcept {
std::lock_guard lk(m_mtx);
const auto prev_front = m_front.load(std::memory_order_relaxed);
element->*prev = prev_front;
m_front.store(element, std::memory_order_relaxed);
if (prev_front == nullptr) {
m_back.store(element, std::memory_order_relaxed);
}
else {
prev_front->*next = element;
}
return prev_front;
}

bool compare_push(Element*& expected, Element* element) {
std::lock_guard lk(m_mtx);
const auto prev_front = m_front.load(std::memory_order_relaxed);
if (prev_front == expected) {
element->*prev = prev_front;
m_front.store(element, std::memory_order_relaxed);
if (prev_front == nullptr) {
m_back.store(element, std::memory_order_relaxed);
}
else {
prev_front->*next = element;
}
return true;
}
expected = prev_front;
return false;
}

Element* pop() noexcept {
std::lock_guard lk(m_mtx);
const auto prev_back = m_back.load(std::memory_order_relaxed);
if (prev_back != nullptr) {
const auto new_back = prev_back->*next;
m_back.store(new_back, std::memory_order_relaxed);
if (new_back == nullptr) {
m_front.store(nullptr, std::memory_order_relaxed);
}
}
return prev_back;
}

bool empty() const noexcept {
return m_back.load(std::memory_order_relaxed) == nullptr;
}

private:
std::atomic<Element*> m_front;
std::atomic<Element*> m_back;
mutable spinlock m_mtx;
};

} // namespace asyncpp
153 changes: 153 additions & 0 deletions include/async++/mutex.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#pragma once

#include "container/atomic_queue.hpp"
#include "promise.hpp"
#include "sync/spinlock.hpp"

#include <cassert>
#include <concepts>
#include <mutex>
#include <utility>


namespace asyncpp {

template <class Mutex, void (Mutex::*unlock)()>
class [[nodiscard]] lock {
public:
lock(Mutex* mtx) : m_mtx(mtx) {}
lock(lock&& rhs) : m_mtx(std::exchange(rhs.m_mtx, nullptr)) {}
lock& operator=(lock&& rhs) {
if (m_mtx) {
m_mtx->unlock();
}
m_mtx = std::exchange(rhs.m_mtx, nullptr);
return *this;
}
~lock() {
if (m_mtx) {
(m_mtx->*unlock)();
}
}
Mutex& parent() const noexcept {
return *m_mtx;
}

private:
Mutex* m_mtx = nullptr;
};


class mutex {
void unlock();

public:
using lock = lock<mutex, &mutex::unlock>;
friend lock;

private:
struct awaitable {
awaitable* m_next = nullptr;
awaitable* m_prev = nullptr;

awaitable(mutex* mtx) : m_mtx(mtx) {}
bool await_ready() noexcept;
template <std::convertible_to<const impl::resumable_promise&> Promise>
bool await_suspend(std::coroutine_handle<Promise> enclosing) noexcept;
lock await_resume() noexcept;
void on_ready(lock lk) noexcept;

private:
mutex* m_mtx;
impl::resumable_promise* m_enclosing = nullptr;
std::optional<lock> m_lk;
};

public:
[[nodiscard]] std::optional<lock> try_lock() noexcept;
awaitable unique() noexcept;
awaitable operator co_await() noexcept;

private:
std::optional<lock> wait(awaitable* waiting);

private:
atomic_queue<awaitable, &awaitable::m_next, &awaitable::m_prev> m_queue;
bool m_locked = false;
spinlock m_spinlock;
};


template <std::convertible_to<const impl::resumable_promise&> Promise>
bool mutex::awaitable::await_suspend(std::coroutine_handle<Promise> enclosing) noexcept {
m_enclosing = &enclosing.promise();
m_lk = m_mtx->wait(this);
return !m_lk.has_value();
}


template <class Mutex>
class unique_lock {
using mutex_awaitable = std::invoke_result_t<decltype(&Mutex::operator co_await), Mutex*>;
struct awaitable {
unique_lock* m_lock;
mutex_awaitable m_awaitable;

auto await_ready() noexcept {
return m_awaitable.await_ready();
}

template <class Promise>
auto await_suspend(std::coroutine_handle<Promise> enclosing) noexcept {
return m_awaitable.await_suspend(enclosing);
}

void await_resume() noexcept {
m_lock->m_lk = m_awaitable.await_resume();
}
};

public:
unique_lock(Mutex& mtx) noexcept : m_mtx(mtx) {}

template <void (Mutex::*unlock)()>
unique_lock(lock<Mutex, unlock> lk) noexcept : m_mtx(lk.parent()), m_lk(std::move(lk)) {}

bool try_lock() noexcept {
assert(!owns_lock());
m_lk = m_mtx.try_lock();
return m_lk.has_value();
}

auto operator co_await() noexcept {
assert(!owns_lock());
return awaitable(this, m_mtx.unique());
}

void unlock() noexcept {
assert(owns_lock());
m_lk = std::nullopt;
}

Mutex& mutex() const noexcept {
return m_mtx;
}

bool owns_lock() const noexcept {
return m_lk.has_value();
}

operator bool() const noexcept {
return owns_lock();
}

private:
Mutex& m_mtx;
std::optional<typename Mutex::lock> m_lk;
};


template <class Mutex_, void (Mutex_::*unlock)()>
unique_lock(lock<Mutex_, unlock> lk) -> unique_lock<Mutex_>;

} // namespace asyncpp
10 changes: 5 additions & 5 deletions include/async++/shared_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,21 +161,21 @@ namespace impl_shared_task {
template <class T>
struct sync_awaitable : chained_awaitable<T> {
promise<T>* m_awaited = nullptr;
std::promise<task_result<T>&> m_promise;
std::future<task_result<T>&> m_future = m_promise.get_future();
std::promise<task_result<T>*> m_promise;
std::future<task_result<T>*> m_future = m_promise.get_future();

sync_awaitable(promise<T>* awaited) noexcept : m_awaited(awaited) {
m_awaited->acquire();
const bool ready = m_awaited->await(this);
if (ready) {
m_promise.set_value(m_awaited->get_result());
m_promise.set_value(&m_awaited->get_result());
}
}
~sync_awaitable() override {
m_awaited->release();
}
void on_ready() noexcept final {
return m_promise.set_value(m_awaited->get_result());
return m_promise.set_value(&m_awaited->get_result());
}
};

Expand Down Expand Up @@ -237,7 +237,7 @@ class shared_task {
auto get() const -> typename impl::task_result<T>::reference {
assert(valid());
impl_shared_task::sync_awaitable<T> awaitable(m_promise);
return INTERLEAVED_ACQUIRE(awaitable.m_future.get()).get_or_throw();
return INTERLEAVED_ACQUIRE(awaitable.m_future.get())->get_or_throw();
}

auto operator co_await() const {
Expand Down
14 changes: 12 additions & 2 deletions include/async++/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ namespace impl_task {
m_result = m_awaited->get_result();
m_awaited->release();
}
return std::forward<T>(m_result.get_or_throw());
if constexpr (!std::is_void_v<T>) {
return std::forward<T>(m_result.get_or_throw());
}
else {
m_result.get_or_throw();
}
}

void on_ready() noexcept final {
Expand Down Expand Up @@ -203,7 +208,12 @@ class [[nodiscard]] task {
T get() {
assert(valid());
impl_task::sync_awaitable<T> awaitable(std::exchange(m_promise, nullptr));
return std::forward<T>(INTERLEAVED_ACQUIRE(awaitable.m_future.get()).get_or_throw());
if constexpr (!std::is_void_v<T>) {
return std::forward<T>(INTERLEAVED_ACQUIRE(awaitable.m_future.get()).get_or_throw());
}
else {
INTERLEAVED_ACQUIRE(awaitable.m_future.get()).get_or_throw();
}
}

auto operator co_await() {
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_library(async++)
target_sources(async++
PRIVATE
thread_pool.cpp
mutex.cpp
interleaving/runner.cpp
interleaving/sequencer.cpp
interleaving/state_tree.cpp
Expand Down
66 changes: 66 additions & 0 deletions src/mutex.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include <async++/mutex.hpp>

namespace asyncpp {

bool mutex::awaitable::await_ready() noexcept {
m_lk = m_mtx->try_lock();
return m_lk.has_value();
}


mutex::lock mutex::awaitable::await_resume() noexcept {
assert(m_lk);
return std::move(m_lk.value());
}


void mutex::awaitable::on_ready(lock lk) noexcept {
m_lk = std::move(lk);
assert(m_enclosing);
m_enclosing->resume();
}


std::optional<mutex::lock> mutex::try_lock() noexcept {
std::lock_guard lk(m_spinlock);
if (std::exchange(m_locked, true) == false) {
return lock(this);
}
return std::nullopt;
}


mutex::awaitable mutex::unique() noexcept {
return awaitable(this);
}


mutex::awaitable mutex::operator co_await() noexcept {
return unique();
}


std::optional<mutex::lock> mutex::wait(awaitable* waiting) {
std::lock_guard lk(m_spinlock);
const bool acquired = std::exchange(m_locked, true) == false;
if (acquired) {
return lock(this);
}
m_queue.push(waiting);
return std::nullopt;
}


void mutex::unlock() {
std::unique_lock lk(m_spinlock);
assert(m_locked);
m_locked = false;
awaitable* const next = m_queue.pop();
lk.unlock();
if (next) {
m_locked = true;
next->on_ready(lock(this));
}
}

} // namespace asyncpp
6 changes: 4 additions & 2 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ target_sources(test
test_stream.cpp
test_thread_pool.cpp
test_task.cpp
test_atomic_stack.cpp
test_mutex.cpp
interleaving/test_runner.cpp
test_shared_task.cpp
test_atomic_collection.cpp
container/test_atomic_queue.cpp
container/test_atomic_stack.cpp
container/test_atomic_collection.cpp
)


Expand Down
File renamed without changes.
Loading

0 comments on commit 4a08066

Please sign in to comment.