From b6bfa0dfac633e07ff53ddf5cf95a90ff9d5ecdf Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski <tyler@expensify.com> Date: Fri, 8 Mar 2024 15:06:20 -0800 Subject: [PATCH 1/5] WIP --- BedrockServer.cpp | 18 ++++++++++++++-- BedrockServer.h | 4 ++++ libstuff/SSignal.cpp | 6 +++--- main.cpp | 2 +- test/clustertest/BedrockClusterTester.h | 28 ++++++++++++++++++++++--- 5 files changed, 49 insertions(+), 9 deletions(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 8105dd9a7..4bbbd9484 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -88,6 +88,10 @@ void BedrockServer::syncWrapper() break; } } + + // Break out of `poll` in main.cpp. + _notifyDone.push(true); + SINFO("Exiting syncWrapper"); } void BedrockServer::sync() @@ -250,7 +254,7 @@ void BedrockServer::sync() } // And set our next timeout for 1 second from now. - nextActivity = STimeNow() + STIME_US_PER_S; + nextActivity = STimeNow() + (STIME_US_PER_MS * 100); // Process any network traffic that happened. Scope this so that we can change the log prefix and have it // auto-revert when we're finished. @@ -644,10 +648,14 @@ void BedrockServer::sync() // Note: This is not an atomic operation but should not matter. Nothing should use this that can happen with no // sync thread. // If there are socket threads in existance, they can be looking at this through a syncThread copy. + SINFO("Deleting DB pool"); _dbPool = nullptr; + SINFO("Deleted DB pool"); // We're really done, store our flag so main() can be aware. + SINFO("Marking sync thread complete"); _syncThreadComplete.store(true); + SINFO("Marked sync thread complete"); } void BedrockServer::worker(int threadId) @@ -671,7 +679,7 @@ void BedrockServer::worker(int threadId) }); // Get the next one. - command = commandQueue.get(1000000); + command = commandQueue.get(100000); SAUTOPREFIX(command->request); SINFO("Dequeued command " << command->request.methodLine << " (" << command->id << ") in worker, " @@ -1346,7 +1354,10 @@ BedrockServer::~BedrockServer() { // Delete our plugins. for (auto& p : plugins) { + string name = p.second->getName(); + SINFO("Deleting " << name << "plugin."); delete p.second; + SINFO("Done deleting " << name << "plugin."); } } @@ -1362,6 +1373,9 @@ bool BedrockServer::shutdownComplete() { void BedrockServer::prePoll(fd_map& fdm) { lock_guard<mutex> lock(_portMutex); + // This will interrupt poll when we shut down. + _notifyDone.prePoll(fdm); + // Add all our ports. There are no sockets directly managed here. if (_commandPortPublic) { SFDset(fdm, _commandPortPublic->s, SREADEVTS); diff --git a/BedrockServer.h b/BedrockServer.h index d08720a41..501327322 100644 --- a/BedrockServer.h +++ b/BedrockServer.h @@ -489,4 +489,8 @@ class BedrockServer : public SQLiteServer { // We call this method whenever a node changes state void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) override; + + // This is just here to allow `poll` in main.cpp to get interrupted when the server shuts down. + // to wait up to a full second for them. + SSynchronizedQueue<bool> _notifyDone; }; diff --git a/libstuff/SSignal.cpp b/libstuff/SSignal.cpp index e4f2b86af..28339a3c3 100644 --- a/libstuff/SSignal.cpp +++ b/libstuff/SSignal.cpp @@ -130,8 +130,8 @@ void _SSignal_signalHandlerThreadFunc() { // Wait for a signal to appear. siginfo_t siginfo = {0}; struct timespec timeout; - timeout.tv_sec = 1; - timeout.tv_nsec = 0; + timeout.tv_sec = 0; + timeout.tv_nsec = 100'000; int result = -1; while (result == -1) { result = sigtimedwait(&signals, &siginfo, &timeout); @@ -159,7 +159,7 @@ void _SSignal_signalHandlerThreadFunc() { void SStopSignalThread() { _SSignal_threadStopFlag = true; if (_SSignal_threadInitialized.test_and_set()) { - // Send ourselves a singnal to interrupt our thread. + // Send ourselves a signal to interrupt our thread. SINFO("Joining signal thread."); _SSignal_signalThread.join(); _SSignal_threadInitialized.clear(); diff --git a/main.cpp b/main.cpp index b8eacd917..1683b4454 100644 --- a/main.cpp +++ b/main.cpp @@ -365,7 +365,7 @@ int main(int argc, char* argv[]) { const uint64_t now = STimeNow(); auto timeBeforePoll = chrono::steady_clock::now(); S_poll(fdm, max(nextActivity, now) - now); - nextActivity = STimeNow() + STIME_US_PER_S; // 1s max period + nextActivity = STimeNow() + STIME_US_PER_S; // 0.1s max period auto timeAfterPoll = chrono::steady_clock::now(); server.postPoll(fdm, nextActivity); auto timeAfterPostPoll = chrono::steady_clock::now(); diff --git a/test/clustertest/BedrockClusterTester.h b/test/clustertest/BedrockClusterTester.h index 78a60684b..84380bd71 100644 --- a/test/clustertest/BedrockClusterTester.h +++ b/test/clustertest/BedrockClusterTester.h @@ -145,6 +145,8 @@ ClusterTester<T>::ClusterTester(ClusterSize size, _cluster.emplace_back(args, queries, serverPort, nodePort, controlPort, false, processPath, &groupCommitCount); } + auto start = STimeNow(); + // Now start them all. list<thread> threads; for (auto it = _cluster.begin(); it != _cluster.end(); it++) { @@ -176,16 +178,36 @@ ClusterTester<T>::ClusterTester(ClusterSize size, usleep(100000); // 0.1 seconds. } } + auto end = STimeNow(); + + cout << "Took " << ((end - start) / 1000) << "ms to start cluster." << endl; } template <typename T> ClusterTester<T>::~ClusterTester() { - // Shut them down in reverse order so they don't try and stand up as leader in the middle of everything. - for (int i = _size - 1; i >= 0; i--) { - stopNode(i); + auto start = STimeNow(); + + // Shut down everything but the leader first. + list<thread> threads; + cout << "Starting shutdown at " << SCURRENT_TIMESTAMP() << endl; + for (int i = _size - 1; i > 0; i--) { + threads.emplace_back([&, i](){ + cout << "Stopping node " << i << " at " << SCURRENT_TIMESTAMP() << endl; + stopNode(i); + }); + } + for (auto& t: threads) { + t.join(); } + // Then do leader last. This is to avoid getting in a state where nodes try to stand up as leader shuts down. + cout << "Stopping node " << 0 << " at " << SCURRENT_TIMESTAMP() << endl; + stopNode(0); + + auto end = STimeNow(); + + cout << "Took " << ((end - start) / 1000) << "ms to stop cluster." << endl; _cluster.clear(); } From 2a6798d96489c76d147707adfb869c551d3d09d4 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski <tyler@expensify.com> Date: Fri, 8 Mar 2024 15:10:03 -0800 Subject: [PATCH 2/5] Remove some test code --- BedrockServer.cpp | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 4bbbd9484..2159ddf67 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -254,7 +254,7 @@ void BedrockServer::sync() } // And set our next timeout for 1 second from now. - nextActivity = STimeNow() + (STIME_US_PER_MS * 100); + nextActivity = STimeNow() + (STIME_US_PER_MS * 100); // Process any network traffic that happened. Scope this so that we can change the log prefix and have it // auto-revert when we're finished. @@ -648,14 +648,10 @@ void BedrockServer::sync() // Note: This is not an atomic operation but should not matter. Nothing should use this that can happen with no // sync thread. // If there are socket threads in existance, they can be looking at this through a syncThread copy. - SINFO("Deleting DB pool"); _dbPool = nullptr; - SINFO("Deleted DB pool"); // We're really done, store our flag so main() can be aware. - SINFO("Marking sync thread complete"); _syncThreadComplete.store(true); - SINFO("Marked sync thread complete"); } void BedrockServer::worker(int threadId) @@ -1354,10 +1350,7 @@ BedrockServer::~BedrockServer() { // Delete our plugins. for (auto& p : plugins) { - string name = p.second->getName(); - SINFO("Deleting " << name << "plugin."); delete p.second; - SINFO("Done deleting " << name << "plugin."); } } From f304516f4c10d9e10f61cd9bda41dd582721b3fa Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski <tyler@expensify.com> Date: Fri, 8 Mar 2024 15:35:46 -0800 Subject: [PATCH 3/5] More speedups --- BedrockServer.cpp | 9 ++++++++- main.cpp | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 2159ddf67..07753d6ef 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -234,6 +234,9 @@ void BedrockServer::sync() // commands, and we'll shortly run through the existing queue. if (_shutdownState.load() == CLIENTS_RESPONDED) { _syncNode->beginShutdown(); + + // This will cause us to skip the next `poll` iteration which avoids a 1 second wait. + _notifyDone.push(true); } // The fd_map contains a list of all file descriptors (eg, sockets, Unix pipes) that poll will wait on for @@ -241,6 +244,7 @@ void BedrockServer::sync() fd_map fdm; // Pre-process any sockets the sync node is managing (i.e., communication with peer nodes). + _notifyDone.prePoll(fdm); _syncNode->prePoll(fdm); // Add our command queues to our fd_map. @@ -254,7 +258,7 @@ void BedrockServer::sync() } // And set our next timeout for 1 second from now. - nextActivity = STimeNow() + (STIME_US_PER_MS * 100); + nextActivity = STimeNow() + STIME_US_PER_S; // Process any network traffic that happened. Scope this so that we can change the log prefix and have it // auto-revert when we're finished. @@ -266,6 +270,7 @@ void BedrockServer::sync() AutoTimerTime postPollTime(postPollTimer); _syncNode->postPoll(fdm, nextActivity); _syncNodeQueuedCommands.postPoll(fdm); + _notifyDone.postPoll(fdm); } // Ok, let the sync node to it's updating for as many iterations as it requires. We'll update the replication @@ -1385,6 +1390,8 @@ void BedrockServer::prePoll(fd_map& fdm) { } void BedrockServer::postPoll(fd_map& fdm, uint64_t& nextActivity) { + _notifyDone.postPoll(fdm); + // NOTE: There are no sockets managed here, just ports. // Open the port the first time we enter a command-processing state SQLiteNodeState state = _replicationState.load(); diff --git a/main.cpp b/main.cpp index 1683b4454..d61b4a1f3 100644 --- a/main.cpp +++ b/main.cpp @@ -365,7 +365,7 @@ int main(int argc, char* argv[]) { const uint64_t now = STimeNow(); auto timeBeforePoll = chrono::steady_clock::now(); S_poll(fdm, max(nextActivity, now) - now); - nextActivity = STimeNow() + STIME_US_PER_S; // 0.1s max period + nextActivity = STimeNow() + STIME_US_PER_MS * 100; // 0.1s max period auto timeAfterPoll = chrono::steady_clock::now(); server.postPoll(fdm, nextActivity); auto timeAfterPostPoll = chrono::steady_clock::now(); From 267895ee89867d35dd1db7a4b08e48bbd337266c Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski <tyler@expensify.com> Date: Fri, 8 Mar 2024 15:42:53 -0800 Subject: [PATCH 4/5] Cleanup some stuff --- main.cpp | 2 +- test/clustertest/BedrockClusterTester.h | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/main.cpp b/main.cpp index d61b4a1f3..b8eacd917 100644 --- a/main.cpp +++ b/main.cpp @@ -365,7 +365,7 @@ int main(int argc, char* argv[]) { const uint64_t now = STimeNow(); auto timeBeforePoll = chrono::steady_clock::now(); S_poll(fdm, max(nextActivity, now) - now); - nextActivity = STimeNow() + STIME_US_PER_MS * 100; // 0.1s max period + nextActivity = STimeNow() + STIME_US_PER_S; // 1s max period auto timeAfterPoll = chrono::steady_clock::now(); server.postPoll(fdm, nextActivity); auto timeAfterPostPoll = chrono::steady_clock::now(); diff --git a/test/clustertest/BedrockClusterTester.h b/test/clustertest/BedrockClusterTester.h index 84380bd71..dc1a40b38 100644 --- a/test/clustertest/BedrockClusterTester.h +++ b/test/clustertest/BedrockClusterTester.h @@ -180,7 +180,9 @@ ClusterTester<T>::ClusterTester(ClusterSize size, } auto end = STimeNow(); - cout << "Took " << ((end - start) / 1000) << "ms to start cluster." << endl; + if ((end - start) > 5000000) { + cout << "Took " << ((end - start) / 1000) << "ms to start cluster." << endl; + } } template <typename T> @@ -190,10 +192,8 @@ ClusterTester<T>::~ClusterTester() // Shut down everything but the leader first. list<thread> threads; - cout << "Starting shutdown at " << SCURRENT_TIMESTAMP() << endl; for (int i = _size - 1; i > 0; i--) { threads.emplace_back([&, i](){ - cout << "Stopping node " << i << " at " << SCURRENT_TIMESTAMP() << endl; stopNode(i); }); } @@ -202,12 +202,13 @@ ClusterTester<T>::~ClusterTester() } // Then do leader last. This is to avoid getting in a state where nodes try to stand up as leader shuts down. - cout << "Stopping node " << 0 << " at " << SCURRENT_TIMESTAMP() << endl; stopNode(0); auto end = STimeNow(); - cout << "Took " << ((end - start) / 1000) << "ms to stop cluster." << endl; + if ((end - start) > 5000000) { + cout << "Took " << ((end - start) / 1000) << "ms to stop cluster." << endl; + } _cluster.clear(); } From fdfe60b36a2819f9f35139b8a603d86ab164c408 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski <tyler@expensify.com> Date: Fri, 8 Mar 2024 16:14:38 -0800 Subject: [PATCH 5/5] Also shorten the retry for startup --- libstuff/SSignal.cpp | 2 +- sqlitecluster/SQLitePeer.cpp | 2 +- test/clustertest/BedrockClusterTester.h | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/libstuff/SSignal.cpp b/libstuff/SSignal.cpp index 28339a3c3..57e82759b 100644 --- a/libstuff/SSignal.cpp +++ b/libstuff/SSignal.cpp @@ -131,7 +131,7 @@ void _SSignal_signalHandlerThreadFunc() { siginfo_t siginfo = {0}; struct timespec timeout; timeout.tv_sec = 0; - timeout.tv_nsec = 100'000; + timeout.tv_nsec = 100'000'000; // 100ms in ns. int result = -1; while (result == -1) { result = sigtimedwait(&signals, &siginfo, &timeout); diff --git a/sqlitecluster/SQLitePeer.cpp b/sqlitecluster/SQLitePeer.cpp index 4b8a12bc1..f2e4199e8 100644 --- a/sqlitecluster/SQLitePeer.cpp +++ b/sqlitecluster/SQLitePeer.cpp @@ -83,7 +83,7 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA } case STCPManager::Socket::CLOSED: { // Done; clean up and try to reconnect - uint64_t delay = SRandom::rand64() % (STIME_US_PER_S * 5); + uint64_t delay = SRandom::rand64() % (STIME_US_PER_S * 1); if (socket->connectFailure) { SINFO("SQLitePeer connection failed after " << (STimeNow() - socket->openTime) / 1000 << "ms, reconnecting in " << delay / 1000 << "ms"); } else { diff --git a/test/clustertest/BedrockClusterTester.h b/test/clustertest/BedrockClusterTester.h index dc1a40b38..000752287 100644 --- a/test/clustertest/BedrockClusterTester.h +++ b/test/clustertest/BedrockClusterTester.h @@ -153,6 +153,7 @@ ClusterTester<T>::ClusterTester(ClusterSize size, threads.emplace_back([it](){ it->startServer(); }); + usleep(100'000); } for (auto& i : threads) { i.join();