Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

event + fixes #5

Merged
merged 5 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/async++/awaitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ namespace impl {

std::optional<std::variant<value_type, std::exception_ptr>> m_result;

task_result() = default;

task_result(value_type value) : m_result(std::move(value)) {}

task_result(std::exception_ptr value) : m_result(std::move(value)) {}

task_result& operator=(value_type value) {
m_result = std::move(value);
return *this;
Expand Down
120 changes: 120 additions & 0 deletions include/async++/event.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#pragma once

#include "container/atomic_collection.hpp"
#include "interleaving/sequence_point.hpp"
#include "promise.hpp"

#include <cassert>
#include <iostream>
#include <memory>


namespace asyncpp {

namespace impl_event {
template <class T>
class promise {
public:
struct awaitable : impl::basic_awaitable<T> {
awaitable* m_next = nullptr;

awaitable(promise* owner) noexcept : m_owner(owner) {}

bool await_ready() const noexcept {
return m_owner->ready();
}

template <std::convertible_to<const impl::resumable_promise&> Promise>
bool await_suspend(std::coroutine_handle<Promise> enclosing) noexcept {
m_enclosing = &enclosing.promise();
const bool ready = m_owner->await(this);
return !ready;
}

auto await_resume() -> typename impl::task_result<T>::reference {
return m_owner->get_result().get_or_throw();
}

void on_ready() noexcept final {
m_enclosing->resume();
}

private:
impl::resumable_promise* m_enclosing = nullptr;
promise* m_owner = nullptr;
};

public:
promise() = default;
promise(promise&&) = delete;
promise(const promise&) = delete;
promise& operator=(promise&&) = delete;
promise& operator=(const promise&) = delete;

void set(impl::task_result<T> result) noexcept {
m_result = std::move(result);
finalize();
}

awaitable await() noexcept {
return { this };
}

bool ready() const noexcept {
return INTERLEAVED(m_awaiters.closed());
}

private:
bool await(awaitable* awaiter) noexcept {
const auto previous = INTERLEAVED(m_awaiters.push(awaiter));
return m_awaiters.closed(previous);
}

void finalize() noexcept {
auto awaiter = INTERLEAVED(m_awaiters.close());
assert(!m_awaiters.closed(awaiter) && "cannot set event twice");
while (awaiter != nullptr) {
const auto next = awaiter->m_next;
awaiter->on_ready();
awaiter = next;
}
}

auto& get_result() noexcept {
return this->m_result;
}

private:
impl::task_result<T> m_result;
atomic_collection<awaitable, &awaitable::m_next> m_awaiters;
};

} // namespace impl_event


template <class T>
class event {
public:
event() = default;
event(const event&) = delete;
event(event&&) = delete;
event& operator=(const event&) = delete;
event& operator=(event&&) = delete;

auto operator co_await() noexcept {
return m_promise.await();
}

void set_value(T value) {
m_promise.set(std::move(value));
}

void set_exception(std::exception_ptr ex) {
m_promise.set(std::move(ex));
}

private:
impl_event::promise<T> m_promise;
};

} // namespace asyncpp
7 changes: 7 additions & 0 deletions include/async++/mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ class mutex {
bool lock_enqueue(awaitable* waiting);

public:
mutex() = default;
mutex(const mutex&) = delete;
mutex(mutex&&) = delete;
mutex& operator=(const mutex&) = delete;
mutex& operator=(mutex&&) = delete;
~mutex();

bool try_lock() noexcept;
awaitable unique() noexcept;
awaitable operator co_await() noexcept;
Expand Down
7 changes: 7 additions & 0 deletions include/async++/shared_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ class shared_mutex {
bool lock_enqueue_shared(shared_awaitable* waiting);

public:
shared_mutex() = default;
shared_mutex(const shared_mutex&) = delete;
shared_mutex(shared_mutex&&) = delete;
shared_mutex& operator=(const shared_mutex&) = delete;
shared_mutex& operator=(shared_mutex&&) = delete;
~shared_mutex();

bool try_lock() noexcept;
bool try_lock_shared() noexcept;
awaitable unique() noexcept;
Expand Down
44 changes: 9 additions & 35 deletions include/async++/shared_task.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#pragma once

#include "awaitable.hpp"
#include "container/atomic_collection.hpp"
#include "interleaving/sequence_point.hpp"
#include "promise.hpp"
#include "scheduler.hpp"
#include "container/atomic_collection.hpp"

#include <cassert>
#include <future>
#include <utility>


Expand Down Expand Up @@ -35,11 +34,8 @@ namespace impl_shared_task {
auto& owner = handle.promise();

auto awaiting = INTERLEAVED(owner.m_awaiting.close());
const auto state = INTERLEAVED(owner.m_state.exchange(owner.READY));
while (awaiting != nullptr) {
if (!(state == owner.EXCLUDE_FIRST && awaiting->m_next == nullptr)) {
awaiting->on_ready();
}
awaiting->on_ready();
awaiting = awaiting->m_next;
}

Expand All @@ -53,32 +49,20 @@ namespace impl_shared_task {
}

auto initial_suspend() noexcept {
acquire();
return std::suspend_always{};
}

void start() noexcept {
auto created = CREATED;
const bool success = INTERLEAVED(m_state.compare_exchange_strong(created, RUNNING));
if (success) {
const bool has_started = INTERLEAVED(m_started.test_and_set());
if (!has_started) {
acquire();
resume();
}
}

bool await(chained_awaitable<T>* awaiter) {
start();
const auto previous = INTERLEAVED(m_awaiting.push(awaiter));
if (previous == nullptr) {
auto created = CREATED;
const bool was_created = INTERLEAVED(m_state.compare_exchange_strong(created, EXCLUDE_FIRST));
if (was_created == CREATED) {
resume();
auto exclude_first = EXCLUDE_FIRST;
INTERLEAVED(m_state.compare_exchange_strong(exclude_first, RUNNING));
if (exclude_first == READY) {
return true;
}
}
}
return m_awaiting.closed(previous);
}

Expand Down Expand Up @@ -114,31 +98,21 @@ namespace impl_shared_task {
}

private:
static constexpr int CREATED = 1;
static constexpr int RUNNING = 2;
static constexpr int EXCLUDE_FIRST = 3;
static constexpr int READY = 4;
std::atomic_size_t m_references = 0;
std::atomic_int m_state = CREATED;
std::atomic_flag m_started;
atomic_collection<chained_awaitable<T>, &chained_awaitable<T>::m_next> m_awaiting;
};


template <class T>
struct awaitable : chained_awaitable<T> {
task_result<T> m_result;
promise<T>* m_awaited = nullptr;
resumable_promise* m_enclosing = nullptr;

awaitable(promise<T>* awaited) : m_awaited(awaited) {
m_awaited->acquire();
}
~awaitable() override {
m_awaited->release();
}
awaitable(promise<T>* awaited) : m_awaited(awaited) {}

constexpr bool await_ready() const noexcept {
return false;
return m_awaited->ready();
}

template <std::convertible_to<const resumable_promise&> Promise>
Expand Down
Loading
Loading