From 97871cfa78fcd2fae164243a8c653e323385772d Mon Sep 17 00:00:00 2001 From: xia-chu <771730766@qq.com> Date: Thu, 9 Nov 2023 22:09:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DSocket=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E8=A7=A6=E5=8F=91=E8=B7=91=E9=A3=9E=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit https://github.com/ZLMediaKit/ZLMediaKit/issues/2946 Socket对象在对fd进行epoll/select操作时,此fd可能已经被关闭并被其他Socket对象复用并持有, 这将导致fd事件紊乱跑飞。现通过SocketNum对象捕获实现fd生命周期与事件监听周期的绑定, 防止socket已经被关闭(并被其他Socket对象复用)但还被添加或修改事件。 --- src/Network/Socket.cpp | 167 ++++++++++++++++--------------------- src/Network/Socket.h | 33 ++++---- src/Poller/EventPoller.cpp | 28 ++++--- src/Poller/EventPoller.h | 6 +- 4 files changed, 111 insertions(+), 123 deletions(-) diff --git a/src/Network/Socket.cpp b/src/Network/Socket.cpp index 6ab51432b..817fe28e0 100644 --- a/src/Network/Socket.cpp +++ b/src/Network/Socket.cpp @@ -124,8 +124,6 @@ void Socket::setOnSendResult(onSendResult cb) { _send_result = std::move(cb); } -#define CLOSE_SOCK(fd) if (fd != -1) { close(fd);} - void Socket::connect(const string &url, uint16_t port, const onErrCB &con_cb_in, float timeout_sec, const string &local_ip, uint16_t local_port) { weak_ptr weak_self = shared_from_this(); // 因为涉及到异步回调,所以在poller线程中执行确保线程安全 @@ -149,37 +147,30 @@ void Socket::connect_l(const string &url, uint16_t port, const onErrCB &con_cb_i strong_self->_async_con_cb = nullptr; strong_self->_con_timer = nullptr; if (err) { - LOCK_GUARD(strong_self->_mtx_sock_fd); - strong_self->_sock_fd = nullptr; + strong_self->setSock(nullptr); } con_cb_in(err); }; - auto async_con_cb = std::make_shared>([weak_self, con_cb](int sock) { + auto async_con_cb = std::make_shared>([weak_self, con_cb](const SockNum::Ptr &sock) { auto strong_self = weak_self.lock(); - if (sock == -1 || !strong_self) { - if (!strong_self) { - CLOSE_SOCK(sock); - } else { - con_cb(SockException(Err_dns, get_uv_errmsg(true))); - } + if (!sock || !strong_self) { + con_cb(SockException(Err_dns, get_uv_errmsg(true))); return; } // 监听该socket是否可写,可写表明已经连接服务器成功 - int result = strong_self->_poller->addEvent(sock, EventPoller::Event_Write | EventPoller::Event_Error, [weak_self, sock, con_cb](int event) { + int result = strong_self->_poller->addEvent(sock->rawFd(), EventPoller::Event_Write | EventPoller::Event_Error, [weak_self, sock, con_cb](int event) { if (auto strong_self = weak_self.lock()) { - // socket可写事件,说明已经连接服务器成功 - strong_self->setSock(strong_self->makeSock(sock, SockNum::Sock_TCP)); strong_self->onConnected(sock, con_cb); - } else { - CLOSE_SOCK(sock); } }); if (result == -1) { - CLOSE_SOCK(sock); con_cb(SockException(Err_other, std::string("add event to poller failed when start connect:") + get_uv_errmsg())); + } else { + // 先创建SockFD对象,防止SockNum由于未执行delEvent无法析构 + strong_self->setSock(sock); } }); @@ -190,18 +181,18 @@ void Socket::connect_l(const string &url, uint16_t port, const onErrCB &con_cb_i }, _poller); if (isIP(url.data())) { - (*async_con_cb)(SockUtil::connect(url.data(), port, true, local_ip.data(), local_port)); + auto fd = SockUtil::connect(url.data(), port, true, local_ip.data(), local_port); + (*async_con_cb)(fd == -1 ? nullptr : std::make_shared(fd, SockNum::Sock_TCP)); } else { auto poller = _poller; - weak_ptr> weak_task = async_con_cb; + weak_ptr> weak_task = async_con_cb; WorkThreadPool::Instance().getExecutor()->async([url, port, local_ip, local_port, weak_task, poller]() { // 阻塞式dns解析放在后台线程执行 - int sock = SockUtil::connect(url.data(), port, true, local_ip.data(), local_port); + int fd = SockUtil::connect(url.data(), port, true, local_ip.data(), local_port); + auto sock = fd == -1 ? nullptr : std::make_shared(fd, SockNum::Sock_TCP); poller->async([sock, weak_task]() { if (auto strong_task = weak_task.lock()) { (*strong_task)(sock); - } else { - CLOSE_SOCK(sock); } }); }); @@ -209,17 +200,19 @@ void Socket::connect_l(const string &url, uint16_t port, const onErrCB &con_cb_i } } -void Socket::onConnected(int sock, const onErrCB &cb) { - auto err = getSockErr(sock, false); +void Socket::onConnected(const SockNum::Ptr &sock, const onErrCB &cb) { + auto err = getSockErr(sock->rawFd(), false); if (err) { // 连接失败 cb(err); return; } + // 更新地址信息 + setSock(sock); // 先删除之前的可写事件监听 - _poller->delEvent(sock); - if (!attachEvent(sock, SockNum::Sock_TCP)) { + _poller->delEvent(sock->rawFd(), [sock](bool) {}); + if (!attachEvent(sock)) { // 连接失败 cb(SockException(Err_other, "add event to poller failed when connected")); return; @@ -235,11 +228,11 @@ void Socket::onConnected(int sock, const onErrCB &cb) { cb(err); } -bool Socket::attachEvent(int sock, SockNum::SockType type) { +bool Socket::attachEvent(const SockNum::Ptr &sock) { weak_ptr weak_self = shared_from_this(); - if (type == SockNum::Sock_TCP_Server) { + if (sock->type() == SockNum::Sock_TCP_Server) { // tcp服务器 - auto result = _poller->addEvent(sock, EventPoller::Event_Read | EventPoller::Event_Error, [weak_self, sock](int event) { + auto result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error, [weak_self, sock](int event) { if (auto strong_self = weak_self.lock()) { strong_self->onAccept(sock, event); } @@ -249,27 +242,27 @@ bool Socket::attachEvent(int sock, SockNum::SockType type) { // tcp客户端或udp auto read_buffer = _poller->getSharedBuffer(); - auto result = _poller->addEvent(sock, EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self, sock, type, read_buffer](int event) { + auto result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self, sock, read_buffer](int event) { auto strong_self = weak_self.lock(); if (!strong_self) { return; } if (event & EventPoller::Event_Read) { - strong_self->onRead(sock, type, read_buffer); + strong_self->onRead(sock, read_buffer); } if (event & EventPoller::Event_Write) { - strong_self->onWriteAble(sock, type); + strong_self->onWriteAble(sock); } if (event & EventPoller::Event_Error) { - strong_self->emitErr(getSockErr(sock)); + strong_self->emitErr(getSockErr(sock->rawFd())); } }); return -1 != result; } -ssize_t Socket::onRead(int sock_fd, SockNum::SockType type, const BufferRaw::Ptr &buffer) noexcept { +ssize_t Socket::onRead(const SockNum::Ptr &sock, const BufferRaw::Ptr &buffer) noexcept { ssize_t ret = 0, nread = 0; auto data = buffer->data(); // 最后一个字节设置为'\0' @@ -280,14 +273,14 @@ ssize_t Socket::onRead(int sock_fd, SockNum::SockType type, const BufferRaw::Ptr while (_enable_recv) { do { - nread = recvfrom(sock_fd, data, capacity, 0, (struct sockaddr *)&addr, &len); + nread = recvfrom(sock->rawFd(), data, capacity, 0, (struct sockaddr *)&addr, &len); } while (-1 == nread && UV_EINTR == get_uv_error(true)); if (nread == 0) { - if (type == SockNum::Sock_TCP) { + if (sock->type() == SockNum::Sock_TCP) { emitErr(SockException(Err_eof, "end of file")); } else { - WarnL << "Recv eof on udp socket[" << sock_fd << "]"; + WarnL << "Recv eof on udp socket[" << sock->rawFd() << "]"; } return ret; } @@ -295,10 +288,10 @@ ssize_t Socket::onRead(int sock_fd, SockNum::SockType type, const BufferRaw::Ptr if (nread == -1) { auto err = get_uv_error(true); if (err != UV_EAGAIN) { - if (type == SockNum::Sock_TCP) { + if (sock->type() == SockNum::Sock_TCP) { emitErr(toSockException(err)); } else { - WarnL << "Recv err on udp socket[" << sock_fd << "]: " << uv_strerror(err); + WarnL << "Recv err on udp socket[" << sock->rawFd() << "]: " << uv_strerror(err); } } return ret; @@ -406,7 +399,7 @@ int Socket::flushAll() { } if (_sendable) { // 该socket可写 - return flushData(_sock_fd->rawFd(), _sock_fd->type(), false) ? 0 : -1; + return flushData(_sock_fd->sockNum(), false) ? 0 : -1; } // 该socket不可写,判断发送超时 @@ -492,7 +485,7 @@ bool Socket::listen(uint16_t port, const string &local_ip, int backlog) { if (fd == -1) { return false; } - return fromSock_l(makeSock(fd, SockNum::Sock_TCP_Server)); + return fromSock_l(std::make_shared(fd, SockNum::Sock_TCP_Server)); } bool Socket::bindUdpSock(uint16_t port, const string &local_ip, bool enable_reuse) { @@ -501,7 +494,7 @@ bool Socket::bindUdpSock(uint16_t port, const string &local_ip, bool enable_reus if (fd == -1) { return false; } - return fromSock_l(makeSock(fd, SockNum::Sock_UDP)); + return fromSock_l(std::make_shared(fd, SockNum::Sock_UDP)); } bool Socket::fromSock(int fd, SockNum::SockType type) { @@ -509,25 +502,25 @@ bool Socket::fromSock(int fd, SockNum::SockType type) { SockUtil::setNoSigpipe(fd); SockUtil::setNoBlocked(fd); SockUtil::setCloExec(fd); - return fromSock_l(makeSock(fd, type)); + return fromSock_l(std::make_shared(fd, type)); } -bool Socket::fromSock_l(SockFD::Ptr fd) { - if (!attachEvent(fd->rawFd(), fd->type())) { +bool Socket::fromSock_l(SockNum::Ptr sock) { + if (!attachEvent(sock)) { return false; } - setSock(std::move(fd)); + setSock(std::move(sock)); return true; } -int Socket::onAccept(int sock, int event) noexcept { +int Socket::onAccept(const SockNum::Ptr &sock, int event) noexcept { int fd; struct sockaddr_storage peer_addr; socklen_t addr_len = sizeof(peer_addr); while (true) { if (event & EventPoller::Event_Read) { do { - fd = (int)accept(sock, (struct sockaddr *)&peer_addr, &addr_len); + fd = (int)accept(sock->rawFd(), (struct sockaddr *)&peer_addr, &addr_len); } while (-1 == fd && UV_EINTR == get_uv_error(true)); if (fd == -1) { @@ -540,18 +533,14 @@ int Socket::onAccept(int sock, int event) noexcept { auto ex = toSockException(err); // emitErr(ex); https://github.com/ZLMediaKit/ZLMediaKit/issues/2946 ErrorL << "Accept socket failed: " << ex.what(); - if (err != UV_EMFILE && err != UV_ENFILE) { - // 其他错误 - continue; - } - // 打开的文件描述符太多了 + // 可能打开的文件描述符太多了:UV_EMFILE/UV_ENFILE #if defined(HAS_EPOLL) // 边缘触发,还需要手动再触发accept事件 std::weak_ptr weak_self = shared_from_this(); - _poller->doDelayTask(100, [weak_self, sock, event]() { + _poller->doDelayTask(100, [weak_self, sock]() { if (auto strong_self = weak_self.lock()) { // 100ms后再处理accept事件,说不定已经有空闲的fd - strong_self->onAccept(sock, event); + strong_self->onAccept(sock, EventPoller::Event_Read); } return 0; }); @@ -590,15 +579,16 @@ int Socket::onAccept(int sock, int event) noexcept { peer_sock = Socket::createSocket(_poller, false); } + auto sock = std::make_shared(fd, SockNum::Sock_TCP); // 设置好fd,以备在onAccept事件中可以正常访问该fd - peer_sock->setSock(peer_sock->makeSock(fd, SockNum::Sock_TCP)); + peer_sock->setSock(sock); // 赋值peer ip,防止在执行setSock时,fd已经被reset断开 memcpy(&peer_sock->_peer_addr, &peer_addr, addr_len); - shared_ptr completed(nullptr, [peer_sock, fd](void *) { + shared_ptr completed(nullptr, [peer_sock, sock](void *) { try { // 然后把该fd加入poll监听(确保先触发onAccept事件然后再触发onRead等事件) - if (!peer_sock->attachEvent(fd, SockNum::Sock_TCP)) { + if (!peer_sock->attachEvent(sock)) { // 加入poll监听失败,触发onErr事件,通知该Socket无效 peer_sock->emitErr(SockException(Err_eof, "add event to poller failed when accept a socket")); } @@ -619,7 +609,7 @@ int Socket::onAccept(int sock, int event) noexcept { } if (event & EventPoller::Event_Error) { - auto ex = getSockErr(sock); + auto ex = getSockErr(sock->rawFd()); emitErr(ex); ErrorL << "TCP listener occurred a err: " << ex.what(); return -1; @@ -627,12 +617,15 @@ int Socket::onAccept(int sock, int event) noexcept { } } -void Socket::setSock(SockFD::Ptr fd) { +void Socket::setSock(SockNum::Ptr sock) { LOCK_GUARD(_mtx_sock_fd); - _sock_fd = std::move(fd); - assert(_poller == _sock_fd->getPoller()); - SockUtil::get_sock_local_addr(_sock_fd->rawFd(), _local_addr); - SockUtil::get_sock_peer_addr(_sock_fd->rawFd(), _peer_addr); + if (sock) { + _sock_fd = std::make_shared(std::move(sock), _poller); + SockUtil::get_sock_local_addr(_sock_fd->rawFd(), _local_addr); + SockUtil::get_sock_peer_addr(_sock_fd->rawFd(), _peer_addr); + } else { + _sock_fd = nullptr; + } } string Socket::get_local_ip() { @@ -678,7 +671,7 @@ string Socket::getIdentifier() const { return class_name + to_string(reinterpret_cast(this)); } -bool Socket::flushData(int sock, SockNum::SockType type, bool poller_thread) { +bool Socket::flushData(const SockNum::Ptr &sock, bool poller_thread) { decltype(_send_buf_sending) send_buf_sending_tmp; { // 转移出二级缓存 @@ -707,7 +700,7 @@ bool Socket::flushData(int sock, SockNum::SockType type, bool poller_thread) { _send_result(buffer, send_success); } } : _send_result; - send_buf_sending_tmp.emplace_back(BufferList::create(std::move(_send_buf_waiting), std::move(send_result), type == SockNum::Sock_UDP)); + send_buf_sending_tmp.emplace_back(BufferList::create(std::move(_send_buf_waiting), std::move(send_result), sock->type() == SockNum::Sock_UDP)); break; } } @@ -724,7 +717,7 @@ bool Socket::flushData(int sock, SockNum::SockType type, bool poller_thread) { while (!send_buf_sending_tmp.empty()) { auto &packet = send_buf_sending_tmp.front(); - auto n = packet->send(sock, _sock_flags); + auto n = packet->send(sock->rawFd(), _sock_flags); if (n > 0) { // 全部或部分发送成功 if (packet->empty()) { @@ -752,7 +745,7 @@ bool Socket::flushData(int sock, SockNum::SockType type, bool poller_thread) { } // 其他错误代码,发生异常 - if (type == SockNum::Sock_UDP) { + if (sock->type() == SockNum::Sock_UDP) { // udp发送异常,把数据丢弃 send_buf_sending_tmp.pop_front(); WarnL << "Send udp socket[" << sock << "] failed, data ignored: " << uv_strerror(err); @@ -775,10 +768,10 @@ bool Socket::flushData(int sock, SockNum::SockType type, bool poller_thread) { // 二级缓存已经全部发送完毕,说明该socket还可写,我们尝试继续写 // 如果是poller线程,我们尝试再次写一次(因为可能其他线程调用了send函数又有新数据了) - return poller_thread ? flushData(sock, type, poller_thread) : true; + return poller_thread ? flushData(sock, poller_thread) : true; } -void Socket::onWriteAble(int sock, SockNum::SockType type) { +void Socket::onWriteAble(const SockNum::Ptr &sock) { bool empty_waiting; bool empty_sending; { @@ -796,22 +789,22 @@ void Socket::onWriteAble(int sock, SockNum::SockType type) { stopWriteAbleEvent(sock); } else { // socket可写,我们尝试发送剩余的数据 - flushData(sock, type, true); + flushData(sock, true); } } -void Socket::startWriteAbleEvent(int sock) { +void Socket::startWriteAbleEvent(const SockNum::Ptr &sock) { // 开始监听socket可写事件 _sendable = false; int flag = _enable_recv ? EventPoller::Event_Read : 0; - _poller->modifyEvent(sock, flag | EventPoller::Event_Error | EventPoller::Event_Write); + _poller->modifyEvent(sock->rawFd(), flag | EventPoller::Event_Error | EventPoller::Event_Write, [sock](bool) {}); } -void Socket::stopWriteAbleEvent(int sock) { +void Socket::stopWriteAbleEvent(const SockNum::Ptr &sock) { // 停止监听socket可写事件 _sendable = true; int flag = _enable_recv ? EventPoller::Event_Read : 0; - _poller->modifyEvent(sock, flag | EventPoller::Event_Error); + _poller->modifyEvent(sock->rawFd(), flag | EventPoller::Event_Error, [sock](bool) {}); } void Socket::enableRecv(bool enabled) { @@ -825,10 +818,6 @@ void Socket::enableRecv(bool enabled) { _poller->modifyEvent(rawFD(), read_flag | send_flag | EventPoller::Event_Error); } -SockFD::Ptr Socket::makeSock(int sock, SockNum::SockType type) { - return std::make_shared(sock, type, _poller); -} - int Socket::rawFD() const { LOCK_GUARD(_mtx_sock_fd); if (!_sock_fd) { @@ -862,26 +851,18 @@ const EventPoller::Ptr &Socket::getPoller() const { return _poller; } -SockFD::Ptr Socket::cloneSockFD(const Socket &other) { - SockFD::Ptr sock; +bool Socket::cloneSocket(const Socket &other) { + closeSock(); + SockNum::Ptr sock; { LOCK_GUARD(other._mtx_sock_fd); if (!other._sock_fd) { WarnL << "sockfd of src socket is null"; - return nullptr; + return false; } - sock = std::make_shared(*(other._sock_fd), _poller); - } - return sock; -} - -bool Socket::cloneSocket(const Socket &other) { - closeSock(); - auto sock = cloneSockFD(other); - if (!sock) { - return false; + sock = other._sock_fd->sockNum(); } - return fromSock_l(std::move(sock)); + return fromSock_l(sock); } bool Socket::bindPeerAddr(const struct sockaddr *dst_addr, socklen_t addr_len, bool soft_bind) { diff --git a/src/Network/Socket.h b/src/Network/Socket.h index a20696540..c071730b9 100644 --- a/src/Network/Socket.h +++ b/src/Network/Socket.h @@ -173,8 +173,8 @@ class SockFD : public noncopyable { * @param num 文件描述符,int数字 * @param poller 事件监听器 */ - SockFD(int num, SockNum::SockType type, const EventPoller::Ptr &poller) { - _num = std::make_shared(num, type); + SockFD(SockNum::Ptr num, const EventPoller::Ptr &poller) { + _num = std::move(num); _poller = poller; } @@ -210,6 +210,10 @@ class SockFD : public noncopyable { return _num->rawFd(); } + SockNum::Ptr sockNum() const { + return _num; + } + SockNum::SockType type() { return _num->type(); } @@ -507,21 +511,19 @@ class Socket : public std::enable_shared_from_this, public noncopyable, std::string getIdentifier() const override; private: - SockFD::Ptr cloneSockFD(const Socket &other); - SockFD::Ptr makeSock(int sock, SockNum::SockType type); - void setSock(SockFD::Ptr fd); - int onAccept(int sock, int event) noexcept; - ssize_t onRead(int sock, SockNum::SockType type, const BufferRaw::Ptr &buffer) noexcept; - void onWriteAble(int sock, SockNum::SockType type); - void onConnected(int sock, const onErrCB &cb); + void setSock(SockNum::Ptr sock); + int onAccept(const SockNum::Ptr &sock, int event) noexcept; + ssize_t onRead(const SockNum::Ptr &sock, const BufferRaw::Ptr &buffer) noexcept; + void onWriteAble(const SockNum::Ptr &sock); + void onConnected(const SockNum::Ptr &sock, const onErrCB &cb); void onFlushed(); - void startWriteAbleEvent(int sock); - void stopWriteAbleEvent(int sock); - bool flushData(int sock, SockNum::SockType type, bool poller_thread); - bool attachEvent(int sock, SockNum::SockType type); + void startWriteAbleEvent(const SockNum::Ptr &sock); + void stopWriteAbleEvent(const SockNum::Ptr &sock); + bool flushData(const SockNum::Ptr &sock, bool poller_thread); + bool attachEvent(const SockNum::Ptr &sock); ssize_t send_l(Buffer::Ptr buf, bool is_buf_sock, bool try_flush = true); void connect_l(const std::string &url, uint16_t port, const onErrCB &con_cb_in, float timeout_sec, const std::string &local_ip, uint16_t local_port); - bool fromSock_l(SockFD::Ptr sock); + bool fromSock_l(SockNum::Ptr sock); private: //send socket时的flag @@ -547,7 +549,7 @@ class Socket : public std::enable_shared_from_this, public noncopyable, //tcp连接超时定时器 Timer::Ptr _con_timer; //tcp连接结果回调对象 - std::shared_ptr > _async_con_cb; + std::shared_ptr _async_con_cb; //记录上次发送缓存(包括socket写缓存、应用层缓存)清空的计时器 Ticker _send_flush_ticker; @@ -587,7 +589,6 @@ class Socket : public std::enable_shared_from_this, public noncopyable, //链接缓存地址,防止tcp reset 导致无法获取对端的地址 struct sockaddr_storage _local_addr; struct sockaddr_storage _peer_addr; - }; class SockSender { diff --git a/src/Poller/EventPoller.cpp b/src/Poller/EventPoller.cpp index f0da19613..03df10665 100644 --- a/src/Poller/EventPoller.cpp +++ b/src/Poller/EventPoller.cpp @@ -137,7 +137,7 @@ int EventPoller::addEvent(int fd, int event, PollEventCB cb) { return 0; } -int EventPoller::delEvent(int fd, PollDelCB cb) { +int EventPoller::delEvent(int fd, PollCompleteCB cb) { TimeTicker(); if (!cb) { cb = [](bool success) {}; @@ -157,31 +157,37 @@ int EventPoller::delEvent(int fd, PollDelCB cb) { //跨线程操作 async([this, fd, cb]() { - delEvent(fd, std::move(const_cast(cb))); + delEvent(fd, std::move(const_cast(cb))); }); return 0; } -int EventPoller::modifyEvent(int fd, int event) { +int EventPoller::modifyEvent(int fd, int event, PollCompleteCB cb) { TimeTicker(); + if (!cb) { + cb = [](bool success) {}; + } + if (isCurrentThread()) { #if defined(HAS_EPOLL) - struct epoll_event ev = {0}; - ev.events = toEpoll(event); - ev.data.fd = fd; - return epoll_ctl(_epoll_fd, EPOLL_CTL_MOD, fd, &ev); + struct epoll_event ev = { 0 }; + ev.events = toEpoll(event); + ev.data.fd = fd; + auto ret = epoll_ctl(_epoll_fd, EPOLL_CTL_MOD, fd, &ev); + cb(ret == 0); + return ret; #else - if (isCurrentThread()) { auto it = _event_map.find(fd); if (it != _event_map.end()) { it->second->event = event; } + cb(it != _event_map.end()); return 0; +#endif // HAS_EPOLL } - async([this, fd, event]() { - modifyEvent(fd, event); + async([this, fd, event, cb]() { + modifyEvent(fd, event, std::move(const_cast(cb))); }); return 0; -#endif //HAS_EPOLL } Task::Ptr EventPoller::async(TaskIn task, bool may_sync) { diff --git a/src/Poller/EventPoller.h b/src/Poller/EventPoller.h index acbaedcd3..f4a65b01e 100644 --- a/src/Poller/EventPoller.h +++ b/src/Poller/EventPoller.h @@ -36,7 +36,7 @@ class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_s using Ptr = std::shared_ptr; using PollEventCB = std::function; - using PollDelCB = std::function; + using PollCompleteCB = std::function; using DelayTask = TaskCancelableImp; typedef enum { @@ -70,7 +70,7 @@ class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_s * @param cb 删除成功回调functional * @return -1:失败,0:成功 */ - int delEvent(int fd, PollDelCB cb = nullptr); + int delEvent(int fd, PollCompleteCB cb = nullptr); /** * 修改监听事件类型 @@ -78,7 +78,7 @@ class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_s * @param event 事件类型,例如 Event_Read | Event_Write * @return -1:失败,0:成功 */ - int modifyEvent(int fd, int event); + int modifyEvent(int fd, int event, PollCompleteCB cb = nullptr); /** * 异步执行任务