Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support combine with other event loops #1497

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions asio/include/asio/detail/impl/kqueue_reactor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,18 @@ void kqueue_reactor::run(long usec, op_queue<operation>& ops)
lock.unlock();

// Block on the kqueue descriptor.
struct kevent events[128];
int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
struct kevent *events = event_loop_field_.events;
int &num_events = event_loop_field_.num_events;

if (!event_loop_field_.has_event) {
num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
if (event_loop_field_.is_wait) {
event_loop_field_.has_event = true;
return;
}
} else {
event_loop_field_.has_event = false;
}

#if defined(ASIO_ENABLE_HANDLER_TRACKING)
// Trace the waiting events.
Expand Down Expand Up @@ -542,6 +552,10 @@ void kqueue_reactor::run(long usec, op_queue<operation>& ops)
timer_queues_.get_ready_timers(ops);
}

void kqueue_reactor::mask_wait_only() {
event_loop_field_.is_wait = true;
}

void kqueue_reactor::interrupt()
{
interrupter_.interrupt();
Expand Down
6 changes: 6 additions & 0 deletions asio/include/asio/detail/impl/scheduler.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ std::size_t scheduler::run_one(asio::error_code& ec)
return do_run_one(lock, this_thread, ec);
}

void scheduler::wait_event(long usec, asio::error_code &ec)
{
task_->mask_wait_only();
wait_one(usec, ec);
}

std::size_t scheduler::wait_one(long usec, asio::error_code& ec)
{
ec = asio::error_code();
Expand Down
10 changes: 10 additions & 0 deletions asio/include/asio/detail/kqueue_reactor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ class kqueue_reactor
// Run the kqueue loop.
ASIO_DECL void run(long usec, op_queue<operation>& ops);

ASIO_DECL void mask_wait_only();

// Interrupt the kqueue loop.
ASIO_DECL void interrupt();

Expand Down Expand Up @@ -254,6 +256,14 @@ class kqueue_reactor

// Keep track of all registered descriptors.
object_pool<descriptor_state> registered_descriptors_;

// Save fired events
struct {
struct kevent events[128];
int num_events;
std::atomic_bool has_event;
std::atomic_bool is_wait;
} event_loop_field_ {};
};

} // namespace detail
Expand Down
3 changes: 3 additions & 0 deletions asio/include/asio/detail/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class scheduler
// Poll for one operation without blocking.
ASIO_DECL std::size_t poll_one(asio::error_code& ec);

// Wait until timeout, interrupted, or one operation event.
ASIO_DECL void wait_event(long usec, asio::error_code& ec);

// Interrupt the event processing loop.
ASIO_DECL void stop();

Expand Down
5 changes: 5 additions & 0 deletions asio/include/asio/detail/scheduler_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ class scheduler_task
// Run the task once until interrupted or events are ready to be dispatched.
virtual void run(long usec, op_queue<scheduler_operation>& ops) = 0;

// Mask just wait event for this time call run
virtual void mask_wait_only() {
throw_error(std::make_error_code(std::errc::operation_not_supported));
};

// Interrupt the task.
virtual void interrupt() = 0;

Expand Down
25 changes: 25 additions & 0 deletions asio/include/asio/impl/io_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,31 @@ std::size_t io_context::run_one_until(
return 0;
}

template <typename Rep, typename Period>
void io_context::wait_one_for(const chrono::duration<Rep, Period>& rel_time)
{
this->wait_one_until(chrono::steady_clock::now() + rel_time);
}

template <typename Clock, typename Duration>
void io_context::wait_one_until(
const chrono::time_point<Clock, Duration> &abs_time)
{
typename Clock::time_point now = Clock::now();
if (now < abs_time) {
typename Clock::duration rel_time = abs_time - now;
if (rel_time > chrono::seconds(1))
rel_time = chrono::seconds(1);

asio::error_code ec;
impl_.wait_event(
static_cast<long>(
chrono::duration_cast<chrono::microseconds>(rel_time).count()),
ec);
asio::detail::throw_error(ec);
}
}

#if !defined(ASIO_NO_DEPRECATED)

inline void io_context::reset()
Expand Down
24 changes: 24 additions & 0 deletions asio/include/asio/io_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,30 @@ class io_context
ASIO_DECL count_type poll_one(asio::error_code& ec);
#endif // !defined(ASIO_NO_DEPRECATED)

/// Run the io_context object's event processing loop for a specified duration
/// do not execute any handler, just wait.
/**
* The wait_one_for() function blocks until one handler has been dispatched,
* until the io_context has been stopped, or until the specified duration has
* elapsed.
*
* @param rel_time The duration for which the call may block.
*/
template <typename Rep, typename Period>
void wait_one_for(const chrono::duration<Rep, Period>& rel_time);

/// Run the io_context object's event processing loop until a specified time
/// do not execute any handler, just wait.
/**
* The wait_one_until() function blocks until one handler has been dispatched,
* until the io_context has been stopped, or until the specified time has
* been reached.
*
* @param abs_time The time point until which the call may block.
*/
template <typename Clock, typename Duration>
void wait_one_until(const chrono::time_point<Clock, Duration>& abs_time);

/// Stop the io_context object's event processing loop.
/**
* This function does not block, but instead simply signals the io_context to
Expand Down
43 changes: 43 additions & 0 deletions asio/src/examples/cpp11/other_eventloop/with_asio.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#include "asio.hpp"

#if __has_include("log.h")
#include "log.h"
#else
#define LOG printf
#endif

int main() {
asio::io_context io_context_main;
io_context_main.dispatch([] { LOG("main thread"); });

std::thread([&] {
LOG("other thread");
asio::io_context io_context_other;

asio::steady_timer asio_timer(io_context_other);
std::function<void()> start_asio_timer;
start_asio_timer = [&] {
asio_timer.expires_after(asio::chrono::milliseconds(1000));
asio_timer.async_wait([&](const asio::error_code &ec) {
LOG("asio Timer fired!");
start_asio_timer();
});
};
start_asio_timer();

for (;;) {
LOG("wait_event...");
io_context_other.wait_one_for(std::chrono::minutes(1));
std::promise<void> promise;
io_context_main.dispatch([&] {
io_context_other.poll_one();
promise.set_value();
});
promise.get_future().get();
}
}).detach();

asio::io_context::work work(io_context_main);
io_context_main.run();
return 0;
}
56 changes: 56 additions & 0 deletions asio/src/examples/cpp11/other_eventloop/with_libuv.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include <asio.hpp>
#include <iostream>
#include <uv.h>

#if __has_include("log.h")
#include "log.h"
#else
#define LOG printf
#endif

int main() {
uv_loop_t *loop = uv_default_loop();
static asio::io_context io_context;
asio::io_context::work work(io_context);
LOG("main thread\n");

uv_timer_t timer_req;
uv_timer_init(loop, &timer_req);
uv_timer_start(
&timer_req, [](uv_timer_t *handle) { LOG("libuv Timer fired!\n"); },
10000, 10000);

asio::steady_timer asio_timer(io_context);
std::function<void()> start_asio_timer;
start_asio_timer = [&] {
asio_timer.expires_after(asio::chrono::milliseconds(1000));
asio_timer.async_wait([&](const asio::error_code &ec) {
LOG("asio Timer fired!\n");
start_asio_timer();
});
};
start_asio_timer();

static std::condition_variable async_done;
std::mutex async_done_lock;

uv_async_t async;
uv_async_init(loop, &async, [](uv_async_t *handle) {
io_context.poll_one();
async_done.notify_one();
});

std::thread([&] {
for (;;) {
LOG("wait_event...\n");
io_context.wait_one_for(std::chrono::minutes(1));

uv_async_send(&async);
std::unique_lock<std::mutex> lock(async_done_lock);
async_done.wait(lock);
}
}).detach();

uv_run(loop, UV_RUN_DEFAULT);
return 0;
}