Skip to content

Commit

Permalink
fix: don't store the event ptrs in the kevent/epoll udata, this can e…
Browse files Browse the repository at this point in the history
…nd up with dangling ptrs in events
  • Loading branch information
braindigitalis committed Dec 11, 2024
1 parent edcefd6 commit 587a51b
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 63 deletions.
15 changes: 8 additions & 7 deletions include/dpp/socketengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,6 @@ struct DPP_EXPORT socket_engine_base {
*/
socket_container fds;

/**
* @brief Thread pool.
* Event calls go into the thread pool and are called as
* and when threads in the pool are available.
*/
std::unique_ptr<thread_pool> pool;

/**
* @brief Number of file descriptors we are waiting to delete
*/
Expand Down Expand Up @@ -244,6 +237,7 @@ struct DPP_EXPORT socket_engine_base {
* remove_socket() on each entry to be removed.
*/
void prune();

protected:

/**
Expand All @@ -252,6 +246,13 @@ struct DPP_EXPORT socket_engine_base {
* @param fd File descriptor to remove
*/
virtual bool remove_socket(dpp::socket fd);

/**
* @brief Find a file descriptors socket events
* @param fd file descriptor
* @return file descriptor or nullptr if doesn't exist
*/
socket_events* get_fd(dpp::socket fd);
};

/**
Expand Down
4 changes: 2 additions & 2 deletions src/dpp/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ dpp::utility::uptime cluster::uptime()
}

void cluster::add_reconnect(uint32_t shard_id) {
reconnections.emplace(shard_id, time(nullptr) + 5);
log(ll_trace, "Reconnecting in 5 seconds...");
reconnections[shard_id] =time(nullptr) + 5;
log(ll_trace, "Reconnecting shard " + std::to_string(shard_id) + " in 5 seconds...");
}

void cluster::start(start_type return_after) {
Expand Down
9 changes: 9 additions & 0 deletions src/dpp/socketengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ socket_engine_base::socket_engine_base(cluster* creator) : owner(creator) {

time_t last_time = time(nullptr);

socket_events* socket_engine_base::get_fd(dpp::socket fd) {
std::unique_lock lock(fds_mutex);
auto iter = fds.find(fd);
if (iter == fds.end() || ((iter->second->flags & WANT_DELETION) != 0L)) {
return nullptr;
}
return iter->second.get();
}

void socket_engine_base::prune() {
if (to_delete_count > 0) {
std::unique_lock lock(fds_mutex);
Expand Down
22 changes: 6 additions & 16 deletions src/dpp/socketengines/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ int modify_event(int epoll_handle, socket_events* eh, int new_events) {
if ((new_events & WANT_ERROR) != 0) {
new_ev.events |= EPOLLERR;
}
new_ev.data.ptr = static_cast<void *>(eh);
new_ev.data.fd = eh->fd;
epoll_ctl(epoll_handle, EPOLL_CTL_MOD, eh->fd, &new_ev);
}
return new_events;
Expand Down Expand Up @@ -80,13 +80,9 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
for (int j = 0; j < i; j++) {
epoll_event ev = events[j];

auto* const eh = static_cast<socket_events*>(ev.data.ptr);
if (!eh) {
continue;
}

const int fd = eh->fd;
if (fd == INVALID_SOCKET || eh->flags & WANT_DELETION) {
const int fd = ev.data.fd;
auto eh = get_fd(fd);
if (!eh || fd == INVALID_SOCKET) {
continue;
}

Expand Down Expand Up @@ -147,10 +143,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
if ((e.flags & WANT_ERROR) != 0) {
ev.events |= EPOLLERR;
}
{
std::shared_lock lock(fds_mutex);
ev.data.ptr = fds.find(e.fd)->second.get();
}
ev.data.fd = e.fd;
return epoll_ctl(epoll_handle, EPOLL_CTL_ADD, e.fd, &ev) >= 0;
}
return r;
Expand All @@ -170,10 +163,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
if ((e.flags & WANT_ERROR) != 0) {
ev.events |= EPOLLERR;
}
{
std::shared_lock lock(fds_mutex);
ev.data.ptr = fds.find(e.fd)->second.get();
}
ev.data.fd = e.fd;
return epoll_ctl(epoll_handle, EPOLL_CTL_MOD, e.fd, &ev) >= 0;
}
return r;
Expand Down
56 changes: 20 additions & 36 deletions src/dpp/socketengines/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {

for (int j = 0; j < i; j++) {
const struct kevent& kev = ke_list[j];
auto* eh = reinterpret_cast<socket_events*>(kev.udata);
if (eh == nullptr || eh->flags & WANT_DELETION) {
auto eh = get_fd(kev.ident);
if (eh == nullptr) {
continue;
}

Expand Down Expand Up @@ -102,46 +102,30 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
prune();
}

bool set_events(const socket_events& e) {
struct kevent ke{};
if ((e.flags & WANT_READ) != 0) {
EV_SET(&ke, e.fd, EVFILT_READ, EV_ADD, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
}
if ((e.flags & WANT_WRITE) != 0) {
EV_SET(&ke, e.fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
}
}

bool register_socket(const socket_events& e) final {
bool r = socket_engine_base::register_socket(e);
if (r) {
struct kevent ke{};
socket_events* se{};
{
std::unique_lock lock(fds_mutex);
se = fds.find(e.fd)->second.get();
}
if ((se->flags & WANT_READ) != 0) {
EV_SET(&ke, e.fd, EVFILT_READ, EV_ADD, 0, 0, static_cast<CAST_TYPE>(se));
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
}
if ((se->flags & WANT_WRITE) != 0) {
EV_SET(&ke, e.fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, static_cast<CAST_TYPE>(se));
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
}
if (socket_engine_base::register_socket(e)) {
return set_events(e);
}
return r;
return false;
}

bool update_socket(const socket_events& e) final {
bool r = socket_engine_base::update_socket(e);
if (r) {
struct kevent ke{};
socket_events* se{};
{
std::unique_lock lock(fds_mutex);
se = fds.find(e.fd)->second.get();
}
if ((e.flags & WANT_READ) != 0) {
EV_SET(&ke, e.fd, EVFILT_READ, EV_ADD, 0, 0, static_cast<CAST_TYPE>(se));
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
}
if ((e.flags & WANT_WRITE) != 0) {
EV_SET(&ke, e.fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, static_cast<CAST_TYPE>(se));
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
}
if (socket_engine_base::update_socket(e)) {
return set_events(e);
}
return r;
return false;
}

protected:
Expand Down
2 changes: 0 additions & 2 deletions src/dpp/sslclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,6 @@ void ssl_client::read_loop()
if (this->sfd == INVALID_SOCKET) {
close_socket(fd);
owner->socketengine->delete_socket(fd);
on_error(fd, e, 0);
return;
}
on_read(fd, e);
Expand All @@ -514,7 +513,6 @@ void ssl_client::read_loop()
if (this->sfd == INVALID_SOCKET) {
close_socket(fd);
owner->socketengine->delete_socket(fd);
on_error(fd, e, 0);
return;
}
on_write(fd, e);
Expand Down

0 comments on commit 587a51b

Please sign in to comment.