diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 8105dd9a7..07753d6ef 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -88,6 +88,10 @@ void BedrockServer::syncWrapper() break; } } + + // Break out of `poll` in main.cpp. + _notifyDone.push(true); + SINFO("Exiting syncWrapper"); } void BedrockServer::sync() @@ -230,6 +234,9 @@ void BedrockServer::sync() // commands, and we'll shortly run through the existing queue. if (_shutdownState.load() == CLIENTS_RESPONDED) { _syncNode->beginShutdown(); + + // This will cause us to skip the next `poll` iteration which avoids a 1 second wait. + _notifyDone.push(true); } // The fd_map contains a list of all file descriptors (eg, sockets, Unix pipes) that poll will wait on for @@ -237,6 +244,7 @@ void BedrockServer::sync() fd_map fdm; // Pre-process any sockets the sync node is managing (i.e., communication with peer nodes). + _notifyDone.prePoll(fdm); _syncNode->prePoll(fdm); // Add our command queues to our fd_map. @@ -262,6 +270,7 @@ void BedrockServer::sync() AutoTimerTime postPollTime(postPollTimer); _syncNode->postPoll(fdm, nextActivity); _syncNodeQueuedCommands.postPoll(fdm); + _notifyDone.postPoll(fdm); } // Ok, let the sync node to it's updating for as many iterations as it requires. We'll update the replication @@ -671,7 +680,7 @@ void BedrockServer::worker(int threadId) }); // Get the next one. - command = commandQueue.get(1000000); + command = commandQueue.get(100000); SAUTOPREFIX(command->request); SINFO("Dequeued command " << command->request.methodLine << " (" << command->id << ") in worker, " @@ -1362,6 +1371,9 @@ bool BedrockServer::shutdownComplete() { void BedrockServer::prePoll(fd_map& fdm) { lock_guard lock(_portMutex); + // This will interrupt poll when we shut down. + _notifyDone.prePoll(fdm); + // Add all our ports. There are no sockets directly managed here. if (_commandPortPublic) { SFDset(fdm, _commandPortPublic->s, SREADEVTS); @@ -1378,6 +1390,8 @@ void BedrockServer::prePoll(fd_map& fdm) { } void BedrockServer::postPoll(fd_map& fdm, uint64_t& nextActivity) { + _notifyDone.postPoll(fdm); + // NOTE: There are no sockets managed here, just ports. // Open the port the first time we enter a command-processing state SQLiteNodeState state = _replicationState.load(); diff --git a/BedrockServer.h b/BedrockServer.h index d08720a41..501327322 100644 --- a/BedrockServer.h +++ b/BedrockServer.h @@ -489,4 +489,8 @@ class BedrockServer : public SQLiteServer { // We call this method whenever a node changes state void notifyStateChangeToPlugins(SQLite& db, SQLiteNodeState newState) override; + + // This is just here to allow `poll` in main.cpp to get interrupted when the server shuts down. + // to wait up to a full second for them. + SSynchronizedQueue _notifyDone; }; diff --git a/libstuff/SSignal.cpp b/libstuff/SSignal.cpp index e4f2b86af..57e82759b 100644 --- a/libstuff/SSignal.cpp +++ b/libstuff/SSignal.cpp @@ -130,8 +130,8 @@ void _SSignal_signalHandlerThreadFunc() { // Wait for a signal to appear. siginfo_t siginfo = {0}; struct timespec timeout; - timeout.tv_sec = 1; - timeout.tv_nsec = 0; + timeout.tv_sec = 0; + timeout.tv_nsec = 100'000'000; // 100ms in ns. int result = -1; while (result == -1) { result = sigtimedwait(&signals, &siginfo, &timeout); @@ -159,7 +159,7 @@ void _SSignal_signalHandlerThreadFunc() { void SStopSignalThread() { _SSignal_threadStopFlag = true; if (_SSignal_threadInitialized.test_and_set()) { - // Send ourselves a singnal to interrupt our thread. + // Send ourselves a signal to interrupt our thread. SINFO("Joining signal thread."); _SSignal_signalThread.join(); _SSignal_threadInitialized.clear(); diff --git a/sqlitecluster/SQLitePeer.cpp b/sqlitecluster/SQLitePeer.cpp index 4b8a12bc1..f2e4199e8 100644 --- a/sqlitecluster/SQLitePeer.cpp +++ b/sqlitecluster/SQLitePeer.cpp @@ -83,7 +83,7 @@ SQLitePeer::PeerPostPollStatus SQLitePeer::postPoll(fd_map& fdm, uint64_t& nextA } case STCPManager::Socket::CLOSED: { // Done; clean up and try to reconnect - uint64_t delay = SRandom::rand64() % (STIME_US_PER_S * 5); + uint64_t delay = SRandom::rand64() % (STIME_US_PER_S * 1); if (socket->connectFailure) { SINFO("SQLitePeer connection failed after " << (STimeNow() - socket->openTime) / 1000 << "ms, reconnecting in " << delay / 1000 << "ms"); } else { diff --git a/test/clustertest/BedrockClusterTester.h b/test/clustertest/BedrockClusterTester.h index 78a60684b..000752287 100644 --- a/test/clustertest/BedrockClusterTester.h +++ b/test/clustertest/BedrockClusterTester.h @@ -145,12 +145,15 @@ ClusterTester::ClusterTester(ClusterSize size, _cluster.emplace_back(args, queries, serverPort, nodePort, controlPort, false, processPath, &groupCommitCount); } + auto start = STimeNow(); + // Now start them all. list threads; for (auto it = _cluster.begin(); it != _cluster.end(); it++) { threads.emplace_back([it](){ it->startServer(); }); + usleep(100'000); } for (auto& i : threads) { i.join(); @@ -176,16 +179,37 @@ ClusterTester::ClusterTester(ClusterSize size, usleep(100000); // 0.1 seconds. } } + auto end = STimeNow(); + + if ((end - start) > 5000000) { + cout << "Took " << ((end - start) / 1000) << "ms to start cluster." << endl; + } } template ClusterTester::~ClusterTester() { - // Shut them down in reverse order so they don't try and stand up as leader in the middle of everything. - for (int i = _size - 1; i >= 0; i--) { - stopNode(i); + auto start = STimeNow(); + + // Shut down everything but the leader first. + list threads; + for (int i = _size - 1; i > 0; i--) { + threads.emplace_back([&, i](){ + stopNode(i); + }); + } + for (auto& t: threads) { + t.join(); } + // Then do leader last. This is to avoid getting in a state where nodes try to stand up as leader shuts down. + stopNode(0); + + auto end = STimeNow(); + + if ((end - start) > 5000000) { + cout << "Took " << ((end - start) / 1000) << "ms to stop cluster." << endl; + } _cluster.clear(); }