From 44918ac068c1885bdb12c400caa9ea3e2253cef4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Kardos?= Date: Fri, 1 Dec 2023 22:23:40 +0100 Subject: [PATCH] sleep (#7) --- CMakeLists.txt | 17 ++- include/async++/promise.hpp | 37 ++++++ include/async++/shared_task.hpp | 6 +- include/async++/sleep.hpp | 56 +++++++++ include/async++/stream.hpp | 2 +- include/async++/task.hpp | 10 +- include/async++/thread_pool.hpp | 4 +- src/CMakeLists.txt | 1 + src/sleep.cpp | 100 ++++++++++++++++ src/thread_pool.cpp | 10 +- test/CMakeLists.txt | 1 + test/helper_interleaving.hpp | 108 ++++++++++++++++++ ...t_schedulers.hpp => helper_schedulers.hpp} | 10 +- test/leak_tester.hpp | 15 --- test/main.cpp | 14 +++ test/test_event.cpp | 13 +-- test/test_shared_task.cpp | 95 +++------------ test/test_sleep.cpp | 44 +++++++ test/test_stream.cpp | 16 ++- test/test_task.cpp | 91 ++------------- test/test_thread_pool.cpp | 2 - 21 files changed, 434 insertions(+), 218 deletions(-) create mode 100644 include/async++/sleep.hpp create mode 100644 src/sleep.cpp create mode 100644 test/helper_interleaving.hpp rename test/{test_schedulers.hpp => helper_schedulers.hpp} (82%) delete mode 100644 test/leak_tester.hpp create mode 100644 test/test_sleep.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 4f96b23..8bfff66 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,10 +3,11 @@ cmake_minimum_required(VERSION 3.25) project(async++) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) -option(ENABLE_LLVM_COV OFF) -option(ENABLE_LLVM_ADDRESS_SANITIZER OFF) -option(ENABLE_LLVM_MEMORY_SANITIZER OFF) -option(ENABLE_LLVM_THREAD_SANITIZER OFF) +option(ASYNCPP_BUILD_TESTS "Build tests." ON) +option(ENABLE_LLVM_COV "Enable LLVM source-based code coverage." OFF) +option(ENABLE_LLVM_ADDRESS_SANITIZER "Enable LLVM address sanitizer." OFF) +option(ENABLE_LLVM_MEMORY_SANITIZER "Enable LLVM memory sanitizer." OFF) +option(ENABLE_LLVM_THREAD_SANITIZER "Enable LLVM thread sanitizer." OFF) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin") set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib") @@ -35,6 +36,12 @@ if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang") endif() endif() +if (${ASYNCPP_BUILD_TESTS}) + add_compile_definitions(ASYNCPP_BUILD_TESTS=1) +endif() + add_subdirectory(include/async++) add_subdirectory(src) -add_subdirectory(test) \ No newline at end of file +if (${ASYNCPP_BUILD_TESTS}) + add_subdirectory(test) +endif() \ No newline at end of file diff --git a/include/async++/promise.hpp b/include/async++/promise.hpp index ce8f443..590b33d 100644 --- a/include/async++/promise.hpp +++ b/include/async++/promise.hpp @@ -2,7 +2,9 @@ #include "awaitable.hpp" +#include #include +#include namespace asyncpp { @@ -91,4 +93,39 @@ struct result_promise { } }; + +namespace impl { + + class leak_checked_promise { + using snapshot_type = std::pair; + + public: +#ifdef ASYNCPP_BUILD_TESTS + leak_checked_promise() noexcept { num_alive.fetch_add(1, std::memory_order_relaxed); } + leak_checked_promise(const leak_checked_promise&) noexcept { num_alive.fetch_add(1, std::memory_order_relaxed); } + leak_checked_promise(leak_checked_promise&&) noexcept = delete; + leak_checked_promise& operator=(const leak_checked_promise&) noexcept { return *this; } + leak_checked_promise& operator=(leak_checked_promise&&) noexcept = delete; + ~leak_checked_promise() { + num_alive.fetch_sub(1, std::memory_order_relaxed); + version.fetch_add(1, std::memory_order_relaxed); + } +#endif + + static snapshot_type snapshot() noexcept { + return { num_alive.load(std::memory_order_relaxed), version.load(std::memory_order_relaxed) }; + } + + static bool check(snapshot_type s) noexcept { + const auto current = snapshot(); + return current.first == s.first && current.second > s.second; + } + + private: + inline static std::atomic_intptr_t num_alive = 0; + inline static std::atomic_intptr_t version = 0; + }; + +} // namespace impl + } // namespace asyncpp \ No newline at end of file diff --git a/include/async++/shared_task.hpp b/include/async++/shared_task.hpp index bc66b25..91ab58f 100644 --- a/include/async++/shared_task.hpp +++ b/include/async++/shared_task.hpp @@ -25,7 +25,7 @@ namespace impl_shared_task { }; template - struct promise : result_promise, resumable_promise, schedulable_promise { + struct promise : result_promise, resumable_promise, schedulable_promise, impl::leak_checked_promise { struct final_awaitable { constexpr bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle handle) noexcept { @@ -92,7 +92,7 @@ namespace impl_shared_task { } bool ready() const { - return m_awaiting.closed(); + return INTERLEAVED(m_awaiting.closed()); } private: @@ -114,7 +114,7 @@ namespace impl_shared_task { } template Promise> - bool await_suspend(std::coroutine_handle enclosing) { + bool await_suspend(std::coroutine_handle enclosing) noexcept { m_enclosing = &enclosing.promise(); const bool ready = m_awaited->await(this); return !ready; diff --git a/include/async++/sleep.hpp b/include/async++/sleep.hpp new file mode 100644 index 0000000..b216e11 --- /dev/null +++ b/include/async++/sleep.hpp @@ -0,0 +1,56 @@ +#pragma once + +#include "awaitable.hpp" +#include "promise.hpp" + +#include +#include +#include + + +namespace asyncpp { + +namespace impl_sleep { + + using clock_type = std::chrono::steady_clock; + + struct awaitable : basic_awaitable { + explicit awaitable(clock_type::time_point time) noexcept; + + bool await_ready() const noexcept; + template Promise> + void await_suspend(std::coroutine_handle enclosing) noexcept; + void await_resume() const noexcept; + void on_ready() noexcept override; + auto get_time() const noexcept -> clock_type::time_point; + + private: + void enqueue() noexcept; + + clock_type::time_point m_time; + resumable_promise* m_enclosing = nullptr; + }; + + template Promise> + void awaitable::await_suspend(std::coroutine_handle enclosing) noexcept { + m_enclosing = &enclosing.promise(); + enqueue(); + } + +} // namespace impl_sleep + + +template +auto sleep_for(std::chrono::duration duration) { + using impl_sleep::clock_type; + return impl_sleep::awaitable{ clock_type::now() + duration }; +} + + +template +auto sleep_until(std::chrono::time_point time_point) { + using impl_sleep::clock_type; + return impl_sleep::awaitable{ std::chrono::clock_cast(time_point) }; +} + +} // namespace asyncpp \ No newline at end of file diff --git a/include/async++/stream.hpp b/include/async++/stream.hpp index b99143d..56dc54e 100644 --- a/include/async++/stream.hpp +++ b/include/async++/stream.hpp @@ -23,7 +23,7 @@ class stream; namespace impl_stream { template - struct promise : resumable_promise, schedulable_promise { + struct promise : resumable_promise, schedulable_promise, impl::leak_checked_promise { struct yield_awaitable { constexpr bool await_ready() const noexcept { return false; diff --git a/include/async++/task.hpp b/include/async++/task.hpp index 460a0b2..c9f6a26 100644 --- a/include/async++/task.hpp +++ b/include/async++/task.hpp @@ -22,7 +22,7 @@ class task; namespace impl_task { template - struct promise : result_promise, resumable_promise, schedulable_promise { + struct promise : result_promise, resumable_promise, schedulable_promise, impl::leak_checked_promise { struct final_awaitable { constexpr bool await_ready() const noexcept { return false; } void await_suspend(std::coroutine_handle handle) const noexcept { @@ -42,7 +42,7 @@ namespace impl_task { } constexpr auto initial_suspend() noexcept { - m_released.test_and_set(); + INTERLEAVED(m_released.test_and_set()); return std::suspend_always{}; } @@ -66,7 +66,7 @@ namespace impl_task { } void release() noexcept { - const auto released = m_released.test_and_set(); + const auto released = INTERLEAVED(m_released.test_and_set()); if (released) { handle().destroy(); } @@ -85,7 +85,7 @@ namespace impl_task { } void resume() final { - [[maybe_unused]] const auto state = m_state.load(); + [[maybe_unused]] const auto state = INTERLEAVED(m_state.load()); assert(state != READY); return m_scheduler ? m_scheduler->schedule(*this) : handle().resume(); } @@ -115,7 +115,7 @@ namespace impl_task { } template Promise> - bool await_suspend(std::coroutine_handle enclosing) { + bool await_suspend(std::coroutine_handle enclosing) noexcept { m_enclosing = &enclosing.promise(); const bool ready = m_awaited->await(this); return !ready; diff --git a/include/async++/thread_pool.hpp b/include/async++/thread_pool.hpp index 0f148e4..2b9d1de 100644 --- a/include/async++/thread_pool.hpp +++ b/include/async++/thread_pool.hpp @@ -21,8 +21,8 @@ class thread_pool : public scheduler { void add_work_item(schedulable_promise& promise); schedulable_promise* get_work_item(); bool has_work() const; - bool is_terminated() const; - void terminate(); + bool is_stopped() const; + void stop(); void wake() const; void wait() const; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 348f928..ca11d35 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,6 +5,7 @@ target_sources(async++ thread_pool.cpp mutex.cpp shared_mutex.cpp + sleep.cpp interleaving/runner.cpp interleaving/sequencer.cpp interleaving/state_tree.cpp diff --git a/src/sleep.cpp b/src/sleep.cpp new file mode 100644 index 0000000..6081edd --- /dev/null +++ b/src/sleep.cpp @@ -0,0 +1,100 @@ +#include + +#include +#include +#include +#include + + +namespace asyncpp { + +namespace impl_sleep { + + struct awaiter_priority { + bool operator()(const awaitable* lhs, const awaitable* rhs) const noexcept { + return lhs->get_time() > rhs->get_time(); + } + }; + + + class sleep_scheduler { + public: + sleep_scheduler(sleep_scheduler&&) = delete; + sleep_scheduler& operator=(sleep_scheduler&&) = delete; + sleep_scheduler(const sleep_scheduler&) = delete; + sleep_scheduler& operator=(const sleep_scheduler&) = delete; + ~sleep_scheduler() { + m_thread.request_stop(); + m_cvar.notify_all(); + } + + static sleep_scheduler& get() { + static sleep_scheduler instance; + return instance; + } + + void enqueue(awaitable* awaiter) noexcept { + { + std::lock_guard lk(m_mtx); + m_queue.push(awaiter); + } + m_cvar.notify_one(); + } + + private: + void awake(std::stop_token token) { + const auto stop_condition = [&] { return token.stop_requested() || !m_queue.empty(); }; + while (!token.stop_requested()) { + std::unique_lock lk(m_mtx); + if (m_queue.empty()) { + m_cvar.wait(lk, stop_condition); + } + else { + const auto next = m_queue.top(); + m_cvar.wait_until(lk, next->get_time(), stop_condition); + if (next->get_time() <= clock_type::now()) { + m_queue.pop(); + lk.unlock(); + next->on_ready(); + } + } + } + } + + sleep_scheduler() { + m_thread = std::jthread([this](std::stop_token token) { awake(token); }); + } + + private: + std::priority_queue, awaiter_priority> m_queue; + std::mutex m_mtx; + std::condition_variable m_cvar; + std::jthread m_thread; + }; + + awaitable::awaitable(clock_type::time_point time) noexcept + : m_time(time) {} + + bool awaitable::await_ready() const noexcept { + return m_time < clock_type::now(); + } + + void awaitable::await_resume() const noexcept { + return; + } + + void awaitable::on_ready() noexcept { + m_enclosing->resume(); + } + + void awaitable::enqueue() noexcept { + sleep_scheduler::get().enqueue(this); + } + + auto awaitable::get_time() const noexcept -> clock_type::time_point { + return m_time; + } + +} // namespace impl_sleep + +} // namespace asyncpp \ No newline at end of file diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 2f7d3ba..2f2749b 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -23,7 +23,7 @@ thread_pool::thread_pool(size_t num_threads) { thread_pool::~thread_pool() noexcept { std::ranges::for_each(m_workers, [](auto& w) { - w->terminate(); + w->stop(); w->wake(); }); } @@ -67,7 +67,7 @@ void thread_pool::worker_function(std::shared_ptr w) noexcept { m_free_workers.push(w.get()); w->wait(); } - } while (promise || !w->is_terminated()); + } while (promise || !w->is_stopped()); } @@ -86,12 +86,12 @@ bool thread_pool::worker::has_work() const { } -bool thread_pool::worker::is_terminated() const { +bool thread_pool::worker::is_stopped() const { return m_terminated.test(); } -void thread_pool::worker::terminate() { +void thread_pool::worker::stop() { m_terminated.test_and_set(); } @@ -104,7 +104,7 @@ void thread_pool::worker::wake() const { void thread_pool::worker::wait() const { std::unique_lock lk(m_wake_mutex); - m_wake_cv.wait(lk, [this] { return has_work() || is_terminated(); }); + m_wake_cv.wait(lk, [this] { return has_work() || is_stopped(); }); } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index addce73..39e647c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -16,6 +16,7 @@ target_sources(test test_task.cpp test_thread_pool.cpp test_event.cpp + test_sleep.cpp ) diff --git a/test/helper_interleaving.hpp b/test/helper_interleaving.hpp new file mode 100644 index 0000000..1088ec2 --- /dev/null +++ b/test/helper_interleaving.hpp @@ -0,0 +1,108 @@ +#pragma once + +#include "helper_schedulers.hpp" + +#include +#include + +#include + +#include + +using namespace asyncpp; + + +template +auto run_dependent_tasks(MainTask main_task, + SubTask sub_task, + std::tuple main_args, + std::tuple sub_args) { + using sub_result_t = std::invoke_result_t; + using main_result_t = std::invoke_result_t; + struct fixture { + thread_locked_scheduler main_sched; + thread_locked_scheduler sub_sched; + main_result_t main_result; + }; + + auto create_fixture = [=, main_args = std::move(main_args), sub_args = std::move(sub_args)] { + auto fixture_ = std::make_shared(); + auto sub_result = launch(std::apply(sub_task, sub_args), fixture_->sub_sched); + auto all_main_args = std::tuple_cat(std::forward_as_tuple(std::move(sub_result)), main_args); + fixture_->main_result = launch(std::apply(main_task, std::move(all_main_args)), fixture_->main_sched); + return fixture_; + }; + auto sub_thread = [](std::shared_ptr fixture_) { + fixture_->sub_sched.resume(); + }; + auto main_thread = [](std::shared_ptr fixture_) { + fixture_->main_sched.resume(); + if (!fixture_->main_result.ready()) { + INTERLEAVED_ACQUIRE(fixture_->main_sched.wait()); + fixture_->main_sched.resume(); + } + join(fixture_->main_result); + }; + + return [create_fixture = std::move(create_fixture), sub_thread = std::move(sub_thread), main_thread = std::move(main_thread)] { + return interleaving::run_all(std::function(create_fixture), + std::vector{ std::function(main_thread), std::function(sub_thread) }, + { "$main", "$sub" }); + }; +} + + +template +auto run_abandoned_task(MainTask main_task, + std::tuple main_args) { + using main_result_t = std::invoke_result_t; + struct fixture { + thread_locked_scheduler main_sched; + main_result_t main_result; + }; + + auto create_fixture = [main_task, main_args = std::move(main_args)] { + auto fixture_ = std::make_shared(); + fixture_->main_result = launch(std::apply(main_task, std::move(main_args)), fixture_->main_sched); + return fixture_; + }; + auto exec_thread = [](std::shared_ptr fixture_) { + fixture_->main_sched.resume(); + }; + auto abandon_thread = [](std::shared_ptr fixture_) { + fixture_->main_result = {}; + }; + + return [create_fixture = std::move(create_fixture), abandon_thread = std::move(abandon_thread), exec_thread = std::move(exec_thread)] { + return interleaving::run_all(std::function(create_fixture), + std::vector{ std::function(abandon_thread), std::function(exec_thread) }, + { "$abandon", "$exec" }); + }; +} + + +template InterleavingGenFunc> +void evaluate_interleavings(InterleavingGenFunc interleaving_gen_func) { + size_t count = 0; + auto before = impl::leak_checked_promise::snapshot(); + for (const auto& interleaving : interleaving_gen_func()) { + INFO(count << "\n" + << (interleaving::interleaving_printer{ interleaving, true })); + REQUIRE(impl::leak_checked_promise::check(before)); + auto before = impl::leak_checked_promise::snapshot(); + ++count; + } + REQUIRE(count >= 3); +} + + +template InterleavingGen> +void evaluate_interleavings(InterleavingGen interleaving_gen) { + size_t count = 0; + for (const auto& interleaving : interleaving_gen) { + INFO(count << "\n" + << (interleaving::interleaving_printer{ interleaving, true })); + ++count; + } + REQUIRE(count >= 3); +} \ No newline at end of file diff --git a/test/test_schedulers.hpp b/test/helper_schedulers.hpp similarity index 82% rename from test/test_schedulers.hpp rename to test/helper_schedulers.hpp index e330f35..2501697 100644 --- a/test/test_schedulers.hpp +++ b/test/helper_schedulers.hpp @@ -14,10 +14,14 @@ class thread_locked_scheduler : public asyncpp::scheduler { m_promise.store(&promise); } - void wait_and_resume() { - asyncpp::schedulable_promise* promise; - while (!(promise = m_promise.exchange(nullptr))) { + void wait() { + while (nullptr == m_promise.load()) { } + } + + void resume() { + const auto promise = m_promise.exchange(nullptr); + assert(promise != nullptr); promise->handle().resume(); } diff --git a/test/leak_tester.hpp b/test/leak_tester.hpp deleted file mode 100644 index 2adf9f1..0000000 --- a/test/leak_tester.hpp +++ /dev/null @@ -1,15 +0,0 @@ -#pragma once - -#include - - -// Should be passed by value to a coroutine. -// Gets copied onto the coroutine stack and should get destructed -// when the coroutine promise is destroyed. -struct leak_tester { - leak_tester() : m_count(std::make_shared(0)) {} - operator bool() { return m_count.use_count() == 1; } - -private: - std::shared_ptr m_count; -}; \ No newline at end of file diff --git a/test/main.cpp b/test/main.cpp index 7a50abf..741543a 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -1,8 +1,22 @@ #define CATCH_CONFIG_RUNNER +#include + #include int main(int argc, char* argv[]) { +#if defined(__has_feature) + #if __has_feature(address_sanitizer) + std::cout << "Address sanitizer: ON" << std::endl; + #endif + #if __has_feature(memory_sanitizer) + std::cout << "Memory sanitizer: ON" << std::endl; + #endif + #if __has_feature(thread_sanitizer) + std::cout << "Thread sanitizer: ON" << std::endl; + #endif +#endif + int result = Catch::Session().run(argc, argv); return result; diff --git a/test/test_event.cpp b/test/test_event.cpp index 13d7413..a9d150c 100644 --- a/test/test_event.cpp +++ b/test/test_event.cpp @@ -1,3 +1,5 @@ +#include "helper_interleaving.hpp" + #include #include #include @@ -9,7 +11,7 @@ using namespace asyncpp; -TEST_CASE("Event: co_await interleaving", "[Event]") { +TEST_CASE("Event: interleave co_await | set", "[Event]") { struct fixture { event evt; }; @@ -32,10 +34,5 @@ TEST_CASE("Event: co_await interleaving", "[Event]") { auto gen = interleaving::run_all(std::function(make_fixture), std::vector{ std::function(wait_thread), std::function(set_thread) }, { "$wait", "$set" }); - size_t count = 0; - for ([[maybe_unused]] const auto& il : gen) { - ++count; - INFO((interleaving::interleaving_printer{ il, true })); - } - REQUIRE(count >= 3); -} \ No newline at end of file + evaluate_interleavings(std::move(gen)); +} diff --git a/test/test_shared_task.cpp b/test/test_shared_task.cpp index 48b3c1f..da84853 100644 --- a/test/test_shared_task.cpp +++ b/test/test_shared_task.cpp @@ -1,5 +1,4 @@ -#include "leak_tester.hpp" -#include "test_schedulers.hpp" +#include "helper_interleaving.hpp" #include #include @@ -12,100 +11,34 @@ using namespace asyncpp; TEST_CASE("Shared task: interleaving co_await", "[Shared task]") { - static const auto do_worker_task = [](leak_tester tester) -> shared_task { + static const auto sub_task = []() -> shared_task { co_return 3; }; - static const auto do_main_task = [](shared_task task) -> shared_task { - task.launch(); - co_return co_await task; + static const auto main_task = [](shared_task tsk) -> shared_task { + tsk.launch(); + co_return co_await tsk; }; - struct fixture { - thread_locked_scheduler worker_sched; - thread_locked_scheduler main_sched; - shared_task main_task; - }; - - leak_tester tester; - - auto make_fixture = [&tester] { - auto f = std::make_shared(); - auto worker_task = do_worker_task(tester); - bind(worker_task, f->worker_sched); - f->main_task = do_main_task(std::move(worker_task)); - f->main_task.bind(f->main_sched); - return f; - }; - - auto worker_thread = [](std::shared_ptr f) { - INTERLEAVED_ACQUIRE(f->worker_sched.wait_and_resume()); - }; - auto main_thread = [](std::shared_ptr f) { - f->main_task.launch(); - INTERLEAVED_ACQUIRE(f->main_sched.wait_and_resume()); - if (!f->main_task.ready()) { - INTERLEAVED_ACQUIRE(f->main_sched.wait_and_resume()); - } - }; - - auto gen = interleaving::run_all(std::function(make_fixture), - std::vector{ std::function(worker_thread), std::function(main_thread) }, - { "$worker", "$main" }); - size_t count = 0; - for ([[maybe_unused]] const auto& il : gen) { - ++count; - INFO((interleaving::interleaving_printer{ il, true })); - REQUIRE(tester); - } - REQUIRE(count >= 3); + auto interleavings = run_dependent_tasks(main_task, sub_task, std::tuple{}, std::tuple{}); + evaluate_interleavings(std::move(interleavings)); } TEST_CASE("Shared task: interleaving abandon", "[Shared task]") { - static const auto do_task = [](leak_tester value) -> shared_task { co_return 3; }; + static const auto abandoned_task = []() -> shared_task { co_return 3; }; - struct fixture { - thread_locked_scheduler sched; - shared_task task; - }; - - leak_tester tester; - - auto make_fixture = [&tester] { - auto f = std::make_shared(); - f->task = do_task(tester); - bind(f->task, f->sched); - return f; - }; - - auto sync_thread = [](std::shared_ptr f) { - f->task.launch(); - f->task = {}; - }; - auto task_thread = [](std::shared_ptr f) { - INTERLEAVED_ACQUIRE(f->sched.wait_and_resume()); - }; - - auto gen = interleaving::run_all(std::function(make_fixture), - std::vector{ std::function(sync_thread), std::function(task_thread) }, - { "$sync", "$task" }); - size_t count = 0; - for ([[maybe_unused]] const auto& il : gen) { - ++count; - INFO((interleaving::interleaving_printer{ il, true })); - REQUIRE(tester); - } - REQUIRE(count >= 3); + auto interleavings = run_abandoned_task(abandoned_task, std::tuple{}); + evaluate_interleavings(std::move(interleavings)); } TEST_CASE("Shared task: abandon (not started)", "[Shared task]") { - static const auto coro = [](leak_tester tester) -> shared_task { + static const auto coro = []() -> shared_task { co_return; }; - leak_tester tester; - static_cast(coro(tester)); - REQUIRE(tester); + const auto before = impl::leak_checked_promise::snapshot(); + static_cast(coro()); + REQUIRE(impl::leak_checked_promise::check(before)); } diff --git a/test/test_sleep.cpp b/test/test_sleep.cpp new file mode 100644 index 0000000..76bae6d --- /dev/null +++ b/test/test_sleep.cpp @@ -0,0 +1,44 @@ +#include +#include +#include + +#include + +using namespace asyncpp; +using namespace std::chrono_literals; + + +TEST_CASE("Sleep: ordering", "[Sleep]") { + std::vector sequence; + const auto delayed = [&sequence](int number, std::chrono::steady_clock::time_point time) -> task { + co_await sleep_until(time); + sequence.push_back(number); + }; + + const auto time = std::chrono::steady_clock::now(); + auto t3 = launch(delayed(3, time + 3ms)); + auto t1 = launch(delayed(1, time + 1ms)); + auto t2 = launch(delayed(2, time + 2ms)); + join(t1); + join(t2); + join(t3); + REQUIRE(sequence == std::vector{ 1, 2, 3 }); +} + + +TEST_CASE("Sleep: sleep_for minimum timing", "[Sleep]") { + const auto duration = 10ms; + const auto start = std::chrono::steady_clock::now(); + join(sleep_for(duration)); + const auto end = std::chrono::steady_clock::now(); + REQUIRE(end - start >= duration); +} + + +TEST_CASE("Sleep: sleep_until minimum timing", "[Sleep]") { + const auto duration = 10ms; + const auto start = std::chrono::steady_clock::now(); + join(sleep_until(start + duration)); + const auto end = std::chrono::steady_clock::now(); + REQUIRE(end - start >= duration); +} diff --git a/test/test_stream.cpp b/test/test_stream.cpp index 2f9d53d..e10eaf2 100644 --- a/test/test_stream.cpp +++ b/test/test_stream.cpp @@ -1,5 +1,3 @@ -#include "leak_tester.hpp" - #include #include @@ -12,22 +10,22 @@ using namespace asyncpp; TEST_CASE("Stream: destroy", "[Task]") { - static const auto coro = [](leak_tester value) -> stream { co_yield 0; }; + static const auto coro = []() -> stream { co_yield 0; }; SECTION("no execution") { - leak_tester tester; + const auto before = impl::leak_checked_promise::snapshot(); { - auto s = coro(tester); + auto s = coro(); } - REQUIRE(tester); + REQUIRE(impl::leak_checked_promise::check(before)); } SECTION("synced") { - leak_tester tester; + const auto before = impl::leak_checked_promise::snapshot(); { - auto s = coro(tester); + auto s = coro(); join(s); } - REQUIRE(tester); + REQUIRE(impl::leak_checked_promise::check(before)); } } diff --git a/test/test_task.cpp b/test/test_task.cpp index b022f30..295d5e0 100644 --- a/test/test_task.cpp +++ b/test/test_task.cpp @@ -1,5 +1,4 @@ -#include "leak_tester.hpp" -#include "test_schedulers.hpp" +#include "helper_interleaving.hpp" #include #include @@ -12,100 +11,34 @@ using namespace asyncpp; TEST_CASE("Task: interleaving co_await", "[Task]") { - static const auto do_worker_task = [](leak_tester tester) -> task { + static const auto sub_task = []() -> task { co_return 3; }; - static const auto do_main_task = [](task tsk) -> task { + static const auto main_task = [](task tsk) -> task { tsk.launch(); co_return co_await tsk; }; - struct fixture { - thread_locked_scheduler worker_sched; - thread_locked_scheduler main_sched; - task main_task; - }; - - leak_tester tester; - - auto make_fixture = [&tester] { - auto f = std::make_shared(); - auto worker_task = do_worker_task(tester); - bind(worker_task, f->worker_sched); - f->main_task = do_main_task(std::move(worker_task)); - f->main_task.bind(f->main_sched); - return f; - }; - - auto worker_thread = [](std::shared_ptr f) { - INTERLEAVED_ACQUIRE(f->worker_sched.wait_and_resume()); - }; - auto main_thread = [](std::shared_ptr f) { - f->main_task.launch(); - INTERLEAVED_ACQUIRE(f->main_sched.wait_and_resume()); - if (!f->main_task.ready()) { - INTERLEAVED_ACQUIRE(f->main_sched.wait_and_resume()); - } - }; - - auto gen = interleaving::run_all(std::function(make_fixture), - std::vector{ std::function(worker_thread), std::function(main_thread) }, - { "$worker", "$main" }); - size_t count = 0; - for ([[maybe_unused]] const auto& il : gen) { - ++count; - INFO((interleaving::interleaving_printer{ il, true })); - REQUIRE(tester); - } - REQUIRE(count >= 3); + auto interleavings = run_dependent_tasks(main_task, sub_task, std::tuple{}, std::tuple{}); + evaluate_interleavings(std::move(interleavings)); } TEST_CASE("Task: interleaving abandon", "[Task]") { - static const auto do_task = [](leak_tester value) -> task { co_return 3; }; + static const auto abandoned_task = []() -> task { co_return 3; }; - struct fixture { - thread_locked_scheduler sched; - task main_task; - }; - - leak_tester tester; - - auto make_fixture = [&tester] { - auto f = std::make_shared(); - f->main_task = do_task(tester); - bind(f->main_task, f->sched); - return f; - }; - - auto sync_thread = [](std::shared_ptr f) { - f->main_task.launch(); - f->main_task = {}; - }; - auto task_thread = [](std::shared_ptr f) { - INTERLEAVED_ACQUIRE(f->sched.wait_and_resume()); - }; - - auto gen = interleaving::run_all(std::function(make_fixture), - std::vector{ std::function(sync_thread), std::function(task_thread) }, - { "$sync", "$task" }); - size_t count = 0; - for ([[maybe_unused]] const auto& il : gen) { - ++count; - INFO((interleaving::interleaving_printer{ il, true })); - REQUIRE(tester); - } - REQUIRE(count >= 3); + auto interleavings = run_abandoned_task(abandoned_task, std::tuple{}); + evaluate_interleavings(std::move(interleavings)); } TEST_CASE("Task: abandon (not started)", "[Shared task]") { - static const auto coro = [](leak_tester tester) -> task { + static const auto coro = []() -> task { co_return; }; - leak_tester tester; - static_cast(coro(tester)); - REQUIRE(tester); + const auto before = impl::leak_checked_promise::snapshot(); + static_cast(coro()); + REQUIRE(impl::leak_checked_promise::check(before)); } diff --git a/test/test_thread_pool.cpp b/test/test_thread_pool.cpp index e7fb4b9..1830846 100644 --- a/test/test_thread_pool.cpp +++ b/test/test_thread_pool.cpp @@ -1,5 +1,3 @@ -#include "test_schedulers.hpp" - #include #include #include