Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time out SUBSCRIBE query instead of forking. #1783

Merged
merged 7 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -874,14 +874,19 @@ string SQLite::getCommittedHash() {
return _sharedData.lastCommittedHash.load();
}

bool SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result) {
int SQLite::getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS) {
// Look up all the queries within that range
SASSERTWARN(SWITHIN(1, fromIndex, toIndex));
string query = _getJournalQuery({"SELECT id, hash, query FROM", "WHERE id >= " + SQ(fromIndex) +
(toIndex ? " AND id <= " + SQ(toIndex) : "")});
SDEBUG("Getting commits #" << fromIndex << "-" << toIndex);
query = "SELECT hash, query FROM (" + query + ") ORDER BY id";
return !SQuery(_db, "getting commits", query, result);
if (timeoutLimitUS) {
_timeoutLimit = STimeNow() + timeoutLimitUS;
}
int queryResult = SQuery(_db, "getting commits", query, result);
_timeoutLimit = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be calling clearTimeout here?

return queryResult;
}

int64_t SQLite::getLastInsertRowID() {
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class SQLite {
static bool getCommit(sqlite3* db, const vector<string> journalNames, uint64_t index, string& query, string& hash);

// Looks up a range of commits.
bool getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result);
int getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result, uint64_t timeoutLimitUS = 0);

// Set a time limit for this transaction, in US from the current time.
void setTimeout(uint64_t timeLimitUS);
Expand Down
25 changes: 21 additions & 4 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2132,12 +2132,29 @@ void SQLiteNode::_queueSynchronize(const SQLiteNode* const node, SQLitePeer* pee
// Figure out how much to send it
uint64_t fromIndex = peerCommitCount + 1;
uint64_t toIndex = targetCommit;
if (!sendAll)
uint64_t timeoutLimitUS = 0;
if (sendAll) {
SINFO("Sending all commits with synchronize message, from " << fromIndex << " to " << toIndex);

// We set this for all commits because this only gets all commits in response to SUBSCRIBE, which is done synchronously, and blocks the commit thread.
// For asynchronous queries, there's nothing being blocked, so it doesn't much matter how long these take.
// This is really not the correct encapsulation for this, but we can improve that later.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pet peeve, but this is a TODO in a comment. Can we make a GH issue for it instead?

Suggested change
// This is really not the correct encapsulation for this, but we can improve that later.

timeoutLimitUS = 10'000'000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the cluster gives up on the leader node after 30s, should we make this higher and closer to 30s, say 25?

If there's a reason for 10 and it's not an arbitrary number, let's add a comment

} else {
toIndex = min(toIndex, fromIndex + 100); // 100 transactions at a time
if (!db.getCommits(fromIndex, toIndex, result))
STHROW("error getting commits");
if ((uint64_t)result.size() != toIndex - fromIndex + 1)
}
int resultCode = db.getCommits(fromIndex, toIndex, result, timeoutLimitUS);
if (resultCode) {
if (resultCode == SQLITE_INTERRUPT) {
STHROW("synchronization query timeout");
} else {
STHROW("error getting commits");
}
}

if ((uint64_t)result.size() != toIndex - fromIndex + 1) {
STHROW("mismatched commit count");
}

// Wrap everything into one huge message
PINFO("Synchronizing commits from " << peerCommitCount + 1 << "-" << targetCommit);
Expand Down