diff --git a/src/Network/BufferSock.cpp b/src/Network/BufferSock.cpp index 603c26b9..779d248d 100644 --- a/src/Network/BufferSock.cpp +++ b/src/Network/BufferSock.cpp @@ -466,4 +466,132 @@ BufferList::Ptr BufferList::create(List > list, Sen #endif } +#if defined(__linux) || defined(__linux__) +class SocketRecvmmsgBuffer : public SocketRecvBuffer { +public: + SocketRecvmmsgBuffer(size_t count, size_t size) + : _size(size) + , _iovec(count) + , _mmsgs(count) + , _buffers(count) + , _address(count) { + for (auto i = 0u; i < count; ++i) { + auto buf = BufferRaw::create(); + buf->setCapacity(size); + + _buffers[i] = buf; + auto &mmsg = _mmsgs[i]; + auto &addr = _address[i]; + mmsg.msg_len = 0; + mmsg.msg_hdr.msg_name = &addr; + mmsg.msg_hdr.msg_namelen = sizeof(addr); + mmsg.msg_hdr.msg_iov = &_iovec[i]; + mmsg.msg_hdr.msg_iov->iov_base = buf->data(); + mmsg.msg_hdr.msg_iov->iov_len = buf->getCapacity() - 1; + mmsg.msg_hdr.msg_iovlen = 1; + mmsg.msg_hdr.msg_control = nullptr; + mmsg.msg_hdr.msg_controllen = 0; + mmsg.msg_hdr.msg_flags = 0; + } + } + + ssize_t recvFromSocket(int fd, ssize_t &count) override { + for (auto i = 0; i < _last_count; ++i) { + auto &mmsg = _mmsgs[i]; + mmsg.msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); + auto &buf = _buffers[i]; + if (!buf) { + auto raw = BufferRaw::create(); + raw->setCapacity(_size); + buf = raw; + mmsg.msg_hdr.msg_iov->iov_base = buf->data(); + } + } + do { + count = recvmmsg(fd, &_mmsgs[0], _mmsgs.size(), 0, nullptr); + } while (-1 == count && UV_EINTR == get_uv_error(true)); + + _last_count = count; + if (count <= 0) { + return count; + } + + ssize_t nread = 0; + for (auto i = 0; i < count; ++i) { + auto &mmsg = _mmsgs[i]; + nread += mmsg.msg_len; + + auto buf = static_pointer_cast(_buffers[i]); + buf->setSize(mmsg.msg_len); + buf->data()[mmsg.msg_len] = '\0'; + } + return nread; + } + + Buffer::Ptr &getBuffer(size_t index) override { return _buffers[index]; } + + struct sockaddr_storage &getAddress(size_t index) override { return _address[index]; } + +private: + size_t _size; + ssize_t _last_count { 0 }; + std::vector _iovec; + std::vector _mmsgs; + std::vector _buffers; + std::vector _address; +}; +#endif + +class SocketRecvFromBuffer : public SocketRecvBuffer { +public: + SocketRecvFromBuffer(size_t size): _size(size) {} + + ssize_t recvFromSocket(int fd, ssize_t &count) override { + ssize_t nread; + socklen_t len = sizeof(_address); + if (!_buffer) { + allocBuffer(); + } + + do { + nread = recvfrom(fd, _buffer->data(), _buffer->getCapacity() - 1, 0, (struct sockaddr *)&_address, &len); + } while (-1 == nread && UV_EINTR == get_uv_error(true)); + + if (nread > 0) { + count = 1; + _buffer->data()[nread] = '\0'; + std::static_pointer_cast(_buffer)->setSize(nread); + } + return nread; + } + + Buffer::Ptr &getBuffer(size_t index) override { return _buffer; } + + struct sockaddr_storage &getAddress(size_t index) override { return _address; } + +private: + void allocBuffer() { + auto buf = BufferRaw::create(); + buf->setCapacity(_size); + _buffer = std::move(buf); + } + +private: + size_t _size; + Buffer::Ptr _buffer; + struct sockaddr_storage _address; +}; + +static constexpr auto kPacketCount = 32; +static constexpr auto kBufferCapacity = 4 * 1024u; + +SocketRecvBuffer::Ptr SocketRecvBuffer::create(bool is_udp) { +#if defined(__linux) || defined(__linux__) + if (is_udp) { + return std::make_shared(kPacketCount, kBufferCapacity); + } +#endif + return std::make_shared(kPacketCount * kBufferCapacity); +} + } //toolkit diff --git a/src/Network/BufferSock.h b/src/Network/BufferSock.h index 0ee264d1..d6da4446 100644 --- a/src/Network/BufferSock.h +++ b/src/Network/BufferSock.h @@ -69,5 +69,18 @@ class BufferList : public noncopyable { ObjectStatistic _statistic; }; +class SocketRecvBuffer { +public: + using Ptr = std::shared_ptr; + + virtual ~SocketRecvBuffer() = default; + + virtual ssize_t recvFromSocket(int fd, ssize_t &count) = 0; + virtual Buffer::Ptr &getBuffer(size_t index) = 0; + virtual struct sockaddr_storage &getAddress(size_t index) = 0; + + static Ptr create(bool is_udp); +}; + } #endif //ZLTOOLKIT_BUFFERSOCK_H diff --git a/src/Network/Socket.cpp b/src/Network/Socket.cpp index 261af94b..ab8f13c9 100644 --- a/src/Network/Socket.cpp +++ b/src/Network/Socket.cpp @@ -80,11 +80,27 @@ Socket::~Socket() { } void Socket::setOnRead(onReadCB cb) { + onMultiReadCB cb2; + if (cb) { + cb2 = [cb](Buffer::Ptr *buf, struct sockaddr_storage *addr, size_t count) { + for (auto i = 0u; i < count; ++i) { + cb(buf[i], (struct sockaddr *)(addr + i), sizeof(struct sockaddr_storage)); + } + }; + } + setOnMultiRead(std::move(cb2)); +} + +void Socket::setOnMultiRead(onMultiReadCB cb) { LOCK_GUARD(_mtx_event); if (cb) { - _on_read = std::move(cb); + _on_multi_read = std::move(cb); } else { - _on_read = [](const Buffer::Ptr &buf, struct sockaddr *, int) { WarnL << "Socket not set read callback, data ignored: " << buf->size(); }; + _on_multi_read = [](Buffer::Ptr *buf, struct sockaddr_storage *addr, size_t count) { + for (auto i = 0u; i < count; ++i) { + WarnL << "Socket not set read callback, data ignored: " << buf[i]->size(); + } + }; } } @@ -246,7 +262,7 @@ bool Socket::attachEvent(const SockNum::Ptr &sock) { } // tcp客户端或udp - auto read_buffer = _poller->getSharedBuffer(); + auto read_buffer = _poller->getSharedBuffer(sock->type() == SockNum::Sock_UDP); auto result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self, sock, read_buffer](int event) { auto strong_self = weak_self.lock(); if (!strong_self) { @@ -267,20 +283,11 @@ bool Socket::attachEvent(const SockNum::Ptr &sock) { return -1 != result; } -ssize_t Socket::onRead(const SockNum::Ptr &sock, const BufferRaw::Ptr &buffer) noexcept { - ssize_t ret = 0, nread = 0; - auto data = buffer->data(); - // 最后一个字节设置为'\0' - auto capacity = buffer->getCapacity() - 1; - - struct sockaddr_storage addr; - socklen_t len = sizeof(addr); +ssize_t Socket::onRead(const SockNum::Ptr &sock, const SocketRecvBuffer::Ptr &buffer) noexcept { + ssize_t ret = 0, nread = 0, count = 0; while (_enable_recv) { - do { - nread = recvfrom(sock->rawFd(), data, capacity, 0, (struct sockaddr *)&addr, &len); - } while (-1 == nread && UV_EINTR == get_uv_error(true)); - + nread = buffer->recvFromSocket(sock->rawFd(), count); if (nread == 0) { if (sock->type() == SockNum::Sock_TCP) { emitErr(SockException(Err_eof, "end of file")); @@ -302,21 +309,18 @@ ssize_t Socket::onRead(const SockNum::Ptr &sock, const BufferRaw::Ptr &buffer) n return ret; } + ret += nread; if (_enable_speed) { // 更新接收速率 _recv_speed += nread; } - ret += nread; - data[nread] = '\0'; - // 设置buffer有效数据大小 - buffer->setSize(nread); - - // 触发回调 - LOCK_GUARD(_mtx_event); + auto &buf = buffer->getBuffer(0); + auto &addr = buffer->getAddress(0); try { // 此处捕获异常,目的是防止数据未读尽,epoll边沿触发失效的问题 - _on_read(buffer, (struct sockaddr *)&addr, len); + LOCK_GUARD(_mtx_event); + _on_multi_read(&buf, &addr, count); } catch (std::exception &ex) { ErrorL << "Exception occurred when emit on_read: " << ex.what(); } diff --git a/src/Network/Socket.h b/src/Network/Socket.h index d43549aa..2155eeb2 100644 --- a/src/Network/Socket.h +++ b/src/Network/Socket.h @@ -282,7 +282,9 @@ class Socket : public std::enable_shared_from_this, public noncopyable, public: using Ptr = std::shared_ptr; //接收数据回调 - using onReadCB = std::function; + using onReadCB = std::function; + using onMultiReadCB = std::function; + //发生错误回调 using onErrCB = std::function; //tcp监听接收到连接请求 @@ -352,6 +354,7 @@ class Socket : public std::enable_shared_from_this, public noncopyable, * @param cb 回调对象 */ void setOnRead(onReadCB cb); + void setOnMultiRead(onMultiReadCB cb); /** * 设置异常事件(包括eof等)回调 @@ -515,7 +518,7 @@ class Socket : public std::enable_shared_from_this, public noncopyable, void setSock(SockNum::Ptr sock); int onAccept(const SockNum::Ptr &sock, int event) noexcept; - ssize_t onRead(const SockNum::Ptr &sock, const BufferRaw::Ptr &buffer) noexcept; + ssize_t onRead(const SockNum::Ptr &sock, const SocketRecvBuffer::Ptr &buffer) noexcept; void onWriteAble(const SockNum::Ptr &sock); void onConnected(const SockNum::Ptr &sock, const onErrCB &cb); void onFlushed(); @@ -528,67 +531,67 @@ class Socket : public std::enable_shared_from_this, public noncopyable, bool fromSock_l(SockNum::Ptr sock); private: - //send socket时的flag + // send socket时的flag int _sock_flags = SOCKET_DEFAULE_FLAGS; - //最大发送缓存,单位毫秒,距上次发送缓存清空时间不能超过该参数 + // 最大发送缓存,单位毫秒,距上次发送缓存清空时间不能超过该参数 uint32_t _max_send_buffer_ms = SEND_TIME_OUT_SEC * 1000; - //控制是否接收监听socket可读事件,关闭后可用于流量控制 - std::atomic _enable_recv {true}; - //标记该socket是否可写,socket写缓存满了就不可写 - std::atomic _sendable {true}; - //是否已经触发err回调了 + // 控制是否接收监听socket可读事件,关闭后可用于流量控制 + std::atomic _enable_recv { true }; + // 标记该socket是否可写,socket写缓存满了就不可写 + std::atomic _sendable { true }; + // 是否已经触发err回调了 bool _err_emit = false; - //是否启用网速统计 + // 是否启用网速统计 bool _enable_speed = false; // udp发送目标地址 std::shared_ptr _udp_send_dst; - //接收速率统计 + // 接收速率统计 BytesSpeed _recv_speed; - //发送速率统计 + // 发送速率统计 BytesSpeed _send_speed; - //tcp连接超时定时器 + // tcp连接超时定时器 Timer::Ptr _con_timer; - //tcp连接结果回调对象 + // tcp连接结果回调对象 std::shared_ptr _async_con_cb; - //记录上次发送缓存(包括socket写缓存、应用层缓存)清空的计时器 + // 记录上次发送缓存(包括socket写缓存、应用层缓存)清空的计时器 Ticker _send_flush_ticker; - //socket fd的抽象类 + // socket fd的抽象类 SockFD::Ptr _sock_fd; - //本socket绑定的poller线程,事件触发于此线程 + // 本socket绑定的poller线程,事件触发于此线程 EventPoller::Ptr _poller; - //跨线程访问_sock_fd时需要上锁 + // 跨线程访问_sock_fd时需要上锁 mutable MutexWrapper _mtx_sock_fd; - //socket异常事件(比如说断开) + // socket异常事件(比如说断开) onErrCB _on_err; - //收到数据事件 - onReadCB _on_read; - //socket缓存清空事件(可用于发送流速控制) + // 收到数据事件 + onMultiReadCB _on_multi_read; + // socket缓存清空事件(可用于发送流速控制) onFlush _on_flush; - //tcp监听收到accept请求事件 + // tcp监听收到accept请求事件 onAcceptCB _on_accept; - //tcp监听收到accept请求,自定义创建peer Socket事件(可以控制子Socket绑定到其他poller线程) + // tcp监听收到accept请求,自定义创建peer Socket事件(可以控制子Socket绑定到其他poller线程) onCreateSocket _on_before_accept; - //设置上述回调函数的锁 + // 设置上述回调函数的锁 MutexWrapper _mtx_event; - //一级发送缓存, socket可写时,会把一级缓存批量送入到二级缓存 - List > _send_buf_waiting; - //一级发送缓存锁 + // 一级发送缓存, socket可写时,会把一级缓存批量送入到二级缓存 + List> _send_buf_waiting; + // 一级发送缓存锁 MutexWrapper _mtx_send_buf_waiting; - //二级发送缓存, socket可写时,会把二级缓存批量写入到socket + // 二级发送缓存, socket可写时,会把二级缓存批量写入到socket List _send_buf_sending; - //二级发送缓存锁 + // 二级发送缓存锁 MutexWrapper _mtx_send_buf_sending; - //发送buffer结果回调 + // 发送buffer结果回调 BufferList::SendResult _send_result; - //对象个数统计 + // 对象个数统计 ObjectStatistic _statistic; - //链接缓存地址,防止tcp reset 导致无法获取对端的地址 + // 链接缓存地址,防止tcp reset 导致无法获取对端的地址 struct sockaddr_storage _local_addr; struct sockaddr_storage _peer_addr; }; diff --git a/src/Network/UdpServer.cpp b/src/Network/UdpServer.cpp index f2805c50..a8fbd468 100644 --- a/src/Network/UdpServer.cpp +++ b/src/Network/UdpServer.cpp @@ -52,7 +52,7 @@ UdpServer::UdpServer(const EventPoller::Ptr &poller) : Server(poller) { void UdpServer::setupEvent() { _socket = createSocket(_poller); std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); - _socket->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { + _socket->setOnRead([weak_self](Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { if (auto strong_self = weak_self.lock()) { strong_self->onRead(buf, addr, addr_len); } @@ -142,7 +142,7 @@ void UdpServer::cloneFrom(const UdpServer &that) { this->mINI::operator=(that); } -void UdpServer::onRead(const Buffer::Ptr &buf, sockaddr *addr, int addr_len) { +void UdpServer::onRead(Buffer::Ptr &buf, sockaddr *addr, int addr_len) { const auto id = makeSockId(addr, addr_len); onRead_l(true, id, buf, addr, addr_len); } @@ -161,7 +161,7 @@ static void emitSessionRecv(const SessionHelper::Ptr &helper, const Buffer::Ptr } } -void UdpServer::onRead_l(bool is_server_fd, const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len) { +void UdpServer::onRead_l(bool is_server_fd, const UdpServer::PeerIdType &id, Buffer::Ptr &buf, sockaddr *addr, int addr_len) { // udp server fd收到数据时触发此函数;大部分情况下数据应该在peer fd触发,此函数应该不是热点函数 bool is_new = false; if (auto helper = getOrCreateSession(id, buf, addr, addr_len, is_new)) { @@ -172,8 +172,8 @@ void UdpServer::onRead_l(bool is_server_fd, const UdpServer::PeerIdType &id, con //数据漂移到其他线程,需要先切换线程 WarnL << "UDP packet incoming from other thread"; std::weak_ptr weak_helper = helper; - //由于socket读buffer是该线程上所有socket共享复用的,所以不能跨线程使用,必须先拷贝一下 - auto cacheable_buf = std::make_shared(buf->toString()); + //由于socket读buffer是该线程上所有socket共享复用的,所以不能跨线程使用,必须先转移走 + auto cacheable_buf = std::move(buf); helper->session()->async([weak_helper, cacheable_buf]() { if (auto strong_helper = weak_helper.lock()) { emitSessionRecv(strong_helper, cacheable_buf); @@ -220,7 +220,7 @@ void UdpServer::onManagerSession() { } } -SessionHelper::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, const Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) { +SessionHelper::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id, Buffer::Ptr &buf, sockaddr *addr, int addr_len, bool &is_new) { { //减小临界区 std::lock_guard lock(*_session_mutex); @@ -233,7 +233,7 @@ SessionHelper::Ptr UdpServer::getOrCreateSession(const UdpServer::PeerIdType &id return createSession(id, buf, addr, addr_len); } -SessionHelper::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { +SessionHelper::Ptr UdpServer::createSession(const PeerIdType &id, Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { // 此处改成自定义获取poller对象,防止负载不均衡 auto socket = createSocket(_multi_poller ? EventPollerPool::Instance().getPoller(false) : _poller, buf, addr, addr_len); if (!socket) { @@ -265,7 +265,7 @@ SessionHelper::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer:: helper->session()->attachServer(*this); std::weak_ptr weak_helper = helper; - socket->setOnRead([weak_self, weak_helper, id](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { + socket->setOnRead([weak_self, weak_helper, id](Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { auto strong_self = weak_self.lock(); if (!strong_self) { return; @@ -322,8 +322,8 @@ SessionHelper::Ptr UdpServer::createSession(const PeerIdType &id, const Buffer:: return helper_creator(); } - // 该socket分配在其他线程,需要先拷贝buffer,然后在其所在线程创建helper对象并处理数据 - auto cacheable_buf = std::make_shared(buf->toString()); + // 该socket分配在其他线程,需要先转移走buffer,然后在其所在线程创建helper对象并处理数据 + auto cacheable_buf = std::move(buf); socket->getPoller()->async([helper_creator, cacheable_buf]() { // 在该socket所在线程创建helper对象 auto helper = helper_creator(); diff --git a/src/Network/UdpServer.h b/src/Network/UdpServer.h index 43f3286d..41d761ae 100644 --- a/src/Network/UdpServer.h +++ b/src/Network/UdpServer.h @@ -77,7 +77,7 @@ class UdpServer : public Server { */ void onManagerSession(); - void onRead(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len); + void onRead(Buffer::Ptr &buf, struct sockaddr *addr, int addr_len); /** * @brief 接收到数据,可能来自server fd,也可能来自peer fd @@ -87,17 +87,17 @@ class UdpServer : public Server { * @param addr 客户端地址 * @param addr_len 客户端地址长度 */ - void onRead_l(bool is_server_fd, const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len); + void onRead_l(bool is_server_fd, const PeerIdType &id, Buffer::Ptr &buf, struct sockaddr *addr, int addr_len); /** * @brief 根据对端信息获取或创建一个会话 */ - SessionHelper::Ptr getOrCreateSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len, bool &is_new); + SessionHelper::Ptr getOrCreateSession(const PeerIdType &id, Buffer::Ptr &buf, struct sockaddr *addr, int addr_len, bool &is_new); /** * @brief 创建一个会话, 同时进行必要的设置 */ - SessionHelper::Ptr createSession(const PeerIdType &id, const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len); + SessionHelper::Ptr createSession(const PeerIdType &id, Buffer::Ptr &buf, struct sockaddr *addr, int addr_len); /** * @brief 创建socket diff --git a/src/Poller/EventPoller.cpp b/src/Poller/EventPoller.cpp index a6fed917..ca924a86 100644 --- a/src/Poller/EventPoller.cpp +++ b/src/Poller/EventPoller.cpp @@ -311,13 +311,15 @@ inline void EventPoller::onPipeEvent() { }); } -BufferRaw::Ptr EventPoller::getSharedBuffer() { - auto ret = _shared_buffer.lock(); +SocketRecvBuffer::Ptr EventPoller::getSharedBuffer(bool is_udp) { +#if !defined(__linux) && !defined(__linux__) + // 非Linux平台下,tcp和udp共享recvfrom方案,使用同一个buffer + is_udp = 0; +#endif + auto ret = _shared_buffer[is_udp].lock(); if (!ret) { - //预留一个字节存放\0结尾符 - ret = BufferRaw::create(); - ret->setCapacity(1 + SOCKET_DEFAULT_BUF_SIZE); - _shared_buffer = ret; + ret = SocketRecvBuffer::create(is_udp); + _shared_buffer[is_udp] = ret; } return ret; } diff --git a/src/Poller/EventPoller.h b/src/Poller/EventPoller.h index f816d832..3676bed7 100644 --- a/src/Poller/EventPoller.h +++ b/src/Poller/EventPoller.h @@ -24,6 +24,7 @@ #include "Thread/TaskExecutor.h" #include "Thread/ThreadPool.h" #include "Network/Buffer.h" +#include "Network/BufferSock.h" #if defined(__linux__) || defined(__linux) #define HAS_EPOLL @@ -123,7 +124,7 @@ class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_s /** * 获取当前线程下所有socket共享的读缓存 */ - BufferRaw::Ptr getSharedBuffer(); + SocketRecvBuffer::Ptr getSharedBuffer(bool is_udp); /** * 获取poller线程id @@ -192,7 +193,7 @@ class EventPoller : public TaskExecutor, public AnyStorage, public std::enable_s //线程名 std::string _name; //当前线程下,所有socket共享的读缓存 - std::weak_ptr _shared_buffer; + std::weak_ptr _shared_buffer[2]; //执行事件循环的线程 std::thread *_loop_thread = nullptr; //通知事件循环的线程已启动