Skip to content

Commit

Permalink
properly mutex everything
Browse files Browse the repository at this point in the history
  • Loading branch information
braindigitalis committed Nov 21, 2024
1 parent d119a3b commit f80a975
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 85 deletions.
7 changes: 7 additions & 0 deletions include/dpp/socketengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <unordered_map>
#include <memory>
#include <functional>
#include <shared_mutex>
#include <dpp/thread_pool.h>

namespace dpp {
Expand Down Expand Up @@ -144,6 +145,12 @@ using socket_container = std::unordered_map<dpp::socket, std::unique_ptr<socket_
* event mechanisms etc).
*/
struct DPP_EXPORT socket_engine_base {

/**
* @brief Mutex for fds
*/
std::shared_mutex fds_mutex;

/**
* @brief File descriptors, and their states
*/
Expand Down
4 changes: 4 additions & 0 deletions src/dpp/socketengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
namespace dpp {

bool socket_engine_base::register_socket(const socket_events &e) {
std::unique_lock lock(fds_mutex);
if (e.fd != INVALID_SOCKET && fds.find(e.fd) == fds.end()) {
fds.emplace(e.fd, std::make_unique<socket_events>(e));
return true;
Expand All @@ -39,6 +40,7 @@ bool socket_engine_base::register_socket(const socket_events &e) {
}

bool socket_engine_base::update_socket(const socket_events &e) {
std::unique_lock lock(fds_mutex);
if (e.fd != INVALID_SOCKET && fds.find(e.fd) != fds.end()) {
auto iter = fds.find(e.fd);
*(iter->second) = e;
Expand Down Expand Up @@ -68,6 +70,7 @@ time_t last_time = time(nullptr);

void socket_engine_base::prune() {
if (to_delete_count > 0) {
std::unique_lock lock(fds_mutex);
for (auto it = fds.cbegin(); it != fds.cend();) {
if ((it->second->flags & WANT_DELETION) != 0L) {
remove_socket(it->second->fd);
Expand Down Expand Up @@ -98,6 +101,7 @@ void socket_engine_base::prune() {
}

bool socket_engine_base::delete_socket(dpp::socket fd) {
std::unique_lock lock(fds_mutex);
auto iter = fds.find(fd);
if (iter == fds.end() || ((iter->second->flags & WANT_DELETION) != 0L)) {
return false;
Expand Down
102 changes: 57 additions & 45 deletions src/dpp/socketengines/poll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <dpp/socketengine.h>
#include <vector>
#include <shared_mutex>
#ifdef _WIN32
/* Windows-specific sockets includes */
#include <WinSock2.h>
Expand Down Expand Up @@ -54,70 +55,78 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base {
*/
std::vector<pollfd> poll_set;
pollfd out_set[FD_SETSIZE]{0};
std::shared_mutex poll_set_mutex;

void process_events() final {
const int poll_delay = 1000;

if (poll_set.empty()) {
/* On many platforms, it is not possible to wait on an empty set */
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} else {
if (poll_set.size() > FD_SETSIZE) {
throw dpp::connection_exception("poll() does not support more than FD_SETSIZE active sockets at once!");
prune();
{
std::shared_lock lock(poll_set_mutex);
if (poll_set.empty()) {
/* On many platforms, it is not possible to wait on an empty set */
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return;
} else {
if (poll_set.size() > FD_SETSIZE) {
throw dpp::connection_exception("poll() does not support more than FD_SETSIZE active sockets at once!");
}
/**
* We must make a copy of the poll_set, because it would cause thread locking/contention
* issues if we had it locked for read during poll/iteration of the returned set.
*/
std::copy(poll_set.begin(), poll_set.end(), out_set);
}
}

std::copy(poll_set.begin(), poll_set.end(), out_set);
int i = poll(out_set, static_cast<unsigned int>(poll_set.size()), poll_delay);
int processed = 0;

int i = poll(out_set, static_cast<unsigned int>(poll_set.size()), poll_delay);
int processed = 0;
for (size_t index = 0; index < poll_set.size() && processed < i; index++) {
const int fd = out_set[index].fd;
const short revents = out_set[index].revents;

for (size_t index = 0; index < poll_set.size() && processed < i; index++) {
const int fd = out_set[index].fd;
const short revents = out_set[index].revents;
if (revents > 0) {
processed++;
}

if (revents > 0) {
processed++;
}
auto iter = fds.find(fd);
if (iter == fds.end()) {
continue;
}
socket_events *eh = iter->second.get();

try {

auto iter = fds.find(fd);
if (iter == fds.end()) {
if ((revents & POLLHUP) != 0) {
eh->on_error(fd, *eh, 0);
continue;
}
socket_events *eh = iter->second.get();

try {

if ((revents & POLLHUP) != 0) {
eh->on_error(fd, *eh, 0);
continue;
}

if ((revents & POLLERR) != 0) {
socklen_t codesize = sizeof(int);
int errcode{};
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char *) &errcode, &codesize) < 0) {
errcode = errno;
}
eh->on_error(fd, *eh, errcode);
continue;
}

if ((revents & POLLIN) != 0) {
eh->on_read(fd, *eh);
if ((revents & POLLERR) != 0) {
socklen_t codesize = sizeof(int);
int errcode{};
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char *) &errcode, &codesize) < 0) {
errcode = errno;
}
eh->on_error(fd, *eh, errcode);
continue;
}

if ((revents & POLLOUT) != 0) {
eh->flags &= ~WANT_WRITE;
update_socket(*eh);
eh->on_write(fd, *eh);
}
if ((revents & POLLIN) != 0) {
eh->on_read(fd, *eh);
}

} catch (const std::exception &e) {
eh->on_error(fd, *eh, 0);
if ((revents & POLLOUT) != 0) {
eh->flags &= ~WANT_WRITE;
update_socket(*eh);
eh->on_write(fd, *eh);
}

} catch (const std::exception &e) {
eh->on_error(fd, *eh, 0);
}
}
prune();
}

#if _WIN32
Expand All @@ -129,6 +138,7 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base {
bool register_socket(const socket_events& e) final {
bool r = socket_engine_base::register_socket(e);
if (r) {
std::unique_lock lock(poll_set_mutex);
pollfd fd_info{};
fd_info.fd = e.fd;
fd_info.events = 0;
Expand All @@ -146,6 +156,7 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base {
bool update_socket(const socket_events& e) final {
bool r = socket_engine_base::update_socket(e);
if (r) {
std::unique_lock lock(poll_set_mutex);
/* We know this will succeed */
for (pollfd& fd_info : poll_set) {
if (fd_info.fd != e.fd) {
Expand All @@ -171,6 +182,7 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base {
bool remove_socket(dpp::socket fd) final {
bool r = socket_engine_base::remove_socket(fd);
if (r) {
std::unique_lock lock(poll_set_mutex);
for (auto i = poll_set.begin(); i != poll_set.end(); ++i) {
if (i->fd == fd) {
poll_set.erase(i);
Expand Down
Loading

0 comments on commit f80a975

Please sign in to comment.