Skip to content

Commit

Permalink
fix: reconnect of shards
Browse files Browse the repository at this point in the history
  • Loading branch information
braindigitalis committed Dec 1, 2024
1 parent 4d9fbbf commit e1d0ab8
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 35 deletions.
2 changes: 1 addition & 1 deletion include/dpp/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ class DPP_EXPORT cluster {
*
* @param return_after If true the bot will return to your program after starting shards, if false this function will never return.
*/
void start(bool return_after = true);
void start(start_type return_after = st_wait);

/**
* @brief Set the presence for all shards on the cluster
Expand Down
2 changes: 1 addition & 1 deletion src/davetest/dave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,5 @@ int main() {
s->connect_voice(TEST_GUILD_ID, TEST_VC_ID, muted, deaf, enable_dave);
}
});
dave_test.start(false);
dave_test.start(dpp::st_wait);
}
2 changes: 1 addition & 1 deletion src/dpp/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ dpp::utility::uptime cluster::uptime()
return dpp::utility::uptime(time(nullptr) - start_time);
}

void cluster::start(bool return_after) {
void cluster::start(start_type return_after) {

auto event_loop = [this]() -> void {
while (!this->terminating && socketengine.get()) {
Expand Down
72 changes: 42 additions & 30 deletions src/dpp/discordclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,19 @@ void discord_client::on_disconnect()
{
set_resume_hostname();
log(dpp::ll_debug, "Lost connection to websocket on shard " + std::to_string(shard_id) + ", reconnecting in 5 seconds...");
ssl_client::close();
end_zlib();
owner->start_timer([this](auto handle) {
log(dpp::ll_debug, "Reconnecting shard " + std::to_string(shard_id) + " to wss://" + hostname + "...");
owner->stop_timer(handle);
cleanup();
terminating = false;
if (timer_handle) {
owner->stop_timer(timer_handle);
timer_handle = 0;
}
start = time(nullptr);
ssl_client::connect();
start_connecting();
run();
}, 5);
Expand Down Expand Up @@ -305,34 +314,40 @@ bool discord_client::handle_frame(const std::string &buffer, ws_opcode opcode)
resumes++;
} else {
/* Full connect */
while (time(nullptr) < creator->last_identify + 5) {
time_t wait = (creator->last_identify + 5) - time(nullptr);
std::this_thread::sleep_for(std::chrono::seconds(wait));
}
log(dpp::ll_debug, "Connecting new session...");
json obj = {
{ "op", 2 },
{
"d",
auto connect_now = [this]() {
log(dpp::ll_debug, "Connecting new session...");
json obj = {
{ "op", 2 },
{
{ "token", this->token },
{ "properties",
{
{ "os", STRINGIFY(DPP_OS) },
{ "browser", "D++" },
{ "device", "D++" }
}
},
{ "shard", json::array({ shard_id, max_shards }) },
{ "compress", false },
{ "large_threshold", 250 },
{ "intents", this->intents }
"d",
{
{ "token", this->token },
{ "properties",
{
{ "os", STRINGIFY(DPP_OS) },
{ "browser", "D++" },
{ "device", "D++" }
}
},
{ "shard", json::array({ shard_id, max_shards }) },
{ "compress", false },
{ "large_threshold", 250 },
{ "intents", this->intents }
}
}
}
};
this->write(jsonobj_to_string(obj), protocol == ws_etf ? OP_BINARY : OP_TEXT);
this->connect_time = creator->last_identify = time(nullptr);
reconnects++;
};
this->write(jsonobj_to_string(obj), protocol == ws_etf ? OP_BINARY : OP_TEXT);
this->connect_time = creator->last_identify = time(nullptr);
reconnects++;
if (time(nullptr) < creator->last_identify + 5) {
owner->start_timer([this, connect_now](timer h) {
owner->stop_timer(h);
connect_now();
}, (creator->last_identify + 5) - time(nullptr));
} else {
connect_now();
}
}
this->last_heartbeat_ack = time(nullptr);
websocket_ping = 0;
Expand All @@ -345,7 +360,7 @@ bool discord_client::handle_frame(const std::string &buffer, ws_opcode opcode)
case 7:
log(dpp::ll_debug, "Reconnection requested, closing socket " + sessionid);
message_queue.clear();
throw dpp::connection_exception(err_reconnection, "Remote site requested reconnection");
this->close();
break;
/* Heartbeat ack */
case 11:
Expand Down Expand Up @@ -411,6 +426,7 @@ void discord_client::error(uint32_t errorcode)
error = i->second;
}
log(dpp::ll_warning, "OOF! Error from underlying websocket: " + std::to_string(errorcode) + ": " + error);
this->close();
}

void discord_client::log(dpp::loglevel severity, const std::string &msg) const
Expand Down Expand Up @@ -454,10 +470,6 @@ size_t discord_client::get_queue_size()

void discord_client::one_second_timer()
{
if (terminating) {
throw dpp::exception("Shard terminating due to cluster shutdown");
}

websocket_client::one_second_timer();

/* This all only triggers if we are connected (have completed websocket, and received READY or RESUMED) */
Expand Down
9 changes: 7 additions & 2 deletions src/dpp/sslclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ void ssl_client::on_write(socket fd, const struct socket_events& e) {

void ssl_client::on_error(socket fd, const struct socket_events&, int error_code) {
if (sfd != INVALID_SOCKET) {
ssl_client::close();
this->close();
}
}

Expand Down Expand Up @@ -542,6 +542,10 @@ void ssl_client::close()
SSL_free(ssl->ssl);
ssl->ssl = nullptr;
}
connected = tcp_connect_done = false;
client_to_server_length = client_to_server_offset = 0;
last_tick = time(nullptr);
bytes_in = bytes_out = 0;
owner->socketengine->delete_socket(sfd);
close_socket(sfd);
sfd = INVALID_SOCKET;
Expand All @@ -552,7 +556,6 @@ void ssl_client::close()
void ssl_client::cleanup()
{
this->close();
delete ssl;
}

ssl_client::~ssl_client()
Expand All @@ -562,6 +565,8 @@ ssl_client::~ssl_client()
owner->stop_timer(timer_handle);
timer_handle = 0;
}
delete ssl;
ssl = nullptr;
}

}
15 changes: 15 additions & 0 deletions src/soaktest/soak.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,33 @@
#include <dpp/dpp.h>
#include <iostream>
#include <thread>
#ifndef _WIN32
#include <csignal>
#endif

dpp::cluster* s{nullptr};

int main() {
using namespace std::chrono_literals;
char* t = getenv("DPP_UNIT_TEST_TOKEN");
if (t) {
dpp::cluster soak_test(t, dpp::i_default_intents | dpp::i_guild_members, 1, 0, 1);
s = &soak_test;
//soak_test.set_websocket_protocol(dpp::ws_etf);
soak_test.on_log([&](const dpp::log_t& log) {
std::cout << "[" << dpp::utility::current_date_time() << "] " << dpp::utility::loglevel(log.severity) << ": " << log.message << std::endl;
});
soak_test.start(dpp::st_return);

#ifndef _WIN32
signal(SIGINT, [](int sig) {
dpp::discord_client* dc = s->get_shard(0);
if (dc != nullptr) {
dc->close();
}
});
#endif

while (true) {
std::this_thread::sleep_for(60s);
dpp::discord_client* dc = soak_test.get_shard(0);
Expand Down

0 comments on commit e1d0ab8

Please sign in to comment.