Skip to content

Commit

Permalink
新增支持批量回调
Browse files Browse the repository at this point in the history
  • Loading branch information
xia-chu committed Jun 23, 2024
1 parent 7eb6c3f commit b45dac7
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 13 deletions.
37 changes: 24 additions & 13 deletions src/Network/Socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ void Socket::setOnRead(onReadCB cb) {
}
}

void Socket::setOnMultiRead(onMultiReadCB cb) {
LOCK_GUARD(_mtx_event);
_on_multi_read = std::move(cb);
}

void Socket::setOnErr(onErrCB cb) {
LOCK_GUARD(_mtx_event);
if (cb) {
Expand Down Expand Up @@ -282,10 +287,10 @@ class MMsgBuffer {
, _buffers(count)
, _address(count) {
for (auto i = 0u; i < count; ++i) {
auto &buf = _buffers[i];
buf = BufferRaw::create();
auto buf = BufferRaw::create();
buf->setCapacity(size);

_buffers[i] = buf;
auto &mmsg = _mmsgs[i];
auto &addr = _address[i];
mmsg.msg_len = 0;
Expand Down Expand Up @@ -318,14 +323,14 @@ class MMsgBuffer {
auto &mmsg = _mmsgs[i];
nread += mmsg.msg_len;

auto &buf = _buffers[i];
auto buf = static_pointer_cast<BufferRaw>(_buffers[i]);
buf->setSize(mmsg.msg_len);
buf->data()[mmsg.msg_len] = '\0';
}
return nread;
}

const BufferRaw::Ptr &getBuffer(size_t index) const { return _buffers[index]; }
const Buffer::Ptr &getBuffer(size_t index) const { return _buffers[index]; }

const struct sockaddr_storage &getAddress(size_t index) const { return _address[index]; }

Expand All @@ -344,7 +349,7 @@ class MMsgBuffer {
private:
std::vector<struct iovec> _iovec;
std::vector<struct mmsghdr> _mmsgs;
std::vector<BufferRaw::Ptr> _buffers;
std::vector<Buffer::Ptr> _buffers;
std::vector<struct sockaddr_storage> _address;
};

Expand Down Expand Up @@ -386,14 +391,20 @@ ssize_t Socket::onRead(const SockNum::Ptr &sock, const BufferRaw::Ptr &) noexcep
}

LOCK_GUARD(_mtx_event);
for (auto i = 0u; i < count; ++i) {
auto &buf = buffer.getBuffer(i);
auto &addr = buffer.getAddress(i);
try {
// 此处捕获异常,目的是防止数据未读尽,epoll边沿触发失效的问题
_on_read(buf, (struct sockaddr *)&addr, sizeof addr);
} catch (std::exception &ex) {
ErrorL << "Exception occurred when emit on_read: " << ex.what();
if (_on_multi_read) {
auto &buf = buffer.getBuffer(0);
auto &addr = buffer.getAddress(0);
_on_multi_read(&buf, &addr, count);
} else {
for (auto i = 0u; i < count; ++i) {
auto &buf = buffer.getBuffer(i);
auto &addr = buffer.getAddress(i);
try {
// 此处捕获异常,目的是防止数据未读尽,epoll边沿触发失效的问题
_on_read(buf, (struct sockaddr *)&addr, sizeof addr);
} catch (std::exception &ex) {
ErrorL << "Exception occurred when emit on_read: " << ex.what();
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/Network/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
using Ptr = std::shared_ptr<Socket>;
//接收数据回调
using onReadCB = std::function<void(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)>;
using onMultiReadCB = std::function<void(const Buffer::Ptr *buf, const struct sockaddr_storage *addr, size_t count)>;

//发生错误回调
using onErrCB = std::function<void(const SockException &err)>;
//tcp监听接收到连接请求
Expand Down Expand Up @@ -352,6 +354,7 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
* @param cb 回调对象
*/
void setOnRead(onReadCB cb);
void setOnMultiRead(onMultiReadCB cb);

/**
* 设置异常事件(包括eof等)回调
Expand Down Expand Up @@ -566,6 +569,7 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
onErrCB _on_err;
//收到数据事件
onReadCB _on_read;
onMultiReadCB _on_multi_read;
//socket缓存清空事件(可用于发送流速控制)
onFlush _on_flush;
//tcp监听收到accept请求事件
Expand Down

0 comments on commit b45dac7

Please sign in to comment.