diff --git a/sqlitecluster/SQLite.cpp b/sqlitecluster/SQLite.cpp index 04864b685..0335bd501 100644 --- a/sqlitecluster/SQLite.cpp +++ b/sqlitecluster/SQLite.cpp @@ -211,7 +211,7 @@ void SQLite::commonConstructorInitialization(bool hctree) { // I tested and found that we could set about 10,000,000 and the number of steps to run and get a callback once a // second. This is set to be a bit more granular than that, which is probably adequate. - sqlite3_progress_handler(_db, 1'000'000, _progressHandlerCallback, this); + sqlite3_progress_handler(_db, 100, _progressHandlerCallback, this); // Setting a wal hook prevents auto-checkpointing. sqlite3_wal_hook(_db, _walHookCallback, this); @@ -874,20 +874,19 @@ string SQLite::getCommittedHash() { return _sharedData.lastCommittedHash.load(); } -bool SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result) { +int SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS) { // Look up all the queries within that range SASSERTWARN(SWITHIN(1, fromIndex, toIndex)); string query = _getJournalQuery({"SELECT id, hash, query FROM", "WHERE id >= " + SQ(fromIndex) + (toIndex ? " AND id <= " + SQ(toIndex) : "")}); SDEBUG("Getting commits #" << fromIndex << "-" << toIndex); query = "SELECT hash, query FROM (" + query + ") ORDER BY id"; - - // Set timeout to 10 seconds. - _timeoutLimit = STimeNow() + 10'000'000; + if (timeoutLimitUS) { + _timeoutLimit = STimeNow() + timeoutLimitUS; + } int queryResult = SQuery(_db, "getting commits", query, result); _timeoutLimit = 0; - - return !queryResult; + return queryResult; } int64_t SQLite::getLastInsertRowID() { diff --git a/sqlitecluster/SQLite.h b/sqlitecluster/SQLite.h index ae9e7b711..1dc6d1ad6 100644 --- a/sqlitecluster/SQLite.h +++ b/sqlitecluster/SQLite.h @@ -221,7 +221,7 @@ class SQLite { static bool getCommit(sqlite3* db, const vector journalNames, uint64_t index, string& query, string& hash); // Looks up a range of commits. - bool getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result); + int getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS = 0); // Set a time limit for this transaction, in US from the current time. void setTimeout(uint64_t timeLimitUS); diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index d3c6ca9fd..32f86efda 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -2132,13 +2132,24 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee // Figure out how much to send it uint64_t fromIndex = peerCommitCount + 1; uint64_t toIndex = targetCommit; + uint64_t timeoutLimitUS = 0; if (sendAll) { SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex); + + // We set this for all commits because this only gets all commits in response to SUBSCRIBE, which is done synchronously, and blocks the commit thread. + // For asynchronous queries, there's nothing being blocked, so it doesn't much matter how long these take. + // This is really not the correct encapsulation for this, but we can improve that later. + timeoutLimitUS = 100; } else { toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time } - if (!db.getCommits(fromIndex, toIndex, result)) { - STHROW("error getting commits"); + int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutLimitUS); + if (resultCode) { + if (resultCode == SQLITE_INTERRUPT) { + SWARN("Timed out while running synchronization query."); + } else { + STHROW("error getting commits"); + } } if ((uint64_t)result.size() != toIndex - fromIndex + 1) { STHROW("mismatched commit count");