Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udp延时3秒关闭,防止频繁快速重建Session对象 #192

Merged
merged 1 commit into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Network/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class Server;

class SessionHelper {
public:
bool enable = true;

using Ptr = std::shared_ptr<SessionHelper>;

SessionHelper(const std::weak_ptr<Server> &server, Session::Ptr session, std::string cls);
Expand Down
91 changes: 50 additions & 41 deletions src/Network/UdpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
static const uint8_t s_in6_addr_maped[]
= { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00 };

static constexpr auto kUdpDelayCloseMS = 3 * 1000;

static UdpServer::PeerIdType makeSockId(sockaddr *addr, int) {
UdpServer::PeerIdType ret;
switch (addr->sa_family) {
Expand Down Expand Up @@ -142,32 +144,36 @@
onRead_l(true, id, buf, addr, addr_len);
}

static void emitSessionRecv(const Session::Ptr &session, const Buffer::Ptr &buf) {
static void emitSessionRecv(const SessionHelper::Ptr &helper, const Buffer::Ptr &buf) {
if (!helper->enable) {
// 延时销毁中
return;
}
try {
session->onRecv(buf);
helper->session()->onRecv(buf);
} catch (SockException &ex) {
session->shutdown(ex);
helper->session()->shutdown(ex);
} catch (exception &ex) {
session->shutdown(SockException(Err_shutdown, ex.what()));
helper->session()->shutdown(SockException(Err_shutdown, ex.what()));
}
}

void UdpServer::onRead_l(bool is_server_fd, const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len) {
// udp server fd收到数据时触发此函数;大部分情况下数据应该在peer fd触发,此函数应该不是热点函数
bool is_new = false;
if (auto session = getOrCreateSession(id, buf, addr, addr_len, is_new)) {
if (session->getPoller()->isCurrentThread()) {
if (auto helper = getOrCreateSession(id, buf, addr, addr_len, is_new)) {
if (helper->session()->getPoller()->isCurrentThread()) {
//当前线程收到数据,直接处理数据
emitSessionRecv(session, buf);
emitSessionRecv(helper, buf);
} else {
//数据漂移到其他线程,需要先切换线程
WarnL << "UDP packet incoming from other thread";
std::weak_ptr<Session> weak_session = session;
std::weak_ptr<SessionHelper> weak_helper = helper;
//由于socket读buffer是该线程上所有socket共享复用的,所以不能跨线程使用,必须先拷贝一下
auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
session->async([weak_session, cacheable_buf]() {
if (auto strong_session = weak_session.lock()) {
emitSessionRecv(strong_session, cacheable_buf);
helper->session()->async([weak_helper, cacheable_buf]() {
if (auto strong_helper = weak_helper.lock()) {
emitSessionRecv(strong_helper, cacheable_buf);
}
});
}
Expand Down Expand Up @@ -207,73 +213,69 @@
});
}

Session::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) {
SessionHelper::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) {
{
//减小临界区
std::lock_guard<std::recursive_mutex> lock(*_session_mutex);
auto it = _session_map->find(id);
if (it != _session_map->end()) {
return it->second->session();
return it->second;
}
}
is_new = true;
return createSession(id, buf, addr, addr_len);
}

static Session::Ptr s_null_session;

Session::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
SessionHelper::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
// 此处改成自定义获取poller对象,防止负载不均衡
auto socket = createSocket(EventPollerPool::Instance().getPoller(false), buf, addr, addr_len);
if (!socket) {
//创建socket失败,本次onRead事件收到的数据直接丢弃
return s_null_session;
return nullptr;
}

auto addr_str = string((char *) addr, addr_len);
std::weak_ptr<UdpServer> weak_self = std::static_pointer_cast<UdpServer>(shared_from_this());
auto session_creator = [this, weak_self, socket, addr_str, id]() -> Session::Ptr {
auto helper_creator = [this, weak_self, socket, addr_str, id]() -> SessionHelper::Ptr {
auto server = weak_self.lock();
if (!server) {
return s_null_session;
return nullptr;
}

//如果已经创建该客户端对应的UdpSession类,那么直接返回
lock_guard<std::recursive_mutex> lck(*_session_mutex);
auto it = _session_map->find(id);
if (it != _session_map->end()) {
return it->second->session();
return it->second;
}

assert(_socket);
socket->bindUdpSock(_socket->get_local_port(), _socket->get_local_ip());
socket->bindPeerAddr((struct sockaddr *) addr_str.data(), addr_str.size());

Check warning on line 254 in src/Network/UdpServer.cpp

View workflow job for this annotation

GitHub Actions / build

'argument': conversion from 'size_t' to 'int', possible loss of data

auto helper = _session_alloc(server, socket);
auto session = helper->session();
// 把本服务器的配置传递给 Session
session->attachServer(*this);
helper->session()->attachServer(*this);

std::weak_ptr<Session> weak_session = session;
auto cls = helper->className();
socket->setOnRead([weak_self, weak_session, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
std::weak_ptr<SessionHelper> weak_helper = helper;
socket->setOnRead([weak_self, weak_helper, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}

//快速判断是否为本会话的的数据, 通常应该成立
if (id == makeSockId(addr, addr_len)) {
if (auto strong_session = weak_session.lock()) {
emitSessionRecv(strong_session, buf);
if (auto strong_helper = weak_helper.lock()) {
emitSessionRecv(strong_helper, buf);
}
return;
}

//收到非本peer fd的数据,让server去派发此数据到合适的session对象
strong_self->onRead_l(false, id, buf, addr, addr_len);
});
socket->setOnErr([weak_self, weak_session, id, cls](const SockException &err) {
socket->setOnErr([weak_self, weak_helper, id](const SockException &err) {
// 在本函数作用域结束时移除会话对象
// 目的是确保移除会话前执行其 onError 函数
// 同时避免其 onError 函数抛异常时没有移除会话对象
Expand All @@ -283,40 +285,47 @@
if (!strong_self) {
return;
}
//从共享map中移除本session对象
lock_guard<std::recursive_mutex> lck(*strong_self->_session_mutex);
strong_self->_session_map->erase(id);
// 延时移除udp session, 防止频繁快速重建对象
strong_self->_poller->doDelayTask(kUdpDelayCloseMS, [weak_self, id]() {
if (auto strong_self = weak_self.lock()) {
// 从共享map中移除本session对象
lock_guard<std::recursive_mutex> lck(*strong_self->_session_mutex);
strong_self->_session_map->erase(id);
}
return 0;
});
});

// 获取会话强应用
if (auto strong_session = weak_session.lock()) {
if (auto strong_helper = weak_helper.lock()) {
// 触发 onError 事件回调
TraceP(strong_session) << cls << " on err: " << err;
strong_session->onError(err);
TraceP(strong_helper->session()) << strong_helper->className() << " on err: " << err;
strong_helper->enable = false;
strong_helper->session()->onError(err);
}
});

auto pr = _session_map->emplace(id, std::move(helper));
assert(pr.second);
return pr.first->second->session();
return pr.first->second;
};

if (socket->getPoller()->isCurrentThread()) {
//该socket分配在本线程,直接创建session对象,并处理数据
return session_creator();
return helper_creator();
}

//该socket分配在其他线程,需要先拷贝buffer,然后在其所在线程创建session对象并处理数据
auto cacheable_buf = std::make_shared<BufferString>(buf->toString());
socket->getPoller()->async([session_creator, cacheable_buf]() {
socket->getPoller()->async([helper_creator, cacheable_buf]() {
//在该socket所在线程创建session对象
auto session = session_creator();
if (session) {
auto helper = helper_creator();
if (helper) {
//该数据不能丢弃,给session对象消费
emitSessionRecv(session, cacheable_buf);
emitSessionRecv(helper, cacheable_buf);
}
});
return s_null_session;
return nullptr;
}

void UdpServer::setOnCreateSocket(onCreateSocket cb) {
Expand Down
4 changes: 2 additions & 2 deletions src/Network/UdpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ class UdpServer : public Server {
/**
* @brief 根据对端信息获取或创建一个会话
*/
Session::Ptr getOrCreateSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len, bool &is_new);
SessionHelper::Ptr getOrCreateSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len, bool &is_new);

/**
* @brief 创建一个会话, 同时进行必要的设置
*/
Session::Ptr createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len);
SessionHelper::Ptr createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len);

/**
* @brief 创建socket
Expand Down
Loading