Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time out SUBSCRIBE query instead of forking. #1783

Merged
merged 7 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -874,14 +874,19 @@ string SQLite::getCommittedHash() {
return _sharedData.lastCommittedHash.load();
}

bool SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result) {
int SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS) {
// Look up all the queries within that range
SASSERTWARN(SWITHIN(1, fromIndex, toIndex));
string query = _getJournalQuery({"SELECT id, hash, query FROM", "WHERE id >= " + SQ(fromIndex) +
(toIndex ? " AND id <= " + SQ(toIndex) : "")});
SDEBUG("Getting commits #" << fromIndex << "-" << toIndex);
query = "SELECT hash, query FROM (" + query + ") ORDER BY id";
return !SQuery(_db, "getting commits", query, result);
if (timeoutLimitUS) {
setTimeout(timeoutLimitUS);
}
int queryResult = SQuery(_db, "getting commits", query, result);
clearTimeout();
return queryResult;
}

int64_t SQLite::getLastInsertRowID() {
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class SQLite {
static bool getCommit(sqlite3* db, const vector<string> journalNames, uint64_t index, string& query, string& hash);

// Looks up a range of commits.
bool getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result);
int getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS = 0);

// Set a time limit for this transaction, in US from the current time.
void setTimeout(uint64_t timeLimitUS);
Expand Down
26 changes: 20 additions & 6 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1660,7 +1660,10 @@ void SQLiteNode::_onMESSAGE(SQLitePeer* peer, const SData& message) {
}
PINFO("Received SUBSCRIBE, accepting new follower");
SData response("SUBSCRIPTION_APPROVED");
_queueSynchronize(this, peer, _db, response, true); // Send everything it's missing

// We send every remaining commit that the node doesn't have, but we set a timeout on the query that gathers these to half the
// maximum time limit that will cause this node to be disconnected from the cluster.
_queueSynchronize(this, peer, _db, response, true, RECV_TIMEOUT / 2);
_sendToPeer(peer, response);
SASSERTWARN(!peer->subscribed);
peer->subscribed = true;
Expand Down Expand Up @@ -2083,7 +2086,7 @@ void SQLiteNode::_changeState(SQLiteNodeState newState, uint64_t commitIDToCance
}
}

void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll) {
void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS) {
// We need this to check the state of the node, and we also need `name` to make the logging macros work in a static
// function. However, if you pass a null pointer here, we can't set these, so we'll fail. We also can't log that,
// so we are just going to rely on the signal handling for sigsegv to log that for you. Don't do that.
Expand Down Expand Up @@ -2132,12 +2135,23 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee
// Figure out how much to send it
uint64_t fromIndex = peerCommitCount + 1;
uint64_t toIndex = targetCommit;
if (!sendAll)
if (sendAll) {
SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex);
} else {
toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time
if (!db.getCommits(fromIndex, toIndex, result))
STHROW("error getting commits");
if ((uint64_t)result.size() != toIndex - fromIndex + 1)
}
int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutAfterUS);
if (resultCode) {
if (resultCode == SQLITE_INTERRUPT) {
STHROW("synchronization query timeout");
} else {
STHROW("error getting commits");
}
}

if ((uint64_t)result.size() != toIndex - fromIndex + 1) {
STHROW("mismatched commit count");
}

// Wrap everything into one huge message
PINFO("Synchronizing commits from " << peerCommitCount + 1 << "-" << targetCommit);
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class SQLiteNode : public STCPManager {
// Queue a SYNCHRONIZE message based on the current state of the node, thread-safe, but you need to pass the
// *correct* DB for the thread that's making the call (i.e., you can't use the node's internal DB from a worker
// thread with a different DB object) - which is why this is static.
static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll);
static void _queueSynchronize(const SQLiteNode* const node, SQLitePeer* peer, SQLite& db, SData& response, bool sendAll, uint64_t timeoutAfterUS = 0);

bool _isNothingBlockingShutdown() const;
bool _majoritySubscribed() const;
Expand Down