Skip to content

Commit

Permalink
Merge pull request #1876 from Expensify/main
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
deetergp authored Sep 23, 2024
2 parents 28cbde8 + cedf037 commit 633999e
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 9 deletions.
8 changes: 4 additions & 4 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -777,10 +777,6 @@ void BedrockServer::runCommand(unique_ptr<BedrockCommand>&& _command, bool isBlo
(_blacklistedParallelCommands.find(command->request.methodLine) == _blacklistedParallelCommands.end());
}

// More checks for parallel writing.
canWriteParallel = canWriteParallel && (getState() == SQLiteNodeState::LEADING);
canWriteParallel = canWriteParallel && (command->writeConsistency == SQLiteNode::ASYNC);

int64_t lastConflictPage = 0;
while (true) {

Expand All @@ -797,6 +793,10 @@ void BedrockServer::runCommand(unique_ptr<BedrockCommand>&& _command, bool isBlo
SINFO("Waited for " << waitCount << " loops for node to be ready.");
}

// More checks for parallel writing.
canWriteParallel = canWriteParallel && (getState() == SQLiteNodeState::LEADING);
canWriteParallel = canWriteParallel && (command->writeConsistency == SQLiteNode::ASYNC);

// If there are outstanding HTTPS requests on this command (from a previous call to `peek`) we process them here.
size_t networkLoopCount = 0;
uint64_t postPollCumulativeTime = 0;
Expand Down
37 changes: 32 additions & 5 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ bool SQLiteNode::update() {
// Find the freshest non-broken peer (including permafollowers).
if (peer->loggedIn) {
if (_forkedFrom.count(peer->name)) {
SWARN("Hash mismatch. Forked from peer " << peer->name << " so not considering it.");
SWARN("Hash mismatch. Forked from peer " << peer->name << " so not considering it." << _getLostQuorumLogMessage());
continue;
}

Expand Down Expand Up @@ -829,6 +829,7 @@ bool SQLiteNode::update() {
size_t numFullPeers = 0;
size_t numLoggedInFullPeers = 0;
size_t approveCount = 0;
size_t abstainCount = 0;
if (_isShuttingDown) {
SINFO("Shutting down while standing up, setting state to SEARCHING");
_changeState(SQLiteNodeState::SEARCHING);
Expand All @@ -850,6 +851,7 @@ bool SQLiteNode::update() {
break;
} else if (peer->standupResponse == SQLitePeer::Response::ABSTAIN) {
PHMMM("Peer abstained from participation in quorum");
abstainCount++;
} else if (peer->standupResponse == SQLitePeer::Response::DENY) {
// It responeded, but didn't approve -- abort
PHMMM("Refused our STANDUP, cancel and RE-SEARCH");
Expand All @@ -862,6 +864,16 @@ bool SQLiteNode::update() {
}
}

// If the majority of full peers responds with abstain, then re-search.
const bool majorityAbstained = abstainCount * 2 > numFullPeers;
if (majorityAbstained) {
// Majority abstained, meaning we're probably forked,
// so we go back to searching so we can go back to synchronizing and see if we're forked.
SHMMM("Majority of full peers abstained; re-SEARCHING.");
_changeState(SQLiteNodeState::SEARCHING);
return true; // Re-update
}

// If everyone's responded with approval and we form a majority, then finish standup.
bool majorityConnected = numLoggedInFullPeers * 2 >= numFullPeers;
bool quorumApproved = approveCount * 2 >= numFullPeers;
Expand Down Expand Up @@ -1591,12 +1603,14 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
uint64_t commitNum = SToUInt64(message["hashMismatchNumber"]);
_db.getCommits(commitNum, commitNum, result);
_forkedFrom.insert(peer->name);

SALERT("Hash mismatch. Peer " << peer->name << " and I have forked at commit " << message["hashMismatchNumber"]
<< ". I have forked from " << _forkedFrom.size() << " other nodes. I am " << stateName(_state)
<< " and have hash " << result[0][0] << " for that commit. Peer has hash " << message["hashMismatchValue"] << ".");
<< " and have hash " << result[0][0] << " for that commit. Peer has hash " << message["hashMismatchValue"] << "."
<< _getLostQuorumLogMessage());

if (_forkedFrom.size() > ((_peerList.size() + 1) / 2)) {
SERROR("Hash mismatch. I have forked from over half the cluster. This is unrecoverable.");
SERROR("Hash mismatch. I have forked from over half the cluster. This is unrecoverable." << _getLostQuorumLogMessage());
}

STHROW("Hash mismatch");
Expand Down Expand Up @@ -1891,6 +1905,9 @@ void SQLiteNode::_onDisconnect(SQLitePeer* peer) {
// It works for the sync thread as well, as there's handling in _changeState to rollback a commit when
// dropping out of leading or standing down (and there can't be commits in progress in other states).
SWARN("[clustersync] We were " << stateName(_state) << " but lost quorum (Disconnected from " << peer->name << "). Going to SEARCHING.");

// Store the time at which this happened for diagnostic purposes.
_lastLostQuorum = STimeNow();
for (const auto* p : _peerList) {
SWARN("[clustersync] Peer " << p->name << " logged in? " << (p->loggedIn ? "TRUE" : "FALSE") << (p->permaFollower ? " (permaFollower)" : ""));
}
Expand Down Expand Up @@ -2025,7 +2042,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance
// loop. It's entirely possible that we do this for valid reasons - it may be the peer that has the bad database and not us, and there are plenty of other reasons we could switch to
// SEARCHING, but in those cases, we just wait an extra second before trying again.
if (newState == SQLiteNodeState::SEARCHING && _forkedFrom.size()) {
SWARN("Going searching while forked peers present, sleeping 1 second.");
SWARN("Going searching while forked peers present, sleeping 1 second." << _getLostQuorumLogMessage());
sleep(1);
}

Expand Down Expand Up @@ -2140,7 +2157,7 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee
uint64_t fromIndex = peerCommitCount + 1;
uint64_t toIndex = targetCommit;
if (sendAll) {
SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex);
SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex);
} else {
toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time
}
Expand Down Expand Up @@ -2785,3 +2802,13 @@ void SQLiteNode::kill() {
peer->reset();
}
}

string SQLiteNode::_getLostQuorumLogMessage() const {
string lostQuorumMessage;
if (_lastLostQuorum) {
lostQuorumMessage = " Lost Quorum at: " + STIMESTAMP_MS(_lastLostQuorum) + " (" +
to_string((double)(STimeNow() - _lastLostQuorum) / 1000000.0) + " seconds ago).";
}

return lostQuorumMessage;
}
6 changes: 6 additions & 0 deletions sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ class SQLiteNode : public STCPManager {

void _changeState(SQLiteNodeState newState, uint64_t commitIDToCancelAfter = 0);

string _getLostQuorumLogMessage() const;

// Handlers for transaction messages.
void _handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SData& message, bool wasConflict);
void _handlePrepareTransaction(SQLite& db, SQLitePeer* peer, const SData& message, uint64_t dequeueTime, uint64_t threadStartTime);
Expand Down Expand Up @@ -320,6 +322,10 @@ class SQLiteNode : public STCPManager {
// Set to true to indicate we're attempting to shut down.
atomic<bool> _isShuttingDown;

// When we spontaneously lose quorum (due to an unexpected node disconnection) we log the time. Later, if we detect we've forked,
// We show this time in a log line as a diagnostic message.
atomic<uint64_t> _lastLostQuorum = 0;

// Store the ID of the last transaction that we replicated to peers. Whenever we do an update, we will try and send
// any new committed transactions to peers, and update this value.
uint64_t _lastSentTransactionID;
Expand Down

0 comments on commit 633999e

Please sign in to comment.