From 6d1d40bfd7eebf7e5fcc504a60adcdb34ab8e8c2 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 21 Jun 2024 10:08:12 -0700 Subject: [PATCH 1/7] First adjustment. Compiles --- sqlitecluster/SQLite.cpp | 8 +++++++- sqlitecluster/SQLiteNode.cpp | 11 ++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/sqlitecluster/SQLite.cpp b/sqlitecluster/SQLite.cpp index 738e1a846..04864b685 100644 --- a/sqlitecluster/SQLite.cpp +++ b/sqlitecluster/SQLite.cpp @@ -881,7 +881,13 @@ bool SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result) (toIndex ? " AND id <= " + SQ(toIndex) : "")}); SDEBUG("Getting commits #" << fromIndex << "-" << toIndex); query = "SELECT hash, query FROM (" + query + ") ORDER BY id"; - return !SQuery(_db, "getting commits", query, result); + + // Set timeout to 10 seconds. + _timeoutLimit = STimeNow() + 10'000'000; + int queryResult = SQuery(_db, "getting commits", query, result); + _timeoutLimit = 0; + + return !queryResult; } int64_t SQLite::getLastInsertRowID() { diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 1d20e46c8..d3c6ca9fd 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2132,12 +2132,17 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee // Figure out how much to send it uint64_t fromIndex = peerCommitCount + 1; uint64_t toIndex = targetCommit; - if (!sendAll) + if (sendAll) { + SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex); + } else { toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time - if (!db.getCommits(fromIndex, toIndex, result)) + } + if (!db.getCommits(fromIndex, toIndex, result)) { STHROW("error getting commits"); - if ((uint64_t)result.size() != toIndex - fromIndex + 1) + } + if ((uint64_t)result.size() != toIndex - fromIndex + 1) { STHROW("mismatched commit count"); + } // Wrap everything into one huge message PINFO("Synchronizing commits from " << peerCommitCount + 1 << "-" << targetCommit); From ff798a0fd44edeb0021762ae5137d0576b333529 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 21 Jun 2024 10:55:00 -0700 Subject: [PATCH 2/7] Tyurn limits way down to trigger --- sqlitecluster/SQLite.cpp | 13 ++++++------- sqlitecluster/SQLite.h | 2 +- sqlitecluster/SQLiteNode.cpp | 15 +++++++++++++-- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/sqlitecluster/SQLite.cpp b/sqlitecluster/SQLite.cpp index 04864b685..0335bd501 100644 --- a/sqlitecluster/SQLite.cpp +++ b/sqlitecluster/SQLite.cpp @@ -211,7 +211,7 @@ void SQLite::commonConstructorInitialization(bool hctree) { // I tested and found that we could set about 10,000,000 and the number of steps to run and get a callback once a // second. This is set to be a bit more granular than that, which is probably adequate. - sqlite3_progress_handler(_db, 1'000'000, _progressHandlerCallback, this); + sqlite3_progress_handler(_db, 100, _progressHandlerCallback, this); // Setting a wal hook prevents auto-checkpointing. sqlite3_wal_hook(_db, _walHookCallback, this); @@ -874,20 +874,19 @@ string SQLite::getCommittedHash() { return _sharedData.lastCommittedHash.load(); } -bool SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result) { +int SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS) { // Look up all the queries within that range SASSERTWARN(SWITHIN(1, fromIndex, toIndex)); string query = _getJournalQuery({"SELECT id, hash, query FROM", "WHERE id >= " + SQ(fromIndex) + (toIndex ? " AND id <= " + SQ(toIndex) : "")}); SDEBUG("Getting commits #" << fromIndex << "-" << toIndex); query = "SELECT hash, query FROM (" + query + ") ORDER BY id"; - - // Set timeout to 10 seconds. - _timeoutLimit = STimeNow() + 10'000'000; + if (timeoutLimitUS) { + _timeoutLimit = STimeNow() + timeoutLimitUS; + } int queryResult = SQuery(_db, "getting commits", query, result); _timeoutLimit = 0; - - return !queryResult; + return queryResult; } int64_t SQLite::getLastInsertRowID() { diff --git a/sqlitecluster/SQLite.h b/sqlitecluster/SQLite.h index ae9e7b711..1dc6d1ad6 100644 --- a/sqlitecluster/SQLite.h +++ b/sqlitecluster/SQLite.h @@ -221,7 +221,7 @@ class SQLite { static bool getCommit(sqlite3* db, const vector journalNames, uint64_t index, string& query, string& hash); // Looks up a range of commits. - bool getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result); + int getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS = 0); // Set a time limit for this transaction, in US from the current time. void setTimeout(uint64_t timeLimitUS); diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index d3c6ca9fd..32f86efda 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2132,13 +2132,24 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee // Figure out how much to send it uint64_t fromIndex = peerCommitCount + 1; uint64_t toIndex = targetCommit; + uint64_t timeoutLimitUS = 0; if (sendAll) { SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex); + + // We set this for all commits because this only gets all commits in response to SUBSCRIBE, which is done synchronously, and blocks the commit thread. + // For asynchronous queries, there's nothing being blocked, so it doesn't much matter how long these take. + // This is really not the correct encapsulation for this, but we can improve that later. + timeoutLimitUS = 100; } else { toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time } - if (!db.getCommits(fromIndex, toIndex, result)) { - STHROW("error getting commits"); + int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutLimitUS); + if (resultCode) { + if (resultCode == SQLITE_INTERRUPT) { + SWARN("Timed out while running synchronization query."); + } else { + STHROW("error getting commits"); + } } if ((uint64_t)result.size() != toIndex - fromIndex + 1) { STHROW("mismatched commit count"); From 4236694eb6bdf194a917b0beb23fd20e563f70e7 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 21 Jun 2024 11:03:59 -0700 Subject: [PATCH 3/7] Hack to make it fail once then succeed --- sqlitecluster/SQLiteNode.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 32f86efda..8228d29ee 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2083,6 +2083,8 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance } } +static int __ATTEMPTS = 0; + void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) { // We need this to check the state of the node, and we also need `name` to make the logging macros work in a static // function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that, @@ -2139,7 +2141,12 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee // We set this for all commits because this only gets all commits in response to SUBSCRIBE, which is done synchronously, and blocks the commit thread. // For asynchronous queries, there's nothing being blocked, so it doesn't much matter how long these take. // This is really not the correct encapsulation for this, but we can improve that later. - timeoutLimitUS = 100; + if (__ATTEMPTS) { + timeoutLimitUS = 10'000; + } else { + timeoutLimitUS = 100; + } + __ATTEMPTS++; } else { toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time } From 8663412f659b8ed7211018970f7d97eb762607eb Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 21 Jun 2024 11:18:54 -0700 Subject: [PATCH 4/7] Allow for failed subscribe, switch back to synchronizing --- sqlitecluster/SQLiteNode.cpp | 45 ++++++++++++++++++++++-------------- sqlitecluster/SQLiteNode.h | 3 ++- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 8228d29ee..75da036f0 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1660,7 +1660,11 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } PINFO("Received SUBSCRIBE, accepting new follower"); SData response("SUBSCRIPTION_APPROVED"); - _queueSynchronize(this, peer, _db, response, true); // Send everything it's missing + bool syncSucess = _queueSynchronize(this, peer, _db, response, true); // Send everything it's missing + if (!syncSucess) { + SWARN("Failed generating sync response to SUBSCRIBE"); + response.methodLine = "SUBSCRIPTION_PENDING"; + } _sendToPeer(peer, response); SASSERTWARN(!peer->subscribed); peer->subscribed = true; @@ -1683,7 +1687,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { transaction.content = _db.getUncommittedQuery(); _sendToPeer(peer, transaction); } - } else if (SIEquals(message.methodLine, "SUBSCRIPTION_APPROVED")) { + } else if (SIEquals(message.methodLine, "SUBSCRIPTION_APPROVED") || SIEquals(message.methodLine, "SUBSCRIPTION_PENDING")) { // SUBSCRIPTION_APPROVED: Sent by a follower's new leader to complete the subscription process. Includes zero or // more COMMITS that should be immediately applied to the database. if (_state != SQLiteNodeState::SUBSCRIBING) { @@ -1692,19 +1696,24 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { if (_leadPeer != peer) { STHROW("not subscribing to you"); } - SINFO("Received SUBSCRIPTION_APPROVED, final synchronization."); - try { - // Done synchronizing - _recvSynchronize(peer, message); - SINFO("Subscription complete, at commitCount #" << _db.getCommitCount() << " (" << _db.getCommittedHash() - << "), FOLLOWING"); - _changeState(SQLiteNodeState::FOLLOWING); - } catch (const SException& e) { - // Transaction failed - SWARN("Subscription failed '" << e.what() << "', reconnecting to leader and re-SEARCHING."); - _reconnectPeer(_leadPeer); - _changeState(SQLiteNodeState::SEARCHING); - throw e; + if (SIEquals(message.methodLine, "SUBSCRIPTION_PENDING")) { + // This subscription did not actually succeed, but not because we did anything wrong. The most useful course of action here is to switch to synchronizing and try again. + _changeState(SQLiteNodeState::SYNCHRONIZING); + } else { + SINFO("Received SUBSCRIPTION_APPROVED, final synchronization."); + try { + // Done synchronizing + _recvSynchronize(peer, message); + SINFO("Subscription complete, at commitCount #" << _db.getCommitCount() << " (" << _db.getCommittedHash() + << "), FOLLOWING"); + _changeState(SQLiteNodeState::FOLLOWING); + } catch (const SException& e) { + // Transaction failed + SWARN("Subscription failed '" << e.what() << "', reconnecting to leader and re-SEARCHING."); + _reconnectPeer(_leadPeer); + _changeState(SQLiteNodeState::SEARCHING); + throw e; + } } } else if (SIEquals(message.methodLine, "BEGIN_TRANSACTION") || SIEquals(message.methodLine, "COMMIT_TRANSACTION") || SIEquals(message.methodLine, "ROLLBACK_TRANSACTION")) { if (_replicationThreadsShouldExit) { @@ -2085,7 +2094,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance static int __ATTEMPTS = 0; -void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) { +bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) { // We need this to check the state of the node, and we also need `name` to make the logging macros work in a static // function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that, // so we are just going to rely on the signal handling for sigsegv to log that for you. Don't do that. @@ -2111,7 +2120,7 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee response["hashMismatchValue"] = myHash; response["hashMismatchNumber"] = to_string(peerCommitCount); - return; + return false; } PINFO("Latest commit hash matches our records, beginning synchronization."); } else { @@ -2175,6 +2184,8 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee response.content += commit.serialize(); } } + + return true; } void SQLiteNode::_recvSynchronize(SQLitePeer* peer, const SData& message) { diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 8212ae18a..b319d8dc5 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -216,7 +216,8 @@ class SQLiteNode : public STCPManager { // Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the // *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker // thread with a different DB object) - which is why this is static. - static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll); + // Returns true on success, false on failure. + static bool _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll); bool _isNothingBlockingShutdown() const; bool _majoritySubscribed() const; From e14609373ed6311c21baab46dd6fb51b49cec82f Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 21 Jun 2024 12:47:26 -0700 Subject: [PATCH 5/7] Simplify solution --- sqlitecluster/SQLiteNode.cpp | 48 ++++++++++++++---------------------- sqlitecluster/SQLiteNode.h | 3 +-- 2 files changed, 19 insertions(+), 32 deletions(-) diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 75da036f0..19b27283b 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1660,11 +1660,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } PINFO("Received SUBSCRIBE, accepting new follower"); SData response("SUBSCRIPTION_APPROVED"); - bool syncSucess = _queueSynchronize(this, peer, _db, response, true); // Send everything it's missing - if (!syncSucess) { - SWARN("Failed generating sync response to SUBSCRIBE"); - response.methodLine = "SUBSCRIPTION_PENDING"; - } + _queueSynchronize(this, peer, _db, response, true); // Send everything it's missing _sendToPeer(peer, response); SASSERTWARN(!peer->subscribed); peer->subscribed = true; @@ -1687,7 +1683,7 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { transaction.content = _db.getUncommittedQuery(); _sendToPeer(peer, transaction); } - } else if (SIEquals(message.methodLine, "SUBSCRIPTION_APPROVED") || SIEquals(message.methodLine, "SUBSCRIPTION_PENDING")) { + } else if (SIEquals(message.methodLine, "SUBSCRIPTION_APPROVED")) { // SUBSCRIPTION_APPROVED: Sent by a follower's new leader to complete the subscription process. Includes zero or // more COMMITS that should be immediately applied to the database. if (_state != SQLiteNodeState::SUBSCRIBING) { @@ -1696,24 +1692,19 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { if (_leadPeer != peer) { STHROW("not subscribing to you"); } - if (SIEquals(message.methodLine, "SUBSCRIPTION_PENDING")) { - // This subscription did not actually succeed, but not because we did anything wrong. The most useful course of action here is to switch to synchronizing and try again. - _changeState(SQLiteNodeState::SYNCHRONIZING); - } else { - SINFO("Received SUBSCRIPTION_APPROVED, final synchronization."); - try { - // Done synchronizing - _recvSynchronize(peer, message); - SINFO("Subscription complete, at commitCount #" << _db.getCommitCount() << " (" << _db.getCommittedHash() - << "), FOLLOWING"); - _changeState(SQLiteNodeState::FOLLOWING); - } catch (const SException& e) { - // Transaction failed - SWARN("Subscription failed '" << e.what() << "', reconnecting to leader and re-SEARCHING."); - _reconnectPeer(_leadPeer); - _changeState(SQLiteNodeState::SEARCHING); - throw e; - } + SINFO("Received SUBSCRIPTION_APPROVED, final synchronization."); + try { + // Done synchronizing + _recvSynchronize(peer, message); + SINFO("Subscription complete, at commitCount #" << _db.getCommitCount() << " (" << _db.getCommittedHash() + << "), FOLLOWING"); + _changeState(SQLiteNodeState::FOLLOWING); + } catch (const SException& e) { + // Transaction failed + SWARN("Subscription failed '" << e.what() << "', reconnecting to leader and re-SEARCHING."); + _reconnectPeer(_leadPeer); + _changeState(SQLiteNodeState::SEARCHING); + throw e; } } else if (SIEquals(message.methodLine, "BEGIN_TRANSACTION") || SIEquals(message.methodLine, "COMMIT_TRANSACTION") || SIEquals(message.methodLine, "ROLLBACK_TRANSACTION")) { if (_replicationThreadsShouldExit) { @@ -2094,7 +2085,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance static int __ATTEMPTS = 0; -bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) { +void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) { // We need this to check the state of the node, and we also need `name` to make the logging macros work in a static // function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that, // so we are just going to rely on the signal handling for sigsegv to log that for you. Don't do that. @@ -2119,8 +2110,6 @@ bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee // Instead of reconnecting, we tell the peer that we don't match. It's up to the peer to reconnect. response["hashMismatchValue"] = myHash; response["hashMismatchNumber"] = to_string(peerCommitCount); - - return false; } PINFO("Latest commit hash matches our records, beginning synchronization."); } else { @@ -2162,11 +2151,12 @@ bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutLimitUS); if (resultCode) { if (resultCode == SQLITE_INTERRUPT) { - SWARN("Timed out while running synchronization query."); + STHROW("synchronization query timeout"); } else { STHROW("error getting commits"); } } + if ((uint64_t)result.size() != toIndex - fromIndex + 1) { STHROW("mismatched commit count"); } @@ -2184,8 +2174,6 @@ bool SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee response.content += commit.serialize(); } } - - return true; } void SQLiteNode::_recvSynchronize(SQLitePeer* peer, const SData& message) { diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index b319d8dc5..8212ae18a 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -216,8 +216,7 @@ class SQLiteNode : public STCPManager { // Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the // *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker // thread with a different DB object) - which is why this is static. - // Returns true on success, false on failure. - static bool _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll); + static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll); bool _isNothingBlockingShutdown() const; bool _majoritySubscribed() const; From ed5871520f529e072eaf3f5b9441165766dbdfc8 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Fri, 21 Jun 2024 13:01:54 -0700 Subject: [PATCH 6/7] Revert test hacks --- sqlitecluster/SQLite.cpp | 2 +- sqlitecluster/SQLiteNode.cpp | 11 +++-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/sqlitecluster/SQLite.cpp b/sqlitecluster/SQLite.cpp index 0335bd501..72191b5d8 100644 --- a/sqlitecluster/SQLite.cpp +++ b/sqlitecluster/SQLite.cpp @@ -211,7 +211,7 @@ void SQLite::commonConstructorInitialization(bool hctree) { // I tested and found that we could set about 10,000,000 and the number of steps to run and get a callback once a // second. This is set to be a bit more granular than that, which is probably adequate. - sqlite3_progress_handler(_db, 100, _progressHandlerCallback, this); + sqlite3_progress_handler(_db, 1'000'000, _progressHandlerCallback, this); // Setting a wal hook prevents auto-checkpointing. sqlite3_wal_hook(_db, _walHookCallback, this); diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 19b27283b..33c8a9e49 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2083,8 +2083,6 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance } } -static int __ATTEMPTS = 0; - void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) { // We need this to check the state of the node, and we also need `name` to make the logging macros work in a static // function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that, @@ -2110,6 +2108,8 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee // Instead of reconnecting, we tell the peer that we don't match. It's up to the peer to reconnect. response["hashMismatchValue"] = myHash; response["hashMismatchNumber"] = to_string(peerCommitCount); + + return; } PINFO("Latest commit hash matches our records, beginning synchronization."); } else { @@ -2139,12 +2139,7 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee // We set this for all commits because this only gets all commits in response to SUBSCRIBE, which is done synchronously, and blocks the commit thread. // For asynchronous queries, there's nothing being blocked, so it doesn't much matter how long these take. // This is really not the correct encapsulation for this, but we can improve that later. - if (__ATTEMPTS) { - timeoutLimitUS = 10'000; - } else { - timeoutLimitUS = 100; - } - __ATTEMPTS++; + timeoutLimitUS = 10'000'000; } else { toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time } From e3a137089d028cf745c72fc70af7989a20133c69 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Mon, 24 Jun 2024 09:49:20 -0700 Subject: [PATCH 7/7] Review feedback --- sqlitecluster/SQLite.cpp | 4 ++-- sqlitecluster/SQLiteNode.cpp | 15 ++++++--------- sqlitecluster/SQLiteNode.h | 2 +- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/sqlitecluster/SQLite.cpp b/sqlitecluster/SQLite.cpp index 72191b5d8..a4650c7cc 100644 --- a/sqlitecluster/SQLite.cpp +++ b/sqlitecluster/SQLite.cpp @@ -882,10 +882,10 @@ int SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, u SDEBUG("Getting commits #" << fromIndex << "-" << toIndex); query = "SELECT hash, query FROM (" + query + ") ORDER BY id"; if (timeoutLimitUS) { - _timeoutLimit = STimeNow() + timeoutLimitUS; + setTimeout(timeoutLimitUS); } int queryResult = SQuery(_db, "getting commits", query, result); - _timeoutLimit = 0; + clearTimeout(); return queryResult; } diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 33c8a9e49..c49499d61 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1660,7 +1660,10 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } PINFO("Received SUBSCRIBE, accepting new follower"); SData response("SUBSCRIPTION_APPROVED"); - _queueSynchronize(this, peer, _db, response, true); // Send everything it's missing + + // We send every remaining commit that the node doesn't have, but we set a timeout on the query that gathers these to half the + // maximum time limit that will cause this node to be disconnected from the cluster. + _queueSynchronize(this, peer, _db, response, true, RECV_TIMEOUT / 2); _sendToPeer(peer, response); SASSERTWARN(!peer->subscribed); peer->subscribed = true; @@ -2083,7 +2086,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance } } -void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) { +void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS) { // We need this to check the state of the node, and we also need `name` to make the logging macros work in a static // function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that, // so we are just going to rely on the signal handling for sigsegv to log that for you. Don't do that. @@ -2132,18 +2135,12 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee // Figure out how much to send it uint64_t fromIndex = peerCommitCount + 1; uint64_t toIndex = targetCommit; - uint64_t timeoutLimitUS = 0; if (sendAll) { SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex); - - // We set this for all commits because this only gets all commits in response to SUBSCRIBE, which is done synchronously, and blocks the commit thread. - // For asynchronous queries, there's nothing being blocked, so it doesn't much matter how long these take. - // This is really not the correct encapsulation for this, but we can improve that later. - timeoutLimitUS = 10'000'000; } else { toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time } - int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutLimitUS); + int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutAfterUS); if (resultCode) { if (resultCode == SQLITE_INTERRUPT) { STHROW("synchronization query timeout"); diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 8212ae18a..83e049884 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -216,7 +216,7 @@ class SQLiteNode : public STCPManager { // Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the // *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker // thread with a different DB object) - which is why this is static. - static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll); + static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS = 0); bool _isNothingBlockingShutdown() const; bool _majoritySubscribed() const;