From af0128d6e8338af1c32f8b0624c5bcf70ce095b8 Mon Sep 17 00:00:00 2001 From: liushuai <770722922@qq.com> Date: Thu, 20 Jun 2024 20:25:22 +0800 Subject: [PATCH 1/2] Support combine with other event loops --- .../asio/detail/impl/kqueue_reactor.ipp | 18 +++++- asio/include/asio/detail/impl/scheduler.ipp | 6 ++ asio/include/asio/detail/kqueue_reactor.hpp | 10 ++++ asio/include/asio/detail/scheduler.hpp | 3 + asio/include/asio/detail/scheduler_task.hpp | 5 ++ .../cpp11/other_eventloop/with_libuv.cpp | 56 +++++++++++++++++++ 6 files changed, 96 insertions(+), 2 deletions(-) create mode 100644 asio/src/examples/cpp11/other_eventloop/with_libuv.cpp diff --git a/asio/include/asio/detail/impl/kqueue_reactor.ipp b/asio/include/asio/detail/impl/kqueue_reactor.ipp index 210a89dd49..c96d8c3f9f 100644 --- a/asio/include/asio/detail/impl/kqueue_reactor.ipp +++ b/asio/include/asio/detail/impl/kqueue_reactor.ipp @@ -444,8 +444,18 @@ void kqueue_reactor::run(long usec, op_queue& ops) lock.unlock(); // Block on the kqueue descriptor. - struct kevent events[128]; - int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout); + struct kevent *events = event_loop_field_.events; + int &num_events = event_loop_field_.num_events; + + if (!event_loop_field_.has_event) { + num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout); + if (event_loop_field_.is_wait) { + event_loop_field_.has_event = true; + return; + } + } else { + event_loop_field_.has_event = false; + } #if defined(ASIO_ENABLE_HANDLER_TRACKING) // Trace the waiting events. @@ -542,6 +552,10 @@ void kqueue_reactor::run(long usec, op_queue& ops) timer_queues_.get_ready_timers(ops); } +void kqueue_reactor::mask_wait_only() { + event_loop_field_.is_wait = true; +} + void kqueue_reactor::interrupt() { interrupter_.interrupt(); diff --git a/asio/include/asio/detail/impl/scheduler.ipp b/asio/include/asio/detail/impl/scheduler.ipp index eccc80322d..94c66cab25 100644 --- a/asio/include/asio/detail/impl/scheduler.ipp +++ b/asio/include/asio/detail/impl/scheduler.ipp @@ -230,6 +230,12 @@ std::size_t scheduler::run_one(asio::error_code& ec) return do_run_one(lock, this_thread, ec); } +std::size_t scheduler::wait_event(long usec, asio::error_code &ec) +{ + task_->mask_wait_only(); + return wait_one(usec, ec); +} + std::size_t scheduler::wait_one(long usec, asio::error_code& ec) { ec = asio::error_code(); diff --git a/asio/include/asio/detail/kqueue_reactor.hpp b/asio/include/asio/detail/kqueue_reactor.hpp index f034410d6d..f054a1c693 100644 --- a/asio/include/asio/detail/kqueue_reactor.hpp +++ b/asio/include/asio/detail/kqueue_reactor.hpp @@ -208,6 +208,8 @@ class kqueue_reactor // Run the kqueue loop. ASIO_DECL void run(long usec, op_queue& ops); + ASIO_DECL void mask_wait_only(); + // Interrupt the kqueue loop. ASIO_DECL void interrupt(); @@ -254,6 +256,14 @@ class kqueue_reactor // Keep track of all registered descriptors. object_pool registered_descriptors_; + + // Save fired events + struct { + struct kevent events[128]; + int num_events; + std::atomic_bool has_event; + std::atomic_bool is_wait; + } event_loop_field_ {}; }; } // namespace detail diff --git a/asio/include/asio/detail/scheduler.hpp b/asio/include/asio/detail/scheduler.hpp index a73ec141eb..dd682d12e1 100644 --- a/asio/include/asio/detail/scheduler.hpp +++ b/asio/include/asio/detail/scheduler.hpp @@ -77,6 +77,9 @@ class scheduler // Poll for one operation without blocking. ASIO_DECL std::size_t poll_one(asio::error_code& ec); + // Wait until timeout, interrupted, or one operation event. + ASIO_DECL std::size_t wait_event(long usec, asio::error_code& ec); + // Interrupt the event processing loop. ASIO_DECL void stop(); diff --git a/asio/include/asio/detail/scheduler_task.hpp b/asio/include/asio/detail/scheduler_task.hpp index a1a4ec1c85..e5c1073c15 100644 --- a/asio/include/asio/detail/scheduler_task.hpp +++ b/asio/include/asio/detail/scheduler_task.hpp @@ -31,6 +31,11 @@ class scheduler_task // Run the task once until interrupted or events are ready to be dispatched. virtual void run(long usec, op_queue& ops) = 0; + // Mask just wait event for this time call run + virtual void mask_wait_only() { + throw_error(std::make_error_code(std::errc::operation_not_supported)); + }; + // Interrupt the task. virtual void interrupt() = 0; diff --git a/asio/src/examples/cpp11/other_eventloop/with_libuv.cpp b/asio/src/examples/cpp11/other_eventloop/with_libuv.cpp new file mode 100644 index 0000000000..ba9cf9a1ce --- /dev/null +++ b/asio/src/examples/cpp11/other_eventloop/with_libuv.cpp @@ -0,0 +1,56 @@ +#include +#include +#include + +//#define USE_LOG +#ifdef USE_LOG +#include "log.h" +#else +#define LOG printf +#endif + +int main() { + uv_loop_t *loop = uv_default_loop(); + static asio::io_context io_context; + asio::io_context::work work(io_context); + + uv_timer_t timer_req; + uv_timer_init(loop, &timer_req); + uv_timer_start( + &timer_req, [](uv_timer_t *handle) { LOG("libuv Timer fired!\n"); }, + 10000, 10000); + + asio::steady_timer asio_timer(io_context); + std::function start_asio_timer; + start_asio_timer = [&] { + asio_timer.expires_after(asio::chrono::milliseconds(1000)); + asio_timer.async_wait([&](const asio::error_code &ec) { + LOG("asio Timer fired!\n"); + start_asio_timer(); + }); + }; + start_asio_timer(); + + static std::condition_variable async_done; + std::mutex async_done_lock; + uv_async_t async; + uv_async_init(loop, &async, [](uv_async_t *handle) { + io_context.poll_one(); + async_done.notify_one(); + }); + + std::thread([&] { + for (;;) { + LOG("wait_event..."); + asio::error_code ec; + auto &service = asio::use_service(io_context); + service.wait_event(INTMAX_MAX, ec); + uv_async_send(&async); + std::unique_lock lock(async_done_lock); + async_done.wait(lock); + } + }).detach(); + + uv_run(loop, UV_RUN_DEFAULT); + return 0; +} From 372886c4ed82d5d94b52f4bb956e1c4473fb67b5 Mon Sep 17 00:00:00 2001 From: liushuai <770722922@qq.com> Date: Fri, 21 Jun 2024 15:54:14 +0800 Subject: [PATCH 2/2] Add wait_one_for and wait_one_until api --- asio/include/asio/detail/impl/scheduler.ipp | 4 +- asio/include/asio/detail/scheduler.hpp | 2 +- asio/include/asio/impl/io_context.hpp | 25 +++++++++++ asio/include/asio/io_context.hpp | 24 +++++++++++ .../cpp11/other_eventloop/with_asio.cpp | 43 +++++++++++++++++++ .../cpp11/other_eventloop/with_libuv.cpp | 12 +++--- 6 files changed, 101 insertions(+), 9 deletions(-) create mode 100644 asio/src/examples/cpp11/other_eventloop/with_asio.cpp diff --git a/asio/include/asio/detail/impl/scheduler.ipp b/asio/include/asio/detail/impl/scheduler.ipp index 94c66cab25..8ed241ef3a 100644 --- a/asio/include/asio/detail/impl/scheduler.ipp +++ b/asio/include/asio/detail/impl/scheduler.ipp @@ -230,10 +230,10 @@ std::size_t scheduler::run_one(asio::error_code& ec) return do_run_one(lock, this_thread, ec); } -std::size_t scheduler::wait_event(long usec, asio::error_code &ec) +void scheduler::wait_event(long usec, asio::error_code &ec) { task_->mask_wait_only(); - return wait_one(usec, ec); + wait_one(usec, ec); } std::size_t scheduler::wait_one(long usec, asio::error_code& ec) diff --git a/asio/include/asio/detail/scheduler.hpp b/asio/include/asio/detail/scheduler.hpp index dd682d12e1..c107ac957e 100644 --- a/asio/include/asio/detail/scheduler.hpp +++ b/asio/include/asio/detail/scheduler.hpp @@ -78,7 +78,7 @@ class scheduler ASIO_DECL std::size_t poll_one(asio::error_code& ec); // Wait until timeout, interrupted, or one operation event. - ASIO_DECL std::size_t wait_event(long usec, asio::error_code& ec); + ASIO_DECL void wait_event(long usec, asio::error_code& ec); // Interrupt the event processing loop. ASIO_DECL void stop(); diff --git a/asio/include/asio/impl/io_context.hpp b/asio/include/asio/impl/io_context.hpp index 002b585502..3b460e7c07 100644 --- a/asio/include/asio/impl/io_context.hpp +++ b/asio/include/asio/impl/io_context.hpp @@ -106,6 +106,31 @@ std::size_t io_context::run_one_until( return 0; } +template +void io_context::wait_one_for(const chrono::duration& rel_time) +{ + this->wait_one_until(chrono::steady_clock::now() + rel_time); +} + +template +void io_context::wait_one_until( + const chrono::time_point &abs_time) +{ + typename Clock::time_point now = Clock::now(); + if (now < abs_time) { + typename Clock::duration rel_time = abs_time - now; + if (rel_time > chrono::seconds(1)) + rel_time = chrono::seconds(1); + + asio::error_code ec; + impl_.wait_event( + static_cast( + chrono::duration_cast(rel_time).count()), + ec); + asio::detail::throw_error(ec); + } +} + #if !defined(ASIO_NO_DEPRECATED) inline void io_context::reset() diff --git a/asio/include/asio/io_context.hpp b/asio/include/asio/io_context.hpp index 5de125685f..9aa5a8dce3 100644 --- a/asio/include/asio/io_context.hpp +++ b/asio/include/asio/io_context.hpp @@ -482,6 +482,30 @@ class io_context ASIO_DECL count_type poll_one(asio::error_code& ec); #endif // !defined(ASIO_NO_DEPRECATED) + /// Run the io_context object's event processing loop for a specified duration + /// do not execute any handler, just wait. + /** + * The wait_one_for() function blocks until one handler has been dispatched, + * until the io_context has been stopped, or until the specified duration has + * elapsed. + * + * @param rel_time The duration for which the call may block. + */ + template + void wait_one_for(const chrono::duration& rel_time); + + /// Run the io_context object's event processing loop until a specified time + /// do not execute any handler, just wait. + /** + * The wait_one_until() function blocks until one handler has been dispatched, + * until the io_context has been stopped, or until the specified time has + * been reached. + * + * @param abs_time The time point until which the call may block. + */ + template + void wait_one_until(const chrono::time_point& abs_time); + /// Stop the io_context object's event processing loop. /** * This function does not block, but instead simply signals the io_context to diff --git a/asio/src/examples/cpp11/other_eventloop/with_asio.cpp b/asio/src/examples/cpp11/other_eventloop/with_asio.cpp new file mode 100644 index 0000000000..4dd0288fa1 --- /dev/null +++ b/asio/src/examples/cpp11/other_eventloop/with_asio.cpp @@ -0,0 +1,43 @@ +#include "asio.hpp" + +#if __has_include("log.h") +#include "log.h" +#else +#define LOG printf +#endif + +int main() { + asio::io_context io_context_main; + io_context_main.dispatch([] { LOG("main thread"); }); + + std::thread([&] { + LOG("other thread"); + asio::io_context io_context_other; + + asio::steady_timer asio_timer(io_context_other); + std::function start_asio_timer; + start_asio_timer = [&] { + asio_timer.expires_after(asio::chrono::milliseconds(1000)); + asio_timer.async_wait([&](const asio::error_code &ec) { + LOG("asio Timer fired!"); + start_asio_timer(); + }); + }; + start_asio_timer(); + + for (;;) { + LOG("wait_event..."); + io_context_other.wait_one_for(std::chrono::minutes(1)); + std::promise promise; + io_context_main.dispatch([&] { + io_context_other.poll_one(); + promise.set_value(); + }); + promise.get_future().get(); + } + }).detach(); + + asio::io_context::work work(io_context_main); + io_context_main.run(); + return 0; +} diff --git a/asio/src/examples/cpp11/other_eventloop/with_libuv.cpp b/asio/src/examples/cpp11/other_eventloop/with_libuv.cpp index ba9cf9a1ce..7a58612a9e 100644 --- a/asio/src/examples/cpp11/other_eventloop/with_libuv.cpp +++ b/asio/src/examples/cpp11/other_eventloop/with_libuv.cpp @@ -2,8 +2,7 @@ #include #include -//#define USE_LOG -#ifdef USE_LOG +#if __has_include("log.h") #include "log.h" #else #define LOG printf @@ -13,6 +12,7 @@ int main() { uv_loop_t *loop = uv_default_loop(); static asio::io_context io_context; asio::io_context::work work(io_context); + LOG("main thread\n"); uv_timer_t timer_req; uv_timer_init(loop, &timer_req); @@ -33,6 +33,7 @@ int main() { static std::condition_variable async_done; std::mutex async_done_lock; + uv_async_t async; uv_async_init(loop, &async, [](uv_async_t *handle) { io_context.poll_one(); @@ -41,10 +42,9 @@ int main() { std::thread([&] { for (;;) { - LOG("wait_event..."); - asio::error_code ec; - auto &service = asio::use_service(io_context); - service.wait_event(INTMAX_MAX, ec); + LOG("wait_event...\n"); + io_context.wait_one_for(std::chrono::minutes(1)); + uv_async_send(&async); std::unique_lock lock(async_done_lock); async_done.wait(lock);