From 79c10fe4773819d99efe905bb2fecf3446f3b36c Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Fri, 21 Jun 2024 20:59:54 +0800 Subject: [PATCH] =?UTF-8?q?Tcp/Udp=E6=9C=8D=E5=8A=A1=E5=99=A8=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=8D=95=E7=BA=BF=E7=A8=8B=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Network/TcpServer.cpp | 31 ++++++++++-------- src/Network/TcpServer.h | 1 + src/Network/UdpServer.cpp | 69 +++++++++++++++++++++------------------ src/Network/UdpServer.h | 1 + 4 files changed, 57 insertions(+), 45 deletions(-) diff --git a/src/Network/TcpServer.cpp b/src/Network/TcpServer.cpp index 9055117e..3d914743 100644 --- a/src/Network/TcpServer.cpp +++ b/src/Network/TcpServer.cpp @@ -20,6 +20,7 @@ INSTANCE_IMP(SessionMap) StatisticImp(TcpServer) TcpServer::TcpServer(const EventPoller::Ptr &poller) : Server(poller) { + _multi_poller = !poller; setOnCreateSocket(nullptr); } @@ -82,7 +83,7 @@ TcpServer::Ptr TcpServer::onCreatServer(const EventPoller::Ptr &poller) { Socket::Ptr TcpServer::onBeforeAcceptConnection(const EventPoller::Ptr &poller) { assert(_poller->isCurrentThread()); //此处改成自定义获取poller对象,防止负载不均衡 - return createSocket(EventPollerPool::Instance().getPoller(false)); + return createSocket(_multi_poller ? EventPollerPool::Instance().getPoller(false) : _poller); } void TcpServer::cloneFrom(const TcpServer &that) { @@ -191,19 +192,21 @@ void TcpServer::start_l(uint16_t port, const std::string &host, uint32_t backlog return true; }, _poller); - EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) { - EventPoller::Ptr poller = static_pointer_cast(executor); - if (poller == _poller) { - return; - } - auto &serverRef = _cloned_server[poller.get()]; - if (!serverRef) { - serverRef = onCreatServer(poller); - } - if (serverRef) { - serverRef->cloneFrom(*this); - } - }); + if (_multi_poller) { + EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) { + EventPoller::Ptr poller = static_pointer_cast(executor); + if (poller == _poller) { + return; + } + auto &serverRef = _cloned_server[poller.get()]; + if (!serverRef) { + serverRef = onCreatServer(poller); + } + if (serverRef) { + serverRef->cloneFrom(*this); + } + }); + } if (!_socket->listen(port, host.c_str(), backlog)) { // 创建tcp监听失败,可能是由于端口占用或权限问题 diff --git a/src/Network/TcpServer.h b/src/Network/TcpServer.h index d6f70a88..11d0d894 100644 --- a/src/Network/TcpServer.h +++ b/src/Network/TcpServer.h @@ -92,6 +92,7 @@ class TcpServer : public Server { void setupEvent(); private: + bool _multi_poller; bool _is_on_manager = false; bool _main_server = true; std::weak_ptr _parent; diff --git a/src/Network/UdpServer.cpp b/src/Network/UdpServer.cpp index 9b120ce1..f2805c50 100644 --- a/src/Network/UdpServer.cpp +++ b/src/Network/UdpServer.cpp @@ -45,6 +45,7 @@ static UdpServer::PeerIdType makeSockId(sockaddr *addr, int) { } UdpServer::UdpServer(const EventPoller::Ptr &poller) : Server(poller) { + _multi_poller = !poller; setOnCreateSocket(nullptr); } @@ -87,20 +88,22 @@ void UdpServer::start_l(uint16_t port, const std::string &host) { return false; }, _poller); - //clone server至不同线程,让udp server支持多线程 - EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) { - auto poller = std::static_pointer_cast(executor); - if (poller == _poller) { - return; - } - auto &serverRef = _cloned_server[poller.get()]; - if (!serverRef) { - serverRef = onCreatServer(poller); - } - if (serverRef) { - serverRef->cloneFrom(*this); - } - }); + if (_multi_poller) { + // clone server至不同线程,让udp server支持多线程 + EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) { + auto poller = std::static_pointer_cast(executor); + if (poller == _poller) { + return; + } + auto &serverRef = _cloned_server[poller.get()]; + if (!serverRef) { + serverRef = onCreatServer(poller); + } + if (serverRef) { + serverRef->cloneFrom(*this); + } + }); + } if (!_socket->bindUdpSock(port, host.c_str())) { // udp 绑定端口失败, 可能是由于端口占用或权限问题 @@ -193,24 +196,28 @@ void UdpServer::onManagerSession() { //拷贝map,防止遍历时移除对象 copy_map = std::make_shared >(*_session_map); } - EventPollerPool::Instance().for_each([copy_map](const TaskExecutor::Ptr &executor) { - auto poller = std::static_pointer_cast(executor); - poller->async([copy_map]() { - for (auto &pr : *copy_map) { - auto &session = pr.second->session(); - if (!session->getPoller()->isCurrentThread()) { - //该session不归属该poller管理 - continue; - } - try { - // UDP 会话需要处理超时 - session->onManager(); - } catch (exception &ex) { - WarnL << "Exception occurred when emit onManager: " << ex.what(); - } + auto lam = [copy_map]() { + for (auto &pr : *copy_map) { + auto &session = pr.second->session(); + if (!session->getPoller()->isCurrentThread()) { + // 该session不归属该poller管理 + continue; } + try { + // UDP 会话需要处理超时 + session->onManager(); + } catch (exception &ex) { + WarnL << "Exception occurred when emit onManager: " << ex.what(); + } + } + }; + if (_multi_poller){ + EventPollerPool::Instance().for_each([lam](const TaskExecutor::Ptr &executor) { + std::static_pointer_cast(executor)->async(lam); }); - }); + } else { + lam(); + } } SessionHelper::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) { @@ -228,7 +235,7 @@ SessionHelper::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id SessionHelper::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { // 此处改成自定义获取poller对象,防止负载不均衡 - auto socket = createSocket(EventPollerPool::Instance().getPoller(false), buf, addr, addr_len); + auto socket = createSocket(_multi_poller ? EventPollerPool::Instance().getPoller(false) : _poller, buf, addr, addr_len); if (!socket) { //创建socket失败,本次onRead事件收到的数据直接丢弃 return nullptr; diff --git a/src/Network/UdpServer.h b/src/Network/UdpServer.h index 82d7d6c4..43f3286d 100644 --- a/src/Network/UdpServer.h +++ b/src/Network/UdpServer.h @@ -108,6 +108,7 @@ class UdpServer : public Server { private: bool _cloned = false; + bool _multi_poller; Socket::Ptr _socket; std::shared_ptr _timer; onCreateSocket _on_create_socket;