From 930c141dfcdd187ab1f9c8617ce9f395076fc8ac Mon Sep 17 00:00:00 2001 From: Dan Lapid Date: Tue, 29 Nov 2022 10:26:09 +0000 Subject: [PATCH] Added linux scheduler --- include/cppcoro/detail/linux.hpp | 113 ++++++++++++++++++++ include/cppcoro/io_service.hpp | 5 + include/cppcoro/resume_on.hpp | 2 +- lib/CMakeLists.txt | 11 ++ lib/io_service.cpp | 161 +++++++++++++++++++++++++++-- lib/linux.cpp | 172 +++++++++++++++++++++++++++++++ test/CMakeLists.txt | 6 ++ 7 files changed, 458 insertions(+), 12 deletions(-) create mode 100644 include/cppcoro/detail/linux.hpp create mode 100644 lib/linux.cpp diff --git a/include/cppcoro/detail/linux.hpp b/include/cppcoro/detail/linux.hpp new file mode 100644 index 00000000..562e8f31 --- /dev/null +++ b/include/cppcoro/detail/linux.hpp @@ -0,0 +1,113 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright (c) Microsoft +// Licenced under MIT license. See LICENSE.txt for details. +/////////////////////////////////////////////////////////////////////////////// +#ifndef CPPCORO_DETAIL_LINUX_HPP_INCLUDED +#define CPPCORO_DETAIL_LINUX_HPP_INCLUDED + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace cppcoro +{ + namespace detail + { + namespace linux + { + using fd_t = int; + + enum message_type + { + CALLBACK_TYPE, + RESUME_TYPE + }; + + class safe_fd + { + public: + safe_fd() + : m_fd(-1) + { + } + + explicit safe_fd(fd_t fd) + : m_fd(fd) + { + } + + safe_fd(const safe_fd& other) = delete; + + safe_fd(safe_fd&& other) noexcept + : m_fd(other.m_fd) + { + other.m_fd = -1; + } + + ~safe_fd() { close(); } + + safe_fd& operator=(safe_fd fd) noexcept + { + swap(fd); + return *this; + } + + constexpr fd_t fd() const { return m_fd; } + + /// Calls close() and sets the fd to -1. + void close() noexcept; + + void swap(safe_fd& other) noexcept { std::swap(m_fd, other.m_fd); } + + bool operator==(const safe_fd& other) const { return m_fd == other.m_fd; } + + bool operator!=(const safe_fd& other) const { return m_fd != other.m_fd; } + + bool operator==(fd_t fd) const { return m_fd == fd; } + + bool operator!=(fd_t fd) const { return m_fd != fd; } + + private: + fd_t m_fd; + }; + + struct message + { + enum message_type m_type; + void* m_ptr; + }; + + struct io_state : linux::message + { + using callback_type = void(io_state* state); + callback_type* m_callback; + }; + + class message_queue + { + private: + int m_pipefd[2]; + safe_fd m_epollfd; + struct epoll_event m_ev; + + public: + message_queue(); + ~message_queue(); + bool enqueue_message(void* message, message_type type); + bool dequeue_message(void*& message, message_type& type, bool wait); + }; + + safe_fd create_event_fd(); + safe_fd create_timer_fd(); + safe_fd create_epoll_fd(); + } // namespace linux + } // namespace detail +} // namespace cppcoro + +#endif diff --git a/include/cppcoro/io_service.hpp b/include/cppcoro/io_service.hpp index b6b4dbdc..4fe60e80 100644 --- a/include/cppcoro/io_service.hpp +++ b/include/cppcoro/io_service.hpp @@ -11,6 +11,8 @@ #if CPPCORO_OS_WINNT # include +#elif CPPCORO_OS_LINUX +# include #endif #include @@ -172,6 +174,9 @@ namespace cppcoro std::atomic m_winsockInitialised; std::mutex m_winsockInitialisationMutex; + +#elif CPPCORO_OS_LINUX + detail::linux::message_queue m_mq; #endif // Head of a linked-list of schedule operations that are diff --git a/include/cppcoro/resume_on.hpp b/include/cppcoro/resume_on.hpp index b26188b5..3fced51a 100644 --- a/include/cppcoro/resume_on.hpp +++ b/include/cppcoro/resume_on.hpp @@ -117,7 +117,7 @@ namespace cppcoro template async_generator resume_on(SCHEDULER& scheduler, async_generator source) { - for (auto iter = co_await source.begin(); iter != source.end(); co_await ++iter) + for (detail::async_generator_iterator iter = co_await source.begin(); iter != source.end(); co_await ++iter) { auto& value = *iter; co_await scheduler.schedule(); diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 6f2a4d5d..09ae0d1d 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -152,6 +152,17 @@ if(WIN32) # TODO remove this when experimental/non-experimental include are fixed list(APPEND compile_definition _SILENCE_EXPERIMENTAL_FILESYSTEM_DEPRECATION_WARNING=1) endif() +elseif(UNIX) + set(linuxDetailIncludes + linux.hpp + ) + list(TRANSFORM linuxDetailIncludes PREPEND "${PROJECT_SOURCE_DIR}/include/cppcoro/detail/") + list(APPEND detailIncludes ${linuxDetailIncludes}) + + list(APPEND sources + linux.cpp + io_service.cpp + ) endif() add_library(cppcoro diff --git a/lib/io_service.cpp b/lib/io_service.cpp index 551c2800..840898f5 100644 --- a/lib/io_service.cpp +++ b/lib/io_service.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #if CPPCORO_OS_WINNT # ifndef WIN32_LEAN_AND_MEAN @@ -22,6 +23,10 @@ # include # include # include +#elif CPPCORO_OS_LINUX + typedef long long int LONGLONG; + typedef int DWORD; +# define INFINITE (DWORD)-1 //needed for timeout values in io_service::timer_thread_state::run() #endif namespace @@ -312,6 +317,10 @@ class cppcoro::io_service::timer_thread_state #if CPPCORO_OS_WINNT detail::win32::safe_handle m_wakeUpEvent; detail::win32::safe_handle m_waitableTimerEvent; +#elif CPPCORO_OS_LINUX + detail::linux::safe_fd m_wakeupfd; + detail::linux::safe_fd m_timerfd; + detail::linux::safe_fd m_epollfd; #endif std::atomic m_newlyQueuedTimers; @@ -335,6 +344,8 @@ cppcoro::io_service::io_service(std::uint32_t concurrencyHint) , m_iocpHandle(create_io_completion_port(concurrencyHint)) , m_winsockInitialised(false) , m_winsockInitialisationMutex() +#elif CPPCORO_OS_LINUX + , m_mq() #endif , m_scheduleOperations(nullptr) , m_timerState(nullptr) @@ -471,13 +482,12 @@ void cppcoro::io_service::notify_work_finished() noexcept } } +#if CPPCORO_OS_WINNT cppcoro::detail::win32::handle_t cppcoro::io_service::native_iocp_handle() noexcept { return m_iocpHandle.handle(); } -#if CPPCORO_OS_WINNT - void cppcoro::io_service::ensure_winsock_initialised() { if (!m_winsockInitialised.load(std::memory_order_acquire)) @@ -512,6 +522,10 @@ void cppcoro::io_service::schedule_impl(schedule_operation* operation) noexcept 0, reinterpret_cast(operation->m_awaiter.address()), nullptr); +#elif CPPCORO_OS_LINUX + const bool ok = m_mq.enqueue_message(reinterpret_cast(operation->m_awaiter.address()), + detail::linux::RESUME_TYPE); +#endif if (!ok) { // Failed to post to the I/O completion port. @@ -531,21 +545,24 @@ void cppcoro::io_service::schedule_impl(schedule_operation* operation) noexcept std::memory_order_release, std::memory_order_acquire)); } -#endif } void cppcoro::io_service::try_reschedule_overflow_operations() noexcept { -#if CPPCORO_OS_WINNT auto* operation = m_scheduleOperations.exchange(nullptr, std::memory_order_acquire); while (operation != nullptr) { auto* next = operation->m_next; +#if CPPCORO_OS_WINNT BOOL ok = ::PostQueuedCompletionStatus( m_iocpHandle.handle(), 0, reinterpret_cast(operation->m_awaiter.address()), nullptr); +#elif CPPCORO_OS_LINUX + bool ok = m_mq.enqueue_message(reinterpret_cast(operation->m_awaiter.address()), + detail::linux::RESUME_TYPE); +#endif if (!ok) { // Still unable to queue these operations. @@ -571,7 +588,6 @@ void cppcoro::io_service::try_reschedule_overflow_operations() noexcept operation = next; } -#endif } bool cppcoro::io_service::try_enter_event_loop() noexcept @@ -598,12 +614,12 @@ void cppcoro::io_service::exit_event_loop() noexcept bool cppcoro::io_service::try_process_one_event(bool waitForEvent) { -#if CPPCORO_OS_WINNT if (is_stop_requested()) { return false; } +#if CPPCORO_OS_WINNT const DWORD timeout = waitForEvent ? INFINITE : 0; while (true) @@ -673,6 +689,44 @@ bool cppcoro::io_service::try_process_one_event(bool waitForEvent) }; } } +#elif CPPCORO_OS_LINUX + while (true) + { + try_reschedule_overflow_operations(); + void* message = NULL; + detail::linux::message_type type = detail::linux::RESUME_TYPE; + + bool ok = m_mq.dequeue_message(message, type, waitForEvent); + + if (!ok) + { + return false; + } + + if (type == detail::linux::CALLBACK_TYPE) + { + auto* state = + static_cast(reinterpret_cast(message)); + + state->m_callback(state); + + return true; + } + else + { + if ((unsigned long long)message != 0) + { + cppcoro::coroutine_handle<>::from_address(reinterpret_cast(message)).resume(); + return true; + } + + if (is_stop_requested()) + { + return false; + } + } + } #endif } @@ -685,6 +739,8 @@ void cppcoro::io_service::post_wake_up_event() noexcept // and the system is out of memory. In this case threads should find other events // in the queue next time they check anyway and thus wake-up. (void)::PostQueuedCompletionStatus(m_iocpHandle.handle(), 0, 0, nullptr); +#elif CPPCORO_OS_LINUX + (void)m_mq.enqueue_message(NULL, detail::linux::RESUME_TYPE); #endif } @@ -715,12 +771,45 @@ cppcoro::io_service::timer_thread_state::timer_thread_state() #if CPPCORO_OS_WINNT : m_wakeUpEvent(create_auto_reset_event()) , m_waitableTimerEvent(create_waitable_timer_event()) +#elif CPPCORO_OS_LINUX + : m_wakeupfd(detail::linux::create_event_fd()) + , m_timerfd(detail::linux::create_timer_fd()) + , m_epollfd(detail::linux::create_epoll_fd()) #endif , m_newlyQueuedTimers(nullptr) , m_timerCancellationRequested(false) , m_shutDownRequested(false) , m_thread([this] { this->run(); }) { +#if CPPCORO_OS_LINUX + epoll_event wake_ev = { 0 }; + wake_ev.events = EPOLLIN; + wake_ev.data.fd = m_wakeupfd.fd(); + + if (epoll_ctl(m_epollfd.fd(), EPOLL_CTL_ADD, m_wakeupfd.fd(), &wake_ev) == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating io_service: epoll ctl wake ev" + }; + } + + epoll_event timer_ev = { 0 }; + timer_ev.events = EPOLLIN; + timer_ev.data.fd = m_timerfd.fd(); + + if (epoll_ctl(m_epollfd.fd(), EPOLL_CTL_ADD, m_timerfd.fd(), &timer_ev) == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating io_service: epoll ctl timer ev" + }; + } +#endif } cppcoro::io_service::timer_thread_state::~timer_thread_state() @@ -742,33 +831,62 @@ void cppcoro::io_service::timer_thread_state::request_timer_cancellation() noexc void cppcoro::io_service::timer_thread_state::run() noexcept { -#if CPPCORO_OS_WINNT using clock = std::chrono::high_resolution_clock; using time_point = clock::time_point; timer_queue timerQueue; +#if CPPCORO_OS_WINNT const DWORD waitHandleCount = 2; const HANDLE waitHandles[waitHandleCount] = { m_wakeUpEvent.handle(), m_waitableTimerEvent.handle() }; +#endif time_point lastSetWaitEventTime = time_point::max(); timed_schedule_operation* timersReadyToResume = nullptr; - DWORD timeout = INFINITE; while (!m_shutDownRequested.load(std::memory_order_relaxed)) { + bool waitEvent = false; + bool timerEvent = false; + DWORD timeout = INFINITE; +#if CPPCORO_OS_WINNT const DWORD waitResult = ::WaitForMultipleObjectsEx( waitHandleCount, waitHandles, FALSE, // waitAll timeout, FALSE); // alertable - if (waitResult == WAIT_OBJECT_0 || waitResult == WAIT_FAILED) + if (waitResult == WAIT_OBJECT_0 || waitResult == WAIT_FAILED) + { + waitEvent = true; + } + else if (waitResult == WAIT_OBJECT_0 + 1) + { + timerEvent = true; + } +#elif CPPCORO_OS_LINUX + epoll_event ev; + const int status = epoll_wait(m_epollfd.fd(), &ev, 1, timeout); + + if (status == 0 || status == -1 || (status == 1 && ev.data.fd == m_wakeupfd.fd())) + { + uint64_t count; + read(m_wakeupfd.fd(), &count, sizeof(uint64_t)); + waitEvent = true; + } + else if (status == 1 && ev.data.fd == m_timerfd.fd()) + { + uint64_t count; + read(m_timerfd.fd(), &count, sizeof(uint64_t)); + timerEvent = true; + } + #endif + if (waitEvent) { // Wake-up event (WAIT_OBJECT_0) // @@ -805,7 +923,7 @@ void cppcoro::io_service::timer_thread_state::run() noexcept } } } - else if (waitResult == (WAIT_OBJECT_0 + 1)) + else if (timerEvent) { lastSetWaitEventTime = time_point::max(); } @@ -830,6 +948,7 @@ void cppcoro::io_service::timer_thread_state::run() noexcept auto timeUntilNextDueTime = earliestDueTime - currentTime; + #if CPPCORO_OS_WINNT // Negative value indicates relative time. LARGE_INTEGER dueTime; dueTime.QuadPart = -std::chrono::duration_cast(timeUntilNextDueTime).count(); @@ -848,6 +967,24 @@ void cppcoro::io_service::timer_thread_state::run() noexcept nullptr, nullptr, resumeFromSuspend); +#elif CPPCORO_OS_LINUX + itimerspec alarm_time = { 0 }; + alarm_time.it_value.tv_sec = + std::chrono:: + duration_cast(timeUntilNextDueTime).count(); + alarm_time.it_value.tv_nsec = + (std::chrono:: + duration_cast(timeUntilNextDueTime).count() % 10000000); + if (alarm_time.it_value.tv_sec == 0 && alarm_time.it_value.tv_nsec == 0) + { + //linux timer of 0 time will not generate events + //so let's set it to 1 nsec + alarm_time.it_value.tv_nsec = 1; + } + const bool ok = timerfd_settime(m_timerfd.fd(), 0, &alarm_time, NULL) != -1; +#endif if (ok) { lastSetWaitEventTime = earliestDueTime; @@ -900,13 +1037,15 @@ void cppcoro::io_service::timer_thread_state::run() noexcept timersReadyToResume = nextTimer; } } -#endif } void cppcoro::io_service::timer_thread_state::wake_up_timer_thread() noexcept { #if CPPCORO_OS_WINNT (void)::SetEvent(m_wakeUpEvent.handle()); +#elif CPPCORO_OS_LINUX + uint64_t count = 1; + (void)write(m_wakeupfd.fd(), &count, sizeof(uint64_t)); #endif } diff --git a/lib/linux.cpp b/lib/linux.cpp new file mode 100644 index 00000000..13dd0069 --- /dev/null +++ b/lib/linux.cpp @@ -0,0 +1,172 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright (c) Microsoft +// Licenced under MIT license. See LICENSE.txt for details. +/////////////////////////////////////////////////////////////////////////////// +#include +#include +#include +#include +#include + +namespace cppcoro +{ + namespace detail + { + namespace linux + { + message_queue::message_queue() + { + if (pipe2(m_pipefd, O_NONBLOCK) == -1) { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating io_service: failed creating pipe" + }; + } + m_epollfd = safe_fd{create_epoll_fd()}; + m_ev.data.fd = m_pipefd[0]; + m_ev.events = EPOLLIN; + + if(epoll_ctl(m_epollfd.fd(), EPOLL_CTL_ADD, m_pipefd[0], &m_ev) == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating io_service: epoll ctl pipe" + }; + } + } + + message_queue::~message_queue() + { + assert(close(m_pipefd[0]) == 0); + assert(close(m_pipefd[1]) == 0); + } + + bool message_queue::enqueue_message(void* msg, message_type type) + { + message qmsg; + qmsg.m_type = type; + qmsg.m_ptr = msg; + int status = write(m_pipefd[1], (const char*)&qmsg, sizeof(message)); + return status==-1?false:true; + } + + bool message_queue::dequeue_message(void*& msg, message_type& type, bool wait) + { + struct epoll_event ev = {0}; + int nfds = epoll_wait(m_epollfd.fd(), &ev, 1, wait?-1:0); + + if(nfds == -1) + { + if (errno == EINTR || errno == EAGAIN) { + return false; + } + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error in epoll_wait run loop" + }; + } + + if(nfds == 0 && !wait) + { + return false; + } + + if(nfds == 0 && wait) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error in epoll_wait run loop" + }; + } + + message qmsg; + ssize_t status = read(m_pipefd[0], (char*)&qmsg, sizeof(message)); + + if(status == -1) + { + if (errno == EINTR || errno == EAGAIN) { + return false; + } + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error retrieving message from message queue: mq_receive" + }; + } + + msg = qmsg.m_ptr; + type = qmsg.m_type; + return true; + } + + safe_fd create_event_fd() + { + int fd = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK | EFD_CLOEXEC); + + if(fd == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating io_service: event fd create" + }; + } + + return safe_fd{fd}; + } + + safe_fd create_timer_fd() + { + int fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); + + if(fd == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating io_service: timer fd create" + }; + } + + return safe_fd{fd}; + } + + safe_fd create_epoll_fd() + { + int fd = epoll_create1(EPOLL_CLOEXEC); + + if(fd == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating timer thread: epoll create" + }; + } + + return safe_fd{fd}; + } + + void safe_fd::close() noexcept + { + if(m_fd != -1) + { + ::close(m_fd); + m_fd = -1; + } + } + } + } +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index f5afecd1..b381bcc6 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -46,6 +46,12 @@ if(WIN32) socket_tests.cpp ) else() + if(UNIX) + list(APPEND tests + scheduling_operator_tests.cpp + io_service_tests.cpp + ) + endif() # let more time for some tests set(async_auto_reset_event_tests_TIMEOUT 60) endif()