From 6dd94f82e4e074267b058345f03fe44b8bc980ce Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 19 Sep 2024 22:45:10 +0800 Subject: [PATCH 1/2] Initial implementation of event loop supporting io_uring, asynchronous reading and writing not yet implemented --- CMakeLists.txt | 14 ++ cmake/FindLibUring.cmake | 16 ++ src/Poller/EventPoller.cpp | 358 ++++++++++++++++++++++++++----------- src/Poller/EventPoller.h | 65 ++++--- 4 files changed, 330 insertions(+), 123 deletions(-) create mode 100644 cmake/FindLibUring.cmake diff --git a/CMakeLists.txt b/CMakeLists.txt index f4527f4b3..fd3e5189b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,6 +19,7 @@ if(HAVE_RECVMMSG_API) add_definitions(-DHAVE_RECVMMSG_API) endif() + # check the socket buffer size set by the upper cmake project, if it is set, use the setting of the upper cmake project, otherwise set it to 256K # if the socket buffer size is set to 0, it means that the socket buffer size is not set, and the kernel default value is used(just for linux) if(DEFINED SOCKET_DEFAULT_BUF_SIZE) @@ -89,6 +90,19 @@ set(ENABLE_OPENSSL ON CACHE BOOL "enable openssl") set(ENABLE_MYSQL ON CACHE BOOL "enable mysql") set(ASAN_USE_DELETE OFF CACHE BOOL "use delele[] or free when asan enabled") +# 添加io_uring支持 +# Add io_uring support +option(ENABLE_IO_URING "enable io_uring" ON) +if(ENABLE_IO_URING) + find_package(LibUring QUIET) + if(LIBURING_FOUND) + message(STATUS "Found liburing") + add_definitions(-DHAS_IO_URING) + include_directories(${LIBURING_INCLUDE_DIRS}) + list(APPEND LINK_LIB_LIST ${LIBURING_LIBRARIES}) + endif() +endif() + #查找openssl是否安装 #Find out if openssl is installed find_package(OpenSSL QUIET) diff --git a/cmake/FindLibUring.cmake b/cmake/FindLibUring.cmake new file mode 100644 index 000000000..75323e0a8 --- /dev/null +++ b/cmake/FindLibUring.cmake @@ -0,0 +1,16 @@ + +find_path(LIBURING_INCLUDE_DIR NAMES liburing.h) +mark_as_advanced(LIBURING_INCLUDE_DIR) + +find_library(LIBURING_LIBRARY NAMES uring) +mark_as_advanced(LIBURING_LIBRARY) + +include(FindPackageHandleStandardArgs) +FIND_PACKAGE_HANDLE_STANDARD_ARGS( + LIBURING + REQUIRED_VARS LIBURING_LIBRARY LIBURING_INCLUDE_DIR) + +if(LIBURING_FOUND) + set(LIBURING_LIBRARIES ${LIBURING_LIBRARY}) + set(LIBURING_INCLUDE_DIRS ${LIBURING_INCLUDE_DIR}) +endif() \ No newline at end of file diff --git a/src/Poller/EventPoller.cpp b/src/Poller/EventPoller.cpp index 700e1babb..13e60ea25 100644 --- a/src/Poller/EventPoller.cpp +++ b/src/Poller/EventPoller.cpp @@ -8,13 +8,13 @@ * may be found in the AUTHORS file in the root of the source tree. */ -#include "SelectWrap.h" #include "EventPoller.h" +#include "Network/sockutil.h" +#include "SelectWrap.h" +#include "Util/NoticeCenter.h" +#include "Util/TimeTicker.h" #include "Util/util.h" #include "Util/uv_errno.h" -#include "Util/TimeTicker.h" -#include "Util/NoticeCenter.h" -#include "Network/sockutil.h" #if defined(HAS_EPOLL) #include @@ -25,23 +25,21 @@ #define EPOLL_SIZE 1024 -//防止epoll惊群 [AUTO-TRANSLATED:ad53c775] -//Prevent epoll thundering +// 防止epoll惊群 [AUTO-TRANSLATED:ad53c775] +// Prevent epoll thundering #ifndef EPOLLEXCLUSIVE #define EPOLLEXCLUSIVE 0 #endif -#define toEpoll(event) (((event) & Event_Read) ? EPOLLIN : 0) \ - | (((event) & Event_Write) ? EPOLLOUT : 0) \ - | (((event) & Event_Error) ? (EPOLLHUP | EPOLLERR) : 0) \ - | (((event) & Event_LT) ? 0 : EPOLLET) +#define toEpoll(event) \ + (((event)&Event_Read) ? EPOLLIN : 0) | (((event)&Event_Write) ? EPOLLOUT : 0) | (((event)&Event_Error) ? (EPOLLHUP | EPOLLERR) : 0) \ + | (((event)&Event_LT) ? 0 : EPOLLET) -#define toPoller(epoll_event) (((epoll_event) & (EPOLLIN | EPOLLRDNORM | EPOLLHUP)) ? Event_Read : 0) \ - | (((epoll_event) & (EPOLLOUT | EPOLLWRNORM)) ? Event_Write : 0) \ - | (((epoll_event) & EPOLLHUP) ? Event_Error : 0) \ - | (((epoll_event) & EPOLLERR) ? Event_Error : 0) +#define toPoller(epoll_event) \ + (((epoll_event) & (EPOLLIN | EPOLLRDNORM | EPOLLHUP)) ? Event_Read : 0) | (((epoll_event) & (EPOLLOUT | EPOLLWRNORM)) ? Event_Write : 0) \ + | (((epoll_event)&EPOLLHUP) ? Event_Error : 0) | (((epoll_event)&EPOLLERR) ? Event_Error : 0) #define create_event() epoll_create(EPOLL_SIZE) -#endif //HAS_EPOLL +#endif // HAS_EPOLL #if defined(HAS_KQUEUE) #include @@ -62,35 +60,51 @@ void EventPoller::addEventPipe() { SockUtil::setNoBlocked(_pipe.writeFD()); // 添加内部管道事件 [AUTO-TRANSLATED:6a72e39a] - //Add internal pipe event + // Add internal pipe event if (addEvent(_pipe.readFD(), EventPoller::Event_Read, [this](int event) { onPipeEvent(); }) == -1) { throw std::runtime_error("Add pipe fd to poller failed"); } } EventPoller::EventPoller(std::string name) { +#if defined(HAS_IO_URING) + struct io_uring_params params; + memset(¶ms, 0, sizeof(params)); + params.flags = IORING_SETUP_SQPOLL; // 使用内核轮询模式 + params.sq_thread_idle = 2000; // 空闲2秒后休眠 + + if (io_uring_queue_init_params(1024, &_ring, ¶ms) < 0) { + throw runtime_error(StrPrinter << "Create io_uring failed: " << strerror(errno)); + } + + if (!(params.features & IORING_FEAT_FAST_POLL)) { + WarnL << "IORING_FEAT_FAST_POLL not available. Performance might be suboptimal."; + } +#else #if defined(HAS_EPOLL) || defined(HAS_KQUEUE) _event_fd = create_event(); if (_event_fd == -1) { throw runtime_error(StrPrinter << "Create event fd failed: " << get_uv_errmsg()); } SockUtil::setCloExec(_event_fd); -#endif //HAS_EPOLL - +#endif // HAS_EPOLL +#endif // HAS_IO_URING _name = std::move(name); _logger = Logger::Instance().shared_from_this(); addEventPipe(); } void EventPoller::shutdown() { - async_l([]() { - throw ExitException(); - }, false, true); + async_l([]() { throw ExitException(); }, false, true); if (_loop_thread) { - //防止作为子进程时崩溃 [AUTO-TRANSLATED:68727e34] - //Prevent crash when running as a child process - try { _loop_thread->join(); } catch (...) { _loop_thread->detach(); } + // 防止作为子进程时崩溃 [AUTO-TRANSLATED:68727e34] + // Prevent crash when running as a child process + try { + _loop_thread->join(); + } catch (...) { + _loop_thread->detach(); + } delete _loop_thread; _loop_thread = nullptr; } @@ -98,16 +112,18 @@ void EventPoller::shutdown() { EventPoller::~EventPoller() { shutdown(); - + #if defined(HAS_EPOLL) || defined(HAS_KQUEUE) if (_event_fd != -1) { close(_event_fd); _event_fd = -1; } +#elif defined(HAS_IO_URING) + io_uring_queue_exit(&_ring); #endif - //退出前清理管道中的数据 [AUTO-TRANSLATED:60e26f9a] - //Clean up pipe data before exiting + // 退出前清理管道中的数据 [AUTO-TRANSLATED:60e26f9a] + // Clean up pipe data before exiting onPipeEvent(true); InfoL << getThreadName(); } @@ -120,9 +136,33 @@ int EventPoller::addEvent(int fd, int event, PollEventCB cb) { } if (isCurrentThread()) { -#if defined(HAS_EPOLL) - struct epoll_event ev = {0}; - ev.events = toEpoll(event) ; +#if defined(HAS_IO_URING) + struct io_uring_sqe *sqe = io_uring_get_sqe(&_ring); + if (!sqe) { + WarnL << "Failed to get SQE for fd: " << fd; + return -1; + } + + uint32_t poll_mask = convert_to_poll_mask(event); + io_uring_prep_poll_add(sqe, fd, poll_mask); + + std::unique_ptr buffer(new EventPoller::IOBuffer); + buffer->fd = fd; + + buffer->callback = std::make_shared(std::move(cb)); +// DebugL << "callback address: " << std::addressof(buffer->callback); + io_uring_sqe_set_data(sqe, buffer.get()); + _io_buffers.emplace(fd, std::move(buffer)); + int ret = io_uring_submit(&_ring); + if (ret < 0) { + ErrorL << "Failed to submit io_uring request: " << strerror(-ret); + _io_buffers.erase(fd); + return ret; + } + return 0; +#elif defined(HAS_EPOLL) + struct epoll_event ev = { 0 }; + ev.events = toEpoll(event); ev.data.fd = fd; int ret = epoll_ctl(_event_fd, EPOLL_CTL_ADD, fd, &ev); if (ret != -1) { @@ -146,7 +186,7 @@ int EventPoller::addEvent(int fd, int event, PollEventCB cb) { #else #ifndef _WIN32 // win32平台,socket套接字不等于文件描述符,所以可能不适用这个限制 [AUTO-TRANSLATED:6adfc664] - //On the win32 platform, the socket does not equal the file descriptor, so this restriction may not apply + // On the win32 platform, the socket does not equal the file descriptor, so this restriction may not apply if (fd >= FD_SETSIZE) { WarnL << "select() can not watch fd bigger than " << FD_SETSIZE; return -1; @@ -161,9 +201,7 @@ int EventPoller::addEvent(int fd, int event, PollEventCB cb) { #endif } - async([this, fd, event, cb]() mutable { - addEvent(fd, event, std::move(cb)); - }); + async([this, fd, event, cb]() mutable { addEvent(fd, event, std::move(cb)); }); return 0; } @@ -174,7 +212,30 @@ int EventPoller::delEvent(int fd, PollCompleteCB cb) { } if (isCurrentThread()) { -#if defined(HAS_EPOLL) +#if defined(HAS_IO_URING) + auto it = _io_buffers.find(fd); + if (it != _io_buffers.end()) { + struct io_uring_sqe *sqe = io_uring_get_sqe(&_ring); + if (!sqe) { + WarnL << "Failed to get SQE for cancelling fd: " << fd; + cb(false); + return -1; + } + io_uring_prep_poll_remove(sqe, (unsigned long long)(uintptr_t)it->second.get()); + int ret = io_uring_submit(&_ring); + if (ret < 0) { + ErrorL << "Failed to submit cancel request: " << strerror(-ret); + cb(false); + return ret; + } + _event_cache_expired.emplace(fd); + _io_buffers.erase(it); + cb(true); + return 0; + } + cb(false); + return -1; +#elif defined(HAS_EPOLL) int ret = -1; if (_event_map.erase(fd)) { _event_cache_expired.emplace(fd); @@ -202,14 +263,12 @@ int EventPoller::delEvent(int fd, PollCompleteCB cb) { } cb(ret != -1); return ret; -#endif //HAS_EPOLL +#endif // HAS_EPOLL } - //跨线程操作 [AUTO-TRANSLATED:4e116519] - //Cross-thread operation - async([this, fd, cb]() mutable { - delEvent(fd, std::move(cb)); - }); + // 跨线程操作 [AUTO-TRANSLATED:4e116519] + // Cross-thread operation + async([this, fd, cb]() mutable { delEvent(fd, std::move(cb)); }); return 0; } @@ -219,7 +278,17 @@ int EventPoller::modifyEvent(int fd, int event, PollCompleteCB cb) { cb = [](bool success) {}; } if (isCurrentThread()) { -#if defined(HAS_EPOLL) +#if defined(HAS_IO_URING) + auto it = _io_buffers.find(fd); + if (it != _io_buffers.end()) { + uint32_t poll_mask = convert_to_poll_mask(event); + rearm_io_uring(fd, poll_mask); + cb(true); + return 0; + } + cb(false); + return -1; +#elif defined(HAS_EPOLL) struct epoll_event ev = { 0 }; ev.events = toEpoll(event); ev.data.fd = fd; @@ -243,9 +312,7 @@ int EventPoller::modifyEvent(int fd, int event, PollCompleteCB cb) { return it != _event_map.end() ? 0 : -1; #endif // HAS_EPOLL } - async([this, fd, event, cb]() mutable { - modifyEvent(fd, event, std::move(cb)); - }); + async([this, fd, event, cb]() mutable { modifyEvent(fd, event, std::move(cb)); }); return 0; } @@ -273,8 +340,8 @@ Task::Ptr EventPoller::async_l(TaskIn task, bool may_sync, bool first) { _list_task.emplace_back(ret); } } - //写数据到管道,唤醒主线程 [AUTO-TRANSLATED:2ead8182] - //Write data to the pipe and wake up the main thread + // 写数据到管道,唤醒主线程 [AUTO-TRANSLATED:2ead8182] + // Write data to the pipe and wake up the main thread _pipe.write("", 1); return ret; } @@ -287,22 +354,22 @@ inline void EventPoller::onPipeEvent(bool flush) { char buf[1024]; int err = 0; if (!flush) { - for (;;) { - if ((err = _pipe.read(buf, sizeof(buf))) > 0) { - // 读到管道数据,继续读,直到读空为止 [AUTO-TRANSLATED:47bd325c] - //Read data from the pipe, continue reading until it's empty - continue; - } - if (err == 0 || get_uv_error(true) != UV_EAGAIN) { - // 收到eof或非EAGAIN(无更多数据)错误,说明管道无效了,重新打开管道 [AUTO-TRANSLATED:5f7a013d] - //Received eof or non-EAGAIN (no more data) error, indicating that the pipe is invalid, reopen the pipe - ErrorL << "Invalid pipe fd of event poller, reopen it"; - delEvent(_pipe.readFD()); - _pipe.reOpen(); - addEventPipe(); - } - break; - } + for (;;) { + if ((err = _pipe.read(buf, sizeof(buf))) > 0) { + // 读到管道数据,继续读,直到读空为止 [AUTO-TRANSLATED:47bd325c] + // Read data from the pipe, continue reading until it's empty + continue; + } + if (err == 0 || get_uv_error(true) != UV_EAGAIN) { + // 收到eof或非EAGAIN(无更多数据)错误,说明管道无效了,重新打开管道 [AUTO-TRANSLATED:5f7a013d] + // Received eof or non-EAGAIN (no more data) error, indicating that the pipe is invalid, reopen the pipe + ErrorL << "Invalid pipe fd of event poller, reopen it"; + delEvent(_pipe.readFD()); + _pipe.reOpen(); + addEventPipe(); + } + break; + } } decltype(_list_task) _list_swap; @@ -325,7 +392,7 @@ inline void EventPoller::onPipeEvent(bool flush) { SocketRecvBuffer::Ptr EventPoller::getSharedBuffer(bool is_udp) { #if !defined(__linux) && !defined(__linux__) // 非Linux平台下,tcp和udp共享recvfrom方案,使用同一个buffer [AUTO-TRANSLATED:2d2ee7bf] - //On non-Linux platforms, tcp and udp share the recvfrom scheme, using the same buffer + // On non-Linux platforms, tcp and udp share the recvfrom scheme, using the same buffer is_udp = 0; #endif auto ret = _shared_buffer[is_udp].lock(); @@ -340,7 +407,7 @@ thread::id EventPoller::getThreadId() const { return _loop_thread ? _loop_thread->get_id() : thread::id(); } -const std::string& EventPoller::getThreadName() const { +const std::string &EventPoller::getThreadName() const { return _name; } @@ -359,16 +426,83 @@ void EventPoller::runLoop(bool blocked, bool ref_self) { _sem_run_started.post(); _exit_flag = false; uint64_t minDelay; -#if defined(HAS_EPOLL) +#if defined(HAS_IO_URING) + struct io_uring_cqe *cqe; + while (!_exit_flag) { + minDelay = getMinDelay(); + int ret; + if (minDelay) { + struct __kernel_timespec ts = { .tv_sec = static_cast(minDelay / 1000), .tv_nsec = static_cast((minDelay % 1000) * 1000000) }; + ret = io_uring_wait_cqe_timeout(&_ring, &cqe, &ts); + } else { + ret = io_uring_wait_cqe(&_ring, &cqe); + } + + if (ret < 0 && ret != -ETIME) { + ErrorL << "io_uring_wait_cqe_timeout failed: " << strerror(-ret); + continue; + } + + _event_cache_expired.clear(); + + unsigned head; + unsigned count = 0; + io_uring_for_each_cqe(&_ring, head, cqe) { + ++count; + IOBuffer *buffer = static_cast(io_uring_cqe_get_data(cqe)); + + if (!buffer || _event_cache_expired.count(buffer->fd)) { + continue; + } + + int fd = buffer->fd; + int event = convert_from_poll_mask(cqe->res); + + std::shared_ptr callback; + { + auto it = _io_buffers.find(fd); + if (it != _io_buffers.end()) { + callback = it->second->callback; + } + } + if (callback) { + try { + (*callback)(event); + } catch (std::exception &ex) { + ErrorL << "Exception occurred when do event task: " << ex.what(); + } + + // 重新添加监听 + struct io_uring_sqe *sqe = io_uring_get_sqe(&_ring); + if (sqe) { + // io_uring_prep_poll_add(sqe, buffer->fd, POLLIN | POLLOUT); + io_uring_prep_poll_add(sqe, fd, convert_to_poll_mask(event)); + io_uring_sqe_set_data(sqe, buffer); + } else { + WarnL << "Failed to get SQE for re-adding fd: " << fd; + } + } + } + + io_uring_cq_advance(&_ring, count); + + if (count > 0) { + ret = io_uring_submit(&_ring); + if (ret < 0) { + ErrorL << "Failed to submit io_uring requests: " << strerror(-ret); + } + } + } +#elif defined(HAS_EPOLL) struct epoll_event events[EPOLL_SIZE]; while (!_exit_flag) { minDelay = getMinDelay(); - startSleep();//用于统计当前线程负载情况 + startSleep(); // 用于统计当前线程负载情况 int ret = epoll_wait(_event_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1); - sleepWakeUp();//用于统计当前线程负载情况 + sleepWakeUp(); // 用于统计当前线程负载情况 if (ret <= 0) { - //超时或被打断 [AUTO-TRANSLATED:7005fded] - //Timed out or interrupted + // 超时或被打断 [AUTO-TRANSLATED:7005fded] + // Timed out or interrupted continue; } @@ -378,7 +512,7 @@ void EventPoller::runLoop(bool blocked, bool ref_self) { struct epoll_event &ev = events[i]; int fd = ev.data.fd; if (_event_cache_expired.count(fd)) { - //event cache refresh + // event cache refresh continue; } @@ -445,10 +579,10 @@ void EventPoller::runLoop(bool blocked, bool ref_self) { List callback_list; struct timeval tv; while (!_exit_flag) { - //定时器事件中可能操作_event_map [AUTO-TRANSLATED:f2a50ee2] - //Possible operations on _event_map in timer events + // 定时器事件中可能操作_event_map [AUTO-TRANSLATED:f2a50ee2] + // Possible operations on _event_map in timer events minDelay = getMinDelay(); - tv.tv_sec = (decltype(tv.tv_sec)) (minDelay / 1000); + tv.tv_sec = (decltype(tv.tv_sec))(minDelay / 1000); tv.tv_usec = 1000 * (minDelay % 1000); set_read.fdZero(); @@ -460,30 +594,30 @@ void EventPoller::runLoop(bool blocked, bool ref_self) { max_fd = pr.first; } if (pr.second->event & Event_Read) { - set_read.fdSet(pr.first);//监听管道可读事件 + set_read.fdSet(pr.first); // 监听管道可读事件 } if (pr.second->event & Event_Write) { - set_write.fdSet(pr.first);//监听管道可写事件 + set_write.fdSet(pr.first); // 监听管道可写事件 } if (pr.second->event & Event_Error) { - set_err.fdSet(pr.first);//监听管道错误事件 + set_err.fdSet(pr.first); // 监听管道错误事件 } } - startSleep();//用于统计当前线程负载情况 + startSleep(); // 用于统计当前线程负载情况 ret = zl_select(max_fd + 1, &set_read, &set_write, &set_err, minDelay ? &tv : nullptr); - sleepWakeUp();//用于统计当前线程负载情况 + sleepWakeUp(); // 用于统计当前线程负载情况 if (ret <= 0) { - //超时或被打断 [AUTO-TRANSLATED:7005fded] - //Timed out or interrupted + // 超时或被打断 [AUTO-TRANSLATED:7005fded] + // Timed out or interrupted continue; } _event_cache_expired.clear(); - //收集select事件类型 [AUTO-TRANSLATED:9a5c41d3] - //Collect select event types + // 收集select事件类型 [AUTO-TRANSLATED:9a5c41d3] + // Collect select event types for (auto &pr : _event_map) { int event = 0; if (set_read.isSet(pr.first)) { @@ -503,7 +637,7 @@ void EventPoller::runLoop(bool blocked, bool ref_self) { callback_list.for_each([&](Poll_Record::Ptr &record) { if (_event_cache_expired.count(record->fd)) { - //event cache refresh + // event cache refresh return; } @@ -515,7 +649,7 @@ void EventPoller::runLoop(bool blocked, bool ref_self) { }); callback_list.clear(); } -#endif //HAS_EPOLL +#endif // HAS_EPOLL } else { _loop_thread = new thread(&EventPoller::runLoop, this, true, ref_self); _sem_run_started.wait(); @@ -527,13 +661,13 @@ uint64_t EventPoller::flushDelayTask(uint64_t now_time) { task_copy.swap(_delay_task_map); for (auto it = task_copy.begin(); it != task_copy.end() && it->first <= now_time; it = task_copy.erase(it)) { - //已到期的任务 [AUTO-TRANSLATED:849cdc29] - //Expired tasks + // 已到期的任务 [AUTO-TRANSLATED:849cdc29] + // Expired tasks try { auto next_delay = (*(it->second))(); if (next_delay) { - //可重复任务,更新时间截止线 [AUTO-TRANSLATED:c7746a21] - //Repeatable tasks, update deadline + // 可重复任务,更新时间截止线 [AUTO-TRANSLATED:c7746a21] + // Repeatable tasks, update deadline _delay_task_map.emplace(next_delay + now_time, std::move(it->second)); } } catch (std::exception &ex) { @@ -546,30 +680,30 @@ uint64_t EventPoller::flushDelayTask(uint64_t now_time) { auto it = _delay_task_map.begin(); if (it == _delay_task_map.end()) { - //没有剩余的定时器了 [AUTO-TRANSLATED:23b1119e] - //No remaining timers + // 没有剩余的定时器了 [AUTO-TRANSLATED:23b1119e] + // No remaining timers return 0; } - //最近一个定时器的执行延时 [AUTO-TRANSLATED:2535621b] - //Delay in execution of the last timer + // 最近一个定时器的执行延时 [AUTO-TRANSLATED:2535621b] + // Delay in execution of the last timer return it->first - now_time; } uint64_t EventPoller::getMinDelay() { auto it = _delay_task_map.begin(); if (it == _delay_task_map.end()) { - //没有剩余的定时器了 [AUTO-TRANSLATED:23b1119e] - //No remaining timers + // 没有剩余的定时器了 [AUTO-TRANSLATED:23b1119e] + // No remaining timers return 0; } auto now = getCurrentMillisecond(); if (it->first > now) { - //所有任务尚未到期 [AUTO-TRANSLATED:8d80eabf] - //All tasks have not expired + // 所有任务尚未到期 [AUTO-TRANSLATED:8d80eabf] + // All tasks have not expired return it->first - now; } - //执行已到期的任务并刷新休眠延时 [AUTO-TRANSLATED:cd6348b7] - //Execute expired tasks and refresh sleep delay + // 执行已到期的任务并刷新休眠延时 [AUTO-TRANSLATED:cd6348b7] + // Execute expired tasks and refresh sleep delay return flushDelayTask(now); } @@ -577,14 +711,33 @@ EventPoller::DelayTask::Ptr EventPoller::doDelayTask(uint64_t delay_ms, function DelayTask::Ptr ret = std::make_shared(std::move(task)); auto time_line = getCurrentMillisecond() + delay_ms; async_first([time_line, ret, this]() { - //异步执行的目的是刷新select或epoll的休眠时间 [AUTO-TRANSLATED:a6b5c8d7] - //The purpose of asynchronous execution is to refresh the sleep time of select or epoll + // 异步执行的目的是刷新select或epoll的休眠时间 [AUTO-TRANSLATED:a6b5c8d7] + // The purpose of asynchronous execution is to refresh the sleep time of select or epoll _delay_task_map.emplace(time_line, ret); }); return ret; } +#if defined(HAS_IO_URING) +void EventPoller::rearm_io_uring(int fd, uint32_t events) { + struct io_uring_sqe *sqe = io_uring_get_sqe(&_ring); + if (!sqe) { + WarnL << "Failed to get SQE for re-arming fd: " << fd; + return; + } + + io_uring_prep_poll_add(sqe, fd, events); + auto it = _io_buffers.find(fd); + if (it != _io_buffers.end()) { + io_uring_sqe_set_data(sqe, it->second.get()); + } + int ret = io_uring_submit(&_ring); + if (ret < 0) { + ErrorL << "Failed to submit io_uring re-arm request: " << strerror(-ret); + } +} +#endif /////////////////////////////////////////////// static size_t s_pool_size = 0; @@ -624,5 +777,4 @@ void EventPollerPool::enableCpuAffinity(bool enable) { s_enable_cpu_affinity = enable; } -} // namespace toolkit - +} // namespace toolkit diff --git a/src/Poller/EventPoller.h b/src/Poller/EventPoller.h index e055788a9..33ced1411 100644 --- a/src/Poller/EventPoller.h +++ b/src/Poller/EventPoller.h @@ -34,6 +34,13 @@ #define HAS_KQUEUE #endif // __APPLE__ +#if defined(HAS_IO_URING) +#include +#include +#undef HAS_EPOLL +#undef HAS_KQUEUE +#endif //HAS_IO_URING + namespace toolkit { class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_shared_from_this { @@ -52,6 +59,12 @@ class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_s Event_LT = 1 << 3,//水平触发 } Poll_Event; + struct IOBuffer { + char data[4096]; + int fd; + + std::shared_ptr callback; + }; ~EventPoller(); /** @@ -278,56 +291,68 @@ class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_s */ void addEventPipe(); +#if defined(HAS_IO_URING) + void rearm_io_uring(int fd, uint32_t events); + uint32_t convert_to_poll_mask(int event) { + uint32_t mask = 0; + if (event & Event_Read) mask |= POLLIN; + if (event & Event_Write) mask |= POLLOUT; + if (event & Event_Error) mask |= POLLERR; + return mask; + } + + int convert_from_poll_mask(uint32_t mask) { + int event = 0; + if (mask & POLLIN) event |= Event_Read; + if (mask & POLLOUT) event |= Event_Write; + if (mask & POLLERR) event |= Event_Error; + return event; + } +#endif + private: class ExitException : public std::exception {}; private: //标记loop线程是否退出 [AUTO-TRANSLATED:98250f84] - //标记loop线程是否退出 -// Mark the loop thread as exited + // Mark the loop thread as exited bool _exit_flag; //线程名 [AUTO-TRANSLATED:f1d62d9f] - //线程名 -// Thread name + // Thread name std::string _name; //当前线程下,所有socket共享的读缓存 [AUTO-TRANSLATED:6ce70017] - //当前线程下,所有socket共享的读缓存 -// Shared read buffer for all sockets under the current thread + // Shared read buffer for all sockets under the current thread std::weak_ptr _shared_buffer[2]; //执行事件循环的线程 [AUTO-TRANSLATED:2465cc75] - //执行事件循环的线程 -// Thread that executes the event loop + // Thread that executes the event loop std::thread *_loop_thread = nullptr; //通知事件循环的线程已启动 [AUTO-TRANSLATED:61f478cf] - //通知事件循环的线程已启动 -// Notify the event loop thread that it has started + // Notify the event loop thread that it has started semaphore _sem_run_started; //内部事件管道 [AUTO-TRANSLATED:dc1d3a93] - //内部事件管道 -// Internal event pipe + // Internal event pipe PipeWrap _pipe; //从其他线程切换过来的任务 [AUTO-TRANSLATED:d16917d6] - //从其他线程切换过来的任务 -// Tasks switched from other threads + // Tasks switched from other threads std::mutex _mtx_task; List _list_task; //保持日志可用 [AUTO-TRANSLATED:4a6c2438] - //保持日志可用 -// Keep the log available + // Keep the log available Logger::Ptr _logger; #if defined(HAS_EPOLL) || defined(HAS_KQUEUE) // epoll和kqueue相关 [AUTO-TRANSLATED:84d2785e] - //epoll和kqueue相关 -// epoll and kqueue related + // epoll and kqueue related int _event_fd = -1; std::unordered_map > _event_map; +#elif defined(HAS_IO_URING) + struct io_uring _ring; + std::unordered_map> _io_buffers; #else // select相关 [AUTO-TRANSLATED:bf3e2edd] - //select相关 -// select related + // select related struct Poll_Record { using Ptr = std::shared_ptr; int fd; From ca83162ca0617d835b6f2f37b9c298e5f0fabcfe Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 19 Sep 2024 22:51:56 +0800 Subject: [PATCH 2/2] Initial implementation of event loop supporting io_uring, asynchronous reading and writing not yet implemented --- CMakeLists.txt | 4 ++-- cmake/{FindLibUring.cmake => FindLIBURING.cmake} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename cmake/{FindLibUring.cmake => FindLIBURING.cmake} (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index fd3e5189b..4bd933a1d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -89,12 +89,12 @@ endif() set(ENABLE_OPENSSL ON CACHE BOOL "enable openssl") set(ENABLE_MYSQL ON CACHE BOOL "enable mysql") set(ASAN_USE_DELETE OFF CACHE BOOL "use delele[] or free when asan enabled") +set(ENABLE_IO_URING OFF CACHE BOOL "enable io_uring") # 添加io_uring支持 # Add io_uring support -option(ENABLE_IO_URING "enable io_uring" ON) if(ENABLE_IO_URING) - find_package(LibUring QUIET) + find_package(LIBURING QUIET) if(LIBURING_FOUND) message(STATUS "Found liburing") add_definitions(-DHAS_IO_URING) diff --git a/cmake/FindLibUring.cmake b/cmake/FindLIBURING.cmake similarity index 100% rename from cmake/FindLibUring.cmake rename to cmake/FindLIBURING.cmake