Skip to content

Commit

Permalink
Merge pull request #915 from Expensify/master
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
rafecolton authored Oct 19, 2020
2 parents df0bab1 + e0a57fe commit 2aad48c
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 13 deletions.
20 changes: 17 additions & 3 deletions plugins/Jobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ void BedrockJobsCommand::process(SQLite& db) {

// Verify there is a job like this and it's running
SQResult result;
if (!db.read("SELECT state, nextRun, lastRun, repeat, parentJobID, json_extract(data, '$.mockRequest') "
if (!db.read("SELECT state, nextRun, lastRun, repeat, parentJobID, json_extract(data, '$.mockRequest'), retryAfter "
"FROM jobs "
"WHERE jobID=" + SQ(jobID) + ";",
result)) {
Expand All @@ -948,6 +948,7 @@ void BedrockJobsCommand::process(SQLite& db) {
string repeat = result[0][3];
int64_t parentJobID = SToInt64(result[0][4]);
mockRequest = result[0][5] == "1";
const string retryAfter = result[0][6];

// Make sure we're finishing a job that's actually running
if (state != "RUNNING" && state != "RUNQUEUED" && !mockRequest) {
Expand Down Expand Up @@ -1038,10 +1039,23 @@ void BedrockJobsCommand::process(SQLite& db) {
}
}

string safeNewNextRun = "";
// If this is set to repeat, get the nextRun value
string safeNewNextRun = "";
if (!repeat.empty()) {
safeNewNextRun = _constructNextRunDATETIME(nextRun, lastRun, repeat);
// For all jobs, the last time at which they were scheduled is the currently stored 'nextRun' time
string lastScheduled = nextRun;

// Except for jobs with 'retryAfter' + 'repeat' based on `SCHEDULED`. With 'retryAfter', in GetJob we updated 'nextRun'
// to a failure check interval, eg 5 minutes. To account for this here when finishing the job, we subtract
// 'retryAfter' from 'nextRun' to get back the originally scheduled time which was 'nextRun' when the job ran.
if (!retryAfter.empty() && SToUpper(repeat).find("SCHEDULED") != string::npos) {
SQResult scheduledJobResult;
if (!db.read("SELECT DATETIME(" + SQ(nextRun) + ", REPLACE(" + SQ(retryAfter) + ", '+', '-'));", scheduledJobResult)) {
STHROW("502 Select failed");
}
lastScheduled = scheduledJobResult[0][0];
}
safeNewNextRun = _constructNextRunDATETIME(lastScheduled, lastRun, repeat);
} else if (SIEquals(requestVerb, "RetryJob")) {
const string& newNextRun = request["nextRun"];

Expand Down
15 changes: 15 additions & 0 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,11 @@ bool SQLite::prepare() {
}

int SQLite::commit() {
// If commits have been disabled, return an error without attempting the commit.
if (!_sharedData._commitEnabled) {
return COMMIT_DISABLED;
}

SASSERT(_insideTransaction);
SASSERT(!_uncommittedHash.empty()); // Must prepare first
int result = 0;
Expand Down Expand Up @@ -1082,17 +1087,27 @@ void SQLite::removeCheckpointListener(SQLite::CheckpointRequiredListener& listen
_sharedData.removeCheckpointListener(listener);
}

void SQLite::setCommitEnabled(bool enable) {
_sharedData.setCommitEnabled(enable);
}

SQLite::SharedData::SharedData() :
nextJournalCount(0),
currentTransactionCount(0),
_currentPageCount(0),
_checkpointThreadBusy(0),
_commitEnabled(true),
_commitLockTimer("commit lock timer", {
{"EXCLUSIVE", chrono::steady_clock::duration::zero()},
{"SHARED", chrono::steady_clock::duration::zero()},
})
{ }

void SQLite::SharedData::setCommitEnabled(bool enable) {
lock_guard<decltype(commitLock)> lock(commitLock);
_commitEnabled = enable;
}

void SQLite::SharedData::addCheckpointListener(SQLite::CheckpointRequiredListener& listener) {
lock_guard<decltype(_internalStateMutex)> lock(_internalStateMutex);
_checkpointListeners.insert(&listener);
Expand Down
17 changes: 17 additions & 0 deletions sqlitecluster/SQLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ class SQLite {
const char* what() const noexcept { return "checkpoint_required"; }
};

// Constant to use like a sqlite result code when commits are disabled (see: https://www.sqlite.org/rescode.html)
// Because the existing codes all use values in the first and second bytes of the int (they're or'ed with values
// left shifted by 8 bits, see SQLITE_ERROR_MISSING_COLLSEQ in sqlite.h for an example), we left shift by 16 for
// this to avoid any overlap.
static const int COMMIT_DISABLED = (1 << 16) | 1;

// Abstract base class for objects that need to be notified when we set `checkpointRequired` and then when that
// checkpoint is complete. Why do we need to notify anyone that we're going to do a checkpoint? Because restart
// checkpoints can't run simultaneously with any other transactions, and thus will block new transactions and wait
Expand Down Expand Up @@ -228,6 +234,10 @@ class SQLite {
// This is the callback function we use to log SQLite's internal errors.
static void _sqliteLogCallback(void* pArg, int iErrCode, const char* zMsg);

// If commits are disabled, calling commit() will return an error without committing. This can be used to guarantee
// no commits can happen "late" from slow threads that could otherwise write to a DB being shutdown.
void setCommitEnabled(bool enable);

private:
// This structure contains all of the data that's shared between a set of SQLite objects that share the same
// underlying database file.
Expand All @@ -242,6 +252,9 @@ class SQLite {
void checkpointRequired(SQLite& db);
void checkpointComplete(SQLite& db);

// Enable or disable commits for the DB.
void setCommitEnabled(bool enable);

// Update the shared state of the DB to include the newest commit with the newest hash. This needs to be done
// after completing a commit and before releasing the commit lock.
void incrementCommit(const string& commitHash);
Expand Down Expand Up @@ -291,7 +304,11 @@ class SQLite {
// Used as a flag to prevent starting multiple checkpoint threads simultaneously.
atomic<int> _checkpointThreadBusy;

// If set to false, this prevents any thread from being able to commit to the DB.
atomic<bool> _commitEnabled;

SPerformanceTimer _commitLockTimer;

private:
// The data required to replicate transactions, in two lists, depending on whether this has only been prepared
// or if it's been committed.
Expand Down
4 changes: 4 additions & 0 deletions sqlitecluster/SQLiteCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ bool SQLiteCore::commit() {
SINFO("Commit conflict, rolling back.");
_db.rollback();
return false;
} else if (errorCode == SQLite::COMMIT_DISABLED) {
SINFO("Commits currently disabled, rolling back.");
_db.rollback();
return false;
}

return true;
Expand Down
9 changes: 9 additions & 0 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2060,6 +2060,10 @@ void SQLiteNode::_changeState(SQLiteNode::State newState) {
_db.rollback();
}

// Turn off commits. This prevents late commits coming in right after we call `_sendOutstandingTransactions`
// below, which otherwise could get committed on leader and not replicated to followers.
_db.setCommitEnabled(false);

// We send any unsent transactions here before we finish switching states, we need to make sure these are
// all sent to the new leader before we complete the transition.
_sendOutstandingTransactions();
Expand All @@ -2072,6 +2076,11 @@ void SQLiteNode::_changeState(SQLiteNode::State newState) {
_leadPeer = nullptr;
}

// Re-enable commits if they were disabled during a previous stand-down.
if (newState != SEARCHING) {
_db.setCommitEnabled(true);
}

// Additional logic for some new states
if (newState == LEADING) {
// Seed our last sent transaction.
Expand Down
4 changes: 2 additions & 2 deletions test/tests/jobs/CreateJobTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ struct CreateJobTest : tpunit::TestFixture {
ASSERT_EQUAL(response["jobID"], jobID);
ASSERT_EQUAL(response["name"], jobName);

// Query the db and confirm that state, nextRun and lastRun are 1 second apart because of retryAfter
// Query the db and confirm the state, and that nextRun and lastRun are 5 seconds apart because of retryAfter
SQResult jobData;
tester->readDB("SELECT state, nextRun, lastRun FROM jobs WHERE jobID = " + jobID + ";", jobData);
ASSERT_EQUAL(jobData[0][0], "RUNQUEUED");
Expand Down Expand Up @@ -360,7 +360,7 @@ struct CreateJobTest : tpunit::TestFixture {
nextRunTime = JobTestHelper::getTimestampForDateTimeString(jobData[0][1]);
lastRunTime = JobTestHelper::getTimestampForDateTimeString(jobData[0][2]);
ASSERT_EQUAL(jobData[0][0], "QUEUED");
ASSERT_EQUAL(difftime(nextRunTime, lastRunTime), 15);
ASSERT_EQUAL(difftime(nextRunTime, lastRunTime), 10);
}

void retryWithMalformedValue() {
Expand Down
10 changes: 2 additions & 8 deletions test/tests/jobs/GetJobsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,8 @@ struct GetJobsTest : tpunit::TestFixture {

// Let's see if it's scheduled at the right time.
if (stoull(row[0]) == jobIDs[0]) {
// This uses a `SCHEDULED` modified, but it also uses `retryAfter`, which is just a broken combination.
// What happens with `scheduled`, is that when we finish a job we take `nextRun` and add our time
// interval to it. This assumes that `nextRun` is whatever it was set to last time we got the job. But
// with `retryAfter`, we updated that to some failure check interval, like 5 minutes, rather than
// running this from when it was last scheduled, it runs it from when it was last scheduled to be
// retried.
//
// We assert nothing here because this case is broken.
// Assert that the difference between "lastRun + 1hour" and "nextRun" is less than 3 seconds.
ASSERT_LESS_THAN(absoluteDiff(stringToUnixTimestamp(row[2]) + 1 * 60 * 60, stringToUnixTimestamp(row[3])), 3);
} else if (stoull(row[0]) == jobIDs[1]) {
// Assert that the difference between "lastRun + 1day" and "nextRun" is less than 3 seconds.
ASSERT_LESS_THAN(absoluteDiff(stringToUnixTimestamp(row[2]) + 1 * 60 * 60 * 24, stringToUnixTimestamp(row[3])), 3);
Expand Down

0 comments on commit 2aad48c

Please sign in to comment.