diff --git a/BedrockCommand.cpp b/BedrockCommand.cpp index bfc12696a..3d8c1be37 100644 --- a/BedrockCommand.cpp +++ b/BedrockCommand.cpp @@ -6,13 +6,14 @@ atomic BedrockCommand::_commandCount(0); const string BedrockCommand::defaultPluginName("NO_PLUGIN"); -BedrockCommand::BedrockCommand(SQLiteCommand&& baseCommand, BedrockPlugin* plugin) : +BedrockCommand::BedrockCommand(SQLiteCommand&& baseCommand, BedrockPlugin* plugin, bool escalateImmediately_) : SQLiteCommand(move(baseCommand)), priority(PRIORITY_NORMAL), peekCount(0), processCount(0), repeek(false), crashIdentifyingValues(*this), + escalateImmediately(escalateImmediately_), _plugin(plugin), _inProgressTiming(INVALID, 0, 0), _timeout(_getTimeout(request)) diff --git a/BedrockCommand.h b/BedrockCommand.h index 94cfab426..d1fe0074c 100644 --- a/BedrockCommand.h +++ b/BedrockCommand.h @@ -34,7 +34,7 @@ class BedrockCommand : public SQLiteCommand { static const uint64_t DEFAULT_PROCESS_TIMEOUT = 30'000; // 30 seconds. // Constructor to initialize via a request object (by move). - BedrockCommand(SQLiteCommand&& baseCommand, BedrockPlugin* plugin); + BedrockCommand(SQLiteCommand&& baseCommand, BedrockPlugin* plugin, bool escalateImmediately_ = false); // Destructor. virtual ~BedrockCommand(); @@ -146,6 +146,10 @@ class BedrockCommand : public SQLiteCommand { // Return the number of commands in existence. static size_t getCommandCount() { return _commandCount.load(); } + // True if this command should be escalated immediately. This can be true for any command that does all of its work + // in `process` instead of peek, as it will always be escalated to leader + const bool escalateImmediately; + protected: // The plugin that owns this command. BedrockPlugin* _plugin; diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 5c1862dfb..027cfba23 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -873,27 +873,6 @@ void BedrockServer::worker(SQLitePool& dbPool, usleep(10000); } - // If this command is dependent on a commitCount newer than what we have (maybe it's a follow-up to a - // command that was escalated to leader), we'll set it aside for later processing. When the sync node - // finishes its update loop, it will re-queue any of these commands that are no longer blocked on our - // updated commit count. - uint64_t commitCount = db.getCommitCount(); - uint64_t commandCommitCount = command->request.calcU64("commitCount"); - if (commandCommitCount > commitCount) { - SAUTOLOCK(server._futureCommitCommandMutex); - auto newQueueSize = server._futureCommitCommands.size() + 1; - SINFO("Command (" << command->request.methodLine << ") depends on future commit (" << commandCommitCount - << "), Currently at: " << commitCount << ", storing for later. Queue size: " << newQueueSize); - server._futureCommitCommandTimeouts.insert(make_pair(command->timeout(), commandCommitCount)); - server._futureCommitCommands.insert(make_pair(commandCommitCount, move(command))); - - // Don't count this as `in progress`, it's just sitting there. - if (newQueueSize > 100) { - SHMMM("server._futureCommitCommands.size() == " << newQueueSize); - } - continue; - } - // OK, so this is the state right now, which isn't necessarily anything in particular, because the sync // node can change it at any time, and we're not synchronizing on it. We're going to go ahead and assume // it's something reasonable, because in most cases, that's pretty safe. If we think we're anything but @@ -902,6 +881,15 @@ void BedrockServer::worker(SQLitePool& dbPool, // our state right before we commit. SQLiteNode::State state = replicationState.load(); + // If we're following, any incomplete commands can be immediately escalated to leader. This saves the work + // of a `peek` operation, but more importantly, it skips any delays that might be introduced by waiting in + // the `_futureCommitCommands` queue. + if (state == SQLiteNode::FOLLOWING && command->escalateImmediately && !command->complete) { + SINFO("Immediately escalating " << command->request.methodLine << " to leader. Sync thread has " << syncNodeQueuedCommands.size() << " queued commands."); + syncNodeQueuedCommands.push(move(command)); + continue; + } + // If we find that we've gotten a command with an initiatingPeerID, but we're not in a leading or // standing down state, we'll have no way of returning this command to the caller, so we discard it. The // original caller will need to re-send the request. This can happen if we're leading, and receive a @@ -917,6 +905,27 @@ void BedrockServer::worker(SQLitePool& dbPool, continue; } + // If this command is dependent on a commitCount newer than what we have (maybe it's a follow-up to a + // command that was escalated to leader), we'll set it aside for later processing. When the sync node + // finishes its update loop, it will re-queue any of these commands that are no longer blocked on our + // updated commit count. + uint64_t commitCount = db.getCommitCount(); + uint64_t commandCommitCount = command->request.calcU64("commitCount"); + if (commandCommitCount > commitCount) { + SAUTOLOCK(server._futureCommitCommandMutex); + auto newQueueSize = server._futureCommitCommands.size() + 1; + SINFO("Command (" << command->request.methodLine << ") depends on future commit (" << commandCommitCount + << "), Currently at: " << commitCount << ", storing for later. Queue size: " << newQueueSize); + server._futureCommitCommandTimeouts.insert(make_pair(command->timeout(), commandCommitCount)); + server._futureCommitCommands.insert(make_pair(commandCommitCount, move(command))); + + // Don't count this as `in progress`, it's just sitting there. + if (newQueueSize > 100) { + SHMMM("server._futureCommitCommands.size() == " << newQueueSize); + } + continue; + } + // If this command is already complete, then we should be a follower, and the sync node got a response back // from a command that had been escalated to leader, and queued it for a worker to respond to. We'll send // that response now. diff --git a/Makefile b/Makefile index 703fd7888..69678ff98 100644 --- a/Makefile +++ b/Makefile @@ -80,6 +80,7 @@ TESTOBJ = $(TESTCPP:%.cpp=$(INTERMEDIATEDIR)/%.o) TESTDEP = $(TESTCPP:%.cpp=$(INTERMEDIATEDIR)/%.d) CLUSTERTESTCPP = $(shell find test -name '*.cpp' -not -path 'test/tests*' -not -path "test/main.cpp") +CLUSTERTESTCPP += test/tests/jobs/JobTestHelper.cpp CLUSTERTESTOBJ = $(CLUSTERTESTCPP:%.cpp=$(INTERMEDIATEDIR)/%.o) CLUSTERTESTDEP = $(CLUSTERTESTCPP:%.cpp=$(INTERMEDIATEDIR)/%.d) diff --git a/plugins/Jobs.cpp b/plugins/Jobs.cpp index f9cbfcc79..94fb038a9 100644 --- a/plugins/Jobs.cpp +++ b/plugins/Jobs.cpp @@ -25,6 +25,13 @@ const set BedrockPlugin_Jobs::supportedRequestVerbs = { "RequeueJobs", }; +bool BedrockJobsCommand::canEscalateImmediately(SQLiteCommand& baseCommand) { + // This is a set of commands that we will escalate to leader without waiting. It's not intended to be complete but + // to solve the biggest issues we have with slow escalation times (i.e., this is usually a problem for `FinishJob`). + static const set commands = {"CreateJob", "CreateJobs", "FinishJob"}; + return commands.count(baseCommand.request.methodLine); +} + // Disable noop mode for the lifetime of this object. class scopedDisableNoopMode { public: @@ -45,7 +52,7 @@ class scopedDisableNoopMode { }; BedrockJobsCommand::BedrockJobsCommand(SQLiteCommand&& baseCommand, BedrockPlugin_Jobs* plugin) : - BedrockCommand(move(baseCommand), plugin) + BedrockCommand(move(baseCommand), plugin, canEscalateImmediately(baseCommand)) { } diff --git a/plugins/Jobs.h b/plugins/Jobs.h index 38af1e386..f1e809a2d 100644 --- a/plugins/Jobs.h +++ b/plugins/Jobs.h @@ -39,4 +39,7 @@ class BedrockJobsCommand : public BedrockCommand { void _validatePriority(const int64_t priority); bool mockRequest; + + // Returns true if this command can skip straight to leader for process. + bool canEscalateImmediately(SQLiteCommand& baseCommand); }; diff --git a/test/tests/jobs/FinishJobTest.cpp b/test/clustertest/tests/FinishJobTest.cpp similarity index 85% rename from test/tests/jobs/FinishJobTest.cpp rename to test/clustertest/tests/FinishJobTest.cpp index 185b4f0a9..49fcc39b1 100644 --- a/test/tests/jobs/FinishJobTest.cpp +++ b/test/clustertest/tests/FinishJobTest.cpp @@ -1,4 +1,4 @@ -#include +#include "../BedrockClusterTester.h" #include struct FinishJobTest : tpunit::TestFixture { @@ -23,18 +23,24 @@ struct FinishJobTest : tpunit::TestFixture { AFTER(FinishJobTest::tearDown), AFTER_CLASS(FinishJobTest::tearDownClass)) { } + BedrockClusterTester* clusterTester; BedrockTester* tester; - void setupClass() { tester = new BedrockTester(_threadID, {{"-plugins", "Jobs,DB"}}, {});} + void setupClass() { + clusterTester = new BedrockClusterTester(ClusterSize::THREE_NODE_CLUSTER, {}, _threadID); + tester = &(clusterTester->getTester(1)); + } // Reset the jobs table void tearDown() { SData command("Query"); command["query"] = "DELETE FROM jobs WHERE jobID > 0;"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); } - void tearDownClass() { delete tester; } + void tearDownClass() { + delete clusterTester; + } // Throw an error if the job doesn't exist void nonExistentJob() { @@ -87,7 +93,7 @@ struct FinishJobTest : tpunit::TestFixture { command.clear(); command.methodLine = "Query"; command["query"] = "UPDATE jobs SET state = 'RUNNING' WHERE jobID = " + childID + ";"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Finish the child command.clear(); @@ -101,27 +107,27 @@ struct FinishJobTest : tpunit::TestFixture { // Create the parent SData command("CreateJob"); command["name"] = "parent"; - STable response = tester->executeWaitVerifyContentTable(command); + STable response = clusterTester->getTester(0).executeWaitVerifyContentTable(command); string parentID = response["jobID"]; // Get the parent command.clear(); command.methodLine = "GetJob"; command["name"] = "parent"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Create the children command.clear(); command.methodLine = "CreateJob"; command["name"] = "child_finished"; command["parentJobID"] = parentID; - response = tester->executeWaitVerifyContentTable(command); + response = clusterTester->getTester(0).executeWaitVerifyContentTable(command); string finishedChildID = response["jobID"]; command.clear(); command.methodLine = "CreateJob"; command["name"] = "child_cancelled"; command["parentJobID"] = parentID; - response = tester->executeWaitVerifyContentTable(command); + response = clusterTester->getTester(0).executeWaitVerifyContentTable(command); string cancelledChildID = response["jobID"]; command.clear(); @@ -129,50 +135,50 @@ struct FinishJobTest : tpunit::TestFixture { command.clear(); command.methodLine = "FinishJob"; command["jobID"] = parentID; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Get the child job command.clear(); command.methodLine = "GetJob"; command["name"] = "child_finished"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Cancel a child // if this goes 2nd this doesn't requeue the parent job command.clear(); command.methodLine = "CancelJob"; command["jobID"] = cancelledChildID; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // The parent may have other children from mock requests, delete them. command.clear(); command.methodLine = "Query"; command["Query"] = "DELETE FROM jobs WHERE parentJobID = " + parentID + " AND JSON_EXTRACT(data, '$.mockRequest') IS NOT NULL;"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Finish a child command.clear(); command.methodLine = "FinishJob"; command["jobID"] = finishedChildID; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Confirm the parent is set to QUEUED SQResult result; - tester->readDB("SELECT state FROM jobs WHERE jobID = " + parentID + ";", result); + clusterTester->getTester(0).readDB("SELECT state FROM jobs WHERE jobID = " + parentID + ";", result); ASSERT_EQUAL(result[0][0], "QUEUED"); // Finish the parent command.clear(); command.methodLine = "GetJob"; command["name"] = "parent"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); command.clear(); command.methodLine = "FinishJob"; command["jobID"] = parentID; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Confirm that the FINISHED and CANCELLED children are deleted - tester->readDB("SELECT count(*) FROM jobs WHERE jobID != " + parentID + " AND JSON_EXTRACT(data, '$.mockRequest') IS NULL;", result); + clusterTester->getTester(0).readDB("SELECT count(*) FROM jobs WHERE jobID != " + parentID + " AND JSON_EXTRACT(data, '$.mockRequest') IS NULL;", result); ASSERT_EQUAL(SToInt(result[0][0]), 0); } @@ -203,7 +209,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the data updated SQResult result; - tester->readDB("SELECT data FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT data FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_EQUAL(result[0][0], SComposeJSONObject(data)); } @@ -239,7 +245,7 @@ struct FinishJobTest : tpunit::TestFixture { command.clear(); command.methodLine = "Query"; command["Query"] = "DELETE FROM jobs WHERE parentJobID = " + parentID + " AND JSON_EXTRACT(data, '$.mockRequest') IS NOT NULL;"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Finish the parent command.clear(); @@ -250,7 +256,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm that the parent is in the PAUSED state and the children are in the QUEUED state SQResult result; list ids = {parentID, finishedChildID, cancelledChildID}; - tester->readDB("SELECT jobID, state FROM jobs WHERE jobID IN(" + SComposeList(ids) + ");", result); + clusterTester->getTester(0).readDB("SELECT jobID, state FROM jobs WHERE jobID IN(" + SComposeList(ids) + ");", result); ASSERT_EQUAL(result.rows.size(), 3); for (auto& row : result.rows) { if (row[0] == parentID) { @@ -286,7 +292,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the job was deleted SQResult result; - tester->readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_TRUE(result.empty()); } @@ -322,7 +328,7 @@ struct FinishJobTest : tpunit::TestFixture { // Get the nextRun value SQResult result; - tester->readDB("SELECT nextRun FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT nextRun FROM jobs WHERE jobID = " + jobID + ";", result); string originalNextRun = result[0][0]; // Get the job @@ -339,7 +345,7 @@ struct FinishJobTest : tpunit::TestFixture { tester->executeWaitVerifyContent(command); // Assert the new nextRun time is 5 seconds after the original nextRun time - tester->readDB("SELECT nextRun FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT nextRun FROM jobs WHERE jobID = " + jobID + ";", result); time_t currentNextRunTime = JobTestHelper::getTimestampForDateTimeString(result[0][0]); time_t originalNextRunTime = JobTestHelper::getTimestampForDateTimeString(originalNextRun); ASSERT_EQUAL(difftime(currentNextRunTime, originalNextRunTime), 5); @@ -368,7 +374,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm nextRun is in 1 hour from the created time SQResult result; - tester->readDB("SELECT lastRun, nextRun FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT lastRun, nextRun FROM jobs WHERE jobID = " + jobID + ";", result); time_t createdTime = JobTestHelper::getTimestampForDateTimeString(result[0][0]); time_t nextRunTime = JobTestHelper::getTimestampForDateTimeString(result[0][1]); ASSERT_EQUAL(difftime(nextRunTime, createdTime), 3600); @@ -391,7 +397,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the job is in RUNQUEUED SQResult result; - tester->readDB("SELECT state FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT state FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_EQUAL(result[0][0], "RUNQUEUED"); // Finish it @@ -401,7 +407,7 @@ struct FinishJobTest : tpunit::TestFixture { tester->executeWaitVerifyContent(command); // Finishing the job should remove it from the table - tester->readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_TRUE(result.empty()); } @@ -429,7 +435,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm nextRun is in 1 hour, not in the 5 second delay SQResult result; - tester->readDB("SELECT lastRun, nextRun FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT lastRun, nextRun FROM jobs WHERE jobID = " + jobID + ";", result); struct tm tm1; struct tm tm2; strptime(result[0][0].c_str(), "%Y-%m-%d %H:%M:%S", &tm1); @@ -462,7 +468,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the job was deleted instead of being rescheduled SQResult result; - tester->readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_TRUE(result.empty()); } @@ -490,7 +496,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm nextRun is in 1 hour, not in the given nextRun time SQResult result; - tester->readDB("SELECT lastRun, nextRun FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT lastRun, nextRun FROM jobs WHERE jobID = " + jobID + ";", result); struct tm tm1; struct tm tm2; strptime(result[0][0].c_str(), "%Y-%m-%d %H:%M:%S", &tm1); @@ -526,7 +532,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the job was deleted instead of being rescheduled SQResult result; - tester->readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_TRUE(result.empty()); } @@ -554,7 +560,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the job was deleted instead of being rescheduled SQResult result; - tester->readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_TRUE(result.empty()); } @@ -579,7 +585,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the job was deleted SQResult result; - tester->readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_TRUE(result.empty()); } } __FinishJobTest;