Skip to content

Commit

Permalink
Support kqueue for macOS/BSD (#227)
Browse files Browse the repository at this point in the history
Co-authored-by: xia-chu <[email protected]>
  • Loading branch information
suzp1984 and xia-chu authored Apr 20, 2024
1 parent 04d1c47 commit 43004be
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 45 deletions.
163 changes: 125 additions & 38 deletions src/Poller/EventPoller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@
| (((epoll_event) & EPOLLOUT) ? Event_Write : 0) \
| (((epoll_event) & EPOLLHUP) ? Event_Error : 0) \
| (((epoll_event) & EPOLLERR) ? Event_Error : 0)
#define create_event() epoll_create(EPOLL_SIZE)
#endif //HAS_EPOLL

#if defined(HAS_KQUEUE)
#include <sys/event.h>
#define KEVENT_SIZE 1024
#define create_event() kqueue()
#endif // HAS_KQUEUE

using namespace std;

namespace toolkit {
Expand All @@ -60,14 +67,15 @@ void EventPoller::addEventPipe() {
}

EventPoller::EventPoller(std::string name) {
_name = std::move(name);
#if defined(HAS_EPOLL)
_epoll_fd = epoll_create(EPOLL_SIZE);
if (_epoll_fd == -1) {
throw runtime_error(StrPrinter << "Create epoll fd failed: " << get_uv_errmsg());
#if defined(HAS_EPOLL) || defined(HAS_KQUEUE)
_event_fd = create_event();
if (_event_fd == -1) {
throw runtime_error(StrPrinter << "Create event fd failed: " << get_uv_errmsg());
}
SockUtil::setCloExec(_epoll_fd);
SockUtil::setCloExec(_event_fd);
#endif //HAS_EPOLL

_name = std::move(name);
_logger = Logger::Instance().shared_from_this();
addEventPipe();
}
Expand All @@ -87,12 +95,14 @@ void EventPoller::shutdown() {

EventPoller::~EventPoller() {
shutdown();
#if defined(HAS_EPOLL)
if (_epoll_fd != -1) {
close(_epoll_fd);
_epoll_fd = -1;

#if defined(HAS_EPOLL) || defined(HAS_KQUEUE)
if (_event_fd != -1) {
close(_event_fd);
_event_fd = -1;
}
#endif //defined(HAS_EPOLL)
#endif

//退出前清理管道中的数据
onPipeEvent();
InfoL << getThreadName();
Expand All @@ -110,26 +120,37 @@ int EventPoller::addEvent(int fd, int event, PollEventCB cb) {
struct epoll_event ev = {0};
ev.events = (toEpoll(event)) | EPOLLEXCLUSIVE;
ev.data.fd = fd;
int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev);
if (ret == 0) {
int ret = epoll_ctl(_event_fd, EPOLL_CTL_ADD, fd, &ev);
if (ret != -1) {
_event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));
}
return ret;
#elif defined(HAS_KQUEUE)
struct kevent kev[2];
int index = 0;
if (event & Event_Read) {
EV_SET(&kev[index++], fd, EVFILT_READ, EV_ADD, 0, 0, nullptr);
}
if (event & Event_Write) {
EV_SET(&kev[index++], fd, EVFILT_WRITE, EV_ADD, 0, 0, nullptr);
}
int ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr);
if (ret != -1) {
_event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));
}
return ret;
#else
#ifndef _WIN32
//win32平台,socket套接字不等于文件描述符,所以可能不适用这个限制
if (fd >= FD_SETSIZE || _event_map.size() >= FD_SETSIZE) {
if (fd >= FD_SETSIZE) {
WarnL << "select() can not watch fd bigger than " << FD_SETSIZE;
return -1;
}
#endif
auto record = std::make_shared<Poll_Record>();
record->fd = fd;
record->event = event;
record->call_back = std::move(cb);
_event_map.emplace(fd, record);
return 0;
#endif //HAS_EPOLL
#endif
}

async([this, fd, event, cb]() mutable {
Expand All @@ -146,19 +167,33 @@ int EventPoller::delEvent(int fd, PollCompleteCB cb) {

if (isCurrentThread()) {
#if defined(HAS_EPOLL)
bool success = epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr) == 0 && _event_map.erase(fd) > 0;
if (success) {
_event_cache_expired_map[fd] = true;
int ret = -1;
if (_event_map.erase(fd)) {
_event_cache_expired.emplace(fd);
ret = epoll_ctl(_event_fd, EPOLL_CTL_DEL, fd, nullptr);
}
cb(success);
return success ? 0 : -1;
cb(ret != -1);
return ret;
#elif defined(HAS_KQUEUE)
int ret = -1;
if (_event_map.erase(fd)) {
_event_cache_expired.emplace(fd);
struct kevent kev[2];
int index = 0;
EV_SET(&kev[index++], fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
EV_SET(&kev[index++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr);
}
cb(ret != -1);
return ret;
#else
bool success = _event_map.erase(fd);
if (success) {
_event_cache_expired_map[fd] = true;
int ret = -1;
if (_event_map.erase(fd)) {
_event_cache_expired.emplace(fd);
ret = 0;
}
cb(success);
return 0;
cb(ret != -1);
return ret;
#endif //HAS_EPOLL
}

Expand All @@ -179,16 +214,24 @@ int EventPoller::modifyEvent(int fd, int event, PollCompleteCB cb) {
struct epoll_event ev = { 0 };
ev.events = toEpoll(event);
ev.data.fd = fd;
auto ret = epoll_ctl(_epoll_fd, EPOLL_CTL_MOD, fd, &ev);
cb(ret == 0);
auto ret = epoll_ctl(_event_fd, EPOLL_CTL_MOD, fd, &ev);
cb(ret != -1);
return ret;
#elif defined(HAS_KQUEUE)
struct kevent kev[2];
int index = 0;
EV_SET(&kev[index++], fd, EVFILT_READ, event & Event_Read ? EV_ADD : EV_DELETE, 0, 0, nullptr);
EV_SET(&kev[index++], fd, EVFILT_WRITE, event & Event_Write ? EV_ADD : EV_DELETE, 0, 0, nullptr);
int ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr);
cb(ret != -1);
return ret;
#else
auto it = _event_map.find(fd);
if (it != _event_map.end()) {
it->second->event = event;
}
cb(it != _event_map.end());
return 0;
return it != _event_map.end() ? 0 : -1;
#endif // HAS_EPOLL
}
async([this, fd, event, cb]() mutable {
Expand Down Expand Up @@ -304,26 +347,26 @@ void EventPoller::runLoop(bool blocked, bool ref_self) {
while (!_exit_flag) {
minDelay = getMinDelay();
startSleep();//用于统计当前线程负载情况
int ret = epoll_wait(_epoll_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1);
int ret = epoll_wait(_event_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1);
sleepWakeUp();//用于统计当前线程负载情况
if (ret <= 0) {
//超时或被打断
continue;
}

_event_cache_expired_map.clear();
_event_cache_expired.clear();

for (int i = 0; i < ret; ++i) {
struct epoll_event &ev = events[i];
int fd = ev.data.fd;
if (_event_cache_expired_map.find(fd) != _event_cache_expired_map.end()) {
if (_event_cache_expired.count(fd)) {
//event cache refresh
continue;
}

auto it = _event_map.find(fd);
if (it == _event_map.end()) {
epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
epoll_ctl(_event_fd, EPOLL_CTL_DEL, fd, nullptr);
continue;
}
auto cb = it->second;
Expand All @@ -334,6 +377,50 @@ void EventPoller::runLoop(bool blocked, bool ref_self) {
}
}
}
#elif defined(HAS_KQUEUE)
struct kevent kevents[KEVENT_SIZE];
while (!_exit_flag) {
minDelay = getMinDelay();
struct timespec timeout = { (long)minDelay / 1000, (long)minDelay % 1000 * 1000000 };

startSleep();
int ret = kevent(_event_fd, nullptr, 0, kevents, KEVENT_SIZE, minDelay ? &timeout : nullptr);
sleepWakeUp();
if (ret <= 0) {
continue;
}

_event_cache_expired.clear();

for (int i = 0; i < ret; ++i) {
auto &kev = kevents[i];
auto fd = kev.ident;
if (_event_cache_expired.count(fd)) {
// event cache refresh
continue;
}

auto it = _event_map.find(fd);
if (it == _event_map.end()) {
EV_SET(&kev, fd, kev.filter, EV_DELETE, 0, 0, nullptr);
kevent(_event_fd, &kev, 1, nullptr, 0, nullptr);
continue;
}
auto cb = it->second;
int event = 0;
switch (kev.filter) {
case EVFILT_READ: event = Event_Read; break;
case EVFILT_WRITE: event = Event_Write; break;
default: WarnL << "unknown kevent filter: " << kev.filter; break;
}

try {
(*cb)(event);
} catch (std::exception &ex) {
ErrorL << "Exception occurred when do event task: " << ex.what();
}
}
}
#else
int ret, max_fd;
FdSet set_read, set_write, set_err;
Expand Down Expand Up @@ -373,7 +460,7 @@ void EventPoller::runLoop(bool blocked, bool ref_self) {
continue;
}

_event_cache_expired_map.clear();
_event_cache_expired.clear();

//收集select事件类型
for (auto &pr : _event_map) {
Expand All @@ -393,8 +480,8 @@ void EventPoller::runLoop(bool blocked, bool ref_self) {
}
}

callback_list.for_each([this](Poll_Record::Ptr &record) {
if (this->_event_cache_expired_map.find(record->fd) != this->_event_cache_expired_map.end()) {
callback_list.for_each([&](Poll_Record::Ptr &record) {
if (_event_cache_expired.count(record->fd)) {
//event cache refresh
return;
}
Expand Down
19 changes: 12 additions & 7 deletions src/Poller/EventPoller.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <functional>
#include <memory>
#include <unordered_map>
#include <unordered_set>
#include "PipeWrap.h"
#include "Util/logger.h"
#include "Util/List.h"
Expand All @@ -28,6 +29,10 @@
#define HAS_EPOLL
#endif //__linux__

#if defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__)
#define HAS_KQUEUE
#endif // __APPLE__

namespace toolkit {

class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_shared_from_this<EventPoller> {
Expand Down Expand Up @@ -202,22 +207,22 @@ class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_s
//保持日志可用
Logger::Ptr _logger;

#if defined(HAS_EPOLL)
//epoll相关
int _epoll_fd = -1;
unordered_map<int, std::shared_ptr<PollEventCB> > _event_map;
#if defined(HAS_EPOLL) || defined(HAS_KQUEUE)
// epoll和kqueue相关
int _event_fd = -1;
std::unordered_map<int, std::shared_ptr<PollEventCB> > _event_map;
#else
//select相关
// select相关
struct Poll_Record {
using Ptr = std::shared_ptr<Poll_Record>;
int fd;
int event;
int attach;
PollEventCB call_back;
};
unordered_map<int, Poll_Record::Ptr> _event_map;
std::unordered_map<int, Poll_Record::Ptr> _event_map;
#endif //HAS_EPOLL
unordered_map<int, bool> _event_cache_expired_map;
std::unordered_set<int> _event_cache_expired;

//定时器相关
std::multimap<uint64_t, DelayTask::Ptr> _delay_task_map;
Expand Down

0 comments on commit 43004be

Please sign in to comment.