From b7df0af1c8d6a6edaa0dea1324b6a6669884b45a Mon Sep 17 00:00:00 2001 From: GIG Date: Thu, 27 Aug 2020 11:20:08 +0300 Subject: [PATCH 1/4] Add partial support of io_service on Linux --- include/cppcoro/detail/io_uring_context.hpp | 70 ++++++++ include/cppcoro/detail/linux.hpp | 104 ++++++++++++ include/cppcoro/io_service.hpp | 6 + lib/build.cake | 14 +- lib/io_service.cpp | 135 ++++++++++++++- lib/io_uring.hpp | 37 +++++ lib/io_uring_context.cpp | 172 ++++++++++++++++++++ lib/linux.cpp | 17 ++ test/build.cake | 2 +- 9 files changed, 547 insertions(+), 10 deletions(-) create mode 100644 include/cppcoro/detail/io_uring_context.hpp create mode 100644 include/cppcoro/detail/linux.hpp create mode 100644 lib/io_uring.hpp create mode 100644 lib/io_uring_context.cpp create mode 100644 lib/linux.cpp diff --git a/include/cppcoro/detail/io_uring_context.hpp b/include/cppcoro/detail/io_uring_context.hpp new file mode 100644 index 00000000..459f51e0 --- /dev/null +++ b/include/cppcoro/detail/io_uring_context.hpp @@ -0,0 +1,70 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright (c) GIG +// Licenced under MIT license. See LICENSE.txt for details. +/////////////////////////////////////////////////////////////////////////////// +#ifndef IO_URING_CONTEXT_HPP_INCLUDED +#define IO_URING_CONTEXT_HPP_INCLUDED + +#include +#include + +#include + +struct io_uring_sqe; +struct io_uring_cqe; + +namespace cppcoro +{ + namespace detail + { + namespace linux + { + class io_uring_context + { + public: + + io_uring_context(std::uint32_t concurrencyHint); + ~io_uring_context(); + + bool submit_one(const io_uring_sqe& sqe); + //bool cancel(std::uint64_t userData); + + bool get_single_event(io_uring_cqe& cqe, bool waitForEvent); + + private: + + safe_file_descriptor m_ringFd; + + std::mutex m_sqMutex; + std::mutex m_cqMutex; + + struct io_sq_ring + { + void* ringPtr; + std::size_t ringSize; + unsigned* head; + unsigned* tail; + unsigned* ringMask; + unsigned* ringEntries; + unsigned* flags; + unsigned* array; + io_uring_sqe* sqes; + } m_sqRing; + + struct io_cq_ring + { + void* ringPtr; + std::size_t ringSize; + unsigned* head; + unsigned* tail; + unsigned* ringMask; + unsigned* ringEntries; + io_uring_cqe* cqes; + } m_cqRing; + + }; + } + } +} + +#endif diff --git a/include/cppcoro/detail/linux.hpp b/include/cppcoro/detail/linux.hpp new file mode 100644 index 00000000..87559051 --- /dev/null +++ b/include/cppcoro/detail/linux.hpp @@ -0,0 +1,104 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright (c) Lewis Baker +// Licenced under MIT license. See LICENSE.txt for details. +/////////////////////////////////////////////////////////////////////////////// +#ifndef CPPCORO_DETAIL_LINUX_HPP_INCLUDED +#define CPPCORO_DETAIL_LINUX_HPP_INCLUDED + +#include + +#if !CPPCORO_OS_LINUX +# error is only supported on the Linux platform. +#endif + +#include +#include + +namespace cppcoro +{ + namespace detail + { + namespace linux + { + struct io_state + { + using callback_type = void( + io_state* state, + std::int32_t res); + + io_state(callback_type* callback = nullptr) noexcept + : m_callback(callback) + {} + + callback_type* m_callback; + }; + + class safe_file_descriptor + { + public: + safe_file_descriptor() + : m_fd(-1) + {} + + explicit safe_file_descriptor(int fd) + : m_fd(fd) + {} + + safe_file_descriptor(const safe_file_descriptor& other) = delete; + + safe_file_descriptor(safe_file_descriptor&& other) noexcept + : m_fd(other.m_fd) + { + other.m_fd = -1; + } + + ~safe_file_descriptor() + { + close(); + } + + safe_file_descriptor& operator=(safe_file_descriptor fd) noexcept + { + swap(fd); + return *this; + } + + constexpr int get() const { return m_fd; } + + void close() noexcept; + + void swap(safe_file_descriptor& other) noexcept + { + std::swap(m_fd, other.m_fd); + } + + bool operator==(const safe_file_descriptor& other) const + { + return m_fd == other.m_fd; + } + + bool operator!=(const safe_file_descriptor& other) const + { + return m_fd != other.m_fd; + } + + bool operator==(int fd) const + { + return m_fd == fd; + } + + bool operator!=(int fd) const + { + return m_fd != fd; + } + + private: + + int m_fd; + + }; + } + } +} + +#endif diff --git a/include/cppcoro/io_service.hpp b/include/cppcoro/io_service.hpp index 6d2a3464..676fb317 100644 --- a/include/cppcoro/io_service.hpp +++ b/include/cppcoro/io_service.hpp @@ -11,6 +11,8 @@ #if CPPCORO_OS_WINNT # include +#elif CPPCORO_OS_LINUX +# include #endif #include @@ -135,6 +137,8 @@ namespace cppcoro #if CPPCORO_OS_WINNT detail::win32::handle_t native_iocp_handle() noexcept; void ensure_winsock_initialised(); +#elif CPPCORO_OS_LINUX + detail::linux::io_uring_context& io_uring_context() noexcept; #endif private: @@ -172,6 +176,8 @@ namespace cppcoro std::atomic m_winsockInitialised; std::mutex m_winsockInitialisationMutex; +#elif CPPCORO_OS_LINUX + detail::linux::io_uring_context m_aioContext; #endif // Head of a linked-list of schedule operations that are diff --git a/lib/build.cake b/lib/build.cake index a32e1c4f..9be48716 100644 --- a/lib/build.cake +++ b/lib/build.cake @@ -104,6 +104,7 @@ sources = script.cwd([ 'auto_reset_event.cpp', 'spin_wait.cpp', 'spin_mutex.cpp', + 'io_service.cpp', ]) extras = script.cwd([ @@ -128,7 +129,6 @@ if variant.platform == "windows": ])) sources.extend(script.cwd([ 'win32.cpp', - 'io_service.cpp', 'file.cpp', 'readable_file.cpp', 'writable_file.cpp', @@ -147,6 +147,18 @@ if variant.platform == "windows": 'socket_recv_operation.cpp', 'socket_recv_from_operation.cpp', ])) +elif variant.platform == "linux": + detailIncludes.extend(cake.path.join(env.expand('${CPPCORO}'), 'include', 'cppcoro', 'detail', [ + 'linux.hpp', + 'io_uring_context.hpp', + ])) + privateHeaders = script.cwd([ + 'io_uring.hpp', + ]) + sources.extend(script.cwd([ + 'linux.cpp', + 'io_uring_context.cpp', + ])) buildDir = env.expand('${CPPCORO_BUILD}') diff --git a/lib/io_service.cpp b/lib/io_service.cpp index 393c3ad9..4b49195a 100644 --- a/lib/io_service.cpp +++ b/lib/io_service.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #if CPPCORO_OS_WINNT # ifndef WIN32_LEAN_AND_MEAN @@ -22,6 +23,9 @@ # include # include # include +#elif CPPCORO_OS_LINUX +# include "io_uring.hpp" +# include #endif namespace @@ -77,6 +81,24 @@ namespace return cppcoro::detail::win32::safe_handle{ handle }; } +#elif CPPCORO_OS_LINUX + io_uring_sqe io_uring_timeout_sqe(const __kernel_timespec& ts) + { + io_uring_sqe sqe{}; + sqe.opcode = IORING_OP_TIMEOUT; + sqe.fd = -1; + sqe.addr = reinterpret_cast(&ts); + sqe.len = 1; + return sqe; + } + + io_uring_sqe io_uring_message_sqe(std::uint64_t userData) + { + const static __kernel_timespec ZERO_TIMEOUT{}; + io_uring_sqe sqe = io_uring_timeout_sqe(ZERO_TIMEOUT); + sqe.user_data = userData; + return sqe; + } #endif } @@ -335,6 +357,8 @@ cppcoro::io_service::io_service(std::uint32_t concurrencyHint) , m_iocpHandle(create_io_completion_port(concurrencyHint)) , m_winsockInitialised(false) , m_winsockInitialisationMutex() +#elif CPPCORO_OS_LINUX + , m_aioContext(concurrencyHint) #endif , m_scheduleOperations(nullptr) , m_timerState(nullptr) @@ -471,13 +495,13 @@ void cppcoro::io_service::notify_work_finished() noexcept } } +#if CPPCORO_OS_WINNT + cppcoro::detail::win32::handle_t cppcoro::io_service::native_iocp_handle() noexcept { return m_iocpHandle.handle(); } -#if CPPCORO_OS_WINNT - void cppcoro::io_service::ensure_winsock_initialised() { if (!m_winsockInitialised.load(std::memory_order_acquire)) @@ -504,6 +528,16 @@ void cppcoro::io_service::ensure_winsock_initialised() #endif // CPPCORO_OS_WINNT +#if CPPCORO_OS_LINUX + +cppcoro::detail::linux::io_uring_context& +cppcoro::io_service::io_uring_context() noexcept +{ + return m_aioContext; +} + +#endif // CPPCORO_OS_LINUX + void cppcoro::io_service::schedule_impl(schedule_operation* operation) noexcept { #if CPPCORO_OS_WINNT @@ -512,6 +546,22 @@ void cppcoro::io_service::schedule_impl(schedule_operation* operation) noexcept 0, reinterpret_cast(operation->m_awaiter.address()), nullptr); +#elif CPPCORO_OS_LINUX + io_uring_sqe sqe = io_uring_message_sqe( + reinterpret_cast(operation->m_awaiter.address())); + + bool ok; + + try + { + ok = m_aioContext.submit_one(sqe); + } + catch (...) + { + ok = true; + } +#endif + if (!ok) { // Failed to post to the I/O completion port. @@ -531,21 +581,37 @@ void cppcoro::io_service::schedule_impl(schedule_operation* operation) noexcept std::memory_order_release, std::memory_order_acquire)); } -#endif } void cppcoro::io_service::try_reschedule_overflow_operations() noexcept { -#if CPPCORO_OS_WINNT auto* operation = m_scheduleOperations.exchange(nullptr, std::memory_order_acquire); while (operation != nullptr) { auto* next = operation->m_next; + +#if CPPCORO_OS_WINNT BOOL ok = ::PostQueuedCompletionStatus( m_iocpHandle.handle(), 0, reinterpret_cast(operation->m_awaiter.address()), nullptr); +#elif CPPCORO_OS_LINUX + io_uring_sqe sqe = io_uring_message_sqe( + reinterpret_cast(operation->m_awaiter.address())); + + bool ok; + + try + { + ok = m_aioContext.submit_one(sqe); + } + catch (...) + { + ok = true; + } +#endif + if (!ok) { // Still unable to queue these operations. @@ -571,7 +637,6 @@ void cppcoro::io_service::try_reschedule_overflow_operations() noexcept operation = next; } -#endif } bool cppcoro::io_service::try_enter_event_loop() noexcept @@ -598,12 +663,12 @@ void cppcoro::io_service::exit_event_loop() noexcept bool cppcoro::io_service::try_process_one_event(bool waitForEvent) { -#if CPPCORO_OS_WINNT if (is_stop_requested()) { return false; } +#if CPPCORO_OS_WINNT const DWORD timeout = waitForEvent ? INFINITE : 0; while (true) @@ -673,6 +738,46 @@ bool cppcoro::io_service::try_process_one_event(bool waitForEvent) }; } } +#elif CPPCORO_OS_LINUX + while (true) + { + // Check for any schedule_operation objects that were unable to be + // queued to the I/O completion port and try to requeue them now. + try_reschedule_overflow_operations(); + + io_uring_cqe cqe; + if (m_aioContext.get_single_event(cqe, waitForEvent)) + { + if (cqe.user_data == 0) + { + if (is_stop_requested()) + { + return false; + } + continue; + } + else if (cqe.res == -ETIME) + { + // This was a coroutine scheduled via a call to + // io_service::schedule(). + std::experimental::coroutine_handle<>::from_address( + reinterpret_cast(cqe.user_data)).resume(); + return true; + } + + // auto* state = reinterpret_cast(cqe.user_data); + + // state->m_callback( + // state, + // cqe.res); + + return true; + } + else + { + return true; + } + } #endif } @@ -685,6 +790,18 @@ void cppcoro::io_service::post_wake_up_event() noexcept // and the system is out of memory. In this case threads should find other events // in the queue next time they check anyway and thus wake-up. (void)::PostQueuedCompletionStatus(m_iocpHandle.handle(), 0, 0, nullptr); +#elif CPPCORO_OS_LINUX + __kernel_timespec ts{}; + io_uring_sqe sqe = io_uring_timeout_sqe(ts); + + try + { + // See comment for Windows implementation above. + m_aioContext.submit_one(sqe); + } + catch (...) + { + } #endif } @@ -712,11 +829,13 @@ cppcoro::io_service::ensure_timer_thread_started() } cppcoro::io_service::timer_thread_state::timer_thread_state() + : #if CPPCORO_OS_WINNT - : m_wakeUpEvent(create_auto_reset_event()) + m_wakeUpEvent(create_auto_reset_event()) , m_waitableTimerEvent(create_waitable_timer_event()) + , #endif - , m_newlyQueuedTimers(nullptr) + m_newlyQueuedTimers(nullptr) , m_timerCancellationRequested(false) , m_shutDownRequested(false) , m_thread([this] { this->run(); }) diff --git a/lib/io_uring.hpp b/lib/io_uring.hpp new file mode 100644 index 00000000..1cb27113 --- /dev/null +++ b/lib/io_uring.hpp @@ -0,0 +1,37 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright (c) GIG +// Licenced under MIT license. See LICENSE.txt for details. +/////////////////////////////////////////////////////////////////////////////// +#ifndef IO_URING_HPP_INCLUDED +#define IO_URING_HPP_INCLUDED + +#include + +#if !CPPCORO_OS_LINUX +# error +#endif + +#include +#include +#include +#include + +static inline int io_uring_setup(unsigned entries, struct io_uring_params* p) +{ + return syscall(__NR_io_uring_setup, entries, p); +} + +static inline int io_uring_enter(int fd, unsigned to_submit, + unsigned min_complete, unsigned flags, sigset_t* sig) +{ + return syscall(__NR_io_uring_enter, fd, to_submit, min_complete, + flags, sig, _NSIG / 8); +} + +static inline int io_uring_register(int fd, unsigned opcode, const void* arg, + unsigned nr_args) +{ + return syscall(__NR_io_uring_register, fd, opcode, arg, nr_args); +} + +#endif diff --git a/lib/io_uring_context.cpp b/lib/io_uring_context.cpp new file mode 100644 index 00000000..4e73db62 --- /dev/null +++ b/lib/io_uring_context.cpp @@ -0,0 +1,172 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright (c) GIG +// Licenced under MIT license. See LICENSE.txt for details. +/////////////////////////////////////////////////////////////////////////////// +#include + +#include "io_uring.hpp" +#include +#include + +template +static inline void atomic_store_release(T* obj, const T& value) +{ + std::atomic_store_explicit(reinterpret_cast*>(obj), + value, std::memory_order_release); +} +template +static inline T atomic_load_acquire(const T* obj) +{ + return std::atomic_load_explicit(reinterpret_cast*>(obj), + std::memory_order_acquire); +} + +template +static inline T* get_var_ptr(void* ptr, unsigned offset) +{ + return reinterpret_cast(static_cast(ptr) + offset); +} + +cppcoro::detail::linux::io_uring_context::io_uring_context(std::uint32_t concurrencyHint) +{ + (void)concurrencyHint; + + m_sqRing.ringPtr = nullptr; + m_sqRing.sqes = nullptr; + + io_uring_params params{}; + m_ringFd = safe_file_descriptor{ io_uring_setup(1, ¶ms) }; + if (m_ringFd.get() < 0) + { + throw std::system_error + { + errno, + std::system_category(), + "Error creating io_uring context: io_uring_setup" + }; + } + + std::size_t sqSize = params.sq_off.array + params.sq_entries * sizeof(unsigned); + std::size_t cqSize = params.cq_off.cqes + params.cq_entries * sizeof(io_uring_cqe); + + std::size_t ringSize = std::max(sqSize, cqSize); + + void* ringPtr = mmap(0, ringSize, PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_POPULATE, m_ringFd.get(), IORING_OFF_SQ_RING); + if (ringPtr == MAP_FAILED) + { + throw std::system_error + { + errno, + std::system_category(), + "Error creating io_uring context: mmap" + }; + } + + // No need to check IORING_FEAT_SINGLE_MMAP because we are anyway going to + // use features that require kernel >= 5.4. + m_sqRing.ringPtr = ringPtr; + m_sqRing.ringSize = ringSize; + m_cqRing.ringPtr = ringPtr; + m_cqRing.ringSize = ringSize; + + m_sqRing.head = get_var_ptr(ringPtr, params.sq_off.head); + m_sqRing.tail = get_var_ptr(ringPtr, params.sq_off.tail); + m_sqRing.ringMask = get_var_ptr(ringPtr, params.sq_off.ring_mask); + m_sqRing.ringEntries = get_var_ptr(ringPtr, params.sq_off.ring_entries); + m_sqRing.flags = get_var_ptr(ringPtr, params.sq_off.flags); + m_sqRing.array = get_var_ptr(ringPtr, params.sq_off.array); + + m_cqRing.head = get_var_ptr(ringPtr, params.cq_off.head); + m_cqRing.tail = get_var_ptr(ringPtr, params.cq_off.tail); + m_cqRing.ringMask = get_var_ptr(ringPtr, params.cq_off.ring_mask); + m_cqRing.ringEntries = get_var_ptr(ringPtr, params.cq_off.ring_entries); + m_cqRing.cqes = get_var_ptr(ringPtr, params.cq_off.cqes); + + void* sqes = mmap(0, params.sq_entries * sizeof(io_uring_sqe), + PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, + m_ringFd.get(), IORING_OFF_SQES); + if (sqes == MAP_FAILED) + { + throw std::system_error + { + errno, + std::system_category(), + "Error creating io_uring context: mmap" + }; + } + + m_sqRing.sqes = static_cast(sqes); +} + +cppcoro::detail::linux::io_uring_context::~io_uring_context() +{ + if (m_sqRing.sqes != nullptr) + { + munmap(m_sqRing.sqes, *m_sqRing.ringEntries * sizeof(io_uring_sqe)); + } + if (m_sqRing.ringPtr != nullptr) + { + munmap(m_sqRing.ringPtr, m_sqRing.ringSize); + } +} + +bool cppcoro::detail::linux::io_uring_context::submit_one(const io_uring_sqe& inSqe) +{ + std::unique_lock lock(m_sqMutex); + + unsigned tail = *m_sqRing.tail; + unsigned index = tail & *m_sqRing.ringMask; + m_sqRing.sqes[index] = inSqe; + m_sqRing.array[index] = index; + + atomic_store_release(m_sqRing.tail, ++tail); + + if (io_uring_enter(m_ringFd.get(), 1, 0, IORING_ENTER_SQ_WAKEUP, nullptr) < 0) + { + if (errno == EAGAIN || errno == EBUSY) + { + *m_sqRing.tail = --tail; + return false; + } + + throw std::system_error + { + errno, + std::system_category(), + "Error submitting operation: io_uring_enter" + }; + } + + return true; +} + +bool cppcoro::detail::linux::io_uring_context::get_single_event( + io_uring_cqe& cqe, bool waitForEvent) +{ + std::unique_lock lock(m_cqMutex); + + unsigned head = *m_cqRing.head; + if (head == atomic_load_acquire(m_cqRing.tail)) + { + if (!waitForEvent) + return false; + + if (io_uring_enter(m_ringFd.get(), 0, 1, IORING_ENTER_GETEVENTS, nullptr) < 0) + { + throw std::system_error + { + errno, + std::system_category(), + "Error waiting for event: io_uring_enter" + }; + } + } + + unsigned index = head & *m_cqRing.ringMask; + cqe = m_cqRing.cqes[index]; + + atomic_store_release(m_cqRing.head, ++head); + + return true; +} diff --git a/lib/linux.cpp b/lib/linux.cpp new file mode 100644 index 00000000..a4e4809b --- /dev/null +++ b/lib/linux.cpp @@ -0,0 +1,17 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright (c) GIG +// Licenced under MIT license. See LICENSE.txt for details. +/////////////////////////////////////////////////////////////////////////////// + +#include + +#include + +void cppcoro::detail::linux::safe_file_descriptor::close() noexcept +{ + if (m_fd >= 0) + { + ::close(m_fd); + m_fd = -1; + } +} diff --git a/test/build.cake b/test/build.cake index a9d0f575..b5ed543e 100644 --- a/test/build.cake +++ b/test/build.cake @@ -43,11 +43,11 @@ sources = script.cwd([ 'ipv6_address_tests.cpp', 'ipv6_endpoint_tests.cpp', 'static_thread_pool_tests.cpp', + 'scheduling_operator_tests.cpp', ]) if variant.platform == 'windows': sources += script.cwd([ - 'scheduling_operator_tests.cpp', 'io_service_tests.cpp', 'file_tests.cpp', 'socket_tests.cpp', From b406dedfd0a5c4e713525fecb3bd62a4fbe39be4 Mon Sep 17 00:00:00 2001 From: GIG Date: Thu, 27 Aug 2020 11:40:20 +0300 Subject: [PATCH 2/4] Add async file I/O support for Linux --- include/cppcoro/detail/io_uring_context.hpp | 8 +- .../cppcoro/detail/linux_async_operation.hpp | 321 ++++++++++++++++++ include/cppcoro/file.hpp | 13 + include/cppcoro/file_read_operation.hpp | 98 +++++- include/cppcoro/file_write_operation.hpp | 97 +++++- include/cppcoro/io_service.hpp | 2 +- include/cppcoro/read_only_file.hpp | 2 + include/cppcoro/read_write_file.hpp | 2 + include/cppcoro/write_only_file.hpp | 2 + lib/build.cake | 17 +- lib/file.cpp | 98 ++++++ lib/file_read_operation.cpp | 48 +++ lib/file_write_operation.cpp | 47 +++ lib/io_service.cpp | 12 +- lib/read_only_file.cpp | 29 +- lib/read_write_file.cpp | 29 +- lib/readable_file.cpp | 34 +- lib/writable_file.cpp | 59 +++- lib/write_only_file.cpp | 30 +- test/build.cake | 2 +- test/file_tests.cpp | 5 +- 21 files changed, 915 insertions(+), 40 deletions(-) create mode 100644 include/cppcoro/detail/linux_async_operation.hpp diff --git a/include/cppcoro/detail/io_uring_context.hpp b/include/cppcoro/detail/io_uring_context.hpp index 459f51e0..c55148c8 100644 --- a/include/cppcoro/detail/io_uring_context.hpp +++ b/include/cppcoro/detail/io_uring_context.hpp @@ -27,8 +27,6 @@ namespace cppcoro ~io_uring_context(); bool submit_one(const io_uring_sqe& sqe); - //bool cancel(std::uint64_t userData); - bool get_single_event(io_uring_cqe& cqe, bool waitForEvent); private: @@ -63,6 +61,12 @@ namespace cppcoro } m_cqRing; }; + + struct safe_file_data + { + safe_file_descriptor fd; + io_uring_context* aioContext; + }; } } } diff --git a/include/cppcoro/detail/linux_async_operation.hpp b/include/cppcoro/detail/linux_async_operation.hpp new file mode 100644 index 00000000..daf7e9d8 --- /dev/null +++ b/include/cppcoro/detail/linux_async_operation.hpp @@ -0,0 +1,321 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright (c) Lewis Baker +// Licenced under MIT license. See LICENSE.txt for details. +/////////////////////////////////////////////////////////////////////////////// +#ifndef CPPCORO_DETAIL_LINUX_ASYNC_OPERATION_HPP_INCLUDED +#define CPPCORO_DETAIL_LINUX_ASYNC_OPERATION_HPP_INCLUDED + +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace cppcoro +{ + namespace detail + { + class linux_async_operation_base + : protected detail::linux::io_state + { + public: + + linux_async_operation_base( + detail::linux::io_state::callback_type* callback, + detail::linux::io_uring_context* ctx) noexcept + : detail::linux::io_state(callback) + , m_aioContext(ctx) + {} + + std::size_t get_result() + { + if (m_res < 0) + { + throw std::system_error{ + -m_res, + std::system_category() + }; + } + + return m_res; + } + + std::int32_t m_res; + + detail::linux::io_uring_context* m_aioContext; + + }; + + template + class linux_async_operation + : protected linux_async_operation_base + { + protected: + + linux_async_operation( + detail::linux::io_uring_context* ctx) noexcept + : linux_async_operation_base( + &linux_async_operation::on_operation_completed, + ctx) + {} + + public: + + bool await_ready() const noexcept { return false; } + + CPPCORO_NOINLINE + bool await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine) + { + static_assert(std::is_base_of_v); + + m_awaitingCoroutine = awaitingCoroutine; + return static_cast(this)->try_start(); + } + + decltype(auto) await_resume() + { + return static_cast(this)->get_result(); + } + + private: + + static void on_operation_completed( + detail::linux::io_state* ioState, + std::int32_t res) noexcept + { + auto* operation = static_cast(ioState); + operation->m_res = res; + operation->m_awaitingCoroutine.resume(); + } + + std::experimental::coroutine_handle<> m_awaitingCoroutine; + + }; + + template + class linux_async_operation_cancellable + : protected linux_async_operation_base + { + static constexpr int error_operation_cancelled = ECANCELED; + + protected: + + linux_async_operation_cancellable( + detail::linux::io_uring_context* ctx, + cancellation_token&& ct) noexcept + : linux_async_operation_base( + &linux_async_operation_cancellable::on_operation_completed, + ctx) + , m_state(ct.is_cancellation_requested() ? state::completed : state::not_started) + , m_cancellationToken(std::move(ct)) + { + m_res = -error_operation_cancelled; + } + + linux_async_operation_cancellable( + linux_async_operation_cancellable&& other) noexcept + : linux_async_operation_base(std::move(other)) + , m_state(other.m_state.load(std::memory_order_relaxed)) + , m_cancellationToken(std::move(other.m_cancellationToken)) + { + assert(m_res == other.m_res); + } + + public: + + bool await_ready() const noexcept + { + return m_state.load(std::memory_order_relaxed) == state::completed; + } + + CPPCORO_NOINLINE + bool await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine) + { + static_assert(std::is_base_of_v); + + m_awaitingCoroutine = awaitingCoroutine; + + // TRICKY: Register cancellation callback before starting the operation + // in case the callback registration throws due to insufficient + // memory. We need to make sure that the logic that occurs after + // starting the operation is noexcept, otherwise we run into the + // problem of not being able to cancel the started operation and + // the dilemma of what to do with the exception. + // + // However, doing this means that the cancellation callback may run + // prior to returning below so in the case that cancellation may + // occur we defer setting the state to 'started' until after + // the operation has finished starting. The cancellation callback + // will only attempt to request cancellation of the operation with + // CancelIoEx() once the state has been set to 'started'. + const bool canBeCancelled = m_cancellationToken.can_be_cancelled(); + if (canBeCancelled) + { + m_cancellationCallback.emplace( + std::move(m_cancellationToken), + [this] { this->on_cancellation_requested(); }); + } + else + { + m_state.store(state::started, std::memory_order_relaxed); + } + + // Now start the operation. + const bool willCompleteAsynchronously = static_cast(this)->try_start(); + if (!willCompleteAsynchronously) + { + // Operation completed synchronously, resume awaiting coroutine immediately. + return false; + } + + if (canBeCancelled) + { + // Need to flag that the operation has finished starting now. + + // However, the operation may have completed concurrently on + // another thread, transitioning directly from not_started -> complete. + // Or it may have had the cancellation callback execute and transition + // from not_started -> cancellation_requested. We use a compare-exchange + // to determine a winner between these potential racing cases. + state oldState = state::not_started; + if (!m_state.compare_exchange_strong( + oldState, + state::started, + std::memory_order_release, + std::memory_order_acquire)) + { + if (oldState == state::cancellation_requested) + { + // Request the operation be cancelled. + // Note that it may have already completed on a background + // thread by now so this request for cancellation may end up + // being ignored. + static_cast(this)->cancel(); + + if (!m_state.compare_exchange_strong( + oldState, + state::started, + std::memory_order_release, + std::memory_order_acquire)) + { + assert(oldState == state::completed); + return false; + } + } + else + { + assert(oldState == state::completed); + return false; + } + } + } + + return true; + } + + decltype(auto) await_resume() + { + // Free memory used by the cancellation callback now that the operation + // has completed rather than waiting until the operation object destructs. + // eg. If the operation is passed to when_all() then the operation object + // may not be destructed until all of the operations complete. + m_cancellationCallback.reset(); + + if (m_res == -error_operation_cancelled) + { + throw operation_cancelled{}; + } + + return static_cast(this)->get_result(); + } + + private: + + enum class state + { + not_started, + started, + cancellation_requested, + completed + }; + + void on_cancellation_requested() noexcept + { + auto oldState = m_state.load(std::memory_order_acquire); + if (oldState == state::not_started) + { + // This callback is running concurrently with await_suspend(). + // The call to start the operation may not have returned yet so + // we can't safely request cancellation of it. Instead we try to + // notify the await_suspend() thread by transitioning the state + // to state::cancellation_requested so that the await_suspend() + // thread can request cancellation after it has finished starting + // the operation. + const bool transferredCancelResponsibility = + m_state.compare_exchange_strong( + oldState, + state::cancellation_requested, + std::memory_order_release, + std::memory_order_acquire); + if (transferredCancelResponsibility) + { + return; + } + } + + // No point requesting cancellation if the operation has already completed. + if (oldState != state::completed) + { + static_cast(this)->cancel(); + } + } + + static void on_operation_completed( + detail::linux::io_state* ioState, + std::int32_t res) noexcept + { + auto* operation = static_cast(ioState); + + operation->m_res = res; + + auto state = operation->m_state.load(std::memory_order_acquire); + if (state == state::started) + { + operation->m_state.store(state::completed, std::memory_order_relaxed); + operation->m_awaitingCoroutine.resume(); + } + else + { + // We are racing with await_suspend() call suspending. + // Try to mark it as completed using an atomic exchange and look + // at the previous value to determine whether the coroutine suspended + // first (in which case we resume it now) or we marked it as completed + // first (in which case await_suspend() will return false and immediately + // resume the coroutine). + state = operation->m_state.exchange( + state::completed, + std::memory_order_acq_rel); + if (state == state::started) + { + // The await_suspend() method returned (or will return) 'true' and so + // we need to resume the coroutine. + operation->m_awaitingCoroutine.resume(); + } + } + } + + std::atomic m_state; + cppcoro::cancellation_token m_cancellationToken; + std::optional m_cancellationCallback; + std::experimental::coroutine_handle<> m_awaitingCoroutine; + + }; + } +} + +#endif diff --git a/include/cppcoro/file.hpp b/include/cppcoro/file.hpp index aa507537..bed7c133 100644 --- a/include/cppcoro/file.hpp +++ b/include/cppcoro/file.hpp @@ -13,6 +13,8 @@ #if CPPCORO_OS_WINNT # include +#elif CPPCORO_OS_LINUX +# include #endif #include @@ -46,6 +48,17 @@ namespace cppcoro file_buffering_mode bufferingMode); detail::win32::safe_handle m_fileHandle; +#elif CPPCORO_OS_LINUX + file(detail::linux::safe_file_data&& fileData) noexcept; + + static detail::linux::safe_file_data open( + io_service& ioService, + const std::filesystem::path& path, + file_open_mode openMode, + file_share_mode shareMode, + file_buffering_mode bufferingMode); + + detail::linux::safe_file_data m_fileData; #endif }; diff --git a/include/cppcoro/file_read_operation.hpp b/include/cppcoro/file_read_operation.hpp index 509d4547..7e92033b 100644 --- a/include/cppcoro/file_read_operation.hpp +++ b/include/cppcoro/file_read_operation.hpp @@ -16,9 +16,16 @@ #if CPPCORO_OS_WINNT # include # include +#elif CPPCORO_OS_LINUX +# include +# include +#endif namespace cppcoro { + +#if CPPCORO_OS_WINNT + class file_read_operation_impl { public: @@ -77,9 +84,9 @@ namespace cppcoro std::uint64_t fileOffset, void* buffer, std::size_t byteCount, - cancellation_token&& cancellationToken) noexcept + cancellation_token&& ct) noexcept : cppcoro::detail::win32_overlapped_operation_cancellable( - fileOffset, std::move(cancellationToken)) + fileOffset, std::move(ct)) , m_impl(fileHandle, buffer, byteCount) {} @@ -94,7 +101,92 @@ namespace cppcoro }; -#endif +#endif // CPPCORO_OS_WINNT + +#if CPPCORO_OS_LINUX + + class file_read_operation_impl + { + public: + + file_read_operation_impl( + int fd, + std::uint64_t fileOffset, + void* buffer, + std::size_t byteCount) noexcept + : m_fd(fd) + , m_offset(fileOffset) + , m_buffer(buffer) + , m_byteCount(byteCount) + {} + + bool try_start(cppcoro::detail::linux_async_operation_base& operation) noexcept; + void cancel(cppcoro::detail::linux_async_operation_base& operation) noexcept; + + private: + + int m_fd; + std::uint64_t m_offset; + void* m_buffer; + std::size_t m_byteCount; + + }; + + class file_read_operation + : public cppcoro::detail::linux_async_operation + { + public: + + file_read_operation( + int fd, + detail::linux::io_uring_context* ctx, + std::uint64_t fileOffset, + void* buffer, + std::size_t byteCount) noexcept + : cppcoro::detail::linux_async_operation(ctx) + , m_impl(fd, fileOffset, buffer, byteCount) + {} + + private: + + friend class cppcoro::detail::linux_async_operation; + + bool try_start() noexcept { return m_impl.try_start(*this); } + + file_read_operation_impl m_impl; + + }; + + class file_read_operation_cancellable + : public cppcoro::detail::linux_async_operation_cancellable + { + public: + + file_read_operation_cancellable( + int fd, + detail::linux::io_uring_context* ctx, + std::uint64_t fileOffset, + void* buffer, + std::size_t byteCount, + cancellation_token&& ct) noexcept + : cppcoro::detail::linux_async_operation_cancellable( + ctx, std::move(ct)) + , m_impl(fd, fileOffset, buffer, byteCount) + {} + + private: + + friend class cppcoro::detail::linux_async_operation_cancellable; + + bool try_start() noexcept { return m_impl.try_start(*this); } + void cancel() noexcept { m_impl.cancel(*this); } + + file_read_operation_impl m_impl; + + }; + +#endif // CPPCORO_OS_LINUX + } #endif diff --git a/include/cppcoro/file_write_operation.hpp b/include/cppcoro/file_write_operation.hpp index afc30d55..0628a827 100644 --- a/include/cppcoro/file_write_operation.hpp +++ b/include/cppcoro/file_write_operation.hpp @@ -16,9 +16,16 @@ #if CPPCORO_OS_WINNT # include # include +#elif CPPCORO_OS_LINUX +# include +# include +#endif namespace cppcoro { + +#if CPPCORO_OS_WINNT + class file_write_operation_impl { public: @@ -78,7 +85,8 @@ namespace cppcoro const void* buffer, std::size_t byteCount, cancellation_token&& ct) noexcept - : cppcoro::detail::win32_overlapped_operation_cancellable(fileOffset, std::move(ct)) + : cppcoro::detail::win32_overlapped_operation_cancellable( + fileOffset, std::move(ct)) , m_impl(fileHandle, buffer, byteCount) {} @@ -92,8 +100,93 @@ namespace cppcoro file_write_operation_impl m_impl; }; -} #endif // CPPCORO_OS_WINNT +#if CPPCORO_OS_LINUX + + class file_write_operation_impl + { + public: + + file_write_operation_impl( + int fd, + std::uint64_t fileOffset, + const void* buffer, + std::size_t byteCount) noexcept + : m_fd(fd) + , m_offset(fileOffset) + , m_buffer(buffer) + , m_byteCount(byteCount) + {} + + bool try_start(cppcoro::detail::linux_async_operation_base& operation) noexcept; + void cancel(cppcoro::detail::linux_async_operation_base& operation) noexcept; + + private: + + int m_fd; + std::uint64_t m_offset; + const void* m_buffer; + std::size_t m_byteCount; + + }; + + class file_write_operation + : public cppcoro::detail::linux_async_operation + { + public: + + file_write_operation( + int fd, + detail::linux::io_uring_context* ctx, + std::uint64_t fileOffset, + const void* buffer, + std::size_t byteCount) noexcept + : cppcoro::detail::linux_async_operation(ctx) + , m_impl(fd, fileOffset, buffer, byteCount) + {} + + private: + + friend class cppcoro::detail::linux_async_operation; + + bool try_start() noexcept { return m_impl.try_start(*this); } + + file_write_operation_impl m_impl; + + }; + + class file_write_operation_cancellable + : public cppcoro::detail::linux_async_operation_cancellable + { + public: + + file_write_operation_cancellable( + int fd, + detail::linux::io_uring_context* ctx, + std::uint64_t fileOffset, + const void* buffer, + std::size_t byteCount, + cancellation_token&& ct) noexcept + : cppcoro::detail::linux_async_operation_cancellable( + ctx, std::move(ct)) + , m_impl(fd, fileOffset, buffer, byteCount) + {} + + private: + + friend class cppcoro::detail::linux_async_operation_cancellable; + + bool try_start() noexcept { return m_impl.try_start(*this); } + void cancel() noexcept { m_impl.cancel(*this); } + + file_write_operation_impl m_impl; + + }; + +#endif // CPPCORO_OS_LINUX + +} + #endif diff --git a/include/cppcoro/io_service.hpp b/include/cppcoro/io_service.hpp index 676fb317..e4f4ab45 100644 --- a/include/cppcoro/io_service.hpp +++ b/include/cppcoro/io_service.hpp @@ -138,7 +138,7 @@ namespace cppcoro detail::win32::handle_t native_iocp_handle() noexcept; void ensure_winsock_initialised(); #elif CPPCORO_OS_LINUX - detail::linux::io_uring_context& io_uring_context() noexcept; + detail::linux::io_uring_context* io_uring_context() noexcept; #endif private: diff --git a/include/cppcoro/read_only_file.hpp b/include/cppcoro/read_only_file.hpp index a836fbc3..1c7bc63c 100644 --- a/include/cppcoro/read_only_file.hpp +++ b/include/cppcoro/read_only_file.hpp @@ -51,6 +51,8 @@ namespace cppcoro #if CPPCORO_OS_WINNT read_only_file(detail::win32::safe_handle&& fileHandle) noexcept; +#elif CPPCORO_OS_LINUX + read_only_file(detail::linux::safe_file_data&& fileData) noexcept; #endif }; diff --git a/include/cppcoro/read_write_file.hpp b/include/cppcoro/read_write_file.hpp index f58bea5c..3d40e3b5 100644 --- a/include/cppcoro/read_write_file.hpp +++ b/include/cppcoro/read_write_file.hpp @@ -58,6 +58,8 @@ namespace cppcoro #if CPPCORO_OS_WINNT read_write_file(detail::win32::safe_handle&& fileHandle) noexcept; +#elif CPPCORO_OS_LINUX + read_write_file(detail::linux::safe_file_data&& fileData) noexcept; #endif }; diff --git a/include/cppcoro/write_only_file.hpp b/include/cppcoro/write_only_file.hpp index d3bf627a..1c3f6c34 100644 --- a/include/cppcoro/write_only_file.hpp +++ b/include/cppcoro/write_only_file.hpp @@ -57,6 +57,8 @@ namespace cppcoro #if CPPCORO_OS_WINNT write_only_file(detail::win32::safe_handle&& fileHandle) noexcept; +#elif CPPCORO_OS_LINUX + write_only_file(detail::linux::safe_file_data&& fileData) noexcept; #endif }; diff --git a/lib/build.cake b/lib/build.cake index 9be48716..57b03f36 100644 --- a/lib/build.cake +++ b/lib/build.cake @@ -105,6 +105,14 @@ sources = script.cwd([ 'spin_wait.cpp', 'spin_mutex.cpp', 'io_service.cpp', + 'file.cpp', + 'readable_file.cpp', + 'writable_file.cpp', + 'read_only_file.cpp', + 'write_only_file.cpp', + 'read_write_file.cpp', + 'file_read_operation.cpp', + 'file_write_operation.cpp', ]) extras = script.cwd([ @@ -129,14 +137,6 @@ if variant.platform == "windows": ])) sources.extend(script.cwd([ 'win32.cpp', - 'file.cpp', - 'readable_file.cpp', - 'writable_file.cpp', - 'read_only_file.cpp', - 'write_only_file.cpp', - 'read_write_file.cpp', - 'file_read_operation.cpp', - 'file_write_operation.cpp', 'socket_helpers.cpp', 'socket.cpp', 'socket_accept_operation.cpp', @@ -150,6 +150,7 @@ if variant.platform == "windows": elif variant.platform == "linux": detailIncludes.extend(cake.path.join(env.expand('${CPPCORO}'), 'include', 'cppcoro', 'detail', [ 'linux.hpp', + 'linux_async_operation.hpp', 'io_uring_context.hpp', ])) privateHeaders = script.cwd([ diff --git a/lib/file.cpp b/lib/file.cpp index 3583217f..d210f060 100644 --- a/lib/file.cpp +++ b/lib/file.cpp @@ -14,6 +14,9 @@ # define WIN32_LEAN_AND_MEAN # endif # include +#elif CPPCORO_OS_LINUX +# include +# include #endif cppcoro::file::~file() @@ -36,8 +39,22 @@ std::uint64_t cppcoro::file::size() const } return size.QuadPart; +#elif CPPCORO_OS_LINUX + struct stat sb; + if (fstat(m_fileData.fd.get(), &sb) < 0) + { + throw std::system_error + { + errno, + std::system_category(), + "error getting file size: fstat" + }; + } + + return sb.st_size; #endif } +#if CPPCORO_OS_WINNT cppcoro::file::file(detail::win32::safe_handle&& fileHandle) noexcept : m_fileHandle(std::move(fileHandle)) @@ -166,3 +183,84 @@ cppcoro::detail::win32::safe_handle cppcoro::file::open( return std::move(fileHandle); } + +#endif // CPPCORO_OS_WINNT + +#if CPPCORO_OS_LINUX + +cppcoro::file::file(detail::linux::safe_file_data &&fileData) noexcept + : m_fileData(std::move(fileData)) +{ +} + +cppcoro::detail::linux::safe_file_data cppcoro::file::open( + io_service &ioService, + const std::filesystem::path &path, + cppcoro::file_open_mode openMode, + cppcoro::file_share_mode shareMode, + cppcoro::file_buffering_mode bufferingMode) +{ + int flags = 0; + + if ((bufferingMode & file_buffering_mode::temporary) == file_buffering_mode::temporary) + { + // TODO + } + if ((bufferingMode & file_buffering_mode::unbuffered) == file_buffering_mode::unbuffered) + { + // TODO + } + + if ((shareMode & file_share_mode::read_write) == file_share_mode::read_write) + { + flags |= O_RDWR; + } + else if ((shareMode & file_share_mode::read) == file_share_mode::read) + { + flags |= O_RDONLY; + } + else if ((shareMode & file_share_mode::write) == file_share_mode::write) + { + flags |= O_WRONLY; + } + if ((shareMode & file_share_mode::delete_) == file_share_mode::delete_) + { + // TODO + } + + switch (openMode) + { + case file_open_mode::create_or_open: + flags |= O_CREAT; + break; + case file_open_mode::create_always: + flags |= O_CREAT | O_TRUNC; + break; + case file_open_mode::create_new: + flags |= O_EXCL; + break; + case file_open_mode::open_existing: + // This is default. + break; + case file_open_mode::truncate_existing: + flags |= O_TRUNC; + break; + } + + cppcoro::detail::linux::safe_file_descriptor fd(::open(path.c_str(), flags)); + if (fd.get() < 0) + { + throw std::system_error + { + errno, + std::system_category(), + "error opening file: open" + }; + } + + //posix_fadvise(fd.get(), 0, 0, advice); + + return { std::move(fd), ioService.io_uring_context() }; +} + +#endif // CPPCORO_OS_LINUX diff --git a/lib/file_read_operation.cpp b/lib/file_read_operation.cpp index ca274898..3f085f0e 100644 --- a/lib/file_read_operation.cpp +++ b/lib/file_read_operation.cpp @@ -51,3 +51,51 @@ void cppcoro::file_read_operation_impl::cancel( } #endif // CPPCORO_OS_WINNT + +#if CPPCORO_OS_LINUX +#include "io_uring.hpp" + +bool cppcoro::file_read_operation_impl::try_start( + cppcoro::detail::linux_async_operation_base& operation) noexcept +{ + io_uring_sqe sqe{}; + sqe.opcode = IORING_OP_READ; + sqe.fd = m_fd; + sqe.off = m_offset; + sqe.addr = reinterpret_cast(m_buffer); + sqe.len = m_byteCount; + sqe.user_data = reinterpret_cast(&operation); + + bool ok; + + try + { + ok = operation.m_aioContext->submit_one(sqe); + } + catch (std::system_error& ex) + { + operation.m_res = -ex.code().value(); + + return false; + } + + return ok; +} + +void cppcoro::file_read_operation_impl::cancel( + cppcoro::detail::linux_async_operation_base& operation) noexcept +{ + io_uring_sqe sqe{}; + sqe.opcode = IORING_OP_ASYNC_CANCEL; + + try + { + operation.m_aioContext->submit_one(sqe); + } + catch (...) + { + } +} + +#endif // CPPCORO_OS_LINUX + diff --git a/lib/file_write_operation.cpp b/lib/file_write_operation.cpp index 68a3ac41..65a90040 100644 --- a/lib/file_write_operation.cpp +++ b/lib/file_write_operation.cpp @@ -51,3 +51,50 @@ void cppcoro::file_write_operation_impl::cancel( } #endif // CPPCORO_OS_WINNT + +#if CPPCORO_OS_LINUX +#include "io_uring.hpp" + +bool cppcoro::file_write_operation_impl::try_start( + cppcoro::detail::linux_async_operation_base& operation) noexcept +{ + io_uring_sqe sqe{}; + sqe.opcode = IORING_OP_WRITE; + sqe.fd = m_fd; + sqe.off = m_offset; + sqe.addr = reinterpret_cast(m_buffer); + sqe.len = m_byteCount; + sqe.user_data = reinterpret_cast(&operation); + + bool ok; + + try + { + ok = operation.m_aioContext->submit_one(sqe); + } + catch (std::system_error& ex) + { + operation.m_res = -ex.code().value(); + + return false; + } + + return ok; +} + +void cppcoro::file_write_operation_impl::cancel( + cppcoro::detail::linux_async_operation_base& operation) noexcept +{ + io_uring_sqe sqe{}; + sqe.opcode = IORING_OP_ASYNC_CANCEL; + + try + { + operation.m_aioContext->submit_one(sqe); + } + catch (...) + { + } +} + +#endif // CPPCORO_OS_LINUX diff --git a/lib/io_service.cpp b/lib/io_service.cpp index 4b49195a..13a70262 100644 --- a/lib/io_service.cpp +++ b/lib/io_service.cpp @@ -530,10 +530,9 @@ void cppcoro::io_service::ensure_winsock_initialised() #if CPPCORO_OS_LINUX -cppcoro::detail::linux::io_uring_context& -cppcoro::io_service::io_uring_context() noexcept +cppcoro::detail::linux::io_uring_context *cppcoro::io_service::io_uring_context() noexcept { - return m_aioContext; + return &m_aioContext; } #endif // CPPCORO_OS_LINUX @@ -765,11 +764,8 @@ bool cppcoro::io_service::try_process_one_event(bool waitForEvent) return true; } - // auto* state = reinterpret_cast(cqe.user_data); - - // state->m_callback( - // state, - // cqe.res); + auto* state = reinterpret_cast(cqe.user_data); + state->m_callback(state, cqe.res); return true; } diff --git a/lib/read_only_file.cpp b/lib/read_only_file.cpp index 614aa48a..fc0ce9db 100644 --- a/lib/read_only_file.cpp +++ b/lib/read_only_file.cpp @@ -3,7 +3,7 @@ // Licenced under MIT license. See LICENSE.txt for details. /////////////////////////////////////////////////////////////////////////////// -#include +#include #if CPPCORO_OS_WINNT # ifndef WIN32_LEAN_AND_MEAN @@ -33,4 +33,29 @@ cppcoro::read_only_file::read_only_file( { } -#endif +#endif // CPPCORO_OS_WINNT + +#if CPPCORO_OS_LINUX + +cppcoro::read_only_file cppcoro::read_only_file::open( + io_service& ioService, + const std::filesystem::path& path, + file_share_mode shareMode, + file_buffering_mode bufferingMode) +{ + return read_only_file(file::open( + ioService, + path, + file_open_mode::open_existing, + shareMode, + bufferingMode)); +} + +cppcoro::read_only_file::read_only_file( + detail::linux::safe_file_data&& fileData) noexcept + : file(std::move(fileData)) + , readable_file(detail::linux::safe_file_data{}) +{ +} + +#endif // CPPCORO_OS_LINUX diff --git a/lib/read_write_file.cpp b/lib/read_write_file.cpp index 231e6790..74f7cb52 100644 --- a/lib/read_write_file.cpp +++ b/lib/read_write_file.cpp @@ -3,7 +3,7 @@ // Licenced under MIT license. See LICENSE.txt for details. /////////////////////////////////////////////////////////////////////////////// -#include +#include #if CPPCORO_OS_WINNT # ifndef WIN32_LEAN_AND_MEAN @@ -36,3 +36,30 @@ cppcoro::read_write_file::read_write_file( } #endif + +#if CPPCORO_OS_LINUX + +cppcoro::read_write_file cppcoro::read_write_file::open( + io_service& ioService, + const std::filesystem::path& path, + file_open_mode openMode, + file_share_mode shareMode, + file_buffering_mode bufferingMode) +{ + return read_write_file(file::open( + ioService, + path, + openMode, + shareMode, + bufferingMode)); +} + +cppcoro::read_write_file::read_write_file( + detail::linux::safe_file_data&& fileData) noexcept + : file(std::move(fileData)) + , readable_file(detail::linux::safe_file_data{}) + , writable_file(detail::linux::safe_file_data{}) +{ +} + +#endif // CPPCORO_OS_LINUX diff --git a/lib/readable_file.cpp b/lib/readable_file.cpp index 12b90f2b..0831a19e 100644 --- a/lib/readable_file.cpp +++ b/lib/readable_file.cpp @@ -33,4 +33,36 @@ cppcoro::file_read_operation_cancellable cppcoro::readable_file::read( std::move(ct)); } -#endif +#endif // CPPCORO_OS_WINNT + +#if CPPCORO_OS_LINUX + +cppcoro::file_read_operation cppcoro::readable_file::read( + std::uint64_t offset, + void* buffer, + std::size_t byteCount) const noexcept +{ + return file_read_operation( + m_fileData.fd.get(), + m_fileData.aioContext, + offset, + buffer, + byteCount); +} + +cppcoro::file_read_operation_cancellable cppcoro::readable_file::read( + std::uint64_t offset, + void* buffer, + std::size_t byteCount, + cancellation_token ct) const noexcept +{ + return file_read_operation_cancellable( + m_fileData.fd.get(), + m_fileData.aioContext, + offset, + buffer, + byteCount, + std::move(ct)); +} + +#endif // CPPCORO_OS_LINUX diff --git a/lib/writable_file.cpp b/lib/writable_file.cpp index 3ba63e7c..43b04758 100644 --- a/lib/writable_file.cpp +++ b/lib/writable_file.cpp @@ -49,12 +49,11 @@ cppcoro::file_write_operation cppcoro::writable_file::write( const void* buffer, std::size_t byteCount) noexcept { - return file_write_operation{ + return file_write_operation( m_fileHandle.handle(), offset, buffer, - byteCount - }; + byteCount); } cppcoro::file_write_operation_cancellable cppcoro::writable_file::write( @@ -63,13 +62,59 @@ cppcoro::file_write_operation_cancellable cppcoro::writable_file::write( std::size_t byteCount, cancellation_token ct) noexcept { - return file_write_operation_cancellable{ + return file_write_operation_cancellable( m_fileHandle.handle(), offset, buffer, byteCount, - std::move(ct) - }; + std::move(ct)); } -#endif +#endif // CPPCORO_OS_WINNT + +#if CPPCORO_OS_LINUX +#include + +void cppcoro::writable_file::set_size( + std::uint64_t fileSize) +{ + if (ftruncate64(m_fileData.fd.get(), fileSize) < 0) + { + throw std::system_error + { + errno, + std::system_category(), + "error setting file size: ftruncate64" + }; + } +} + +cppcoro::file_write_operation cppcoro::writable_file::write( + std::uint64_t offset, + const void* buffer, + std::size_t byteCount) noexcept +{ + return file_write_operation( + m_fileData.fd.get(), + m_fileData.aioContext, + offset, + buffer, + byteCount); +} + +cppcoro::file_write_operation_cancellable cppcoro::writable_file::write( + std::uint64_t offset, + const void* buffer, + std::size_t byteCount, + cancellation_token ct) noexcept +{ + return file_write_operation_cancellable( + m_fileData.fd.get(), + m_fileData.aioContext, + offset, + buffer, + byteCount, + std::move(ct)); +} + +#endif // CPPCORO_OS_LINUX diff --git a/lib/write_only_file.cpp b/lib/write_only_file.cpp index 4c91bddf..bfcae353 100644 --- a/lib/write_only_file.cpp +++ b/lib/write_only_file.cpp @@ -3,7 +3,7 @@ // Licenced under MIT license. See LICENSE.txt for details. /////////////////////////////////////////////////////////////////////////////// -#include +#include #if CPPCORO_OS_WINNT # ifndef WIN32_LEAN_AND_MEAN @@ -34,4 +34,30 @@ cppcoro::write_only_file::write_only_file( { } -#endif +#endif // CPPCORO_OS_WINNT + +#if CPPCORO_OS_LINUX + +cppcoro::write_only_file cppcoro::write_only_file::open( + io_service& ioService, + const std::filesystem::path& path, + file_open_mode openMode, + file_share_mode shareMode, + file_buffering_mode bufferingMode) +{ + return write_only_file(file::open( + ioService, + path, + openMode, + shareMode, + bufferingMode)); +} + +cppcoro::write_only_file::write_only_file( + detail::linux::safe_file_data&& fileData) noexcept + : file(std::move(fileData)) + , writable_file(detail::linux::safe_file_data{}) +{ +} + +#endif // CPPCORO_OS_LINUX diff --git a/test/build.cake b/test/build.cake index b5ed543e..103c4669 100644 --- a/test/build.cake +++ b/test/build.cake @@ -44,12 +44,12 @@ sources = script.cwd([ 'ipv6_endpoint_tests.cpp', 'static_thread_pool_tests.cpp', 'scheduling_operator_tests.cpp', + 'file_tests.cpp', ]) if variant.platform == 'windows': sources += script.cwd([ 'io_service_tests.cpp', - 'file_tests.cpp', 'socket_tests.cpp', ]) diff --git a/test/file_tests.cpp b/test/file_tests.cpp index 89742b8f..f64e8513 100644 --- a/test/file_tests.cpp +++ b/test/file_tests.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include "io_service_fixture.hpp" @@ -60,14 +61,14 @@ namespace fs::remove_all(m_path); } - const std::experimental::filesystem::path& temp_dir() + const fs::path& temp_dir() { return m_path; } private: - std::experimental::filesystem::path m_path; + fs::path m_path; }; From 481d37aa9d47cc72025fea94d8b4a1dd1334a5e8 Mon Sep 17 00:00:00 2001 From: GIG Date: Thu, 27 Aug 2020 18:43:50 +0300 Subject: [PATCH 3/4] Fix io_service::try_process_one_event() on Linux --- lib/io_service.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/io_service.cpp b/lib/io_service.cpp index 13a70262..ddce5669 100644 --- a/lib/io_service.cpp +++ b/lib/io_service.cpp @@ -94,8 +94,8 @@ namespace io_uring_sqe io_uring_message_sqe(std::uint64_t userData) { - const static __kernel_timespec ZERO_TIMEOUT{}; - io_uring_sqe sqe = io_uring_timeout_sqe(ZERO_TIMEOUT); + const static __kernel_timespec zero_timeout{}; + io_uring_sqe sqe = io_uring_timeout_sqe(zero_timeout); sqe.user_data = userData; return sqe; } @@ -771,7 +771,7 @@ bool cppcoro::io_service::try_process_one_event(bool waitForEvent) } else { - return true; + return false; } } #endif From fdfd947c40ce7f8f0c891d6cb6129b95831da06f Mon Sep 17 00:00:00 2001 From: GIG Date: Thu, 27 Aug 2020 21:38:17 +0300 Subject: [PATCH 4/4] Fix file writing on Linux --- include/cppcoro/file.hpp | 1 + lib/file.cpp | 20 +++++++++----------- lib/io_uring_context.cpp | 9 ++++++++- lib/read_only_file.cpp | 2 ++ lib/read_write_file.cpp | 2 ++ lib/write_only_file.cpp | 2 ++ test/build.cake | 2 +- 7 files changed, 25 insertions(+), 13 deletions(-) diff --git a/include/cppcoro/file.hpp b/include/cppcoro/file.hpp index bed7c133..16b05487 100644 --- a/include/cppcoro/file.hpp +++ b/include/cppcoro/file.hpp @@ -52,6 +52,7 @@ namespace cppcoro file(detail::linux::safe_file_data&& fileData) noexcept; static detail::linux::safe_file_data open( + int fileAccess, io_service& ioService, const std::filesystem::path& path, file_open_mode openMode, diff --git a/lib/file.cpp b/lib/file.cpp index d210f060..a24d10f9 100644 --- a/lib/file.cpp +++ b/lib/file.cpp @@ -194,13 +194,14 @@ cppcoro::file::file(detail::linux::safe_file_data &&fileData) noexcept } cppcoro::detail::linux::safe_file_data cppcoro::file::open( + int fileAccess, io_service &ioService, const std::filesystem::path &path, cppcoro::file_open_mode openMode, cppcoro::file_share_mode shareMode, cppcoro::file_buffering_mode bufferingMode) { - int flags = 0; + int flags = fileAccess; if ((bufferingMode & file_buffering_mode::temporary) == file_buffering_mode::temporary) { @@ -211,17 +212,13 @@ cppcoro::detail::linux::safe_file_data cppcoro::file::open( // TODO } - if ((shareMode & file_share_mode::read_write) == file_share_mode::read_write) - { - flags |= O_RDWR; - } - else if ((shareMode & file_share_mode::read) == file_share_mode::read) + if ((shareMode & file_share_mode::read) == file_share_mode::read) { - flags |= O_RDONLY; + // TODO } - else if ((shareMode & file_share_mode::write) == file_share_mode::write) + if ((shareMode & file_share_mode::write) == file_share_mode::write) { - flags |= O_WRONLY; + // TODO } if ((shareMode & file_share_mode::delete_) == file_share_mode::delete_) { @@ -240,14 +237,15 @@ cppcoro::detail::linux::safe_file_data cppcoro::file::open( flags |= O_EXCL; break; case file_open_mode::open_existing: - // This is default. + // Default. break; case file_open_mode::truncate_existing: flags |= O_TRUNC; break; } - cppcoro::detail::linux::safe_file_descriptor fd(::open(path.c_str(), flags)); + cppcoro::detail::linux::safe_file_descriptor fd( + ::open(path.c_str(), flags, S_IRWXU | S_IRWXG)); if (fd.get() < 0) { throw std::system_error diff --git a/lib/io_uring_context.cpp b/lib/io_uring_context.cpp index 4e73db62..c7ad8011 100644 --- a/lib/io_uring_context.cpp +++ b/lib/io_uring_context.cpp @@ -150,10 +150,17 @@ bool cppcoro::detail::linux::io_uring_context::get_single_event( if (head == atomic_load_acquire(m_cqRing.tail)) { if (!waitForEvent) + { return false; + } - if (io_uring_enter(m_ringFd.get(), 0, 1, IORING_ENTER_GETEVENTS, nullptr) < 0) + while (io_uring_enter(m_ringFd.get(), 0, 1, IORING_ENTER_GETEVENTS, nullptr) < 0) { + if (errno == EINTR) + { + continue; + } + throw std::system_error { errno, diff --git a/lib/read_only_file.cpp b/lib/read_only_file.cpp index fc0ce9db..cb549ebd 100644 --- a/lib/read_only_file.cpp +++ b/lib/read_only_file.cpp @@ -36,6 +36,7 @@ cppcoro::read_only_file::read_only_file( #endif // CPPCORO_OS_WINNT #if CPPCORO_OS_LINUX +#include cppcoro::read_only_file cppcoro::read_only_file::open( io_service& ioService, @@ -44,6 +45,7 @@ cppcoro::read_only_file cppcoro::read_only_file::open( file_buffering_mode bufferingMode) { return read_only_file(file::open( + O_RDONLY, ioService, path, file_open_mode::open_existing, diff --git a/lib/read_write_file.cpp b/lib/read_write_file.cpp index 74f7cb52..f1c92fdc 100644 --- a/lib/read_write_file.cpp +++ b/lib/read_write_file.cpp @@ -38,6 +38,7 @@ cppcoro::read_write_file::read_write_file( #endif #if CPPCORO_OS_LINUX +#include cppcoro::read_write_file cppcoro::read_write_file::open( io_service& ioService, @@ -47,6 +48,7 @@ cppcoro::read_write_file cppcoro::read_write_file::open( file_buffering_mode bufferingMode) { return read_write_file(file::open( + O_RDWR, ioService, path, openMode, diff --git a/lib/write_only_file.cpp b/lib/write_only_file.cpp index bfcae353..f331f8a7 100644 --- a/lib/write_only_file.cpp +++ b/lib/write_only_file.cpp @@ -37,6 +37,7 @@ cppcoro::write_only_file::write_only_file( #endif // CPPCORO_OS_WINNT #if CPPCORO_OS_LINUX +#include cppcoro::write_only_file cppcoro::write_only_file::open( io_service& ioService, @@ -46,6 +47,7 @@ cppcoro::write_only_file cppcoro::write_only_file::open( file_buffering_mode bufferingMode) { return write_only_file(file::open( + O_WRONLY, ioService, path, openMode, diff --git a/test/build.cake b/test/build.cake index 103c4669..b5ed543e 100644 --- a/test/build.cake +++ b/test/build.cake @@ -44,12 +44,12 @@ sources = script.cwd([ 'ipv6_endpoint_tests.cpp', 'static_thread_pool_tests.cpp', 'scheduling_operator_tests.cpp', - 'file_tests.cpp', ]) if variant.platform == 'windows': sources += script.cwd([ 'io_service_tests.cpp', + 'file_tests.cpp', 'socket_tests.cpp', ])