Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

修复Udp数据已关闭流,由于最后几帧数据传入又重新创建session对象 #191

Closed
wants to merge 4 commits into from

Conversation

mc373906408
Copy link
Contributor

目前是通过0.5s自动删除的map,来判断这几帧数据是不是当前关闭的流。

@xia-chu
Copy link
Member

xia-chu commented Nov 17, 2023

这个针对什么协议?国标吗?

@mc373906408
Copy link
Contributor Author

我是SIP协议,应该是UDP传入的都会发生这个问题

@xia-chu
Copy link
Member

xia-chu commented Nov 21, 2023

其实 可不可以换种思路呢 就是udp socket触发onEr后 延时移除对象呢?我觉得这样可能更好。
因为onError是emitErr时触发的 而emitErr时会delEvent,这样udp socket就不会再触发网络接收事件了。
但是由于延时移除对象,所以这段时间内 这个socket收到的数据都会被直接丢弃

@xia-chu
Copy link
Member

xia-chu commented Nov 21, 2023

不过这个可能对某些老内核不适用 因为有些老内核 触发数据接收的udp fd可能并不是这个对应的fd

@xia-chu
Copy link
Member

xia-chu commented Nov 21, 2023

不过这个可能对某些老内核不适用 因为有些老内核 触发数据接收的udp fd可能并不是这个对应的fd

这个补丁能修复这个问题:

Index: src/Network/Server.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/Network/Server.h b/src/Network/Server.h
--- a/src/Network/Server.h	(revision ad44a16c99834540b397774ad6c7f3f8ed619d56)
+++ b/src/Network/Server.h	(date 1700549107505)
@@ -49,6 +49,8 @@
 
 class SessionHelper {
 public:
+    bool enable = true;
+
     using Ptr = std::shared_ptr<SessionHelper>;
 
     SessionHelper(const std::weak_ptr<Server> &server, Session::Ptr session, std::string cls);
Index: src/Network/UdpServer.h
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/Network/UdpServer.h b/src/Network/UdpServer.h
--- a/src/Network/UdpServer.h	(revision ad44a16c99834540b397774ad6c7f3f8ed619d56)
+++ b/src/Network/UdpServer.h	(date 1700548667692)
@@ -89,12 +89,12 @@
     /**
      * @brief 根据对端信息获取或创建一个会话
      */
-    Session::Ptr getOrCreateSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len, bool &is_new);
+    SessionHelper::Ptr getOrCreateSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len, bool &is_new);
 
     /**
      * @brief 创建一个会话, 同时进行必要的设置
      */
-    Session::Ptr createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len);
+    SessionHelper::Ptr createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len);
 
     /**
      * @brief 创建socket
Index: src/Network/UdpServer.cpp
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/src/Network/UdpServer.cpp b/src/Network/UdpServer.cpp
--- a/src/Network/UdpServer.cpp	(revision ad44a16c99834540b397774ad6c7f3f8ed619d56)
+++ b/src/Network/UdpServer.cpp	(date 1700549173025)
@@ -19,6 +19,8 @@
 static const uint8_t s_in6_addr_maped[]
     = { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00 };
 
+static constexpr auto kUdpDelayCloseMS = 3 * 1000;
+
 static UdpServer::PeerIdType makeSockId(sockaddr *addr, int) {
     UdpServer::PeerIdType ret;
     switch (addr->sa_family) {
@@ -142,32 +144,36 @@
     onRead_l(true, id, buf, addr, addr_len);
 }
 
-static void emitSessionRecv(const Session::Ptr &session, const Buffer::Ptr &buf) {
+static void emitSessionRecv(const SessionHelper::Ptr &helper, const Buffer::Ptr &buf) {
+    if (!helper->enable) {
+        // 延时销毁中
+        return;
+    }
     try {
-        session->onRecv(buf);
+        helper->session()->onRecv(buf);
     } catch (SockException &ex) {
-        session->shutdown(ex);
+        helper->session()->shutdown(ex);
     } catch (exception &ex) {
-        session->shutdown(SockException(Err_shutdown, ex.what()));
+        helper->session()->shutdown(SockException(Err_shutdown, ex.what()));
     }
 }
 
 void UdpServer::onRead_l(bool is_server_fd, const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len) {
     // udp server fd收到数据时触发此函数;大部分情况下数据应该在peer fd触发,此函数应该不是热点函数
     bool is_new = false;
-    if (auto session = getOrCreateSession(id, buf, addr, addr_len, is_new)) {
-        if (session->getPoller()->isCurrentThread()) {
+    if (auto helper = getOrCreateSession(id, buf, addr, addr_len, is_new)) {
+        if (helper->session()->getPoller()->isCurrentThread()) {
             //当前线程收到数据,直接处理数据
-            emitSessionRecv(session, buf);
+            emitSessionRecv(helper, buf);
         } else {
             //数据漂移到其他线程,需要先切换线程
             WarnL << "UDP packet incoming from other thread";
-            std::weak_ptr<Session> weak_session = session;
+            std::weak_ptr<SessionHelper> weak_helper = helper;
             //由于socket读buffer是该线程上所有socket共享复用的,所以不能跨线程使用,必须先拷贝一下
             auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
-            session->async([weak_session, cacheable_buf]() {
-                if (auto strong_session = weak_session.lock()) {
-                    emitSessionRecv(strong_session, cacheable_buf);
+            helper->session()->async([weak_helper, cacheable_buf]() {
+                if (auto strong_helper = weak_helper.lock()) {
+                    emitSessionRecv(strong_helper, cacheable_buf);
                 }
             });
         }
@@ -207,42 +213,40 @@
     });
 }
 
-Session::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) {
+SessionHelper::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) {
     {
         //减小临界区
         std::lock_guard<std::recursive_mutex> lock(*_session_mutex);
         auto it = _session_map->find(id);
         if (it != _session_map->end()) {
-            return it->second->session();
+            return it->second;
         }
     }
     is_new = true;
     return createSession(id, buf, addr, addr_len);
 }
 
-static Session::Ptr s_null_session;
-
-Session::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
+SessionHelper::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
     // 此处改成自定义获取poller对象,防止负载不均衡
     auto socket = createSocket(EventPollerPool::Instance().getPoller(false), buf, addr, addr_len);
     if (!socket) {
         //创建socket失败,本次onRead事件收到的数据直接丢弃
-        return s_null_session;
+        return nullptr;
     }
 
     auto addr_str = string((char *) addr, addr_len);
     std::weak_ptr<UdpServer> weak_self = std::static_pointer_cast<UdpServer>(shared_from_this());
-    auto session_creator = [this, weak_self, socket, addr_str, id]() -> Session::Ptr {
+    auto helper_creator = [this, weak_self, socket, addr_str, id]() -> SessionHelper::Ptr {
         auto server = weak_self.lock();
         if (!server) {
-            return s_null_session;
+            return nullptr;
         }
 
         //如果已经创建该客户端对应的UdpSession类,那么直接返回
         lock_guard<std::recursive_mutex> lck(*_session_mutex);
         auto it = _session_map->find(id);
         if (it != _session_map->end()) {
-            return it->second->session();
+            return it->second;
         }
 
         assert(_socket);
@@ -250,13 +254,11 @@
         socket->bindPeerAddr((struct sockaddr *) addr_str.data(), addr_str.size());
 
         auto helper = _session_alloc(server, socket);
-        auto session = helper->session();
         // 把本服务器的配置传递给 Session
-        session->attachServer(*this);
+        helper->session()->attachServer(*this);
 
-        std::weak_ptr<Session> weak_session = session;
-        auto cls = helper->className();
-        socket->setOnRead([weak_self, weak_session, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
+        std::weak_ptr<SessionHelper> weak_helper = helper;
+        socket->setOnRead([weak_self, weak_helper, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
             auto strong_self = weak_self.lock();
             if (!strong_self) {
                 return;
@@ -264,8 +266,8 @@
 
             //快速判断是否为本会话的的数据, 通常应该成立
             if (id == makeSockId(addr, addr_len)) {
-                if (auto strong_session = weak_session.lock()) {
-                    emitSessionRecv(strong_session, buf);
+                if (auto strong_helper = weak_helper.lock()) {
+                    emitSessionRecv(strong_helper, buf);
                 }
                 return;
             }
@@ -273,7 +275,7 @@
             //收到非本peer fd的数据,让server去派发此数据到合适的session对象
             strong_self->onRead_l(false, id, buf, addr, addr_len);
         });
-        socket->setOnErr([weak_self, weak_session, id, cls](const SockException &err) {
+        socket->setOnErr([weak_self, weak_helper, id](const SockException &err) {
             // 在本函数作用域结束时移除会话对象
             // 目的是确保移除会话前执行其 onError 函数
             // 同时避免其 onError 函数抛异常时没有移除会话对象
@@ -283,40 +285,47 @@
                 if (!strong_self) {
                     return;
                 }
-                //从共享map中移除本session对象
-                lock_guard<std::recursive_mutex> lck(*strong_self->_session_mutex);
-                strong_self->_session_map->erase(id);
+                // 延时移除udp session, 防止频繁快速重复对象
+                strong_self->_poller->doDelayTask(kUdpDelayCloseMS, [weak_self, id]() {
+                    if (auto strong_self = weak_self.lock()) {
+                        // 从共享map中移除本session对象
+                        lock_guard<std::recursive_mutex> lck(*strong_self->_session_mutex);
+                        strong_self->_session_map->erase(id);
+                    }
+                    return 0;
+                });
             });
 
             // 获取会话强应用
-            if (auto strong_session = weak_session.lock()) {
+            if (auto strong_helper = weak_helper.lock()) {
                 // 触发 onError 事件回调
-                TraceP(strong_session) << cls << " on err: " << err;
-                strong_session->onError(err);
+                TraceP(strong_helper->session()) << strong_helper->className() << " on err: " << err;
+                strong_helper->enable = false;
+                strong_helper->session()->onError(err);
             }
         });
 
         auto pr = _session_map->emplace(id, std::move(helper));
         assert(pr.second);
-        return pr.first->second->session();
+        return pr.first->second;
     };
 
     if (socket->getPoller()->isCurrentThread()) {
         //该socket分配在本线程,直接创建session对象,并处理数据
-        return session_creator();
+        return helper_creator();
     }
 
     //该socket分配在其他线程,需要先拷贝buffer,然后在其所在线程创建session对象并处理数据
     auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
-    socket->getPoller()->async([session_creator, cacheable_buf]() {
+    socket->getPoller()->async([helper_creator, cacheable_buf]() {
         //在该socket所在线程创建session对象
-        auto session = session_creator();
-        if (session) {
+        auto helper = helper_creator();
+        if (helper) {
             //该数据不能丢弃,给session对象消费
-            emitSessionRecv(session, cacheable_buf);
+            emitSessionRecv(helper, cacheable_buf);
         }
     });
-    return s_null_session;
+    return nullptr;
 }
 
 void UdpServer::setOnCreateSocket(onCreateSocket cb) {

@xia-chu
Copy link
Member

xia-chu commented Nov 21, 2023

你可以测试下这个分支:https://github.com/ZLMediaKit/ZLToolKit/tree/feature/udp_delay_close
看看是否满足你的需求

0.5f,
[weak_self,id]() -> bool {
auto strong_self = weak_self.lock();
strong_self->_session_erase_map->erase(id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

而且这里也没判断strong_self是否有效

@mc373906408
Copy link
Contributor Author

你是对的,这个问题修复了。可以关闭这个pr。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants