Skip to content

Commit

Permalink
udp延时3秒关闭,防止频繁快速重建Session对象 (#192 #191)
Browse files Browse the repository at this point in the history
  • Loading branch information
xia-chu authored Nov 23, 2023
1 parent ba24e09 commit ba2763b
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 43 deletions.
2 changes: 2 additions & 0 deletions src/Network/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class Server;

class SessionHelper {
public:
bool enable = true;

using Ptr = std::shared_ptr<SessionHelper>;

SessionHelper(const std::weak_ptr<Server> &server, Session::Ptr session, std::string cls);
Expand Down
91 changes: 50 additions & 41 deletions src/Network/UdpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ namespace toolkit {
static const uint8_t s_in6_addr_maped[]
= { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00 };

static constexpr auto kUdpDelayCloseMS = 3 * 1000;

static UdpServer::PeerIdType makeSockId(sockaddr *addr, int) {
UdpServer::PeerIdType ret;
switch (addr->sa_family) {
Expand Down Expand Up @@ -142,32 +144,36 @@ void UdpServer::onRead(const Buffer::Ptr &buf, sockaddr *addr, int addr_len) {
onRead_l(true, id, buf, addr, addr_len);
}

static void emitSessionRecv(const Session::Ptr &session, const Buffer::Ptr &buf) {
static void emitSessionRecv(const SessionHelper::Ptr &helper, const Buffer::Ptr &buf) {
if (!helper->enable) {
// 延时销毁中
return;
}
try {
session->onRecv(buf);
helper->session()->onRecv(buf);
} catch (SockException &ex) {
session->shutdown(ex);
helper->session()->shutdown(ex);
} catch (exception &ex) {
session->shutdown(SockException(Err_shutdown, ex.what()));
helper->session()->shutdown(SockException(Err_shutdown, ex.what()));
}
}

void UdpServer::onRead_l(bool is_server_fd, const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len) {
// udp server fd收到数据时触发此函数;大部分情况下数据应该在peer fd触发,此函数应该不是热点函数
bool is_new = false;
if (auto session = getOrCreateSession(id, buf, addr, addr_len, is_new)) {
if (session->getPoller()->isCurrentThread()) {
if (auto helper = getOrCreateSession(id, buf, addr, addr_len, is_new)) {
if (helper->session()->getPoller()->isCurrentThread()) {
//当前线程收到数据,直接处理数据
emitSessionRecv(session, buf);
emitSessionRecv(helper, buf);
} else {
//数据漂移到其他线程,需要先切换线程
WarnL << "UDP packet incoming from other thread";
std::weak_ptr<Session> weak_session = session;
std::weak_ptr<SessionHelper> weak_helper = helper;
//由于socket读buffer是该线程上所有socket共享复用的,所以不能跨线程使用,必须先拷贝一下
auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
session->async([weak_session, cacheable_buf]() {
if (auto strong_session = weak_session.lock()) {
emitSessionRecv(strong_session, cacheable_buf);
helper->session()->async([weak_helper, cacheable_buf]() {
if (auto strong_helper = weak_helper.lock()) {
emitSessionRecv(strong_helper, cacheable_buf);
}
});
}
Expand Down Expand Up @@ -207,73 +213,69 @@ void UdpServer::onManagerSession() {
});
}

Session::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) {
SessionHelper::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) {
{
//减小临界区
std::lock_guard<std::recursive_mutex> lock(*_session_mutex);
auto it = _session_map->find(id);
if (it != _session_map->end()) {
return it->second->session();
return it->second;
}
}
is_new = true;
return createSession(id, buf, addr, addr_len);
}

static Session::Ptr s_null_session;

Session::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
SessionHelper::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
// 此处改成自定义获取poller对象,防止负载不均衡
auto socket = createSocket(EventPollerPool::Instance().getPoller(false), buf, addr, addr_len);
if (!socket) {
//创建socket失败,本次onRead事件收到的数据直接丢弃
return s_null_session;
return nullptr;
}

auto addr_str = string((char *) addr, addr_len);
std::weak_ptr<UdpServer> weak_self = std::static_pointer_cast<UdpServer>(shared_from_this());
auto session_creator = [this, weak_self, socket, addr_str, id]() -> Session::Ptr {
auto helper_creator = [this, weak_self, socket, addr_str, id]() -> SessionHelper::Ptr {
auto server = weak_self.lock();
if (!server) {
return s_null_session;
return nullptr;
}

//如果已经创建该客户端对应的UdpSession类,那么直接返回
lock_guard<std::recursive_mutex> lck(*_session_mutex);
auto it = _session_map->find(id);
if (it != _session_map->end()) {
return it->second->session();
return it->second;
}

assert(_socket);
socket->bindUdpSock(_socket->get_local_port(), _socket->get_local_ip());
socket->bindPeerAddr((struct sockaddr *) addr_str.data(), addr_str.size());

auto helper = _session_alloc(server, socket);
auto session = helper->session();
// 把本服务器的配置传递给 Session
session->attachServer(*this);
helper->session()->attachServer(*this);

std::weak_ptr<Session> weak_session = session;
auto cls = helper->className();
socket->setOnRead([weak_self, weak_session, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
std::weak_ptr<SessionHelper> weak_helper = helper;
socket->setOnRead([weak_self, weak_helper, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}

//快速判断是否为本会话的的数据, 通常应该成立
if (id == makeSockId(addr, addr_len)) {
if (auto strong_session = weak_session.lock()) {
emitSessionRecv(strong_session, buf);
if (auto strong_helper = weak_helper.lock()) {
emitSessionRecv(strong_helper, buf);
}
return;
}

//收到非本peer fd的数据,让server去派发此数据到合适的session对象
strong_self->onRead_l(false, id, buf, addr, addr_len);
});
socket->setOnErr([weak_self, weak_session, id, cls](const SockException &err) {
socket->setOnErr([weak_self, weak_helper, id](const SockException &err) {
// 在本函数作用域结束时移除会话对象
// 目的是确保移除会话前执行其 onError 函数
// 同时避免其 onError 函数抛异常时没有移除会话对象
Expand All @@ -283,40 +285,47 @@ Session::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &b
if (!strong_self) {
return;
}
//从共享map中移除本session对象
lock_guard<std::recursive_mutex> lck(*strong_self->_session_mutex);
strong_self->_session_map->erase(id);
// 延时移除udp session, 防止频繁快速重建对象
strong_self->_poller->doDelayTask(kUdpDelayCloseMS, [weak_self, id]() {
if (auto strong_self = weak_self.lock()) {
// 从共享map中移除本session对象
lock_guard<std::recursive_mutex> lck(*strong_self->_session_mutex);
strong_self->_session_map->erase(id);
}
return 0;
});
});

// 获取会话强应用
if (auto strong_session = weak_session.lock()) {
if (auto strong_helper = weak_helper.lock()) {
// 触发 onError 事件回调
TraceP(strong_session) << cls << " on err: " << err;
strong_session->onError(err);
TraceP(strong_helper->session()) << strong_helper->className() << " on err: " << err;
strong_helper->enable = false;
strong_helper->session()->onError(err);
}
});

auto pr = _session_map->emplace(id, std::move(helper));
assert(pr.second);
return pr.first->second->session();
return pr.first->second;
};

if (socket->getPoller()->isCurrentThread()) {
//该socket分配在本线程,直接创建session对象,并处理数据
return session_creator();
return helper_creator();
}

//该socket分配在其他线程,需要先拷贝buffer,然后在其所在线程创建session对象并处理数据
auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
socket->getPoller()->async([session_creator, cacheable_buf]() {
socket->getPoller()->async([helper_creator, cacheable_buf]() {
//在该socket所在线程创建session对象
auto session = session_creator();
if (session) {
auto helper = helper_creator();
if (helper) {
//该数据不能丢弃,给session对象消费
emitSessionRecv(session, cacheable_buf);
emitSessionRecv(helper, cacheable_buf);
}
});
return s_null_session;
return nullptr;
}

void UdpServer::setOnCreateSocket(onCreateSocket cb) {
Expand Down
4 changes: 2 additions & 2 deletions src/Network/UdpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ class UdpServer : public Server {
/**
* @brief 根据对端信息获取或创建一个会话
*/
Session::Ptr getOrCreateSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len, bool &is_new);
SessionHelper::Ptr getOrCreateSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len, bool &is_new);

/**
* @brief 创建一个会话, 同时进行必要的设置
*/
Session::Ptr createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len);
SessionHelper::Ptr createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len);

/**
* @brief 创建socket
Expand Down

0 comments on commit ba2763b

Please sign in to comment.