diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 84ae94d08..0bf9fe777 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -777,10 +777,6 @@ void BedrockServer::runCommand(unique_ptr&& _command, bool isBlo (_blacklistedParallelCommands.find(command->request.methodLine) == _blacklistedParallelCommands.end()); } - // More checks for parallel writing. - canWriteParallel = canWriteParallel && (getState() == SQLiteNodeState::LEADING); - canWriteParallel = canWriteParallel && (command->writeConsistency == SQLiteNode::ASYNC); - int64_t lastConflictPage = 0; while (true) { @@ -797,6 +793,10 @@ void BedrockServer::runCommand(unique_ptr&& _command, bool isBlo SINFO("Waited for " << waitCount << " loops for node to be ready."); } + // More checks for parallel writing. + canWriteParallel = canWriteParallel && (getState() == SQLiteNodeState::LEADING); + canWriteParallel = canWriteParallel && (command->writeConsistency == SQLiteNode::ASYNC); + // If there are outstanding HTTPS requests on this command (from a previous call to `peek`) we process them here. size_t networkLoopCount = 0; uint64_t postPollCumulativeTime = 0; diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 6cc77cf94..2070b657a 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -606,7 +606,7 @@ bool SQLiteNode::update() { // Find the freshest non-broken peer (including permafollowers). if (peer->loggedIn) { if (_forkedFrom.count(peer->name)) { - SWARN("Hash mismatch. Forked from peer " << peer->name << " so not considering it."); + SWARN("Hash mismatch. Forked from peer " << peer->name << " so not considering it." << _getLostQuorumLogMessage()); continue; } @@ -829,6 +829,7 @@ bool SQLiteNode::update() { size_t numFullPeers = 0; size_t numLoggedInFullPeers = 0; size_t approveCount = 0; + size_t abstainCount = 0; if (_isShuttingDown) { SINFO("Shutting down while standing up, setting state to SEARCHING"); _changeState(SQLiteNodeState::SEARCHING); @@ -850,6 +851,7 @@ bool SQLiteNode::update() { break; } else if (peer->standupResponse == SQLitePeer::Response::ABSTAIN) { PHMMM("Peer abstained from participation in quorum"); + abstainCount++; } else if (peer->standupResponse == SQLitePeer::Response::DENY) { // It responeded, but didn't approve -- abort PHMMM("Refused our STANDUP, cancel and RE-SEARCH"); @@ -862,6 +864,16 @@ bool SQLiteNode::update() { } } + // If the majority of full peers responds with abstain, then re-search. + const bool majorityAbstained = abstainCount * 2 > numFullPeers; + if (majorityAbstained) { + // Majority abstained, meaning we're probably forked, + // so we go back to searching so we can go back to synchronizing and see if we're forked. + SHMMM("Majority of full peers abstained; re-SEARCHING."); + _changeState(SQLiteNodeState::SEARCHING); + return true; // Re-update + } + // If everyone's responded with approval and we form a majority, then finish standup. bool majorityConnected = numLoggedInFullPeers * 2 >= numFullPeers; bool quorumApproved = approveCount * 2 >= numFullPeers; @@ -1591,12 +1603,14 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { uint64_t commitNum = SToUInt64(message["hashMismatchNumber"]); _db.getCommits(commitNum, commitNum, result); _forkedFrom.insert(peer->name); + SALERT("Hash mismatch. Peer " << peer->name << " and I have forked at commit " << message["hashMismatchNumber"] << ". I have forked from " << _forkedFrom.size() << " other nodes. I am " << stateName(_state) - << " and have hash " << result[0][0] << " for that commit. Peer has hash " << message["hashMismatchValue"] << "."); + << " and have hash " << result[0][0] << " for that commit. Peer has hash " << message["hashMismatchValue"] << "." + << _getLostQuorumLogMessage()); if (_forkedFrom.size() > ((_peerList.size() + 1) / 2)) { - SERROR("Hash mismatch. I have forked from over half the cluster. This is unrecoverable."); + SERROR("Hash mismatch. I have forked from over half the cluster. This is unrecoverable." << _getLostQuorumLogMessage()); } STHROW("Hash mismatch"); @@ -1891,6 +1905,9 @@ void SQLiteNode::_onDisconnect(SQLitePeer* peer) { // It works for the sync thread as well, as there's handling in _changeState to rollback a commit when // dropping out of leading or standing down (and there can't be commits in progress in other states). SWARN("[clustersync] We were " << stateName(_state) << " but lost quorum (Disconnected from " << peer->name << "). Going to SEARCHING."); + + // Store the time at which this happened for diagnostic purposes. + _lastLostQuorum = STimeNow(); for (const auto* p : _peerList) { SWARN("[clustersync] Peer " << p->name << " logged in? " << (p->loggedIn ? "TRUE" : "FALSE") << (p->permaFollower ? " (permaFollower)" : "")); } @@ -2025,7 +2042,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance // loop. It's entirely possible that we do this for valid reasons - it may be the peer that has the bad database and not us, and there are plenty of other reasons we could switch to // SEARCHING, but in those cases, we just wait an extra second before trying again. if (newState == SQLiteNodeState::SEARCHING && _forkedFrom.size()) { - SWARN("Going searching while forked peers present, sleeping 1 second."); + SWARN("Going searching while forked peers present, sleeping 1 second." << _getLostQuorumLogMessage()); sleep(1); } @@ -2140,7 +2157,7 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee uint64_t fromIndex = peerCommitCount + 1; uint64_t toIndex = targetCommit; if (sendAll) { - SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex); + SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex); } else { toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time } @@ -2785,3 +2802,13 @@ void SQLiteNode::kill() { peer->reset(); } } + +string SQLiteNode::_getLostQuorumLogMessage() const { + string lostQuorumMessage; + if (_lastLostQuorum) { + lostQuorumMessage = " Lost Quorum at: " + STIMESTAMP_MS(_lastLostQuorum) + " (" + + to_string((double)(STimeNow() - _lastLostQuorum) / 1000000.0) + " seconds ago)."; + } + + return lostQuorumMessage; +} diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index e87090d68..5f9e6e3e0 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -228,6 +228,8 @@ class SQLiteNode : public STCPManager { void _changeState(SQLiteNodeState newState, uint64_t commitIDToCancelAfter = 0); + string _getLostQuorumLogMessage() const; + // Handlers for transaction messages. void _handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SData& message, bool wasConflict); void _handlePrepareTransaction(SQLite& db, SQLitePeer* peer, const SData& message, uint64_t dequeueTime, uint64_t threadStartTime); @@ -320,6 +322,10 @@ class SQLiteNode : public STCPManager { // Set to true to indicate we're attempting to shut down. atomic _isShuttingDown; + // When we spontaneously lose quorum (due to an unexpected node disconnection) we log the time. Later, if we detect we've forked, + // We show this time in a log line as a diagnostic message. + atomic _lastLostQuorum = 0; + // Store the ID of the last transaction that we replicated to peers. Whenever we do an update, we will try and send // any new committed transactions to peers, and update this value. uint64_t _lastSentTransactionID;