From 6830a831aebcd9c36e4099aeb1a6f37b39d43ef5 Mon Sep 17 00:00:00 2001 From: Neko Life Date: Wed, 27 Nov 2024 16:01:09 +0700 Subject: [PATCH] Fix/voice socketengine (#1337) --- include/dpp/cluster.h | 2 +- src/dpp/cluster.cpp | 10 +++++++--- src/dpp/socketengines/epoll.cpp | 1 + src/dpp/sslclient.cpp | 4 ++++ src/dpp/voice/enabled/read_write.cpp | 4 ---- src/dpp/voice/enabled/write_ready.cpp | 14 +++++++++----- 6 files changed, 22 insertions(+), 13 deletions(-) diff --git a/include/dpp/cluster.h b/include/dpp/cluster.h index c30c378735..fcae83b18e 100644 --- a/include/dpp/cluster.h +++ b/include/dpp/cluster.h @@ -183,7 +183,7 @@ class DPP_EXPORT cluster { * @brief Used to spawn the socket engine into its own thread if * the cluster is started with dpp::st_return. It is unused otherwise. */ - std::unique_ptr engine_thread{nullptr}; + std::thread engine_thread; /** * @brief Protection mutex for timers diff --git a/src/dpp/cluster.cpp b/src/dpp/cluster.cpp index 604deff496..3d41f311a9 100644 --- a/src/dpp/cluster.cpp +++ b/src/dpp/cluster.cpp @@ -298,7 +298,7 @@ void cluster::start(bool return_after) { }); if (return_after) { - engine_thread = std::make_unique([event_loop]() { + engine_thread = std::thread([event_loop]() { dpp::utility::set_thread_name("event_loop"); event_loop(); }); @@ -310,9 +310,12 @@ void cluster::start(bool return_after) { void cluster::shutdown() { /* Signal termination */ terminating = true; - if (engine_thread) { - engine_thread->join(); + + if (engine_thread.joinable()) { + /* Join engine_thread if it ever started */ + engine_thread.join(); } + { std::lock_guard l(timer_guard); /* Free memory for active timers */ @@ -322,6 +325,7 @@ void cluster::shutdown() { timer_list.clear(); next_timer.clear(); } + /* Terminate shards */ for (const auto& sh : shards) { delete sh.second; diff --git a/src/dpp/socketengines/epoll.cpp b/src/dpp/socketengines/epoll.cpp index cfde495d94..c330cd4255 100644 --- a/src/dpp/socketengines/epoll.cpp +++ b/src/dpp/socketengines/epoll.cpp @@ -111,6 +111,7 @@ struct DPP_EXPORT socket_engine_epoll : public socket_engine_base { } if ((ev.events & EPOLLOUT) != 0U) { + /* Should we have a flag to allow keeping WANT_WRITE? Maybe like WANT_WRITE_ONCE or GREEDY_WANT_WRITE, eh */ eh->flags = modify_event(epoll_handle, eh, eh->flags & ~WANT_WRITE); if (eh->on_write) { eh->on_write(fd, *eh); diff --git a/src/dpp/sslclient.cpp b/src/dpp/sslclient.cpp index 534ecc4705..7fb71696cf 100644 --- a/src/dpp/sslclient.cpp +++ b/src/dpp/sslclient.cpp @@ -373,6 +373,10 @@ void ssl_client::on_read(socket fd, const struct socket_events& ev) { } void ssl_client::on_write(socket fd, const struct socket_events& e) { + /* We wanted write before so keep it */ + socket_events se{e}; + se.flags |= WANT_WRITE; + owner->socketengine->update_socket(se); if (!tcp_connect_done) { tcp_connect_done = true; diff --git a/src/dpp/voice/enabled/read_write.cpp b/src/dpp/voice/enabled/read_write.cpp index 5512f08c1c..fd41423796 100644 --- a/src/dpp/voice/enabled/read_write.cpp +++ b/src/dpp/voice/enabled/read_write.cpp @@ -39,10 +39,6 @@ void discord_voice_client::send(const char* packet, size_t len, uint64_t duratio } else [[unlikely]] { this->udp_send(packet, len); } - if (!this->sent_stop_frames) { - udp_events.flags = WANT_READ | WANT_WRITE | WANT_ERROR; - owner->socketengine->update_socket(udp_events); - } } int discord_voice_client::udp_send(const char* data, size_t length) { diff --git a/src/dpp/voice/enabled/write_ready.cpp b/src/dpp/voice/enabled/write_ready.cpp index 385388c356..6877a97c26 100644 --- a/src/dpp/voice/enabled/write_ready.cpp +++ b/src/dpp/voice/enabled/write_ready.cpp @@ -31,6 +31,13 @@ namespace dpp { void discord_voice_client::write_ready() { + /* + * WANT_WRITE has been reset everytime this method is being called, + * ALWAYS set it again no matter what we're gonna do. + */ + udp_events.flags = WANT_READ | WANT_WRITE | WANT_ERROR; + owner->socketengine->update_socket(udp_events); + uint64_t duration = 0; bool track_marker_found = false; uint64_t bufsize = 0; @@ -54,15 +61,12 @@ void discord_voice_client::write_ready() { } } if (!outbuf.empty()) { - if (this->udp_send(outbuf[0].packet.data(), outbuf[0].packet.length()) == (int)outbuf[0].packet.length()) { + int sent_siz = this->udp_send(outbuf[0].packet.data(), outbuf[0].packet.length()); + if (sent_siz == (int)outbuf[0].packet.length()) { duration = outbuf[0].duration * timescale; bufsize = outbuf[0].packet.length(); outbuf.erase(outbuf.begin()); } - if (!outbuf.empty()) { - udp_events.flags = WANT_READ | WANT_WRITE | WANT_ERROR; - owner->socketengine->update_socket(udp_events); - } } } }