diff --git a/.travis.yml b/.travis.yml index 4cc705a3..bf1f6c3e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,8 +19,8 @@ addons: - llvm-toolchain-trusty packages: - python2.7 - - clang-6.0 - - lld-6.0 + - clang-7 + - lld-7 - ninja-build cache: @@ -37,12 +37,12 @@ env: - CMAKE_VERSION="3.9.1" matrix: include: - - env: RELEASE=debug ARCH=x64 CLANG_VERSION=6.0 - - env: RELEASE=optimised ARCH=x64 CLANG_VERSION=6.0 + - env: RELEASE=debug ARCH=x64 CLANG_VERSION=7 + - env: RELEASE=optimised ARCH=x64 CLANG_VERSION=7 allow_failures: # Clang 6.0~svn320382 has a bug that causes optimised builds to fail. # See https://bugs.llvm.org/show_bug.cgi?id=34897 - - env: RELEASE=optimised ARCH=x64 CLANG_VERSION=6.0 + - env: RELEASE=optimised ARCH=x64 CLANG_VERSION=7 before_install: - export CC="$CC-$CLANG_VERSION" diff --git a/README.md b/README.md index 435f764d..b4fa93b1 100644 --- a/README.md +++ b/README.md @@ -1854,7 +1854,7 @@ task<> example() ``` API Summary: -``` +```c++ // namespace cppcoro { diff --git a/appveyor.yml b/appveyor.yml index 4e2a5001..4ad544d7 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,9 +1,8 @@ - version: 1.0.{build} image: - Visual Studio 2017 -#- Visual Studio 2017 Preview +- Visual Studio 2017 Preview platform: - x64 @@ -13,6 +12,13 @@ configuration: - debug - optimised +matrix: + # Allow failures under MSVC x86 optimised since there are some known compiler + # bugs causing failures here. + allow_failures: + - platform: x86 + configuration: optimised + clone_script: - ps: git clone -q $("--branch=" + $env:APPVEYOR_REPO_BRANCH) $("https://github.com/" + $env:APPVEYOR_REPO_NAME + ".git") $env:APPVEYOR_BUILD_FOLDER - ps: if (!$env:APPVEYOR_PULL_REQUEST_NUMBER) {$("git checkout -qf " + $env:APPVEYOR_REPO_COMMIT)} diff --git a/config.cake b/config.cake index 2332813f..da31b676 100644 --- a/config.cake +++ b/config.cake @@ -117,6 +117,8 @@ if cake.system.isWindows() or cake.system.isCygwin(): # Enable C++17 features like std::optional<> compiler.addCppFlag('/std:c++latest') + compiler.addDefine('_SILENCE_CXX17_RESULT_OF_DEPRECATION_WARNING') + compiler.addProgramFlag('/nodefaultlib') compiler.addModuleFlag('/nodefaultlib') @@ -137,7 +139,7 @@ if cake.system.isWindows() or cake.system.isCygwin(): compiler.runtimeLibraries = 'debug-dll' compiler.addLibrary('msvcrtd') compiler.addLibrary('msvcprtd') - compiler.addLibrary('msvcurtd') + compiler.addLibrary('vcruntimed') compiler.addLibrary('ucrtd') compiler.addLibrary('oldnames') @@ -159,7 +161,7 @@ if cake.system.isWindows() or cake.system.isCygwin(): compiler.runtimeLibraries = 'release-dll' compiler.addLibrary('msvcrt') compiler.addLibrary('msvcprt') - compiler.addLibrary('msvcurt') + compiler.addLibrary('vcruntime') compiler.addLibrary('ucrt') compiler.addLibrary('oldnames') @@ -293,6 +295,8 @@ elif cake.system.isLinux() or cake.system.isDarwin(): compiler.addLibrary('c++') compiler.addLibrary('c') compiler.addLibrary('pthread') + compiler.addLibrary('rt') + compiler.addLibrary('uuid') #compiler.addProgramFlag('-Wl,--trace') #compiler.addProgramFlag('-Wl,-v') diff --git a/include/cppcoro/async_generator.hpp b/include/cppcoro/async_generator.hpp index 49cd05ce..f188d195 100644 --- a/include/cppcoro/async_generator.hpp +++ b/include/cppcoro/async_generator.hpp @@ -117,8 +117,8 @@ namespace cppcoro // State transition diagram // VNRCA - value_not_ready_consumer_active // VNRCS - value_not_ready_consumer_suspended - // VRPA - value_ready_consumer_active - // VRPS - value_ready_consumer_suspended + // VRPA - value_ready_producer_active + // VRPS - value_ready_producer_suspended // // A +--- VNRCA --[C]--> VNRCS yield_value() // | | | A | A | . diff --git a/include/cppcoro/detail/linux.hpp b/include/cppcoro/detail/linux.hpp new file mode 100644 index 00000000..136ff4ff --- /dev/null +++ b/include/cppcoro/detail/linux.hpp @@ -0,0 +1,133 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright (c) Microsoft +// Licenced under MIT license. See LICENSE.txt for details. +/////////////////////////////////////////////////////////////////////////////// +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace cppcoro +{ + namespace detail + { + namespace linux + { + using fd_t = int; + + enum message_type + { + CALLBACK_TYPE, + RESUME_TYPE + }; + + class safe_fd + { + public: + + safe_fd() + : m_fd(-1) + {} + + explicit safe_fd(fd_t fd) + : m_fd(fd) + {} + + safe_fd(const safe_fd& other) = delete; + + safe_fd(safe_fd&& other) noexcept + : m_fd(other.m_fd) + { + other.m_fd = -1; + } + + ~safe_fd() + { + close(); + } + + safe_fd& operator=(safe_fd fd) noexcept + { + swap(fd); + return *this; + } + + constexpr fd_t fd() const { return m_fd; } + + /// Calls close() and sets the fd to -1. + void close() noexcept; + + void swap(safe_fd& other) noexcept + { + std::swap(m_fd, other.m_fd); + } + + bool operator==(const safe_fd& other) const + { + return m_fd == other.m_fd; + } + + bool operator!=(const safe_fd& other) const + { + return m_fd != other.m_fd; + } + + bool operator==(fd_t fd) const + { + return m_fd == fd; + } + + bool operator!=(fd_t fd) const + { + return m_fd != fd; + } + + private: + + fd_t m_fd; + + }; + + struct message + { + enum message_type m_type; + void* m_ptr; + }; + + struct io_state : linux::message + { + using callback_type = void(io_state* state); + callback_type* m_callback; + }; + + class message_queue + { + private: + mqd_t m_mqdt; + char m_qname[NAME_MAX]; + safe_fd m_epollfd; + struct epoll_event m_ev; + message_queue(); + public: + message_queue(size_t queue_length); + ~message_queue(); + bool enqueue_message(void* message, message_type type); + bool dequeue_message(void*& message, message_type& type, bool wait); + }; + + safe_fd create_event_fd(); + safe_fd create_timer_fd(); + safe_fd create_epoll_fd(); + } + } +} diff --git a/include/cppcoro/file_write_operation.hpp b/include/cppcoro/file_write_operation.hpp index 2b6a5a0a..49b23883 100644 --- a/include/cppcoro/file_write_operation.hpp +++ b/include/cppcoro/file_write_operation.hpp @@ -66,7 +66,7 @@ namespace cppcoro private: - friend class cppcoro::detail::win32_overlapped_operation; + friend class cppcoro::detail::win32_overlapped_operation_cancellable; bool try_start() noexcept; void cancel() noexcept; diff --git a/include/cppcoro/io_service.hpp b/include/cppcoro/io_service.hpp index 27867c58..9f100a80 100644 --- a/include/cppcoro/io_service.hpp +++ b/include/cppcoro/io_service.hpp @@ -13,12 +13,17 @@ # include #endif +#if CPPCORO_OS_LINUX +#include +#endif + #include #include #include #include #include #include +#include namespace cppcoro { @@ -42,8 +47,11 @@ namespace cppcoro /// actively processing events. /// Note that the number of active threads may temporarily go /// above this number. +#if CPPCORO_OS_WINNT io_service(std::uint32_t concurrencyHint); - +#elif CPPCORO_OS_LINUX + io_service(size_t queue_length); +#endif ~io_service(); io_service(io_service&& other) = delete; @@ -147,6 +155,10 @@ namespace cppcoro void try_reschedule_overflow_operations() noexcept; + void queue_overflow_operation_to_head(schedule_operation* operation) noexcept; + + void queue_overflow_operation_to_tail(schedule_operation* operation) noexcept; + bool try_enter_event_loop() noexcept; void exit_event_loop() noexcept; @@ -169,6 +181,9 @@ namespace cppcoro detail::win32::safe_handle m_iocpHandle; #endif +#if CPPCORO_OS_LINUX + detail::linux::message_queue* m_mq; +#endif // Head of a linked-list of schedule operations that are // ready to run but that failed to be queued to the I/O // completion port (eg. due to low memory). diff --git a/lib/build.cake b/lib/build.cake index b3ed9c03..8faea0db 100644 --- a/lib/build.cake +++ b/lib/build.cake @@ -82,6 +82,7 @@ sources = script.cwd([ 'ipv4_endpoint.cpp', 'ipv6_address.cpp', 'ipv6_endpoint.cpp', + 'io_service.cpp', ]) extras = script.cwd([ @@ -89,6 +90,14 @@ extras = script.cwd([ 'use.cake', ]) +if variant.platform == "linux": + detailIncludes.extend(cake.path.join(env.expand('${CPPCORO}'), 'include', 'cppcoro', 'detail', [ + 'linux.hpp', + ])) + sources.extend(script.cwd([ + 'linux.cpp', + ])) + if variant.platform == "windows": detailIncludes.extend(cake.path.join(env.expand('${CPPCORO}'), 'include', 'cppcoro', 'detail', [ 'win32.hpp', @@ -96,7 +105,6 @@ if variant.platform == "windows": ])) sources.extend(script.cwd([ 'win32.cpp', - 'io_service.cpp', 'file.cpp', 'readable_file.cpp', 'writable_file.cpp', diff --git a/lib/file.cpp b/lib/file.cpp index c53a89f6..3583217f 100644 --- a/lib/file.cpp +++ b/lib/file.cpp @@ -10,7 +10,9 @@ #include #if CPPCORO_OS_WINNT -# define WIN32_LEAN_AND_MEAN +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif # include #endif diff --git a/lib/file_read_operation.cpp b/lib/file_read_operation.cpp index 399d914e..fb9e32cc 100644 --- a/lib/file_read_operation.cpp +++ b/lib/file_read_operation.cpp @@ -6,7 +6,9 @@ #include #if CPPCORO_OS_WINNT -# define WIN32_LEAN_AND_MEAN +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif # include bool cppcoro::file_read_operation::try_start() noexcept diff --git a/lib/file_write_operation.cpp b/lib/file_write_operation.cpp index 92e93660..fd9b465e 100644 --- a/lib/file_write_operation.cpp +++ b/lib/file_write_operation.cpp @@ -6,7 +6,9 @@ #include #if CPPCORO_OS_WINNT -# define WIN32_LEAN_AND_MEAN +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif # include bool cppcoro::file_write_operation::try_start() noexcept diff --git a/lib/io_service.cpp b/lib/io_service.cpp index 8031775e..d08f74d2 100644 --- a/lib/io_service.cpp +++ b/lib/io_service.cpp @@ -12,11 +12,21 @@ #include #if CPPCORO_OS_WINNT -# define WIN32_LEAN_AND_MEAN -# define NOMINMAX +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif +# ifndef NOMINMAX +# define NOMINMAX +# endif # include #endif +#if CPPCORO_OS_LINUX +typedef int DWORD; +#define INFINITE (DWORD)-1 //needed for timeout values in io_service::timer_thread_state::run() +typedef long long int LONGLONG; +#endif + namespace { #if CPPCORO_OS_WINNT @@ -298,7 +308,7 @@ class cppcoro::io_service::timer_thread_state void request_timer_cancellation() noexcept; - void run() noexcept; + void run(); void wake_up_timer_thread() noexcept; @@ -307,6 +317,12 @@ class cppcoro::io_service::timer_thread_state detail::win32::safe_handle m_waitableTimerEvent; #endif +#if CPPCORO_OS_LINUX + detail::linux::safe_fd m_wakeupfd; + detail::linux::safe_fd m_timerfd; + detail::linux::safe_fd m_epollfd; +#endif + std::atomic m_newlyQueuedTimers; std::atomic m_timerCancellationRequested; std::atomic m_shutDownRequested; @@ -317,15 +333,27 @@ class cppcoro::io_service::timer_thread_state cppcoro::io_service::io_service() +#if CPPCORO_OS_WINNT : io_service(0) +#elif CPPCORO_OS_LINUX + : io_service(10) +#endif { } -cppcoro::io_service::io_service(std::uint32_t concurrencyHint) +cppcoro::io_service::io_service( +#if CPPCORO_OS_WINNT + std::uint32_t concurrencyHint +#elif CPPCORO_OS_LINUX + size_t queue_length +#endif +) : m_threadState(0) , m_workCount(0) #if CPPCORO_OS_WINNT , m_iocpHandle(create_io_completion_port(concurrencyHint)) +#elif CPPCORO_OS_LINUX + , m_mq(new detail::linux::message_queue(queue_length)) #endif , m_scheduleOperations(nullptr) , m_timerState(nullptr) @@ -338,6 +366,10 @@ cppcoro::io_service::~io_service() assert(m_threadState.load(std::memory_order_relaxed) < active_thread_count_increment); delete m_timerState.load(std::memory_order_relaxed); + +#if CPPCORO_OS_LINUX + delete m_mq; +#endif } cppcoro::io_service::schedule_operation cppcoro::io_service::schedule() noexcept @@ -453,10 +485,12 @@ void cppcoro::io_service::notify_work_finished() noexcept } } +#if CPPCORO_OS_WINNT cppcoro::detail::win32::handle_t cppcoro::io_service::native_iocp_handle() noexcept { return m_iocpHandle.handle(); } +#endif void cppcoro::io_service::schedule_impl(schedule_operation* operation) noexcept { @@ -466,6 +500,10 @@ void cppcoro::io_service::schedule_impl(schedule_operation* operation) noexcept 0, reinterpret_cast(operation->m_awaiter.address()), nullptr); +#elif CPPCORO_OS_LINUX + const bool ok = m_mq->enqueue_message(reinterpret_cast(operation->m_awaiter.address()), + detail::linux::RESUME_TYPE); +#endif if (!ok) { // Failed to post to the I/O completion port. @@ -485,21 +523,24 @@ void cppcoro::io_service::schedule_impl(schedule_operation* operation) noexcept std::memory_order_release, std::memory_order_acquire)); } -#endif } void cppcoro::io_service::try_reschedule_overflow_operations() noexcept { -#if CPPCORO_OS_WINNT auto* operation = m_scheduleOperations.exchange(nullptr, std::memory_order_acquire); while (operation != nullptr) { auto* next = operation->m_next; +#if CPPCORO_OS_WINNT BOOL ok = ::PostQueuedCompletionStatus( m_iocpHandle.handle(), 0, reinterpret_cast(operation->m_awaiter.address()), nullptr); +#elif CPPCORO_OS_LINUX + bool ok = m_mq->enqueue_message(reinterpret_cast(operation->m_awaiter.address()), + detail::linux::RESUME_TYPE); +#endif if (!ok) { // Still unable to queue these operations. @@ -525,7 +566,6 @@ void cppcoro::io_service::try_reschedule_overflow_operations() noexcept operation = next; } -#endif } bool cppcoro::io_service::try_enter_event_loop() noexcept @@ -552,12 +592,12 @@ void cppcoro::io_service::exit_event_loop() noexcept bool cppcoro::io_service::try_process_one_event(bool waitForEvent) { -#if CPPCORO_OS_WINNT if (is_stop_requested()) { return false; } +#if CPPCORO_OS_WINNT const DWORD timeout = waitForEvent ? INFINITE : 0; while (true) @@ -627,6 +667,45 @@ bool cppcoro::io_service::try_process_one_event(bool waitForEvent) }; } } +#elif CPPCORO_OS_LINUX + while (true) + { + try_reschedule_overflow_operations(); + void* message = NULL; + detail::linux::message_type type = detail::linux::RESUME_TYPE; + + bool status = m_mq->dequeue_message(message, type, waitForEvent); + + if (!status) + { + return false; + } + + if (type == detail::linux::CALLBACK_TYPE) + { + auto* state = + static_cast(reinterpret_cast(message)); + + state->m_callback(state); + + return true; + } + else + { + if ((unsigned long long)message != 0) + { + std::experimental + ::coroutine_handle<>::from_address(reinterpret_cast(message)).resume(); + return true; + } + + if (is_stop_requested()) + { + return false; + } + } + } #endif } @@ -639,6 +718,8 @@ void cppcoro::io_service::post_wake_up_event() noexcept // and the system is out of memory. In this case threads should find other events // in the queue next time they check anyway and thus wake-up. (void)::PostQueuedCompletionStatus(m_iocpHandle.handle(), 0, 0, nullptr); +#elif CPPCORO_OS_LINUX + (void)m_mq->enqueue_message(NULL, detail::linux::RESUME_TYPE); #endif } @@ -669,12 +750,45 @@ cppcoro::io_service::timer_thread_state::timer_thread_state() #if CPPCORO_OS_WINNT : m_wakeUpEvent(create_auto_reset_event()) , m_waitableTimerEvent(create_waitable_timer_event()) +#elif CPPCORO_OS_LINUX + :m_wakeupfd(detail::linux::create_event_fd()) + , m_timerfd(detail::linux::create_timer_fd()) + , m_epollfd(detail::linux::create_epoll_fd()) #endif , m_newlyQueuedTimers(nullptr) , m_timerCancellationRequested(false) , m_shutDownRequested(false) , m_thread([this] { this->run(); }) { +#if CPPCORO_OS_LINUX + epoll_event wake_ev = { 0 }; + wake_ev.events = EPOLLIN; + wake_ev.data.fd = m_wakeupfd.fd(); + + if (epoll_ctl(m_epollfd.fd(), EPOLL_CTL_ADD, m_wakeupfd.fd(), &wake_ev) == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating io_service: epoll ctl wake ev" + }; + } + + epoll_event timer_ev = { 0 }; + timer_ev.events = EPOLLIN; + timer_ev.data.fd = m_timerfd.fd(); + + if (epoll_ctl(m_epollfd.fd(), EPOLL_CTL_ADD, m_timerfd.fd(), &timer_ev) == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating io_service: epoll ctl timer ev" + }; + } +#endif } cppcoro::io_service::timer_thread_state::~timer_thread_state() @@ -694,21 +808,21 @@ void cppcoro::io_service::timer_thread_state::request_timer_cancellation() noexc } } -void cppcoro::io_service::timer_thread_state::run() noexcept +void cppcoro::io_service::timer_thread_state::run() { -#if CPPCORO_OS_WINNT using clock = std::chrono::high_resolution_clock; using time_point = clock::time_point; timer_queue timerQueue; +#if CPPCORO_OS_WINNT const DWORD waitHandleCount = 2; const HANDLE waitHandles[waitHandleCount] = { m_wakeUpEvent.handle(), m_waitableTimerEvent.handle() }; - +#endif time_point lastSetWaitEventTime = time_point::max(); timed_schedule_operation* timersReadyToResume = nullptr; @@ -716,13 +830,60 @@ void cppcoro::io_service::timer_thread_state::run() noexcept DWORD timeout = INFINITE; while (!m_shutDownRequested.load(std::memory_order_relaxed)) { + bool waitEvent = false; + bool timerEvent = false; +#if CPPCORO_OS_WINNT const DWORD waitResult = ::WaitForMultipleObjectsEx( waitHandleCount, waitHandles, FALSE, // waitAll timeout, FALSE); // alertable + if (waitResult == WAIT_OBJECT_0 || waitResult == WAIT_FAILED) + { + waitEvent = true; + } + else if (waitResult == WAIT_OBJECT_0 + 1) + { + timerEvent = true; + } +#elif CPPCORO_OS_LINUX + epoll_event ev; + const int status = epoll_wait(m_epollfd.fd(), &ev, 1, timeout); + + if (status == 0 || status == -1 || (status == 1 && ev.data.fd == m_wakeupfd.fd())) + { + uint64_t count; + if (read(m_wakeupfd.fd(), &count, sizeof(uint64_t)) != sizeof(uint64_t)) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error in timer thread: eventfd read" + }; + } + + waitEvent = true; + } + else if (status == 1 && ev.data.fd == m_timerfd.fd()) + { + uint64_t count; + if (read(m_timerfd.fd(), &count, sizeof(uint64_t)) != sizeof(uint64_t)) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error in timer thread: timerfd read" + }; + } + + timerEvent = true; + } +#endif + if (waitEvent) { // Wake-up event (WAIT_OBJECT_0) // @@ -759,7 +920,7 @@ void cppcoro::io_service::timer_thread_state::run() noexcept } } } - else if (waitResult == (WAIT_OBJECT_0 + 1)) + else if (timerEvent) { lastSetWaitEventTime = time_point::max(); } @@ -785,6 +946,7 @@ void cppcoro::io_service::timer_thread_state::run() noexcept auto timeUntilNextDueTime = earliestDueTime - currentTime; // Negative value indicates relative time. +#if CPPCORO_OS_WINNT LARGE_INTEGER dueTime; dueTime.QuadPart = -std::chrono::duration_cast(timeUntilNextDueTime).count(); @@ -802,6 +964,33 @@ void cppcoro::io_service::timer_thread_state::run() noexcept nullptr, nullptr, resumeFromSuspend); +#elif CPPCORO_OS_LINUX + bool ok = false; + itimerspec alarm_time = { 0 }; + alarm_time.it_value.tv_sec = + std::chrono:: + duration_cast(timeUntilNextDueTime).count(); + alarm_time.it_value.tv_nsec = + (std::chrono:: + duration_cast(timeUntilNextDueTime).count() % 10000000); + if (alarm_time.it_value.tv_sec == 0 && alarm_time.it_value.tv_nsec == 0) + { + //linux timer of 0 time will not generate events + //so let's set it to 1 nsec + alarm_time.it_value.tv_nsec = 1; + } + + if (timerfd_settime(m_timerfd.fd(), 0, &alarm_time, NULL) == -1) + { + ok = false; + } + else + { + ok = true; + } +#endif if (ok) { lastSetWaitEventTime = earliestDueTime; @@ -854,13 +1043,15 @@ void cppcoro::io_service::timer_thread_state::run() noexcept timersReadyToResume = nextTimer; } } -#endif } void cppcoro::io_service::timer_thread_state::wake_up_timer_thread() noexcept { #if CPPCORO_OS_WINNT (void)::SetEvent(m_wakeUpEvent.handle()); +#elif CPPCORO_OS_LINUX + uint64_t count = 1; + (void)write(m_wakeupfd.fd(), &count, sizeof(uint64_t)); #endif } diff --git a/lib/lightweight_manual_reset_event.cpp b/lib/lightweight_manual_reset_event.cpp index 0d356005..1b6d0d9c 100644 --- a/lib/lightweight_manual_reset_event.cpp +++ b/lib/lightweight_manual_reset_event.cpp @@ -8,7 +8,9 @@ #include #if CPPCORO_OS_WINNT -# define WIN32_LEAN_AND_MEAN +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif # include # if CPPCORO_OS_WINNT >= 0x0602 diff --git a/lib/linux.cpp b/lib/linux.cpp new file mode 100644 index 00000000..ab60d8b4 --- /dev/null +++ b/lib/linux.cpp @@ -0,0 +1,205 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright (c) Microsoft +// Licenced under MIT license. See LICENSE.txt for details. +/////////////////////////////////////////////////////////////////////////////// +#include +#include +#include + +#define UUID_STRING_SIZE 36 + +namespace cppcoro +{ + namespace detail + { + namespace linux + { + message_queue::message_queue(size_t queue_length) + { + m_mqdt = -1; + uuid_t unique_name; + const char* cppcoro_qname_prefix = "/cppcoro-"; + + if(NAME_MAX < UUID_STRING_SIZE + strlen(cppcoro_qname_prefix) + 1) + { + throw std::system_error + { + static_cast(EINVAL), + std::system_category(), + "Error creating message queue: system name max length too small" + }; + } + + strncpy(m_qname, cppcoro_qname_prefix, NAME_MAX); + + for(;;) + { + uuid_generate(unique_name); + uuid_unparse(unique_name, m_qname + sizeof(cppcoro_qname_prefix)); + + struct mq_attr attr; + attr.mq_flags = O_NONBLOCK; + attr.mq_maxmsg = queue_length; + attr.mq_msgsize = sizeof(cppcoro::detail::linux::message); + attr.mq_curmsgs = 0; + + m_mqdt = mq_open(m_qname, O_RDWR | O_CREAT | O_EXCL | O_CLOEXEC, S_IRWXU, &attr); + + if( m_mqdt == -1 && errno == EEXIST) + { + continue; + } + + if( m_mqdt == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating io_service: message queue open" + }; + } + + break; + } + + m_epollfd = safe_fd{create_epoll_fd()}; + m_ev.data.fd = m_mqdt; + m_ev.events = EPOLLIN; + + if(epoll_ctl(m_epollfd.fd(), EPOLL_CTL_ADD, m_mqdt, &m_ev) == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating io_service: epoll ctl mqdt" + }; + } + } + + message_queue::~message_queue() + { + assert(mq_close(m_mqdt) == 0); + assert(mq_unlink(m_qname) == 0); + } + + bool message_queue::enqueue_message(void* msg, message_type type) + { + message qmsg; + qmsg.m_type = type; + qmsg.m_ptr = msg; + int status = mq_send(m_mqdt, (const char*)&qmsg, sizeof(message), 0); + return status==-1?false:true; + } + + bool message_queue::dequeue_message(void*& msg, message_type& type, bool wait) + { + struct epoll_event ev = {0}; + int nfds = epoll_wait(m_epollfd.fd(), &ev, 1, wait?-1:0); + + if(nfds == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error in epoll_wait run loop" + }; + } + + if(nfds == 0 && !wait) + { + return false; + } + + if(nfds == 0 && wait) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error in epoll_wait run loop" + }; + } + + message qmsg; + ssize_t status = mq_receive(m_mqdt, (char*)&qmsg, sizeof(message), NULL); + + if(status == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error retrieving message from message queue: mq_receive" + }; + } + + msg = qmsg.m_ptr; + type = qmsg.m_type; + return true; + } + + safe_fd create_event_fd() + { + int fd = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK | EFD_CLOEXEC); + + if(fd == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating io_service: event fd create" + }; + } + + return safe_fd{fd}; + } + + safe_fd create_timer_fd() + { + int fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); + + if(fd == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating io_service: timer fd create" + }; + } + + return safe_fd{fd}; + } + + safe_fd create_epoll_fd() + { + int fd = epoll_create1(EPOLL_CLOEXEC); + + if(fd == -1) + { + throw std::system_error + { + static_cast(errno), + std::system_category(), + "Error creating timer thread: epoll create" + }; + } + + return safe_fd{fd}; + } + + void safe_fd::close() noexcept + { + if(m_fd != -1) + { + ::close(m_fd); + m_fd = -1; + } + } + } + } +} diff --git a/lib/read_only_file.cpp b/lib/read_only_file.cpp index 058e024c..614aa48a 100644 --- a/lib/read_only_file.cpp +++ b/lib/read_only_file.cpp @@ -6,7 +6,9 @@ #include #if CPPCORO_OS_WINNT -# define WIN32_LEAN_AND_MEAN +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif # include cppcoro::read_only_file cppcoro::read_only_file::open( diff --git a/lib/read_write_file.cpp b/lib/read_write_file.cpp index 4c3815e1..231e6790 100644 --- a/lib/read_write_file.cpp +++ b/lib/read_write_file.cpp @@ -6,7 +6,9 @@ #include #if CPPCORO_OS_WINNT -# define WIN32_LEAN_AND_MEAN +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif # include cppcoro::read_write_file cppcoro::read_write_file::open( diff --git a/lib/win32.cpp b/lib/win32.cpp index 4a3d183d..b7b497bb 100644 --- a/lib/win32.cpp +++ b/lib/win32.cpp @@ -5,7 +5,9 @@ #include -#define WIN32_LEAN_AND_MEAN +#ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +#endif #include void cppcoro::detail::win32::safe_handle::close() noexcept diff --git a/lib/writable_file.cpp b/lib/writable_file.cpp index b1bd2b8e..3ba63e7c 100644 --- a/lib/writable_file.cpp +++ b/lib/writable_file.cpp @@ -8,7 +8,9 @@ #include #if CPPCORO_OS_WINNT -# define WIN32_LEAN_AND_MEAN +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif # include void cppcoro::writable_file::set_size( diff --git a/lib/write_only_file.cpp b/lib/write_only_file.cpp index c30f3920..4c91bddf 100644 --- a/lib/write_only_file.cpp +++ b/lib/write_only_file.cpp @@ -6,7 +6,9 @@ #include #if CPPCORO_OS_WINNT -# define WIN32_LEAN_AND_MEAN +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif # include cppcoro::write_only_file cppcoro::write_only_file::open( diff --git a/test/build.cake b/test/build.cake index 69d0c953..02346b0f 100644 --- a/test/build.cake +++ b/test/build.cake @@ -39,12 +39,12 @@ sources = script.cwd([ 'ipv4_endpoint_tests.cpp', 'ipv6_address_tests.cpp', 'ipv6_endpoint_tests.cpp', + 'scheduling_operator_tests.cpp', + 'io_service_tests.cpp', ]) if variant.platform == 'windows': sources += script.cwd([ - 'scheduling_operator_tests.cpp', - 'io_service_tests.cpp', 'file_tests.cpp', ]) diff --git a/test/doctest/doctest.h b/test/doctest/doctest.h index 7ab30067..e70d3f10 100644 --- a/test/doctest/doctest.h +++ b/test/doctest/doctest.h @@ -418,7 +418,8 @@ extern "C" __declspec(dllimport) void __stdcall DebugBreak(); #ifdef _LIBCPP_VERSION // not forward declaring ostream for libc++ because I had some problems (inline namespaces vs c++98) // so the header is used - also it is very light and doesn't drag a ton of stuff -#include +//#include +#include #else // _LIBCPP_VERSION #ifndef DOCTEST_CONFIG_USE_IOSFWD namespace std diff --git a/test/io_service_fixture.hpp b/test/io_service_fixture.hpp index 05f7fb0b..9552ebeb 100644 --- a/test/io_service_fixture.hpp +++ b/test/io_service_fixture.hpp @@ -25,7 +25,10 @@ struct io_service_fixture m_ioThreads.reserve(threadCount); try { - m_ioThreads.emplace_back([this] { m_ioService.process_events(); }); + for (std::uint32_t i = 0; i < threadCount; ++i) + { + m_ioThreads.emplace_back([this] { m_ioService.process_events(); }); + } } catch (...) { diff --git a/test/scheduling_operator_tests.cpp b/test/scheduling_operator_tests.cpp index a44efef3..6d7b368c 100644 --- a/test/scheduling_operator_tests.cpp +++ b/test/scheduling_operator_tests.cpp @@ -17,102 +17,123 @@ TEST_SUITE_BEGIN("schedule/resume_on"); +#if CPPCORO_OS_WINNT +#define thread_id std::thread::id +#define get_thread_id std::this_thread::get_id +#endif + +#if CPPCORO_OS_LINUX +#define thread_id unsigned long long +#define get_thread_id() get_thread_id() + +#include + +static unsigned long long get_thread_id() +{ + unsigned long long id; + std::stringstream ss; + ss << std::this_thread::get_id(); + id = std::stoull(ss.str()); + return id; +} +#endif + TEST_CASE_FIXTURE(io_service_fixture, "schedule_on task<> function") { - auto mainThreadId = std::this_thread::get_id(); + auto mainThreadId = get_thread_id(); - std::thread::id ioThreadId; + thread_id ioThreadId; auto start = [&]() -> cppcoro::task<> - { - ioThreadId = std::this_thread::get_id(); - CHECK(ioThreadId != mainThreadId); - co_return; - }; + { + ioThreadId = get_thread_id(); + CHECK(ioThreadId != mainThreadId); + co_return; + }; cppcoro::sync_wait([&]() -> cppcoro::task<> - { - CHECK(std::this_thread::get_id() == mainThreadId); + { + CHECK(get_thread_id() == mainThreadId); - co_await schedule_on(io_service(), start()); + co_await schedule_on(io_service(), start()); - CHECK(std::this_thread::get_id() == ioThreadId); - }()); + CHECK(get_thread_id() == ioThreadId); + }()); } TEST_CASE_FIXTURE(io_service_fixture, "schedule_on async_generator<> function") { - auto mainThreadId = std::this_thread::get_id(); + auto mainThreadId = get_thread_id(); - std::thread::id ioThreadId; + thread_id ioThreadId; auto makeSequence = [&]() -> cppcoro::async_generator - { - ioThreadId = std::this_thread::get_id(); - CHECK(ioThreadId != mainThreadId); + { + ioThreadId = get_thread_id(); + CHECK(ioThreadId != mainThreadId); - co_yield 1; + co_yield 1; - CHECK(std::this_thread::get_id() == ioThreadId); + CHECK(get_thread_id() == ioThreadId); - co_yield 2; + co_yield 2; - CHECK(std::this_thread::get_id() == ioThreadId); + CHECK(get_thread_id() == ioThreadId); - co_yield 3; + co_yield 3; - CHECK(std::this_thread::get_id() == ioThreadId); + CHECK(get_thread_id() == ioThreadId); - co_return; - }; + co_return; + }; cppcoro::io_service otherIoService; cppcoro::sync_wait(cppcoro::when_all_ready( - [&]() -> cppcoro::task<> - { - CHECK(std::this_thread::get_id() == mainThreadId); - - auto seq = schedule_on(io_service(), makeSequence()); - - int expected = 1; - for co_await(int value : seq) - { - CHECK(value == expected++); - - // Transfer exection back to main thread before - // awaiting next item in the loop to chck that - // the generator is resumed on io_service() thread. - co_await otherIoService.schedule(); - } - - otherIoService.stop(); - }(), - [&]() -> cppcoro::task<> - { - otherIoService.process_events(); - co_return; - }())); + [&]() -> cppcoro::task<> + { + CHECK(get_thread_id() == mainThreadId); + + auto seq = schedule_on(io_service(), makeSequence()); + + int expected = 1; + for co_await(int value : seq) + { + CHECK(value == expected++); + + // Transfer exection back to main thread before + // awaiting next item in the loop to chck that + // the generator is resumed on io_service() thread. + co_await otherIoService.schedule(); + } + + otherIoService.stop(); + }(), + [&]() -> cppcoro::task<> + { + otherIoService.process_events(); + co_return; + }())); } TEST_CASE_FIXTURE(io_service_fixture, "resume_on task<> function") { - auto mainThreadId = std::this_thread::get_id(); + auto mainThreadId = get_thread_id(); auto start = [&]() -> cppcoro::task<> - { - CHECK(std::this_thread::get_id() == mainThreadId); - co_return; - }; + { + CHECK(get_thread_id() == mainThreadId); + co_return; + }; cppcoro::sync_wait([&]() -> cppcoro::task<> - { - CHECK(std::this_thread::get_id() == mainThreadId); + { + CHECK(get_thread_id() == mainThreadId); - co_await resume_on(io_service(), start()); + co_await resume_on(io_service(), start()); - CHECK(std::this_thread::get_id() != mainThreadId); - }()); + CHECK(get_thread_id() != mainThreadId); + }()); } constexpr bool isMsvc15_4X86Optimised = @@ -127,82 +148,82 @@ constexpr bool isMsvc15_4X86Optimised = TEST_CASE_FIXTURE(io_service_fixture, "resume_on async_generator<> function" * doctest::skip{ isMsvc15_4X86Optimised }) { - auto mainThreadId = std::this_thread::get_id(); + auto mainThreadId = get_thread_id(); - std::thread::id ioThreadId; + thread_id ioThreadId; auto makeSequence = [&]() -> cppcoro::async_generator - { - co_await io_service().schedule(); + { + co_await io_service().schedule(); - ioThreadId = std::this_thread::get_id(); + ioThreadId = get_thread_id(); - CHECK(ioThreadId != mainThreadId); + CHECK(ioThreadId != mainThreadId); - co_yield 1; + co_yield 1; - co_yield 2; + co_yield 2; - co_await io_service().schedule(); + co_await io_service().schedule(); - co_yield 3; + co_yield 3; - co_await io_service().schedule(); + co_await io_service().schedule(); - co_return; - }; + co_return; + }; cppcoro::io_service otherIoService; cppcoro::sync_wait(cppcoro::when_all_ready( - [&]() -> cppcoro::task<> - { - auto stopOnExit = cppcoro::on_scope_exit([&] { otherIoService.stop(); }); - - CHECK(std::this_thread::get_id() == mainThreadId); - - auto seq = resume_on(otherIoService, makeSequence()); - - int expected = 1; - for co_await(int value : seq) - { - // Every time we receive a value it should be on our requested - // scheduler (ie. main thread) - CHECK(std::this_thread::get_id() == mainThreadId); - CHECK(value == expected++); - - // Occasionally transfer execution to a different thread before - // awaiting next element. - if (value == 2) - { - co_await io_service().schedule(); - } - } - - otherIoService.stop(); - }(), - [&]() -> cppcoro::task<> - { - otherIoService.process_events(); - co_return; - }())); + [&]() -> cppcoro::task<> + { + auto stopOnExit = cppcoro::on_scope_exit([&] { otherIoService.stop(); }); + + CHECK(get_thread_id() == mainThreadId); + + auto seq = resume_on(otherIoService, makeSequence()); + + int expected = 1; + for co_await(int value : seq) + { + // Every time we receive a value it should be on our requested + // scheduler (ie. main thread) + CHECK(get_thread_id() == mainThreadId); + CHECK(value == expected++); + + // Occasionally transfer execution to a different thread before + // awaiting next element. + if (value == 2) + { + co_await io_service().schedule(); + } + } + + otherIoService.stop(); + }(), + [&]() -> cppcoro::task<> + { + otherIoService.process_events(); + co_return; + }())); } TEST_CASE_FIXTURE(io_service_fixture, "schedule_on task<> pipe syntax") { - auto mainThreadId = std::this_thread::get_id(); + auto mainThreadId = get_thread_id(); auto makeTask = [&]() -> cppcoro::task - { - CHECK(std::this_thread::get_id() != mainThreadId); - co_return 123; - }; + { + CHECK(get_thread_id() != mainThreadId); + co_return 123; + }; auto triple = [&](int x) - { - CHECK(std::this_thread::get_id() != mainThreadId); - return x * 3; - }; + { + CHECK(get_thread_id() != mainThreadId); + return x * 3; + }; CHECK(cppcoro::sync_wait(makeTask() | schedule_on(io_service())) == 123); @@ -214,62 +235,62 @@ TEST_CASE_FIXTURE(io_service_fixture, "schedule_on task<> pipe syntax") TEST_CASE_FIXTURE(io_service_fixture, "resume_on task<> pipe syntax") { - auto mainThreadId = std::this_thread::get_id(); + auto mainThreadId = get_thread_id(); auto makeTask = [&]() -> cppcoro::task - { - CHECK(std::this_thread::get_id() == mainThreadId); - co_return 123; - }; + { + CHECK(get_thread_id() == mainThreadId); + co_return 123; + }; cppcoro::sync_wait([&]() -> cppcoro::task<> - { - cppcoro::task t = makeTask() | cppcoro::resume_on(io_service()); - CHECK(co_await t == 123); - CHECK(std::this_thread::get_id() != mainThreadId); - }()); + { + cppcoro::task t = makeTask() | cppcoro::resume_on(io_service()); + CHECK(co_await t == 123); + CHECK(get_thread_id() != mainThreadId); + }()); } TEST_CASE_FIXTURE(io_service_fixture, "resume_on task<> pipe syntax multiple uses") { - auto mainThreadId = std::this_thread::get_id(); + auto mainThreadId = get_thread_id(); auto makeTask = [&]() -> cppcoro::task - { - CHECK(std::this_thread::get_id() == mainThreadId); - co_return 123; - }; + { + CHECK(get_thread_id() == mainThreadId); + co_return 123; + }; auto triple = [&](int x) - { - CHECK(std::this_thread::get_id() != mainThreadId); - return x * 3; - }; + { + CHECK(get_thread_id() != mainThreadId); + return x * 3; + }; cppcoro::io_service otherIoService; cppcoro::sync_wait(cppcoro::when_all_ready( - [&]() -> cppcoro::task<> - { - auto stopOnExit = cppcoro::on_scope_exit([&] { otherIoService.stop(); }); - - CHECK(std::this_thread::get_id() == mainThreadId); - - cppcoro::task t = - makeTask() - | cppcoro::resume_on(io_service()) - | cppcoro::fmap(triple) - | cppcoro::resume_on(otherIoService); - - CHECK(co_await t == 369); - - CHECK(std::this_thread::get_id() == mainThreadId); - }(), - [&]() -> cppcoro::task<> - { - otherIoService.process_events(); - co_return; - }())); + [&]() -> cppcoro::task<> + { + auto stopOnExit = cppcoro::on_scope_exit([&] { otherIoService.stop(); }); + + CHECK(get_thread_id() == mainThreadId); + + cppcoro::task t = + makeTask() + | cppcoro::resume_on(io_service()) + | cppcoro::fmap(triple) + | cppcoro::resume_on(otherIoService); + + CHECK(co_await t == 369); + + CHECK(get_thread_id() == mainThreadId); + }(), + [&]() -> cppcoro::task<> + { + otherIoService.process_events(); + co_return; + }())); } TEST_SUITE_END();