diff --git a/sqlitecluster/SQLite.cpp b/sqlitecluster/SQLite.cpp index 738e1a846..a4650c7cc 100644 --- a/sqlitecluster/SQLite.cpp +++ b/sqlitecluster/SQLite.cpp @@ -874,14 +874,19 @@ string SQLite::getCommittedHash() { return _sharedData.lastCommittedHash.load(); } -bool SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result) { +int SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS) { // Look up all the queries within that range SASSERTWARN(SWITHIN(1, fromIndex, toIndex)); string query = _getJournalQuery({"SELECT id, hash, query FROM", "WHERE id >= " + SQ(fromIndex) + (toIndex ? " AND id <= " + SQ(toIndex) : "")}); SDEBUG("Getting commits #" << fromIndex << "-" << toIndex); query = "SELECT hash, query FROM (" + query + ") ORDER BY id"; - return !SQuery(_db, "getting commits", query, result); + if (timeoutLimitUS) { + setTimeout(timeoutLimitUS); + } + int queryResult = SQuery(_db, "getting commits", query, result); + clearTimeout(); + return queryResult; } int64_t SQLite::getLastInsertRowID() { diff --git a/sqlitecluster/SQLite.h b/sqlitecluster/SQLite.h index ae9e7b711..1dc6d1ad6 100644 --- a/sqlitecluster/SQLite.h +++ b/sqlitecluster/SQLite.h @@ -221,7 +221,7 @@ class SQLite { static bool getCommit(sqlite3* db, const vector journalNames, uint64_t index, string& query, string& hash); // Looks up a range of commits. - bool getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result); + int getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS = 0); // Set a time limit for this transaction, in US from the current time. void setTimeout(uint64_t timeLimitUS); diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 1d20e46c8..c49499d61 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -1660,7 +1660,10 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) { } PINFO("Received SUBSCRIBE, accepting new follower"); SData response("SUBSCRIPTION_APPROVED"); - _queueSynchronize(this, peer, _db, response, true); // Send everything it's missing + + // We send every remaining commit that the node doesn't have, but we set a timeout on the query that gathers these to half the + // maximum time limit that will cause this node to be disconnected from the cluster. + _queueSynchronize(this, peer, _db, response, true, RECV_TIMEOUT / 2); _sendToPeer(peer, response); SASSERTWARN(!peer->subscribed); peer->subscribed = true; @@ -2083,7 +2086,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance } } -void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) { +void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS) { // We need this to check the state of the node, and we also need `name` to make the logging macros work in a static // function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that, // so we are just going to rely on the signal handling for sigsegv to log that for you. Don't do that. @@ -2132,12 +2135,23 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee // Figure out how much to send it uint64_t fromIndex = peerCommitCount + 1; uint64_t toIndex = targetCommit; - if (!sendAll) + if (sendAll) { + SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex); + } else { toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time - if (!db.getCommits(fromIndex, toIndex, result)) - STHROW("error getting commits"); - if ((uint64_t)result.size() != toIndex - fromIndex + 1) + } + int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutAfterUS); + if (resultCode) { + if (resultCode == SQLITE_INTERRUPT) { + STHROW("synchronization query timeout"); + } else { + STHROW("error getting commits"); + } + } + + if ((uint64_t)result.size() != toIndex - fromIndex + 1) { STHROW("mismatched commit count"); + } // Wrap everything into one huge message PINFO("Synchronizing commits from " << peerCommitCount + 1 << "-" << targetCommit); diff --git a/sqlitecluster/SQLiteNode.h b/sqlitecluster/SQLiteNode.h index 8212ae18a..83e049884 100644 --- a/sqlitecluster/SQLiteNode.h +++ b/sqlitecluster/SQLiteNode.h @@ -216,7 +216,7 @@ class SQLiteNode : public STCPManager { // Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the // *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker // thread with a different DB object) - which is why this is static. - static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll); + static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS = 0); bool _isNothingBlockingShutdown() const; bool _majoritySubscribed() const;