Skip to content

Commit

Permalink
add semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
petiaccja committed Mar 8, 2024
1 parent 4d9ab85 commit c22084b
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 0 deletions.
54 changes: 54 additions & 0 deletions include/asyncpp/semaphore.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once

#include "container/atomic_deque.hpp"
#include "promise.hpp"
#include "threading/spinlock.hpp"

#include <cassert>
#include <cstdint>


namespace asyncpp {

class counting_semaphore {
struct awaitable {
counting_semaphore* m_owner = nullptr;
resumable_promise* m_enclosing = nullptr;
awaitable* m_prev = nullptr;
awaitable* m_next = nullptr;

bool await_ready() const noexcept;

template <std::convertible_to<const resumable_promise&> Promise>
bool await_suspend(std::coroutine_handle<Promise> promise) noexcept {
assert(m_owner);
m_enclosing = &promise.promise();
return !m_owner->acquire(this);
}

constexpr void await_resume() const noexcept {}
};

public:
counting_semaphore(ptrdiff_t current_counter = 0, ptrdiff_t max_counter = std::numeric_limits<ptrdiff_t>::max()) noexcept;

bool try_acquire() noexcept;
awaitable operator co_await() noexcept;
void release() noexcept;
ptrdiff_t max() const noexcept;

ptrdiff_t _debug_get_counter() const noexcept;
deque<awaitable, &awaitable::m_prev, &awaitable::m_next>& _debug_get_awaiters();
void _debug_clear();

private:
bool acquire(awaitable* waiting) noexcept;

private:
spinlock m_spinlock;
deque<awaitable, &awaitable::m_prev, &awaitable::m_next> m_awaiters;
ptrdiff_t m_counter = 0;
const ptrdiff_t m_max;
};

} // namespace asyncpp
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ target_sources(asyncpp
shared_mutex.cpp
sleep.cpp
testing/interleaver.cpp
semaphore.cpp
)

target_link_libraries(asyncpp asyncpp-headers)
84 changes: 84 additions & 0 deletions src/semaphore.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#include <asyncpp/semaphore.hpp>

#include <mutex>


namespace asyncpp {


bool counting_semaphore::awaitable::await_ready() const noexcept {
assert(m_owner);
return m_owner->try_acquire();
}


counting_semaphore::counting_semaphore(ptrdiff_t current_counter, ptrdiff_t max_counter) noexcept : m_counter(current_counter), m_max(max_counter) {}
bool counting_semaphore::try_acquire() noexcept {
std::lock_guard lk(m_spinlock);
if (m_counter > 0) {
--m_counter;
return true;
}
return false;
}


counting_semaphore::awaitable counting_semaphore::operator co_await() noexcept {
return { this };
}


void counting_semaphore::release() noexcept {
std::unique_lock lk(m_spinlock);
++m_counter;
const auto resumed = m_awaiters.pop_front();
if (resumed) {
--m_counter;
lk.unlock();
assert(resumed->m_enclosing);
resumed->m_enclosing->resume();
}
else {
if (m_counter > m_max) {
std::terminate(); // You released the semaphore too many times.
}
}
}


ptrdiff_t counting_semaphore::max() const noexcept {
return m_max;
}


ptrdiff_t counting_semaphore::_debug_get_counter() const noexcept {
return m_counter;
}


deque<counting_semaphore::awaitable, &counting_semaphore::awaitable::m_prev, &counting_semaphore::awaitable::m_next>& counting_semaphore::_debug_get_awaiters() {
return m_awaiters;
}


void counting_semaphore::_debug_clear() {
m_awaiters.~deque();
new (&m_awaiters) decltype(m_awaiters);
m_counter = m_max;
}


bool counting_semaphore::acquire(awaitable* waiting) noexcept {
std::lock_guard lk(m_spinlock);
if (m_counter > 0) {
--m_counter;
return true;
}
else {
m_awaiters.push_back(waiting);
return false;
}
}


} // namespace asyncpp
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ target_sources(test
test_thread_pool.cpp
test_event.cpp
test_sleep.cpp
test_semaphore.cpp
testing/test_interleaver.cpp
helper_schedulers.hpp
monitor_task.hpp
Expand Down
103 changes: 103 additions & 0 deletions test/test_semaphore.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#include "monitor_task.hpp"

#include <asyncpp/semaphore.hpp>

#include <catch2/catch_test_macros.hpp>

using namespace asyncpp;


struct [[nodiscard]] scope_clear {
~scope_clear() {
sema._debug_clear();
}
counting_semaphore& sema;
};


static monitor_task acquire(counting_semaphore& sema) {
co_await sema;
}


TEST_CASE("Semaphore - try_acquire", "[Semaphore]") {
SECTION("success") {
counting_semaphore sema(1);
REQUIRE(sema.try_acquire());
REQUIRE(sema._debug_get_awaiters().empty());
REQUIRE(sema._debug_get_counter() == 0);
}
SECTION("failure") {
counting_semaphore sema(0);
REQUIRE(!sema.try_acquire());
REQUIRE(sema._debug_get_awaiters().empty());
REQUIRE(sema._debug_get_counter() == 0);
}
}


TEST_CASE("Semaphore - acquire direct immediate", "[Semaphore]") {
SECTION("success") {
counting_semaphore sema(1);
scope_clear guard(sema);
auto monitor = acquire(sema);
REQUIRE(monitor.get_counters().done);
REQUIRE(sema._debug_get_awaiters().empty());
REQUIRE(sema._debug_get_counter() == 0);
}
SECTION("failure") {
counting_semaphore sema(0);
scope_clear guard(sema);
auto monitor = acquire(sema);
REQUIRE(!monitor.get_counters().done);
REQUIRE(!sema._debug_get_awaiters().empty());
REQUIRE(sema._debug_get_counter() == 0);
}
}


TEST_CASE("Semaphore - acquire spurious immediate", "[Semaphore]") {
SECTION("success") {
counting_semaphore sema(1);
scope_clear guard(sema);

auto monitor = []() -> monitor_task { co_return; }();
auto awaiter = sema.operator co_await();

REQUIRE(false == awaiter.await_suspend(monitor.handle()));
REQUIRE(sema._debug_get_awaiters().empty());
REQUIRE(sema._debug_get_counter() == 0);
}
SECTION("failure") {
counting_semaphore sema(0);
scope_clear guard(sema);

auto monitor = []() -> monitor_task { co_return; }();
auto awaiter = sema.operator co_await();

REQUIRE(true == awaiter.await_suspend(monitor.handle()));
REQUIRE(!sema._debug_get_awaiters().empty());
REQUIRE(sema._debug_get_counter() == 0);
}
}


TEST_CASE("Semaphore - release", "[Semaphore]") {
counting_semaphore sema(0);

auto monitor1 = acquire(sema);
auto monitor2 = acquire(sema);

REQUIRE(!monitor1.get_counters().done);
REQUIRE(!monitor2.get_counters().done);

sema.release();

REQUIRE(monitor1.get_counters().done);
REQUIRE(!monitor2.get_counters().done);

sema.release();

REQUIRE(monitor1.get_counters().done);
REQUIRE(monitor2.get_counters().done);
}

0 comments on commit c22084b

Please sign in to comment.