Skip to content

Commit

Permalink
fix: immediately delete fds that want deletion after we are done firi…
Browse files Browse the repository at this point in the history
…ng their events rather than prune() loop

fix: replace fds that already exist in the set. emplace would fail.
  • Loading branch information
braindigitalis committed Dec 12, 2024
1 parent 587a51b commit 5a06ebc
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 54 deletions.
3 changes: 2 additions & 1 deletion src/dpp/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ dpp::utility::uptime cluster::uptime()
}

void cluster::add_reconnect(uint32_t shard_id) {
reconnections[shard_id] =time(nullptr) + 5;
reconnections[shard_id] = time(nullptr) + 5;
log(ll_trace, "Reconnecting shard " + std::to_string(shard_id) + " in 5 seconds...");
}

Expand All @@ -208,6 +208,7 @@ void cluster::start(start_type return_after) {
auto event_loop = [this]() -> void {
auto reconnect_monitor = start_timer([this](auto t) {
time_t now = time(nullptr);
log(ll_trace, "Ticking reconnect monitor with " + std::to_string(reconnections.size()) + " queued reconnections");
for (auto reconnect = reconnections.begin(); reconnect != reconnections.end(); ++reconnect) {
auto shard_id = reconnect->first;
auto shard_reconnect_time = reconnect->second;
Expand Down
1 change: 1 addition & 0 deletions src/dpp/discordclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ discord_client::~discord_client()

void discord_client::on_disconnect()
{
log(ll_trace, "discord_client::on_disconnect()");
set_resume_hostname();
if (sfd != INVALID_SOCKET) {
log(dpp::ll_debug, "Lost connection to websocket on shard " + std::to_string(shard_id) + ", reconnecting...");
Expand Down
29 changes: 9 additions & 20 deletions src/dpp/socketengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ bool socket_engine_base::register_socket(const socket_events &e) {
if (e.fd != INVALID_SOCKET && i == fds.end()) {
fds.emplace(e.fd, std::make_unique<socket_events>(e));
return true;
} else if (e.fd != INVALID_SOCKET && i != fds.end()) {
this->remove_socket(e.fd);
}
if (e.fd != INVALID_SOCKET && i != fds.end()) {
remove_socket(e.fd);
fds.erase(i);
fds.emplace(e.fd, std::make_unique<socket_events>(e));
return true;
}
Expand Down Expand Up @@ -75,37 +77,25 @@ time_t last_time = time(nullptr);
socket_events* socket_engine_base::get_fd(dpp::socket fd) {
std::unique_lock lock(fds_mutex);
auto iter = fds.find(fd);
if (iter == fds.end() || ((iter->second->flags & WANT_DELETION) != 0L)) {
if (iter == fds.end()) {
return nullptr;
}
return iter->second.get();
}

void socket_engine_base::prune() {
if (to_delete_count > 0) {
std::unique_lock lock(fds_mutex);
for (auto it = fds.cbegin(); it != fds.cend();) {
if ((it->second->flags & WANT_DELETION) != 0L) {
remove_socket(it->second->fd);
it = fds.erase(it);
} else {
++it;
}
}
to_delete_count = 0;
}
if (time(nullptr) != last_time) {
try {
/* Every minute, rehash all cache containers.
* We do this from the socket engine now, not from
* shard 0, so no need to run shards to have timers!
*/
owner->tick_timers();
} catch (const std::exception& e) {
owner->log(dpp::ll_error, "Uncaught exception in tick_timers: " + std::string(e.what()));
}

if ((time(nullptr) % 60) == 0) {
/* Every minute, rehash all cache containers.
* We do this from the socket engine now, not from
* shard 0, so no need to run shards to have timers!
*/
dpp::garbage_collection();
}

Expand All @@ -120,7 +110,6 @@ bool socket_engine_base::delete_socket(dpp::socket fd) {
return false;
}
iter->second->flags |= WANT_DELETION;
to_delete_count++;
return true;
}

Expand Down
18 changes: 10 additions & 8 deletions src/dpp/socketengines/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {

const int fd = ev.data.fd;
auto eh = get_fd(fd);
if (!eh || fd == INVALID_SOCKET) {
if (eh == nullptr || fd == INVALID_SOCKET) {
continue;
}

try {
if ((eh->flags & WANT_DELETION) == 0L) try {

if ((ev.events & EPOLLHUP) != 0U) {
if (eh->on_error) {
Expand Down Expand Up @@ -125,6 +125,11 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
owner->log(ll_trace, "Socket loop exception: " + std::string(e.what()));
eh->on_error(fd, *eh, 0);
}

if ((eh->flags & WANT_DELETION) != 0L) {
remove_socket(fd);
fds.erase(fd);
}
}
prune();
}
Expand Down Expand Up @@ -172,12 +177,9 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
protected:

bool remove_socket(dpp::socket fd) final {
bool r = socket_engine_base::remove_socket(fd);
if (r) {
struct epoll_event ev{};
epoll_ctl(epoll_handle, EPOLL_CTL_DEL, fd, &ev);
}
return r;
struct epoll_event ev{};
epoll_ctl(epoll_handle, EPOLL_CTL_DEL, fd, &ev);
return true;
}
};

Expand Down
23 changes: 13 additions & 10 deletions src/dpp/socketengines/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
continue;
}

try {
if ((eh->flags & WANT_DELETION) == 0L) try {

const short filter = kev.filter;
if (kev.flags & EV_EOF || kev.flags & EV_ERROR) {
Expand All @@ -98,6 +98,12 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
owner->log(ll_trace, "Socket loop exception: " + std::string(e.what()));
eh->on_error(kev.ident, *eh, 0);
}


if ((eh->flags & WANT_DELETION) != 0L) {
remove_socket(kev.ident);
fds.erase(kev.ident);
}
}
prune();
}
Expand Down Expand Up @@ -131,15 +137,12 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
protected:

bool remove_socket(dpp::socket fd) final {
bool r = socket_engine_base::remove_socket(fd);
if (r) {
struct kevent ke{};
EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
}
return r;
struct kevent ke{};
EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr);
return true;
}
};

Expand Down
28 changes: 13 additions & 15 deletions src/dpp/socketengines/poll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,12 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base {
processed++;
}

auto iter = fds.find(fd);
if (iter == fds.end()) {
continue;
}
socket_events *eh = iter->second.get();

socket_events *eh = get_fd(fd);
if (eh == nullptr || eh->flags & WANT_DELETION) {
continue;
}

try {
if ((eh->flags & WANT_DELETION) == 0L) try {

if ((revents & POLLHUP) != 0) {
eh->on_error(fd, *eh, 0);
Expand Down Expand Up @@ -130,6 +125,12 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base {
} catch (const std::exception &e) {
eh->on_error(fd, *eh, 0);
}

if ((eh->flags & WANT_DELETION) != 0L) {
remove_socket(fd);
std::unique_lock lock(fds_mutex);
fds.erase(fd);
}
}
}

Expand Down Expand Up @@ -184,14 +185,11 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base {
protected:

bool remove_socket(dpp::socket fd) final {
bool r = socket_engine_base::remove_socket(fd);
if (r) {
std::unique_lock lock(poll_set_mutex);
for (auto i = poll_set.begin(); i != poll_set.end(); ++i) {
if (i->fd == fd) {
poll_set.erase(i);
return true;
}
std::unique_lock lock(poll_set_mutex);
for (auto i = poll_set.begin(); i != poll_set.end(); ++i) {
if (i->fd == fd) {
poll_set.erase(i);
return true;
}
}
return false;
Expand Down
1 change: 1 addition & 0 deletions src/dpp/sslclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ void ssl_client::close()
last_tick = time(nullptr);
bytes_in = bytes_out = 0;
if (sfd != INVALID_SOCKET) {
log(ll_trace, "ssl_client::close() with sfd");
owner->socketengine->delete_socket(sfd);
close_socket(sfd);
sfd = INVALID_SOCKET;
Expand Down
1 change: 1 addition & 0 deletions src/dpp/wsclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ void websocket_client::on_disconnect()

void websocket_client::close()
{
log(ll_trace, "websocket_client::close()");
this->on_disconnect();
this->state = HTTP_HEADERS;
ssl_client::close();
Expand Down

0 comments on commit 5a06ebc

Please sign in to comment.