Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve KQUEUE edge-triggered #239

Closed
wants to merge 9 commits into from
68 changes: 67 additions & 1 deletion src/Network/BufferSock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@
ref.iov_len = remain;
#else
ref.buf = (CHAR *)ref.buf + ref.len - remain;
ref.len = remain;

Check warning on line 244 in src/Network/BufferSock.cpp

View workflow job for this annotation

GitHub Actions / build

'=': conversion from 'size_t' to 'ULONG', possible loss of data
#endif
break;
}
Expand All @@ -258,7 +258,7 @@
_remain_size += it->iov_len;
#else
it->buf = pr.first->data();
it->len = pr.first->size();

Check warning on line 261 in src/Network/BufferSock.cpp

View workflow job for this annotation

GitHub Actions / build

'=': conversion from 'size_t' to 'ULONG', possible loss of data
_remain_size += it->len;
#endif
++it;
Expand Down Expand Up @@ -307,9 +307,9 @@
auto &buffer = front.first;
if (_is_udp) {
auto ptr = getBufferSockPtr(front);
n = ::sendto(fd, buffer->data() + _offset, buffer->size() - _offset, flags, ptr ? ptr->sockaddr() : nullptr, ptr ? ptr->socklen() : 0);

Check warning on line 310 in src/Network/BufferSock.cpp

View workflow job for this annotation

GitHub Actions / build

'argument': conversion from 'size_t' to 'int', possible loss of data
} else {
n = ::send(fd, buffer->data() + _offset, buffer->size() - _offset, flags);

Check warning on line 312 in src/Network/BufferSock.cpp

View workflow job for this annotation

GitHub Actions / build

'argument': conversion from 'size_t' to 'int', possible loss of data
}

if (n >= 0) {
Expand Down Expand Up @@ -544,7 +544,7 @@

class SocketRecvFromBuffer : public SocketRecvBuffer {
public:
SocketRecvFromBuffer(size_t size): _size(size) {}
explicit SocketRecvFromBuffer(size_t size): _size(size) {}

ssize_t recvFromSocket(int fd, ssize_t &count) override {
ssize_t nread;
Expand All @@ -554,7 +554,7 @@
}

do {
nread = recvfrom(fd, _buffer->data(), _buffer->getCapacity() - 1, 0, (struct sockaddr *)&_address, &len);

Check warning on line 557 in src/Network/BufferSock.cpp

View workflow job for this annotation

GitHub Actions / build

'argument': conversion from 'size_t' to 'int', possible loss of data
} while (-1 == nread && UV_EINTR == get_uv_error(true));

if (nread > 0) {
Expand Down Expand Up @@ -582,6 +582,65 @@
struct sockaddr_storage _address;
};

#if (_WIN32)
class SocketWSARecvFromBuffers final : public SocketRecvBuffer {
public:
explicit SocketWSARecvFromBuffers(size_t size) : _size(size) {
_wsabuf = wsa_buf_init(NULL, 0);
}

ssize_t recvFromSocket(int fd, ssize_t &count) override {
DWORD bytes = 0, flags = 0;
ssize_t nread;
socklen_t len = sizeof(_address);
if (!_buffer) {
allocBuffer();
}

do {
nread = WSARecvFrom(fd, (WSABUF *)&_wsabuf, 1, &bytes, &flags, (struct sockaddr *)&_address, &len, NULL, NULL);
nread = bytes;
} while (-1 == nread && UV_EINTR == get_uv_error(true));

if (nread > 0) {
count = 1;
_buffer->data()[nread] = '\0';
std::static_pointer_cast<BufferRaw>(_buffer)->setSize(nread);
}
return nread;
}

Buffer::Ptr &getBuffer(size_t index) override { return _buffer; }

struct sockaddr_storage &getAddress(size_t index) override {
return _address;
}

private:
WSABUF wsa_buf_init(char *base, ssize_t len) {
WSABUF buftmp;
buftmp.buf = base;
buftmp.len = len;

Check warning on line 623 in src/Network/BufferSock.cpp

View workflow job for this annotation

GitHub Actions / build

'=': conversion from 'int64_t' to 'ULONG', possible loss of data
return buftmp;
}

void allocBuffer() {
auto buf = BufferRaw::create();
buf->setCapacity(_size);
_buffer = std::move(buf);
_wsabuf = wsa_buf_init(_buffer->data(), _size);
}

private:
size_t _size;
Buffer::Ptr _buffer;
struct sockaddr_storage _address;
WSABUF _wsabuf;
//bool _isudp;
};

#endif

static constexpr auto kPacketCount = 32;
static constexpr auto kBufferCapacity = 4 * 1024u;

Expand All @@ -591,6 +650,13 @@
return std::make_shared<SocketRecvmmsgBuffer>(kPacketCount, kBufferCapacity);
}
#endif

#if (_WIN32)
if (is_udp) {
return std::make_shared<SocketWSARecvFromBuffers>(kPacketCount * kBufferCapacity);
}
#endif

return std::make_shared<SocketRecvFromBuffer>(kPacketCount * kBufferCapacity);
}

Expand Down
12 changes: 8 additions & 4 deletions src/Network/Socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,13 @@ ssize_t Socket::onRead(const SockNum::Ptr &sock, const SocketRecvBuffer::Ptr &bu
if (nread == 0) {
if (sock->type() == SockNum::Sock_TCP) {
emitErr(SockException(Err_eof, "end of file"));
} else {
WarnL << "Recv eof on udp socket[" << sock->rawFd() << "]";
}
#ifndef _WIN32
else {
// 在windows udp, 如果未发生任何错误,并且接收操作已立即完成, 则 WSARecvFrom 将返回零
WarnL << "Recv eof on udp socket[" << sock->rawFd() << "]";
}
#endif
return ret;
}

Expand Down Expand Up @@ -543,9 +547,9 @@ int Socket::onAccept(const SockNum::Ptr &sock, int event) noexcept {
// emitErr(ex); https://github.com/ZLMediaKit/ZLMediaKit/issues/2946
ErrorL << "Accept socket failed: " << ex.what();
// 可能打开的文件描述符太多了:UV_EMFILE/UV_ENFILE
#if defined(HAS_EPOLL) && !defined(_WIN32)
#if (defined(HAS_EPOLL) && !defined(_WIN32)) || defined(HAS_KQUEUE)
// 边缘触发,还需要手动再触发accept事件,
//wepoll, Edge-triggered (`EPOLLET`) mode isn't supported.
// wepoll, Edge-triggered (`EPOLLET`) mode isn't supported.
std::weak_ptr<Socket> weak_self = shared_from_this();
_poller->doDelayTask(100, [weak_self, sock]() {
if (auto strong_self = weak_self.lock()) {
Expand Down
2 changes: 1 addition & 1 deletion src/Network/TcpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void TcpClient::onSockConnect(const SockException &ex) {
return true;
});

sock_ptr->setOnRead([weak_self, sock_ptr](const Buffer::Ptr &pBuf, struct sockaddr *, int) {
sock_ptr->setOnRead([weak_self, sock_ptr](Buffer::Ptr &pBuf, struct sockaddr *, int) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
Expand Down
2 changes: 1 addition & 1 deletion src/Network/TcpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ Session::Ptr TcpServer::onAcceptConnection(const Socket::Ptr &sock) {

weak_ptr<Session> weak_session = session;
//会话接收数据事件
sock->setOnRead([weak_session](const Buffer::Ptr &buf, struct sockaddr *, int) {
sock->setOnRead([weak_session](Buffer::Ptr &buf, struct sockaddr *, int) {
//获取会话强应用
auto strong_session = weak_session.lock();
if (!strong_session) {
Expand Down
14 changes: 6 additions & 8 deletions src/Poller/EventPoller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ int EventPoller::addEvent(int fd, int event, PollEventCB cb) {
struct kevent kev[2];
int index = 0;
if (event & Event_Read) {
EV_SET(&kev[index++], fd, EVFILT_READ, EV_ADD, 0, 0, nullptr);
EV_SET(&kev[index++], fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, nullptr);
}
if (event & Event_Write) {
EV_SET(&kev[index++], fd, EVFILT_WRITE, EV_ADD, 0, 0, nullptr);
EV_SET(&kev[index++], fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);
}
int ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr);
if (ret != -1) {
Expand Down Expand Up @@ -223,8 +223,8 @@ int EventPoller::modifyEvent(int fd, int event, PollCompleteCB cb) {
#elif defined(HAS_KQUEUE)
struct kevent kev[2];
int index = 0;
EV_SET(&kev[index++], fd, EVFILT_READ, event & Event_Read ? EV_ADD : EV_DELETE, 0, 0, nullptr);
EV_SET(&kev[index++], fd, EVFILT_WRITE, event & Event_Write ? EV_ADD : EV_DELETE, 0, 0, nullptr);
EV_SET(&kev[index++], fd, EVFILT_READ, event & Event_Read ? EV_ADD | EV_CLEAR : EV_DELETE, 0, 0, nullptr);
EV_SET(&kev[index++], fd, EVFILT_WRITE, event & Event_Write ? EV_ADD | EV_CLEAR : EV_DELETE, 0, 0, nullptr);
int ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr);
cb(ret != -1);
return ret;
Expand Down Expand Up @@ -312,10 +312,8 @@ inline void EventPoller::onPipeEvent() {
}

SocketRecvBuffer::Ptr EventPoller::getSharedBuffer(bool is_udp) {
#if !defined(__linux) && !defined(__linux__)
// 非Linux平台下,tcp和udp共享recvfrom方案,使用同一个buffer
is_udp = 0;
#endif
// Linux/windows跨平台里,tcp 用recvfrom方案,使用同一个buffer
// Linux UDP 用 recvmmsg,而 windows 用 WSARecvFrom
auto ret = _shared_buffer[is_udp].lock();
if (!ret) {
ret = SocketRecvBuffer::create(is_udp);
Expand Down
2 changes: 1 addition & 1 deletion tests/test_pingpong.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ int main(int argc,char *argv[]){
socket->setOnErr([](const SockException &err){
WarnL << err.what();
});
socket->setOnRead([interval,socket](const Buffer::Ptr &buffer, struct sockaddr *addr , int addr_len){
socket->setOnRead([interval,socket](Buffer::Ptr &buffer, struct sockaddr *addr , int addr_len){
if(!interval){
socket->send(buffer);
}
Expand Down
8 changes: 6 additions & 2 deletions tests/test_udpSock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ string getIP(struct sockaddr *addr){
return SockUtil::inet_ntoa(addr);
}

uint16_t getPort(struct sockaddr* addr) {
return SockUtil::inet_port(addr);
}

int main() {
//设置程序退出信号处理函数
signal(SIGINT, [](int){exitProgram = true;});
Expand All @@ -42,9 +46,9 @@ int main() {
sockRecv->bindUdpSock(9001);//接收UDP绑定9001端口
sockSend->bindUdpSock(0, "0.0.0.0");//发送UDP随机端口

sockRecv->setOnRead([](const Buffer::Ptr &buf, struct sockaddr *addr , int){
sockRecv->setOnRead([](Buffer::Ptr &buf, struct sockaddr *addr , int){
//接收到数据回调
DebugL << "recv data form " << getIP(addr) << ":" << buf->data();
DebugL << "recv data form(" << getIP(addr) << ":" << getPort(addr)<< ") " << buf->data();
});

struct sockaddr_storage addrDst;
Expand Down
Loading