Skip to content

Commit

Permalink
feat: socket engine stats functions (#1353)
Browse files Browse the repository at this point in the history
  • Loading branch information
braindigitalis authored Dec 19, 2024
1 parent dd567e6 commit 531c451
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 37 deletions.
97 changes: 78 additions & 19 deletions include/dpp/socketengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cstdint>
#include <unordered_map>
#include <memory>
#include <string_view>
#include <functional>
#include <shared_mutex>
#include <dpp/thread_pool.h>
Expand Down Expand Up @@ -74,6 +75,51 @@ using socket_write_event = std::function<void(dpp::socket fd, const struct socke
*/
using socket_error_event = std::function<void(dpp::socket fd, const struct socket_events&, int error_code)>;

/**
* @brief Contains statistics about the IO loop
*/
struct DPP_EXPORT socket_stats {
/**
* @brief Number of reads since startup
*/
uint64_t reads{0};

/**
* @brief Number of writes since startup
*/
uint64_t writes{0};

/**
* @brief Number of errors since startup
*/
uint64_t errors{0};

/**
* @brief Number of updates to file descriptors
*/
uint64_t updates{0};

/**
* @brief Number of deletions of file descriptors
*/
uint64_t deletions{0};

/**
* @brief Number of loop iterations since startup
*/
uint64_t iterations{0};

/**
* @brief Number of currently active file descriptors
*/
uint64_t active_fds{0};

/**
* @brief Socket engine type
*/
std::string_view engine_type;
};

/**
* @brief Represents an active socket event set in the socket engine.
*
Expand Down Expand Up @@ -150,21 +196,6 @@ using socket_container = std::unordered_map<dpp::socket, std::unique_ptr<socket_
*/
struct DPP_EXPORT socket_engine_base {

/**
* @brief Mutex for fds
*/
std::shared_mutex fds_mutex;

/**
* @brief File descriptors, and their states
*/
socket_container fds;

/**
* @brief Number of file descriptors we are waiting to delete
*/
size_t to_delete_count{0};

/**
* @brief Owning cluster
*/
Expand Down Expand Up @@ -238,21 +269,49 @@ struct DPP_EXPORT socket_engine_base {
*/
void prune();

/**
* @brief Merge new flags in with the given file descriptor
* @param fd file descriptor
* @param extra_flags extra flags to add
*/
void inplace_modify_fd(dpp::socket fd, uint8_t extra_flags);

/**
* @brief Get statistics for socket engine
* @return socket stats
*/
const socket_stats& get_stats() const;

protected:

/**
* @brief Called by the prune() function to remove sockets when safe to do so.
* This is normally at the end or before an iteration of the event loop.
* @param fd File descriptor to remove
* @brief Mutex for fds
*/
virtual bool remove_socket(dpp::socket fd);
std::shared_mutex fds_mutex;

/**
* @brief File descriptors, and their states
*/
socket_container fds;

/**
* @brief Socket engine statistics
*/
socket_stats stats{};

/**
* @brief Find a file descriptors socket events
* @param fd file descriptor
* @return file descriptor or nullptr if doesn't exist
*/
socket_events* get_fd(dpp::socket fd);

/**
* @brief Called by the prune() function to remove sockets when safe to do so.
* This is normally at the end or before an iteration of the event loop.
* @param fd File descriptor to remove
*/
virtual bool remove_socket(dpp::socket fd);
};

/**
Expand Down
27 changes: 27 additions & 0 deletions src/dpp/socketengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ bool socket_engine_base::register_socket(const socket_events &e) {
auto i = fds.find(e.fd);
if (e.fd != INVALID_SOCKET && i == fds.end()) {
fds.emplace(e.fd, std::make_unique<socket_events>(e));
stats.active_fds++;
return true;
}
if (e.fd != INVALID_SOCKET && i != fds.end()) {
remove_socket(e.fd);
fds.erase(i);
fds.emplace(e.fd, std::make_unique<socket_events>(e));
stats.updates++;
return true;
}
return false;
Expand All @@ -51,6 +53,7 @@ bool socket_engine_base::update_socket(const socket_events &e) {
if (e.fd != INVALID_SOCKET && fds.find(e.fd) != fds.end()) {
auto iter = fds.find(e.fd);
*(iter->second) = e;
stats.updates++;
return true;
}
return false;
Expand Down Expand Up @@ -83,6 +86,23 @@ socket_events* socket_engine_base::get_fd(dpp::socket fd) {
return iter->second.get();
}

void socket_engine_base::inplace_modify_fd(dpp::socket fd, uint8_t extra_flags) {
bool should_modify;
socket_events s{};
{
std::lock_guard lk(fds_mutex);
auto i = fds.find(fd);
should_modify = i != fds.end() && (i->second->flags & extra_flags) != extra_flags;
if (should_modify) {
i->second->flags |= extra_flags;
s = *(i->second);
}
}
if (should_modify) {
update_socket(s);
}
}

void socket_engine_base::prune() {
if (time(nullptr) != last_time) {
try {
Expand All @@ -101,6 +121,7 @@ void socket_engine_base::prune() {

last_time = time(nullptr);
}
stats.iterations++;
}

bool socket_engine_base::delete_socket(dpp::socket fd) {
Expand All @@ -110,11 +131,17 @@ bool socket_engine_base::delete_socket(dpp::socket fd) {
return false;
}
iter->second->flags |= WANT_DELETION;
stats.deletions++;
stats.active_fds--;
return true;
}

bool socket_engine_base::remove_socket(dpp::socket fd) {
return true;
}

const socket_stats& socket_engine_base::get_stats() const {
return stats;
}

}
8 changes: 7 additions & 1 deletion src/dpp/socketengines/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {

int epoll_handle{INVALID_SOCKET};
static constexpr size_t MAX_EVENTS = 65536;
std::array<struct epoll_event, MAX_EVENTS> events;
std::array<struct epoll_event, MAX_EVENTS> events{};

socket_engine_epoll(const socket_engine_epoll&) = delete;
socket_engine_epoll(socket_engine_epoll&&) = delete;
Expand All @@ -65,6 +65,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
if (epoll_handle == -1) {
throw dpp::connection_exception("Failed to initialise epoll()");
}
stats.engine_type = "epoll";
}

~socket_engine_epoll() override {
Expand All @@ -89,13 +90,15 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
if ((eh->flags & WANT_DELETION) == 0L) try {

if ((ev.events & EPOLLHUP) != 0U) {
stats.errors++;
if (eh->on_error) {
eh->on_error(fd, *eh, EPIPE);
}
continue;
}

if ((ev.events & EPOLLERR) != 0U) {
stats.errors++;
socklen_t codesize = sizeof(int);
int errcode{};
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &codesize) < 0) {
Expand All @@ -111,18 +114,21 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base {
/* Should we have a flag to allow keeping WANT_WRITE? Maybe like WANT_WRITE_ONCE or GREEDY_WANT_WRITE, eh */
eh->flags = modify_event(epoll_handle, eh, eh->flags & ~WANT_WRITE);
if (eh->on_write) {
stats.writes++;
eh->on_write(fd, *eh);
}
}

if ((ev.events & EPOLLIN) != 0U) {
if (eh->on_read) {
stats.reads++;
eh->on_read(fd, *eh);
}
}

} catch (const std::exception& e) {
owner->log(ll_trace, "Socket loop exception: " + std::string(e.what()));
stats.errors++;
eh->on_error(fd, *eh, 0);
}

Expand Down
5 changes: 5 additions & 0 deletions src/dpp/socketengines/kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
if (kqueue_handle == -1) {
throw dpp::connection_exception("Failed to initialise kqueue()");
}
stats.engine_type = "kqueue";
}

~socket_engine_kqueue() override {
Expand Down Expand Up @@ -78,6 +79,7 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
if (kev.flags & EV_EOF || kev.flags & EV_ERROR) {
if (eh->on_error) {
eh->on_error(kev.ident, *eh, kev.fflags);
stats.errors++;
}
continue;
}
Expand All @@ -86,15 +88,18 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base {
eh->flags &= ~bits_to_clr;
if (eh->on_write) {
eh->on_write(kev.ident, *eh);
stats.writes++;
}
}
else if (filter == EVFILT_READ) {
if (eh->on_read) {
eh->on_read(kev.ident, *eh);
stats.reads++;
}
}

} catch (const std::exception& e) {
stats.errors++;
owner->log(ll_trace, "Socket loop exception: " + std::string(e.what()));
eh->on_error(kev.ident, *eh, 0);
}
Expand Down
9 changes: 8 additions & 1 deletion src/dpp/socketengines/poll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base {

if ((revents & POLLHUP) != 0) {
eh->on_error(fd, *eh, 0);
stats.errors++;
continue;
}

Expand All @@ -109,21 +110,25 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base {
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char *) &errcode, &codesize) < 0) {
errcode = errno;
}
stats.errors++;
eh->on_error(fd, *eh, errcode);
continue;
}

if ((revents & POLLIN) != 0) {
stats.reads++;
eh->on_read(fd, *eh);
}

if ((revents & POLLOUT) != 0) {
stats.writes++;
eh->flags &= ~WANT_WRITE;
update_socket(*eh);
eh->on_write(fd, *eh);
}

} catch (const std::exception &e) {
stats.errors++;
eh->on_error(fd, *eh, 0);
}

Expand Down Expand Up @@ -181,7 +186,9 @@ struct DPP_EXPORT socket_engine_poll : public socket_engine_base {
return r;
}

explicit socket_engine_poll(cluster* creator) : socket_engine_base(creator) { };
explicit socket_engine_poll(cluster* creator) : socket_engine_base(creator) {
stats.engine_type = "poll";
};

protected:

Expand Down
17 changes: 1 addition & 16 deletions src/dpp/wsclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,7 @@ void websocket_client::write(const std::string_view data, ws_opcode _opcode)
ssl_client::socket_write(data);
}

bool should_append_want_write = false;
socket_events *new_se = nullptr;
{
std::lock_guard lk(owner->socketengine->fds_mutex);
auto i = owner->socketengine->fds.find(sfd);

should_append_want_write = i != owner->socketengine->fds.end() && (i->second->flags & WANT_WRITE) != WANT_WRITE;
if (should_append_want_write) {
new_se = i->second.get();
new_se->flags |= WANT_WRITE;
}
}

if (should_append_want_write) {
owner->socketengine->update_socket(*new_se);
}
owner->socketengine->inplace_modify_fd(sfd, WANT_WRITE);
}

bool websocket_client::handle_buffer(std::string& buffer)
Expand Down

0 comments on commit 531c451

Please sign in to comment.