diff --git a/src/Poller/EventPoller.cpp b/src/Poller/EventPoller.cpp index 8e1e9941..45ef73be 100644 --- a/src/Poller/EventPoller.cpp +++ b/src/Poller/EventPoller.cpp @@ -39,8 +39,15 @@ | (((epoll_event) & EPOLLOUT) ? Event_Write : 0) \ | (((epoll_event) & EPOLLHUP) ? Event_Error : 0) \ | (((epoll_event) & EPOLLERR) ? Event_Error : 0) +#define create_event() epoll_create(EPOLL_SIZE) #endif //HAS_EPOLL +#if defined(HAS_KQUEUE) +#include +#define KEVENT_SIZE 1024 +#define create_event() kqueue() +#endif // HAS_KQUEUE + using namespace std; namespace toolkit { @@ -60,14 +67,15 @@ void EventPoller::addEventPipe() { } EventPoller::EventPoller(std::string name) { - _name = std::move(name); -#if defined(HAS_EPOLL) - _epoll_fd = epoll_create(EPOLL_SIZE); - if (_epoll_fd == -1) { - throw runtime_error(StrPrinter << "Create epoll fd failed: " << get_uv_errmsg()); +#if defined(HAS_EPOLL) || defined(HAS_KQUEUE) + _event_fd = create_event(); + if (_event_fd == -1) { + throw runtime_error(StrPrinter << "Create event fd failed: " << get_uv_errmsg()); } - SockUtil::setCloExec(_epoll_fd); + SockUtil::setCloExec(_event_fd); #endif //HAS_EPOLL + + _name = std::move(name); _logger = Logger::Instance().shared_from_this(); addEventPipe(); } @@ -87,12 +95,14 @@ void EventPoller::shutdown() { EventPoller::~EventPoller() { shutdown(); -#if defined(HAS_EPOLL) - if (_epoll_fd != -1) { - close(_epoll_fd); - _epoll_fd = -1; + +#if defined(HAS_EPOLL) || defined(HAS_KQUEUE) + if (_event_fd != -1) { + close(_event_fd); + _event_fd = -1; } -#endif //defined(HAS_EPOLL) +#endif + //退出前清理管道中的数据 onPipeEvent(); InfoL << getThreadName(); @@ -110,26 +120,37 @@ int EventPoller::addEvent(int fd, int event, PollEventCB cb) { struct epoll_event ev = {0}; ev.events = (toEpoll(event)) | EPOLLEXCLUSIVE; ev.data.fd = fd; - int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev); - if (ret == 0) { + int ret = epoll_ctl(_event_fd, EPOLL_CTL_ADD, fd, &ev); + if (ret != -1) { + _event_map.emplace(fd, std::make_shared(std::move(cb))); + } + return ret; +#elif defined(HAS_KQUEUE) + struct kevent kev[2]; + int index = 0; + if (event & Event_Read) { + EV_SET(&kev[index++], fd, EVFILT_READ, EV_ADD, 0, 0, nullptr); + } + if (event & Event_Write) { + EV_SET(&kev[index++], fd, EVFILT_WRITE, EV_ADD, 0, 0, nullptr); + } + int ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr); + if (ret != -1) { _event_map.emplace(fd, std::make_shared(std::move(cb))); } return ret; #else -#ifndef _WIN32 - //win32平台,socket套接字不等于文件描述符,所以可能不适用这个限制 - if (fd >= FD_SETSIZE || _event_map.size() >= FD_SETSIZE) { + if (fd >= FD_SETSIZE) { WarnL << "select() can not watch fd bigger than " << FD_SETSIZE; return -1; } -#endif auto record = std::make_shared(); record->fd = fd; record->event = event; record->call_back = std::move(cb); _event_map.emplace(fd, record); return 0; -#endif //HAS_EPOLL +#endif } async([this, fd, event, cb]() mutable { @@ -146,19 +167,33 @@ int EventPoller::delEvent(int fd, PollCompleteCB cb) { if (isCurrentThread()) { #if defined(HAS_EPOLL) - bool success = epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr) == 0 && _event_map.erase(fd) > 0; - if (success) { - _event_cache_expired_map[fd] = true; + int ret = -1; + if (_event_map.erase(fd)) { + _event_cache_expired.emplace(fd); + ret = epoll_ctl(_event_fd, EPOLL_CTL_DEL, fd, nullptr); } - cb(success); - return success ? 0 : -1; + cb(ret != -1); + return ret; +#elif defined(HAS_KQUEUE) + int ret = -1; + if (_event_map.erase(fd)) { + _event_cache_expired.emplace(fd); + struct kevent kev[2]; + int index = 0; + EV_SET(&kev[index++], fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr); + EV_SET(&kev[index++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr); + ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr); + } + cb(ret != -1); + return ret; #else - bool success = _event_map.erase(fd); - if (success) { - _event_cache_expired_map[fd] = true; + int ret = -1; + if (_event_map.erase(fd)) { + _event_cache_expired.emplace(fd); + ret = 0; } - cb(success); - return 0; + cb(ret != -1); + return ret; #endif //HAS_EPOLL } @@ -179,8 +214,16 @@ int EventPoller::modifyEvent(int fd, int event, PollCompleteCB cb) { struct epoll_event ev = { 0 }; ev.events = toEpoll(event); ev.data.fd = fd; - auto ret = epoll_ctl(_epoll_fd, EPOLL_CTL_MOD, fd, &ev); - cb(ret == 0); + auto ret = epoll_ctl(_event_fd, EPOLL_CTL_MOD, fd, &ev); + cb(ret != -1); + return ret; +#elif defined(HAS_KQUEUE) + struct kevent kev[2]; + int index = 0; + EV_SET(&kev[index++], fd, EVFILT_READ, event & Event_Read ? EV_ADD : EV_DELETE, 0, 0, nullptr); + EV_SET(&kev[index++], fd, EVFILT_WRITE, event & Event_Write ? EV_ADD : EV_DELETE, 0, 0, nullptr); + int ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr); + cb(ret != -1); return ret; #else auto it = _event_map.find(fd); @@ -188,7 +231,7 @@ int EventPoller::modifyEvent(int fd, int event, PollCompleteCB cb) { it->second->event = event; } cb(it != _event_map.end()); - return 0; + return it != _event_map.end() ? 0 : -1; #endif // HAS_EPOLL } async([this, fd, event, cb]() mutable { @@ -304,26 +347,26 @@ void EventPoller::runLoop(bool blocked, bool ref_self) { while (!_exit_flag) { minDelay = getMinDelay(); startSleep();//用于统计当前线程负载情况 - int ret = epoll_wait(_epoll_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1); + int ret = epoll_wait(_event_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1); sleepWakeUp();//用于统计当前线程负载情况 if (ret <= 0) { //超时或被打断 continue; } - _event_cache_expired_map.clear(); + _event_cache_expired.clear(); for (int i = 0; i < ret; ++i) { struct epoll_event &ev = events[i]; int fd = ev.data.fd; - if (_event_cache_expired_map.find(fd) != _event_cache_expired_map.end()) { + if (_event_cache_expired.count(fd)) { //event cache refresh continue; } auto it = _event_map.find(fd); if (it == _event_map.end()) { - epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr); + epoll_ctl(_event_fd, EPOLL_CTL_DEL, fd, nullptr); continue; } auto cb = it->second; @@ -334,6 +377,50 @@ void EventPoller::runLoop(bool blocked, bool ref_self) { } } } +#elif defined(HAS_KQUEUE) + struct kevent kevents[KEVENT_SIZE]; + while (!_exit_flag) { + minDelay = getMinDelay(); + struct timespec timeout = { (long)minDelay / 1000, (long)minDelay % 1000 * 1000000 }; + + startSleep(); + int ret = kevent(_event_fd, nullptr, 0, kevents, KEVENT_SIZE, minDelay ? &timeout : nullptr); + sleepWakeUp(); + if (ret <= 0) { + continue; + } + + _event_cache_expired.clear(); + + for (int i = 0; i < ret; ++i) { + auto &kev = kevents[i]; + auto fd = kev.ident; + if (_event_cache_expired.count(fd)) { + // event cache refresh + continue; + } + + auto it = _event_map.find(fd); + if (it == _event_map.end()) { + EV_SET(&kev, fd, kev.filter, EV_DELETE, 0, 0, nullptr); + kevent(_event_fd, &kev, 1, nullptr, 0, nullptr); + continue; + } + auto cb = it->second; + int event = 0; + switch (kev.filter) { + case EVFILT_READ: event = Event_Read; break; + case EVFILT_WRITE: event = Event_Write; break; + default: WarnL << "unknown kevent filter: " << kev.filter; break; + } + + try { + (*cb)(event); + } catch (std::exception &ex) { + ErrorL << "Exception occurred when do event task: " << ex.what(); + } + } + } #else int ret, max_fd; FdSet set_read, set_write, set_err; @@ -373,7 +460,7 @@ void EventPoller::runLoop(bool blocked, bool ref_self) { continue; } - _event_cache_expired_map.clear(); + _event_cache_expired.clear(); //收集select事件类型 for (auto &pr : _event_map) { @@ -393,8 +480,8 @@ void EventPoller::runLoop(bool blocked, bool ref_self) { } } - callback_list.for_each([this](Poll_Record::Ptr &record) { - if (this->_event_cache_expired_map.find(record->fd) != this->_event_cache_expired_map.end()) { + callback_list.for_each([&](Poll_Record::Ptr &record) { + if (_event_cache_expired.count(record->fd)) { //event cache refresh return; } diff --git a/src/Poller/EventPoller.h b/src/Poller/EventPoller.h index b566276e..f816d832 100644 --- a/src/Poller/EventPoller.h +++ b/src/Poller/EventPoller.h @@ -17,6 +17,7 @@ #include #include #include +#include #include "PipeWrap.h" #include "Util/logger.h" #include "Util/List.h" @@ -28,6 +29,10 @@ #define HAS_EPOLL #endif //__linux__ +#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) +#define HAS_KQUEUE +#endif // __APPLE__ + namespace toolkit { class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_shared_from_this { @@ -202,12 +207,12 @@ class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_s //保持日志可用 Logger::Ptr _logger; -#if defined(HAS_EPOLL) - //epoll相关 - int _epoll_fd = -1; - unordered_map > _event_map; +#if defined(HAS_EPOLL) || defined(HAS_KQUEUE) + // epoll和kqueue相关 + int _event_fd = -1; + std::unordered_map > _event_map; #else - //select相关 + // select相关 struct Poll_Record { using Ptr = std::shared_ptr; int fd; @@ -215,9 +220,9 @@ class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_s int attach; PollEventCB call_back; }; - unordered_map _event_map; + std::unordered_map _event_map; #endif //HAS_EPOLL - unordered_map _event_cache_expired_map; + std::unordered_set _event_cache_expired; //定时器相关 std::multimap _delay_task_map;