diff --git a/src/Network/Server.h b/src/Network/Server.h index 88d9e952c..12e49c697 100644 --- a/src/Network/Server.h +++ b/src/Network/Server.h @@ -49,6 +49,8 @@ class Server; class SessionHelper { public: + bool enable = true; + using Ptr = std::shared_ptr; SessionHelper(const std::weak_ptr &server, Session::Ptr session, std::string cls); diff --git a/src/Network/UdpServer.cpp b/src/Network/UdpServer.cpp index 2a027d1e4..cd0ac49ad 100644 --- a/src/Network/UdpServer.cpp +++ b/src/Network/UdpServer.cpp @@ -19,6 +19,8 @@ namespace toolkit { static const uint8_t s_in6_addr_maped[] = { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00 }; +static constexpr auto kUdpDelayCloseMS = 3 * 1000; + static UdpServer::PeerIdType makeSockId(sockaddr *addr, int) { UdpServer::PeerIdType ret; switch (addr->sa_family) { @@ -142,32 +144,36 @@ void UdpServer::onRead(const Buffer::Ptr &buf, sockaddr *addr, int addr_len) { onRead_l(true, id, buf, addr, addr_len); } -static void emitSessionRecv(const Session::Ptr &session, const Buffer::Ptr &buf) { +static void emitSessionRecv(const SessionHelper::Ptr &helper, const Buffer::Ptr &buf) { + if (!helper->enable) { + // 延时销毁中 + return; + } try { - session->onRecv(buf); + helper->session()->onRecv(buf); } catch (SockException &ex) { - session->shutdown(ex); + helper->session()->shutdown(ex); } catch (exception &ex) { - session->shutdown(SockException(Err_shutdown, ex.what())); + helper->session()->shutdown(SockException(Err_shutdown, ex.what())); } } void UdpServer::onRead_l(bool is_server_fd, const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len) { // udp server fd收到数据时触发此函数;大部分情况下数据应该在peer fd触发,此函数应该不是热点函数 bool is_new = false; - if (auto session = getOrCreateSession(id, buf, addr, addr_len, is_new)) { - if (session->getPoller()->isCurrentThread()) { + if (auto helper = getOrCreateSession(id, buf, addr, addr_len, is_new)) { + if (helper->session()->getPoller()->isCurrentThread()) { //当前线程收到数据,直接处理数据 - emitSessionRecv(session, buf); + emitSessionRecv(helper, buf); } else { //数据漂移到其他线程,需要先切换线程 WarnL << "UDP packet incoming from other thread"; - std::weak_ptr weak_session = session; + std::weak_ptr weak_helper = helper; //由于socket读buffer是该线程上所有socket共享复用的,所以不能跨线程使用,必须先拷贝一下 auto cacheable_buf = std::make_shared(buf->toString()); - session->async([weak_session, cacheable_buf]() { - if (auto strong_session = weak_session.lock()) { - emitSessionRecv(strong_session, cacheable_buf); + helper->session()->async([weak_helper, cacheable_buf]() { + if (auto strong_helper = weak_helper.lock()) { + emitSessionRecv(strong_helper, cacheable_buf); } }); } @@ -207,42 +213,40 @@ void UdpServer::onManagerSession() { }); } -Session::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) { +SessionHelper::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) { { //减小临界区 std::lock_guard lock(*_session_mutex); auto it = _session_map->find(id); if (it != _session_map->end()) { - return it->second->session(); + return it->second; } } is_new = true; return createSession(id, buf, addr, addr_len); } -static Session::Ptr s_null_session; - -Session::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { +SessionHelper::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { // 此处改成自定义获取poller对象,防止负载不均衡 auto socket = createSocket(EventPollerPool::Instance().getPoller(false), buf, addr, addr_len); if (!socket) { //创建socket失败,本次onRead事件收到的数据直接丢弃 - return s_null_session; + return nullptr; } auto addr_str = string((char *) addr, addr_len); std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); - auto session_creator = [this, weak_self, socket, addr_str, id]() -> Session::Ptr { + auto helper_creator = [this, weak_self, socket, addr_str, id]() -> SessionHelper::Ptr { auto server = weak_self.lock(); if (!server) { - return s_null_session; + return nullptr; } //如果已经创建该客户端对应的UdpSession类,那么直接返回 lock_guard lck(*_session_mutex); auto it = _session_map->find(id); if (it != _session_map->end()) { - return it->second->session(); + return it->second; } assert(_socket); @@ -250,13 +254,11 @@ Session::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &b socket->bindPeerAddr((struct sockaddr *) addr_str.data(), addr_str.size()); auto helper = _session_alloc(server, socket); - auto session = helper->session(); // 把本服务器的配置传递给 Session - session->attachServer(*this); + helper->session()->attachServer(*this); - std::weak_ptr weak_session = session; - auto cls = helper->className(); - socket->setOnRead([weak_self, weak_session, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { + std::weak_ptr weak_helper = helper; + socket->setOnRead([weak_self, weak_helper, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { auto strong_self = weak_self.lock(); if (!strong_self) { return; @@ -264,8 +266,8 @@ Session::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &b //快速判断是否为本会话的的数据, 通常应该成立 if (id == makeSockId(addr, addr_len)) { - if (auto strong_session = weak_session.lock()) { - emitSessionRecv(strong_session, buf); + if (auto strong_helper = weak_helper.lock()) { + emitSessionRecv(strong_helper, buf); } return; } @@ -273,7 +275,7 @@ Session::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &b //收到非本peer fd的数据,让server去派发此数据到合适的session对象 strong_self->onRead_l(false, id, buf, addr, addr_len); }); - socket->setOnErr([weak_self, weak_session, id, cls](const SockException &err) { + socket->setOnErr([weak_self, weak_helper, id](const SockException &err) { // 在本函数作用域结束时移除会话对象 // 目的是确保移除会话前执行其 onError 函数 // 同时避免其 onError 函数抛异常时没有移除会话对象 @@ -283,40 +285,47 @@ Session::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &b if (!strong_self) { return; } - //从共享map中移除本session对象 - lock_guard lck(*strong_self->_session_mutex); - strong_self->_session_map->erase(id); + // 延时移除udp session, 防止频繁快速重建对象 + strong_self->_poller->doDelayTask(kUdpDelayCloseMS, [weak_self, id]() { + if (auto strong_self = weak_self.lock()) { + // 从共享map中移除本session对象 + lock_guard lck(*strong_self->_session_mutex); + strong_self->_session_map->erase(id); + } + return 0; + }); }); // 获取会话强应用 - if (auto strong_session = weak_session.lock()) { + if (auto strong_helper = weak_helper.lock()) { // 触发 onError 事件回调 - TraceP(strong_session) << cls << " on err: " << err; - strong_session->onError(err); + TraceP(strong_helper->session()) << strong_helper->className() << " on err: " << err; + strong_helper->enable = false; + strong_helper->session()->onError(err); } }); auto pr = _session_map->emplace(id, std::move(helper)); assert(pr.second); - return pr.first->second->session(); + return pr.first->second; }; if (socket->getPoller()->isCurrentThread()) { //该socket分配在本线程,直接创建session对象,并处理数据 - return session_creator(); + return helper_creator(); } //该socket分配在其他线程,需要先拷贝buffer,然后在其所在线程创建session对象并处理数据 auto cacheable_buf = std::make_shared(buf->toString()); - socket->getPoller()->async([session_creator, cacheable_buf]() { + socket->getPoller()->async([helper_creator, cacheable_buf]() { //在该socket所在线程创建session对象 - auto session = session_creator(); - if (session) { + auto helper = helper_creator(); + if (helper) { //该数据不能丢弃,给session对象消费 - emitSessionRecv(session, cacheable_buf); + emitSessionRecv(helper, cacheable_buf); } }); - return s_null_session; + return nullptr; } void UdpServer::setOnCreateSocket(onCreateSocket cb) { diff --git a/src/Network/UdpServer.h b/src/Network/UdpServer.h index e416ad60e..2fe33bdd8 100644 --- a/src/Network/UdpServer.h +++ b/src/Network/UdpServer.h @@ -89,12 +89,12 @@ class UdpServer : public Server { /** * @brief 根据对端信息获取或创建一个会话 */ - Session::Ptr getOrCreateSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len, bool &is_new); + SessionHelper::Ptr getOrCreateSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len, bool &is_new); /** * @brief 创建一个会话, 同时进行必要的设置 */ - Session::Ptr createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len); + SessionHelper::Ptr createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len); /** * @brief 创建socket