Skip to content

Commit

Permalink
Merge pull request #1767 from Expensify/dsilva_useCorrectCancelAfterW…
Browse files Browse the repository at this point in the history
…henResourcesAreExhausted

Use correct cancelAfter broadcast when resources are exhausted
  • Loading branch information
tylerkaraszewski authored Jun 4, 2024
2 parents 1b961da + d4fbfc9 commit 69a1ba7
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
10 changes: 8 additions & 2 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1716,6 +1716,12 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
uint64_t threadAttemptStartTimestamp = STimeNow();
thread(&SQLiteNode::_replicate, this, peer, message, _dbPool->getIndex(false), threadAttemptStartTimestamp).detach();
} catch (const system_error& e) {
// If the server is strugling and falling behind on replication, we might have too many threads
// causing a resource exhaustion. If that happens, all the transactions that are already threaded
// and waiting for the transaction that failed will be stuck in an infinite loop. To prevent that
// we're changing the state to SEARCHING and sending the cancelAfter property to drop all threads
// that depend on the transaction that failed to be threaded.
_changeState(SQLiteNodeState::SEARCHING, message.calcU64("NewCount") - 1);
SWARN("Caught system_error starting _replicate thread with " << _replicationThreadCount.load() << " threads. e.what()=" << e.what());
STHROW("Error starting replicate thread so giving up and reconnecting.");
}
Expand Down Expand Up @@ -1921,7 +1927,7 @@ void SQLiteNode::_sendToAllPeers(const SData& message, bool subscribedOnly) {
}
}

void SQLiteNode::_changeState(SQLiteNodeState newState) {
void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCancelAfter) {
SINFO("[NOTIFY] setting commit count to: " << _db.getCommitCount());
_localCommitNotifier.notifyThrough(_db.getCommitCount());

Expand All @@ -1932,7 +1938,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState) {
// If we were following, and now we're not, we give up an any replications.
if (_state == SQLiteNodeState::FOLLOWING) {
_replicationThreadsShouldExit = true;
uint64_t cancelAfter = _leaderCommitNotifier.getValue();
uint64_t cancelAfter = commitIDToCancelAfter ? commitIDToCancelAfter : _leaderCommitNotifier.getValue();
SINFO("Replication threads should exit, canceling commits after current leader commit " << cancelAfter);
_localCommitNotifier.cancel(cancelAfter);
_leaderCommitNotifier.cancel(cancelAfter);
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class SQLiteNode : public STCPManager {
// Add required headers for messages being sent to peers.
SData _addPeerHeaders(SData message);

void _changeState(SQLiteNodeState newState);
void _changeState(SQLiteNodeState newState, uint64_t commitIDToCancelAfter = 0);

// Handlers for transaction messages.
void _handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SData& message, bool wasConflict);
Expand Down

0 comments on commit 69a1ba7

Please sign in to comment.