From e1d0ab8c3d6c9e5a951efa07c25457c8b179e205 Mon Sep 17 00:00:00 2001 From: Craig Edwards Date: Sun, 1 Dec 2024 23:03:09 +0000 Subject: [PATCH] fix: reconnect of shards --- include/dpp/cluster.h | 2 +- src/davetest/dave.cpp | 2 +- src/dpp/cluster.cpp | 2 +- src/dpp/discordclient.cpp | 72 +++++++++++++++++++++++---------------- src/dpp/sslclient.cpp | 9 +++-- src/soaktest/soak.cpp | 15 ++++++++ 6 files changed, 67 insertions(+), 35 deletions(-) diff --git a/include/dpp/cluster.h b/include/dpp/cluster.h index 9b193f8098..8b40f8f53b 100644 --- a/include/dpp/cluster.h +++ b/include/dpp/cluster.h @@ -473,7 +473,7 @@ class DPP_EXPORT cluster { * * @param return_after If true the bot will return to your program after starting shards, if false this function will never return. */ - void start(bool return_after = true); + void start(start_type return_after = st_wait); /** * @brief Set the presence for all shards on the cluster diff --git a/src/davetest/dave.cpp b/src/davetest/dave.cpp index 94812e0d89..83ebc5e78f 100644 --- a/src/davetest/dave.cpp +++ b/src/davetest/dave.cpp @@ -84,5 +84,5 @@ int main() { s->connect_voice(TEST_GUILD_ID, TEST_VC_ID, muted, deaf, enable_dave); } }); - dave_test.start(false); + dave_test.start(dpp::st_wait); } diff --git a/src/dpp/cluster.cpp b/src/dpp/cluster.cpp index 99926b3f37..21947d0679 100644 --- a/src/dpp/cluster.cpp +++ b/src/dpp/cluster.cpp @@ -198,7 +198,7 @@ dpp::utility::uptime cluster::uptime() return dpp::utility::uptime(time(nullptr) - start_time); } -void cluster::start(bool return_after) { +void cluster::start(start_type return_after) { auto event_loop = [this]() -> void { while (!this->terminating && socketengine.get()) { diff --git a/src/dpp/discordclient.cpp b/src/dpp/discordclient.cpp index 50ba878ced..c698c8cb7f 100644 --- a/src/dpp/discordclient.cpp +++ b/src/dpp/discordclient.cpp @@ -128,10 +128,19 @@ void discord_client::on_disconnect() { set_resume_hostname(); log(dpp::ll_debug, "Lost connection to websocket on shard " + std::to_string(shard_id) + ", reconnecting in 5 seconds..."); + ssl_client::close(); + end_zlib(); owner->start_timer([this](auto handle) { + log(dpp::ll_debug, "Reconnecting shard " + std::to_string(shard_id) + " to wss://" + hostname + "..."); owner->stop_timer(handle); cleanup(); terminating = false; + if (timer_handle) { + owner->stop_timer(timer_handle); + timer_handle = 0; + } + start = time(nullptr); + ssl_client::connect(); start_connecting(); run(); }, 5); @@ -305,34 +314,40 @@ bool discord_client::handle_frame(const std::string &buffer, ws_opcode opcode) resumes++; } else { /* Full connect */ - while (time(nullptr) < creator->last_identify + 5) { - time_t wait = (creator->last_identify + 5) - time(nullptr); - std::this_thread::sleep_for(std::chrono::seconds(wait)); - } - log(dpp::ll_debug, "Connecting new session..."); - json obj = { - { "op", 2 }, - { - "d", + auto connect_now = [this]() { + log(dpp::ll_debug, "Connecting new session..."); + json obj = { + { "op", 2 }, { - { "token", this->token }, - { "properties", - { - { "os", STRINGIFY(DPP_OS) }, - { "browser", "D++" }, - { "device", "D++" } - } - }, - { "shard", json::array({ shard_id, max_shards }) }, - { "compress", false }, - { "large_threshold", 250 }, - { "intents", this->intents } + "d", + { + { "token", this->token }, + { "properties", + { + { "os", STRINGIFY(DPP_OS) }, + { "browser", "D++" }, + { "device", "D++" } + } + }, + { "shard", json::array({ shard_id, max_shards }) }, + { "compress", false }, + { "large_threshold", 250 }, + { "intents", this->intents } + } } - } + }; + this->write(jsonobj_to_string(obj), protocol == ws_etf ? OP_BINARY : OP_TEXT); + this->connect_time = creator->last_identify = time(nullptr); + reconnects++; }; - this->write(jsonobj_to_string(obj), protocol == ws_etf ? OP_BINARY : OP_TEXT); - this->connect_time = creator->last_identify = time(nullptr); - reconnects++; + if (time(nullptr) < creator->last_identify + 5) { + owner->start_timer([this, connect_now](timer h) { + owner->stop_timer(h); + connect_now(); + }, (creator->last_identify + 5) - time(nullptr)); + } else { + connect_now(); + } } this->last_heartbeat_ack = time(nullptr); websocket_ping = 0; @@ -345,7 +360,7 @@ bool discord_client::handle_frame(const std::string &buffer, ws_opcode opcode) case 7: log(dpp::ll_debug, "Reconnection requested, closing socket " + sessionid); message_queue.clear(); - throw dpp::connection_exception(err_reconnection, "Remote site requested reconnection"); + this->close(); break; /* Heartbeat ack */ case 11: @@ -411,6 +426,7 @@ void discord_client::error(uint32_t errorcode) error = i->second; } log(dpp::ll_warning, "OOF! Error from underlying websocket: " + std::to_string(errorcode) + ": " + error); + this->close(); } void discord_client::log(dpp::loglevel severity, const std::string &msg) const @@ -454,10 +470,6 @@ size_t discord_client::get_queue_size() void discord_client::one_second_timer() { - if (terminating) { - throw dpp::exception("Shard terminating due to cluster shutdown"); - } - websocket_client::one_second_timer(); /* This all only triggers if we are connected (have completed websocket, and received READY or RESUMED) */ diff --git a/src/dpp/sslclient.cpp b/src/dpp/sslclient.cpp index c40820df0a..dbe85c1060 100644 --- a/src/dpp/sslclient.cpp +++ b/src/dpp/sslclient.cpp @@ -482,7 +482,7 @@ void ssl_client::on_write(socket fd, const struct socket_events& e) { void ssl_client::on_error(socket fd, const struct socket_events&, int error_code) { if (sfd != INVALID_SOCKET) { - ssl_client::close(); + this->close(); } } @@ -542,6 +542,10 @@ void ssl_client::close() SSL_free(ssl->ssl); ssl->ssl = nullptr; } + connected = tcp_connect_done = false; + client_to_server_length = client_to_server_offset = 0; + last_tick = time(nullptr); + bytes_in = bytes_out = 0; owner->socketengine->delete_socket(sfd); close_socket(sfd); sfd = INVALID_SOCKET; @@ -552,7 +556,6 @@ void ssl_client::close() void ssl_client::cleanup() { this->close(); - delete ssl; } ssl_client::~ssl_client() @@ -562,6 +565,8 @@ ssl_client::~ssl_client() owner->stop_timer(timer_handle); timer_handle = 0; } + delete ssl; + ssl = nullptr; } } diff --git a/src/soaktest/soak.cpp b/src/soaktest/soak.cpp index cbc462c194..679da011ae 100644 --- a/src/soaktest/soak.cpp +++ b/src/soaktest/soak.cpp @@ -23,18 +23,33 @@ #include #include #include +#ifndef _WIN32 + #include +#endif + +dpp::cluster* s{nullptr}; int main() { using namespace std::chrono_literals; char* t = getenv("DPP_UNIT_TEST_TOKEN"); if (t) { dpp::cluster soak_test(t, dpp::i_default_intents | dpp::i_guild_members, 1, 0, 1); + s = &soak_test; //soak_test.set_websocket_protocol(dpp::ws_etf); soak_test.on_log([&](const dpp::log_t& log) { std::cout << "[" << dpp::utility::current_date_time() << "] " << dpp::utility::loglevel(log.severity) << ": " << log.message << std::endl; }); soak_test.start(dpp::st_return); +#ifndef _WIN32 + signal(SIGINT, [](int sig) { + dpp::discord_client* dc = s->get_shard(0); + if (dc != nullptr) { + dc->close(); + } + }); +#endif + while (true) { std::this_thread::sleep_for(60s); dpp::discord_client* dc = soak_test.get_shard(0);