diff --git a/include/dpp/socketengine.h b/include/dpp/socketengine.h index ce21d1cc75..051aa2f801 100644 --- a/include/dpp/socketengine.h +++ b/include/dpp/socketengine.h @@ -160,13 +160,6 @@ struct DPP_EXPORT socket_engine_base { */ socket_container fds; - /** - * @brief Thread pool. - * Event calls go into the thread pool and are called as - * and when threads in the pool are available. - */ - std::unique_ptr pool; - /** * @brief Number of file descriptors we are waiting to delete */ @@ -244,6 +237,7 @@ struct DPP_EXPORT socket_engine_base { * remove_socket() on each entry to be removed. */ void prune(); + protected: /** @@ -252,6 +246,13 @@ struct DPP_EXPORT socket_engine_base { * @param fd File descriptor to remove */ virtual bool remove_socket(dpp::socket fd); + + /** + * @brief Find a file descriptors socket events + * @param fd file descriptor + * @return file descriptor or nullptr if doesn't exist + */ + socket_events* get_fd(dpp::socket fd); }; /** diff --git a/src/dpp/cluster.cpp b/src/dpp/cluster.cpp index b9c5faf838..3ad3553d6c 100644 --- a/src/dpp/cluster.cpp +++ b/src/dpp/cluster.cpp @@ -199,8 +199,8 @@ dpp::utility::uptime cluster::uptime() } void cluster::add_reconnect(uint32_t shard_id) { - reconnections.emplace(shard_id, time(nullptr) + 5); - log(ll_trace, "Reconnecting in 5 seconds..."); + reconnections[shard_id] =time(nullptr) + 5; + log(ll_trace, "Reconnecting shard " + std::to_string(shard_id) + " in 5 seconds..."); } void cluster::start(start_type return_after) { diff --git a/src/dpp/socketengine.cpp b/src/dpp/socketengine.cpp index 3a39a31eb3..4b8730ee0a 100644 --- a/src/dpp/socketengine.cpp +++ b/src/dpp/socketengine.cpp @@ -72,6 +72,15 @@ socket_engine_base::socket_engine_base(cluster* creator) : owner(creator) { time_t last_time = time(nullptr); +socket_events* socket_engine_base::get_fd(dpp::socket fd) { + std::unique_lock lock(fds_mutex); + auto iter = fds.find(fd); + if (iter == fds.end() || ((iter->second->flags & WANT_DELETION) != 0L)) { + return nullptr; + } + return iter->second.get(); +} + void socket_engine_base::prune() { if (to_delete_count > 0) { std::unique_lock lock(fds_mutex); diff --git a/src/dpp/socketengines/epoll.cpp b/src/dpp/socketengines/epoll.cpp index 2ccc28b2fe..648e28ed0d 100644 --- a/src/dpp/socketengines/epoll.cpp +++ b/src/dpp/socketengines/epoll.cpp @@ -44,7 +44,7 @@ int modify_event(int epoll_handle, socket_events* eh, int new_events) { if ((new_events & WANT_ERROR) != 0) { new_ev.events |= EPOLLERR; } - new_ev.data.ptr = static_cast(eh); + new_ev.data.fd = eh->fd; epoll_ctl(epoll_handle, EPOLL_CTL_MOD, eh->fd, &new_ev); } return new_events; @@ -80,13 +80,9 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { for (int j = 0; j < i; j++) { epoll_event ev = events[j]; - auto* const eh = static_cast(ev.data.ptr); - if (!eh) { - continue; - } - - const int fd = eh->fd; - if (fd == INVALID_SOCKET || eh->flags & WANT_DELETION) { + const int fd = ev.data.fd; + auto eh = get_fd(fd); + if (!eh || fd == INVALID_SOCKET) { continue; } @@ -147,10 +143,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { if ((e.flags & WANT_ERROR) != 0) { ev.events |= EPOLLERR; } - { - std::shared_lock lock(fds_mutex); - ev.data.ptr = fds.find(e.fd)->second.get(); - } + ev.data.fd = e.fd; return epoll_ctl(epoll_handle, EPOLL_CTL_ADD, e.fd, &ev) >= 0; } return r; @@ -170,10 +163,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { if ((e.flags & WANT_ERROR) != 0) { ev.events |= EPOLLERR; } - { - std::shared_lock lock(fds_mutex); - ev.data.ptr = fds.find(e.fd)->second.get(); - } + ev.data.fd = e.fd; return epoll_ctl(epoll_handle, EPOLL_CTL_MOD, e.fd, &ev) >= 0; } return r; diff --git a/src/dpp/socketengines/kqueue.cpp b/src/dpp/socketengines/kqueue.cpp index 310c4573ab..3c3acf16c5 100644 --- a/src/dpp/socketengines/kqueue.cpp +++ b/src/dpp/socketengines/kqueue.cpp @@ -67,8 +67,8 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { for (int j = 0; j < i; j++) { const struct kevent& kev = ke_list[j]; - auto* eh = reinterpret_cast(kev.udata); - if (eh == nullptr || eh->flags & WANT_DELETION) { + auto eh = get_fd(kev.ident); + if (eh == nullptr) { continue; } @@ -102,46 +102,30 @@ struct DPP_EXPORT socket_engine_kqueue : public socket_engine_base { prune(); } + bool set_events(const socket_events& e) { + struct kevent ke{}; + if ((e.flags & WANT_READ) != 0) { + EV_SET(&ke, e.fd, EVFILT_READ, EV_ADD, 0, 0, nullptr); + kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); + } + if ((e.flags & WANT_WRITE) != 0) { + EV_SET(&ke, e.fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, nullptr); + kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); + } + } + bool register_socket(const socket_events& e) final { - bool r = socket_engine_base::register_socket(e); - if (r) { - struct kevent ke{}; - socket_events* se{}; - { - std::unique_lock lock(fds_mutex); - se = fds.find(e.fd)->second.get(); - } - if ((se->flags & WANT_READ) != 0) { - EV_SET(&ke, e.fd, EVFILT_READ, EV_ADD, 0, 0, static_cast(se)); - kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); - } - if ((se->flags & WANT_WRITE) != 0) { - EV_SET(&ke, e.fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, static_cast(se)); - kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); - } + if (socket_engine_base::register_socket(e)) { + return set_events(e); } - return r; + return false; } bool update_socket(const socket_events& e) final { - bool r = socket_engine_base::update_socket(e); - if (r) { - struct kevent ke{}; - socket_events* se{}; - { - std::unique_lock lock(fds_mutex); - se = fds.find(e.fd)->second.get(); - } - if ((e.flags & WANT_READ) != 0) { - EV_SET(&ke, e.fd, EVFILT_READ, EV_ADD, 0, 0, static_cast(se)); - kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); - } - if ((e.flags & WANT_WRITE) != 0) { - EV_SET(&ke, e.fd, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, static_cast(se)); - kevent(kqueue_handle, &ke, 1, nullptr, 0, nullptr); - } + if (socket_engine_base::update_socket(e)) { + return set_events(e); } - return r; + return false; } protected: diff --git a/src/dpp/sslclient.cpp b/src/dpp/sslclient.cpp index 5cf0616a21..fcc31c05ab 100644 --- a/src/dpp/sslclient.cpp +++ b/src/dpp/sslclient.cpp @@ -505,7 +505,6 @@ void ssl_client::read_loop() if (this->sfd == INVALID_SOCKET) { close_socket(fd); owner->socketengine->delete_socket(fd); - on_error(fd, e, 0); return; } on_read(fd, e); @@ -514,7 +513,6 @@ void ssl_client::read_loop() if (this->sfd == INVALID_SOCKET) { close_socket(fd); owner->socketengine->delete_socket(fd); - on_error(fd, e, 0); return; } on_write(fd, e);