Skip to content

Commit

Permalink
event + fixes (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
petiaccja authored Nov 28, 2023
1 parent ae2db04 commit 9710430
Show file tree
Hide file tree
Showing 16 changed files with 278 additions and 69 deletions.
6 changes: 6 additions & 0 deletions include/async++/awaitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ namespace impl {

std::optional<std::variant<value_type, std::exception_ptr>> m_result;

task_result() = default;

task_result(value_type value) : m_result(std::move(value)) {}

task_result(std::exception_ptr value) : m_result(std::move(value)) {}

task_result& operator=(value_type value) {
m_result = std::move(value);
return *this;
Expand Down
120 changes: 120 additions & 0 deletions include/async++/event.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#pragma once

#include "container/atomic_collection.hpp"
#include "interleaving/sequence_point.hpp"
#include "promise.hpp"

#include <cassert>
#include <iostream>
#include <memory>


namespace asyncpp {

namespace impl_event {
template <class T>
class promise {
public:
struct awaitable : impl::basic_awaitable<T> {
awaitable* m_next = nullptr;

awaitable(promise* owner) noexcept : m_owner(owner) {}

bool await_ready() const noexcept {
return m_owner->ready();
}

template <std::convertible_to<const impl::resumable_promise&> Promise>
bool await_suspend(std::coroutine_handle<Promise> enclosing) noexcept {
m_enclosing = &enclosing.promise();
const bool ready = m_owner->await(this);
return !ready;
}

auto await_resume() -> typename impl::task_result<T>::reference {
return m_owner->get_result().get_or_throw();
}

void on_ready() noexcept final {
m_enclosing->resume();
}

private:
impl::resumable_promise* m_enclosing = nullptr;
promise* m_owner = nullptr;
};

public:
promise() = default;
promise(promise&&) = delete;
promise(const promise&) = delete;
promise& operator=(promise&&) = delete;
promise& operator=(const promise&) = delete;

void set(impl::task_result<T> result) noexcept {
m_result = std::move(result);
finalize();
}

awaitable await() noexcept {
return { this };
}

bool ready() const noexcept {
return INTERLEAVED(m_awaiters.closed());
}

private:
bool await(awaitable* awaiter) noexcept {
const auto previous = INTERLEAVED(m_awaiters.push(awaiter));
return m_awaiters.closed(previous);
}

void finalize() noexcept {
auto awaiter = INTERLEAVED(m_awaiters.close());
assert(!m_awaiters.closed(awaiter) && "cannot set event twice");
while (awaiter != nullptr) {
const auto next = awaiter->m_next;
awaiter->on_ready();
awaiter = next;
}
}

auto& get_result() noexcept {
return this->m_result;
}

private:
impl::task_result<T> m_result;
atomic_collection<awaitable, &awaitable::m_next> m_awaiters;
};

} // namespace impl_event


template <class T>
class event {
public:
event() = default;
event(const event&) = delete;
event(event&&) = delete;
event& operator=(const event&) = delete;
event& operator=(event&&) = delete;

auto operator co_await() noexcept {
return m_promise.await();
}

void set_value(T value) {
m_promise.set(std::move(value));
}

void set_exception(std::exception_ptr ex) {
m_promise.set(std::move(ex));
}

private:
impl_event::promise<T> m_promise;
};

} // namespace asyncpp
7 changes: 7 additions & 0 deletions include/async++/mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ class mutex {
bool lock_enqueue(awaitable* waiting);

public:
mutex() = default;
mutex(const mutex&) = delete;
mutex(mutex&&) = delete;
mutex& operator=(const mutex&) = delete;
mutex& operator=(mutex&&) = delete;
~mutex();

bool try_lock() noexcept;
awaitable unique() noexcept;
awaitable operator co_await() noexcept;
Expand Down
7 changes: 7 additions & 0 deletions include/async++/shared_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ class shared_mutex {
bool lock_enqueue_shared(shared_awaitable* waiting);

public:
shared_mutex() = default;
shared_mutex(const shared_mutex&) = delete;
shared_mutex(shared_mutex&&) = delete;
shared_mutex& operator=(const shared_mutex&) = delete;
shared_mutex& operator=(shared_mutex&&) = delete;
~shared_mutex();

bool try_lock() noexcept;
bool try_lock_shared() noexcept;
awaitable unique() noexcept;
Expand Down
44 changes: 9 additions & 35 deletions include/async++/shared_task.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#pragma once

#include "awaitable.hpp"
#include "container/atomic_collection.hpp"
#include "interleaving/sequence_point.hpp"
#include "promise.hpp"
#include "scheduler.hpp"
#include "container/atomic_collection.hpp"

#include <cassert>
#include <future>
#include <utility>


Expand Down Expand Up @@ -35,11 +34,8 @@ namespace impl_shared_task {
auto& owner = handle.promise();

auto awaiting = INTERLEAVED(owner.m_awaiting.close());
const auto state = INTERLEAVED(owner.m_state.exchange(owner.READY));
while (awaiting != nullptr) {
if (!(state == owner.EXCLUDE_FIRST && awaiting->m_next == nullptr)) {
awaiting->on_ready();
}
awaiting->on_ready();
awaiting = awaiting->m_next;
}

Expand All @@ -53,32 +49,20 @@ namespace impl_shared_task {
}

auto initial_suspend() noexcept {
acquire();
return std::suspend_always{};
}

void start() noexcept {
auto created = CREATED;
const bool success = INTERLEAVED(m_state.compare_exchange_strong(created, RUNNING));
if (success) {
const bool has_started = INTERLEAVED(m_started.test_and_set());
if (!has_started) {
acquire();
resume();
}
}

bool await(chained_awaitable<T>* awaiter) {
start();
const auto previous = INTERLEAVED(m_awaiting.push(awaiter));
if (previous == nullptr) {
auto created = CREATED;
const bool was_created = INTERLEAVED(m_state.compare_exchange_strong(created, EXCLUDE_FIRST));
if (was_created == CREATED) {
resume();
auto exclude_first = EXCLUDE_FIRST;
INTERLEAVED(m_state.compare_exchange_strong(exclude_first, RUNNING));
if (exclude_first == READY) {
return true;
}
}
}
return m_awaiting.closed(previous);
}

Expand Down Expand Up @@ -114,31 +98,21 @@ namespace impl_shared_task {
}

private:
static constexpr int CREATED = 1;
static constexpr int RUNNING = 2;
static constexpr int EXCLUDE_FIRST = 3;
static constexpr int READY = 4;
std::atomic_size_t m_references = 0;
std::atomic_int m_state = CREATED;
std::atomic_flag m_started;
atomic_collection<chained_awaitable<T>, &chained_awaitable<T>::m_next> m_awaiting;
};


template <class T>
struct awaitable : chained_awaitable<T> {
task_result<T> m_result;
promise<T>* m_awaited = nullptr;
resumable_promise* m_enclosing = nullptr;

awaitable(promise<T>* awaited) : m_awaited(awaited) {
m_awaited->acquire();
}
~awaitable() override {
m_awaited->release();
}
awaitable(promise<T>* awaited) : m_awaited(awaited) {}

constexpr bool await_ready() const noexcept {
return false;
return m_awaited->ready();
}

template <std::convertible_to<const resumable_promise&> Promise>
Expand Down
Loading

0 comments on commit 9710430

Please sign in to comment.