From 5a49e7bd0107b4298c5802a3d34df91fcd01efd4 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Mon, 17 Aug 2020 16:49:00 -0700 Subject: [PATCH 1/5] Allow some commands to skip the future commit queue --- BedrockCommand.cpp | 3 +- BedrockCommand.h | 6 +- BedrockServer.cpp | 51 +++++++----- Makefile | 1 + plugins/Jobs.cpp | 77 ++++++++++++++++++- plugins/Jobs.h | 2 + test/clustertest/tests/ConflictSpamTest.cpp | 4 +- .../tests}/FinishJobTest.cpp | 44 ++++++----- test/lib/BedrockTester.cpp | 1 + 9 files changed, 144 insertions(+), 45 deletions(-) rename test/{tests/jobs => clustertest/tests}/FinishJobTest.cpp (90%) diff --git a/BedrockCommand.cpp b/BedrockCommand.cpp index bfc12696a..3d8c1be37 100644 --- a/BedrockCommand.cpp +++ b/BedrockCommand.cpp @@ -6,13 +6,14 @@ atomic BedrockCommand::_commandCount(0); const string BedrockCommand::defaultPluginName("NO_PLUGIN"); -BedrockCommand::BedrockCommand(SQLiteCommand&& baseCommand, BedrockPlugin* plugin) : +BedrockCommand::BedrockCommand(SQLiteCommand&& baseCommand, BedrockPlugin* plugin, bool escalateImmediately_) : SQLiteCommand(move(baseCommand)), priority(PRIORITY_NORMAL), peekCount(0), processCount(0), repeek(false), crashIdentifyingValues(*this), + escalateImmediately(escalateImmediately_), _plugin(plugin), _inProgressTiming(INVALID, 0, 0), _timeout(_getTimeout(request)) diff --git a/BedrockCommand.h b/BedrockCommand.h index 94cfab426..d1fe0074c 100644 --- a/BedrockCommand.h +++ b/BedrockCommand.h @@ -34,7 +34,7 @@ class BedrockCommand : public SQLiteCommand { static const uint64_t DEFAULT_PROCESS_TIMEOUT = 30'000; // 30 seconds. // Constructor to initialize via a request object (by move). - BedrockCommand(SQLiteCommand&& baseCommand, BedrockPlugin* plugin); + BedrockCommand(SQLiteCommand&& baseCommand, BedrockPlugin* plugin, bool escalateImmediately_ = false); // Destructor. virtual ~BedrockCommand(); @@ -146,6 +146,10 @@ class BedrockCommand : public SQLiteCommand { // Return the number of commands in existence. static size_t getCommandCount() { return _commandCount.load(); } + // True if this command should be escalated immediately. This can be true for any command that does all of its work + // in `process` instead of peek, as it will always be escalated to leader + const bool escalateImmediately; + protected: // The plugin that owns this command. BedrockPlugin* _plugin; diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 5c1862dfb..027cfba23 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -873,27 +873,6 @@ void BedrockServer::worker(SQLitePool& dbPool, usleep(10000); } - // If this command is dependent on a commitCount newer than what we have (maybe it's a follow-up to a - // command that was escalated to leader), we'll set it aside for later processing. When the sync node - // finishes its update loop, it will re-queue any of these commands that are no longer blocked on our - // updated commit count. - uint64_t commitCount = db.getCommitCount(); - uint64_t commandCommitCount = command->request.calcU64("commitCount"); - if (commandCommitCount > commitCount) { - SAUTOLOCK(server._futureCommitCommandMutex); - auto newQueueSize = server._futureCommitCommands.size() + 1; - SINFO("Command (" << command->request.methodLine << ") depends on future commit (" << commandCommitCount - << "), Currently at: " << commitCount << ", storing for later. Queue size: " << newQueueSize); - server._futureCommitCommandTimeouts.insert(make_pair(command->timeout(), commandCommitCount)); - server._futureCommitCommands.insert(make_pair(commandCommitCount, move(command))); - - // Don't count this as `in progress`, it's just sitting there. - if (newQueueSize > 100) { - SHMMM("server._futureCommitCommands.size() == " << newQueueSize); - } - continue; - } - // OK, so this is the state right now, which isn't necessarily anything in particular, because the sync // node can change it at any time, and we're not synchronizing on it. We're going to go ahead and assume // it's something reasonable, because in most cases, that's pretty safe. If we think we're anything but @@ -902,6 +881,15 @@ void BedrockServer::worker(SQLitePool& dbPool, // our state right before we commit. SQLiteNode::State state = replicationState.load(); + // If we're following, any incomplete commands can be immediately escalated to leader. This saves the work + // of a `peek` operation, but more importantly, it skips any delays that might be introduced by waiting in + // the `_futureCommitCommands` queue. + if (state == SQLiteNode::FOLLOWING && command->escalateImmediately && !command->complete) { + SINFO("Immediately escalating " << command->request.methodLine << " to leader. Sync thread has " << syncNodeQueuedCommands.size() << " queued commands."); + syncNodeQueuedCommands.push(move(command)); + continue; + } + // If we find that we've gotten a command with an initiatingPeerID, but we're not in a leading or // standing down state, we'll have no way of returning this command to the caller, so we discard it. The // original caller will need to re-send the request. This can happen if we're leading, and receive a @@ -917,6 +905,27 @@ void BedrockServer::worker(SQLitePool& dbPool, continue; } + // If this command is dependent on a commitCount newer than what we have (maybe it's a follow-up to a + // command that was escalated to leader), we'll set it aside for later processing. When the sync node + // finishes its update loop, it will re-queue any of these commands that are no longer blocked on our + // updated commit count. + uint64_t commitCount = db.getCommitCount(); + uint64_t commandCommitCount = command->request.calcU64("commitCount"); + if (commandCommitCount > commitCount) { + SAUTOLOCK(server._futureCommitCommandMutex); + auto newQueueSize = server._futureCommitCommands.size() + 1; + SINFO("Command (" << command->request.methodLine << ") depends on future commit (" << commandCommitCount + << "), Currently at: " << commitCount << ", storing for later. Queue size: " << newQueueSize); + server._futureCommitCommandTimeouts.insert(make_pair(command->timeout(), commandCommitCount)); + server._futureCommitCommands.insert(make_pair(commandCommitCount, move(command))); + + // Don't count this as `in progress`, it's just sitting there. + if (newQueueSize > 100) { + SHMMM("server._futureCommitCommands.size() == " << newQueueSize); + } + continue; + } + // If this command is already complete, then we should be a follower, and the sync node got a response back // from a command that had been escalated to leader, and queued it for a worker to respond to. We'll send // that response now. diff --git a/Makefile b/Makefile index 703fd7888..69678ff98 100644 --- a/Makefile +++ b/Makefile @@ -80,6 +80,7 @@ TESTOBJ = $(TESTCPP:%.cpp=$(INTERMEDIATEDIR)/%.o) TESTDEP = $(TESTCPP:%.cpp=$(INTERMEDIATEDIR)/%.d) CLUSTERTESTCPP = $(shell find test -name '*.cpp' -not -path 'test/tests*' -not -path "test/main.cpp") +CLUSTERTESTCPP += test/tests/jobs/JobTestHelper.cpp CLUSTERTESTOBJ = $(CLUSTERTESTCPP:%.cpp=$(INTERMEDIATEDIR)/%.o) CLUSTERTESTDEP = $(CLUSTERTESTCPP:%.cpp=$(INTERMEDIATEDIR)/%.d) diff --git a/plugins/Jobs.cpp b/plugins/Jobs.cpp index f9cbfcc79..5db516fb4 100644 --- a/plugins/Jobs.cpp +++ b/plugins/Jobs.cpp @@ -25,6 +25,13 @@ const set BedrockPlugin_Jobs::supportedRequestVerbs = { "RequeueJobs", }; +bool BedrockJobsCommand::canEscalateImmediately(SQLiteCommand& baseCommand) { + // This is a set of commands that we will escalate to leader without waiting. It's not intended to be complete but + // to solve the biggest issues we have with slow escalation times (i.e., this is usually a problem for `FinishJob`). + static const set commands = {"CreateJob", "CreateJobs", "FinishJob"}; + return commands.count(baseCommand.request.methodLine); +} + // Disable noop mode for the lifetime of this object. class scopedDisableNoopMode { public: @@ -45,7 +52,7 @@ class scopedDisableNoopMode { }; BedrockJobsCommand::BedrockJobsCommand(SQLiteCommand&& baseCommand, BedrockPlugin_Jobs* plugin) : - BedrockCommand(move(baseCommand), plugin) + BedrockCommand(move(baseCommand), plugin, canEscalateImmediately(baseCommand)) { } @@ -103,6 +110,74 @@ void BedrockPlugin_Jobs::upgradeDatabase(SQLite& db) { SASSERT(db.verifyIndex("jobsName", "jobs", "( name )", false, !BedrockPlugin_Jobs::isLive)); SASSERT(db.verifyIndex("jobsParentJobIDState", "jobs", "( parentJobID, state ) WHERE parentJobID != 0", false, !BedrockPlugin_Jobs::isLive)); SASSERT(db.verifyIndex("jobsStatePriorityNextRunName", "jobs", "( state, priority, nextRun, name )", false, !BedrockPlugin_Jobs::isLive)); + + // The above is out-of-date. Currently we have: + // jobsName + // jobsStatePriorityNextRunName + // jobsParentJobIDState + // jobsPriorityNextRunManualSmartScanMerchantAndCategory + // jobsPriorityNextRunManualSmartScanAmountAndCurrency + // jobsPriorityNextRunManualSmartScanCreated + // jobsPriorityNextRunManualSmartScanIsCash + // jobsPriorityNextRunManualSmartScan + // jobsManualSmartscanReceiptID + // jobsPriorityNextRunWWWProd + // + // The most problematic two, from a conflict perspective, seem to be: + // 44527 (read/write page; part of db index jobs.jobsPriorityNextRunWWWProd; + // 17672 (read/write page; part of db index jobs.jobsStatePriorityNextRunName; + // 6070 (read-only page; part of db index jobs.jobsPriorityNextRunWWWProd; + // 4932 (read-only page; part of db index jobs.jobsStatePriorityNextRunName; + // 4117 (read/write page; part of db table jobs; + // 440 (read/write page; part of db index jobs.jobsName; + // 402 (read-only page; part of db index jobs.jobsName; + // 288 (read-only page; part of db index jobs.jobsParentJobIDState; + // 259 (read/write page; part of db index jobs.jobsParentJobIDState; + + // Potential sharded indexes: + //CREATE INDEX jobsPriorityNextRunWWWProd0 ON jobs (priority, nextRun) WHERE jobID%4=0 AND state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'www-prod/*'; + //CREATE INDEX jobsPriorityNextRunWWWProd1 ON jobs (priority, nextRun) WHERE jobID%4=1 AND state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'www-prod/*'; + //CREATE INDEX jobsPriorityNextRunWWWProd2 ON jobs (priority, nextRun) WHERE jobID%4=2 AND state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'www-prod/*'; + //CREATE INDEX jobsPriorityNextRunWWWProd3 ON jobs (priority, nextRun) WHERE jobID%4=3 AND state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'www-prod/*'; + // + // Production indexes + // name,sql + // sqlite_autoindex_cache_1, + // sqlite_autoindex_smartScanAgents_1, + // jobsName,"CREATE INDEX jobsName ON jobs ( name )" + // chatsAccountID,"CREATE INDEX chatsAccountID ON chats (accountID)" + // chatsStateID,"CREATE INDEX chatsStateID ON chats (stateID)" + // chatEventsStateID,"CREATE INDEX chatEventsStateID ON chatEvents (stateID)" + // chatInputChatID,"CREATE INDEX chatInputChatID ON chatInputs (chatID)" + // chatInputStateID,"CREATE INDEX chatInputStateID ON chatInputs (stateID)" + // chatInputToStateID,"CREATE INDEX chatInputToStateID ON chatInputs (toStateID)" + // jobsStatePriorityNextRunName,"CREATE INDEX jobsStatePriorityNextRunName ON jobs ( state, priority, nextRun, name )" + // chatsChannel,"CREATE INDEX chatsChannel ON chats (channel)" + // merchantsToMccMerchantOccurrences,"CREATE INDEX merchantsToMccMerchantOccurrences ON merchantsToMcc( merchant, occurrences )" + // merchantsToMccMerchantMcc,"CREATE INDEX merchantsToMccMerchantMcc ON merchantsToMcc( merchant, mcc )" + // chatFragmentInputID,"CREATE INDEX chatFragmentInputID ON chatFragments ( inputID )" + // chatFragmentEventID,"CREATE INDEX chatFragmentEventID ON chatFragments ( eventID )" + // chatFragmentFragment,"CREATE INDEX chatFragmentFragment ON chatFragments ( fragment )" + // chatInputHistoryCreated,"CREATE INDEX chatInputHistoryCreated ON chatInputs(JSON_type(message, '$.history'), created, JSON_EXTRACT(message, '$.history'))" + // chatInputEventIDCreated,"CREATE INDEX chatInputEventIDCreated ON chatInputs (eventID, created)" + // jobsParentJobIDState,"CREATE INDEX jobsParentJobIDState ON jobs ( parentJobID, state ) WHERE parentJobID != 0" + // chatFragmentID,"CREATE INDEX chatFragmentID ON chatFragments2 ( fragmentID )" + // chatFragmentString,"CREATE INDEX chatFragmentString ON chatFragments2 ( fragmentString )" + // chatFragmentMapFragmentID,"CREATE INDEX chatFragmentMapFragmentID ON chatFragmentMap (fragmentID)" + // jobsPriorityNextRunManualSmartScanMerchantAndCategory,"CREATE INDEX jobsPriorityNextRunManualSmartScanMerchantAndCategory ON jobs (priority, nextRun) WHERE state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'manual/SmartScanMerchantAndCategory*'" + // jobsPriorityNextRunManualSmartScanAmountAndCurrency,"CREATE INDEX jobsPriorityNextRunManualSmartScanAmountAndCurrency ON jobs (priority, nextRun) WHERE state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'manual/SmartScanAmountAndCurrency*'" + // jobsPriorityNextRunManualSmartScanCreated,"CREATE INDEX jobsPriorityNextRunManualSmartScanCreated ON jobs (priority, nextRun) WHERE state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'manual/SmartScanCreated*'" + // jobsPriorityNextRunManualSmartScanIsCash,"CREATE INDEX jobsPriorityNextRunManualSmartScanIsCash ON jobs (priority, nextRun) WHERE state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'manual/SmartScanIsCash*'" + // jobsPriorityNextRunManualSmartScan,"CREATE INDEX jobsPriorityNextRunManualSmartScan ON jobs (priority, nextRun) WHERE state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'manual/SmartScan*'" + // jobsManualSmartscanReceiptID,"CREATE INDEX jobsManualSmartscanReceiptID ON jobs ( JSON_EXTRACT(data, '$.receiptID') ) WHERE JSON_VALID(data) AND name GLOB 'manual/SmartScan*'" + // jobsPriorityNextRunWWWProd,"CREATE INDEX jobsPriorityNextRunWWWProd ON jobs (priority, nextRun) WHERE state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'www-prod/*'" + // chatInputEventID,"CREATE INDEX chatInputEventID ON chatInputs (eventID)" + // agentActivityTimestamp,"CREATE INDEX agentActivityTimestamp ON agentActivity ( timestamp )" + // chatInputParticipantCreated,"CREATE INDEX chatInputParticipantCreated ON chatInputs (JSON_TYPE(message, '$.participants'), created, JSON_EXTRACT(message, '$.participants'))" + // + // Notes: + // It seems almost certain that it's the "priority" portion of the index that's conflicting, as it's re-used + // everywhere with only a few valid values, so almost everything will hit it. But let me look at nextRun. } // ========================================================================== diff --git a/plugins/Jobs.h b/plugins/Jobs.h index 38af1e386..83221d773 100644 --- a/plugins/Jobs.h +++ b/plugins/Jobs.h @@ -39,4 +39,6 @@ class BedrockJobsCommand : public BedrockCommand { void _validatePriority(const int64_t priority); bool mockRequest; + + bool canEscalateImmediately(SQLiteCommand& baseCommand); }; diff --git a/test/clustertest/tests/ConflictSpamTest.cpp b/test/clustertest/tests/ConflictSpamTest.cpp index da4c1e3ee..51b1965ef 100644 --- a/test/clustertest/tests/ConflictSpamTest.cpp +++ b/test/clustertest/tests/ConflictSpamTest.cpp @@ -48,7 +48,7 @@ struct ConflictSpamTest : tpunit::TestFixture { for (int h = 0; h <= 4; h++) { for (int i : {0, 1, 2}) { BedrockTester& brtester = tester->getTester(i); - SData query("idcollision b"); + SData query("idcollision"); // What if we throw in a few sync commands? query["writeConsistency"] = "ASYNC"; int cmdNum = cmdID.fetch_add(1); @@ -98,7 +98,7 @@ struct ConflictSpamTest : tpunit::TestFixture { vector requests; int numCommands = 200; for (int j = 0; j < numCommands; j++) { - SData query("idcollision b2"); + SData query("idcollision"); query["writeConsistency"] = "ASYNC"; int cmdNum = cmdID.fetch_add(1); query["value"] = "sent-" + to_string(cmdNum); diff --git a/test/tests/jobs/FinishJobTest.cpp b/test/clustertest/tests/FinishJobTest.cpp similarity index 90% rename from test/tests/jobs/FinishJobTest.cpp rename to test/clustertest/tests/FinishJobTest.cpp index 185b4f0a9..accd445f3 100644 --- a/test/tests/jobs/FinishJobTest.cpp +++ b/test/clustertest/tests/FinishJobTest.cpp @@ -1,4 +1,4 @@ -#include +#include "../BedrockClusterTester.h" #include struct FinishJobTest : tpunit::TestFixture { @@ -23,9 +23,13 @@ struct FinishJobTest : tpunit::TestFixture { AFTER(FinishJobTest::tearDown), AFTER_CLASS(FinishJobTest::tearDownClass)) { } + BedrockClusterTester* clusterTester; BedrockTester* tester; - void setupClass() { tester = new BedrockTester(_threadID, {{"-plugins", "Jobs,DB"}}, {});} + void setupClass() { + clusterTester = new BedrockClusterTester(ClusterSize::THREE_NODE_CLUSTER, {}, _threadID); + tester = &(clusterTester->getTester(1)); + } // Reset the jobs table void tearDown() { @@ -34,7 +38,9 @@ struct FinishJobTest : tpunit::TestFixture { tester->executeWaitVerifyContent(command); } - void tearDownClass() { delete tester; } + void tearDownClass() { + delete clusterTester; + } // Throw an error if the job doesn't exist void nonExistentJob() { @@ -158,7 +164,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the parent is set to QUEUED SQResult result; - tester->readDB("SELECT state FROM jobs WHERE jobID = " + parentID + ";", result); + clusterTester->getTester(0).readDB("SELECT state FROM jobs WHERE jobID = " + parentID + ";", result); ASSERT_EQUAL(result[0][0], "QUEUED"); // Finish the parent @@ -172,7 +178,7 @@ struct FinishJobTest : tpunit::TestFixture { tester->executeWaitVerifyContent(command); // Confirm that the FINISHED and CANCELLED children are deleted - tester->readDB("SELECT count(*) FROM jobs WHERE jobID != " + parentID + " AND JSON_EXTRACT(data, '$.mockRequest') IS NULL;", result); + clusterTester->getTester(0).readDB("SELECT count(*) FROM jobs WHERE jobID != " + parentID + " AND JSON_EXTRACT(data, '$.mockRequest') IS NULL;", result); ASSERT_EQUAL(SToInt(result[0][0]), 0); } @@ -203,7 +209,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the data updated SQResult result; - tester->readDB("SELECT data FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT data FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_EQUAL(result[0][0], SComposeJSONObject(data)); } @@ -250,7 +256,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm that the parent is in the PAUSED state and the children are in the QUEUED state SQResult result; list ids = {parentID, finishedChildID, cancelledChildID}; - tester->readDB("SELECT jobID, state FROM jobs WHERE jobID IN(" + SComposeList(ids) + ");", result); + clusterTester->getTester(0).readDB("SELECT jobID, state FROM jobs WHERE jobID IN(" + SComposeList(ids) + ");", result); ASSERT_EQUAL(result.rows.size(), 3); for (auto& row : result.rows) { if (row[0] == parentID) { @@ -286,7 +292,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the job was deleted SQResult result; - tester->readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_TRUE(result.empty()); } @@ -322,7 +328,7 @@ struct FinishJobTest : tpunit::TestFixture { // Get the nextRun value SQResult result; - tester->readDB("SELECT nextRun FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT nextRun FROM jobs WHERE jobID = " + jobID + ";", result); string originalNextRun = result[0][0]; // Get the job @@ -339,7 +345,7 @@ struct FinishJobTest : tpunit::TestFixture { tester->executeWaitVerifyContent(command); // Assert the new nextRun time is 5 seconds after the original nextRun time - tester->readDB("SELECT nextRun FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT nextRun FROM jobs WHERE jobID = " + jobID + ";", result); time_t currentNextRunTime = JobTestHelper::getTimestampForDateTimeString(result[0][0]); time_t originalNextRunTime = JobTestHelper::getTimestampForDateTimeString(originalNextRun); ASSERT_EQUAL(difftime(currentNextRunTime, originalNextRunTime), 5); @@ -368,7 +374,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm nextRun is in 1 hour from the created time SQResult result; - tester->readDB("SELECT lastRun, nextRun FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT lastRun, nextRun FROM jobs WHERE jobID = " + jobID + ";", result); time_t createdTime = JobTestHelper::getTimestampForDateTimeString(result[0][0]); time_t nextRunTime = JobTestHelper::getTimestampForDateTimeString(result[0][1]); ASSERT_EQUAL(difftime(nextRunTime, createdTime), 3600); @@ -391,7 +397,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the job is in RUNQUEUED SQResult result; - tester->readDB("SELECT state FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT state FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_EQUAL(result[0][0], "RUNQUEUED"); // Finish it @@ -401,7 +407,7 @@ struct FinishJobTest : tpunit::TestFixture { tester->executeWaitVerifyContent(command); // Finishing the job should remove it from the table - tester->readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_TRUE(result.empty()); } @@ -429,7 +435,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm nextRun is in 1 hour, not in the 5 second delay SQResult result; - tester->readDB("SELECT lastRun, nextRun FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT lastRun, nextRun FROM jobs WHERE jobID = " + jobID + ";", result); struct tm tm1; struct tm tm2; strptime(result[0][0].c_str(), "%Y-%m-%d %H:%M:%S", &tm1); @@ -462,7 +468,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the job was deleted instead of being rescheduled SQResult result; - tester->readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_TRUE(result.empty()); } @@ -490,7 +496,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm nextRun is in 1 hour, not in the given nextRun time SQResult result; - tester->readDB("SELECT lastRun, nextRun FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT lastRun, nextRun FROM jobs WHERE jobID = " + jobID + ";", result); struct tm tm1; struct tm tm2; strptime(result[0][0].c_str(), "%Y-%m-%d %H:%M:%S", &tm1); @@ -526,7 +532,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the job was deleted instead of being rescheduled SQResult result; - tester->readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_TRUE(result.empty()); } @@ -554,7 +560,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the job was deleted instead of being rescheduled SQResult result; - tester->readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_TRUE(result.empty()); } @@ -579,7 +585,7 @@ struct FinishJobTest : tpunit::TestFixture { // Confirm the job was deleted SQResult result; - tester->readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); + clusterTester->getTester(0).readDB("SELECT * FROM jobs WHERE jobID = " + jobID + ";", result); ASSERT_TRUE(result.empty()); } } __FinishJobTest; diff --git a/test/lib/BedrockTester.cpp b/test/lib/BedrockTester.cpp index 50432a77a..ad5aeefbf 100644 --- a/test/lib/BedrockTester.cpp +++ b/test/lib/BedrockTester.cpp @@ -103,6 +103,7 @@ BedrockTester::BedrockTester(int threadID, const map& args, {"-enableMultiWrite", "true"}, {"-cacheSize", "1000"}, {"-parallelReplication", "true"}, + {"-logConflictingQueriesInCommands", "idcollision"}, }; // Set defaults. From 29a962642a2f9eb06696559088fddfc0a43c69d1 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Mon, 17 Aug 2020 16:52:21 -0700 Subject: [PATCH 2/5] remove giant comment for testing --- plugins/Jobs.cpp | 68 ------------------------------------------------ 1 file changed, 68 deletions(-) diff --git a/plugins/Jobs.cpp b/plugins/Jobs.cpp index 5db516fb4..94fb038a9 100644 --- a/plugins/Jobs.cpp +++ b/plugins/Jobs.cpp @@ -110,74 +110,6 @@ void BedrockPlugin_Jobs::upgradeDatabase(SQLite& db) { SASSERT(db.verifyIndex("jobsName", "jobs", "( name )", false, !BedrockPlugin_Jobs::isLive)); SASSERT(db.verifyIndex("jobsParentJobIDState", "jobs", "( parentJobID, state ) WHERE parentJobID != 0", false, !BedrockPlugin_Jobs::isLive)); SASSERT(db.verifyIndex("jobsStatePriorityNextRunName", "jobs", "( state, priority, nextRun, name )", false, !BedrockPlugin_Jobs::isLive)); - - // The above is out-of-date. Currently we have: - // jobsName - // jobsStatePriorityNextRunName - // jobsParentJobIDState - // jobsPriorityNextRunManualSmartScanMerchantAndCategory - // jobsPriorityNextRunManualSmartScanAmountAndCurrency - // jobsPriorityNextRunManualSmartScanCreated - // jobsPriorityNextRunManualSmartScanIsCash - // jobsPriorityNextRunManualSmartScan - // jobsManualSmartscanReceiptID - // jobsPriorityNextRunWWWProd - // - // The most problematic two, from a conflict perspective, seem to be: - // 44527 (read/write page; part of db index jobs.jobsPriorityNextRunWWWProd; - // 17672 (read/write page; part of db index jobs.jobsStatePriorityNextRunName; - // 6070 (read-only page; part of db index jobs.jobsPriorityNextRunWWWProd; - // 4932 (read-only page; part of db index jobs.jobsStatePriorityNextRunName; - // 4117 (read/write page; part of db table jobs; - // 440 (read/write page; part of db index jobs.jobsName; - // 402 (read-only page; part of db index jobs.jobsName; - // 288 (read-only page; part of db index jobs.jobsParentJobIDState; - // 259 (read/write page; part of db index jobs.jobsParentJobIDState; - - // Potential sharded indexes: - //CREATE INDEX jobsPriorityNextRunWWWProd0 ON jobs (priority, nextRun) WHERE jobID%4=0 AND state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'www-prod/*'; - //CREATE INDEX jobsPriorityNextRunWWWProd1 ON jobs (priority, nextRun) WHERE jobID%4=1 AND state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'www-prod/*'; - //CREATE INDEX jobsPriorityNextRunWWWProd2 ON jobs (priority, nextRun) WHERE jobID%4=2 AND state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'www-prod/*'; - //CREATE INDEX jobsPriorityNextRunWWWProd3 ON jobs (priority, nextRun) WHERE jobID%4=3 AND state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'www-prod/*'; - // - // Production indexes - // name,sql - // sqlite_autoindex_cache_1, - // sqlite_autoindex_smartScanAgents_1, - // jobsName,"CREATE INDEX jobsName ON jobs ( name )" - // chatsAccountID,"CREATE INDEX chatsAccountID ON chats (accountID)" - // chatsStateID,"CREATE INDEX chatsStateID ON chats (stateID)" - // chatEventsStateID,"CREATE INDEX chatEventsStateID ON chatEvents (stateID)" - // chatInputChatID,"CREATE INDEX chatInputChatID ON chatInputs (chatID)" - // chatInputStateID,"CREATE INDEX chatInputStateID ON chatInputs (stateID)" - // chatInputToStateID,"CREATE INDEX chatInputToStateID ON chatInputs (toStateID)" - // jobsStatePriorityNextRunName,"CREATE INDEX jobsStatePriorityNextRunName ON jobs ( state, priority, nextRun, name )" - // chatsChannel,"CREATE INDEX chatsChannel ON chats (channel)" - // merchantsToMccMerchantOccurrences,"CREATE INDEX merchantsToMccMerchantOccurrences ON merchantsToMcc( merchant, occurrences )" - // merchantsToMccMerchantMcc,"CREATE INDEX merchantsToMccMerchantMcc ON merchantsToMcc( merchant, mcc )" - // chatFragmentInputID,"CREATE INDEX chatFragmentInputID ON chatFragments ( inputID )" - // chatFragmentEventID,"CREATE INDEX chatFragmentEventID ON chatFragments ( eventID )" - // chatFragmentFragment,"CREATE INDEX chatFragmentFragment ON chatFragments ( fragment )" - // chatInputHistoryCreated,"CREATE INDEX chatInputHistoryCreated ON chatInputs(JSON_type(message, '$.history'), created, JSON_EXTRACT(message, '$.history'))" - // chatInputEventIDCreated,"CREATE INDEX chatInputEventIDCreated ON chatInputs (eventID, created)" - // jobsParentJobIDState,"CREATE INDEX jobsParentJobIDState ON jobs ( parentJobID, state ) WHERE parentJobID != 0" - // chatFragmentID,"CREATE INDEX chatFragmentID ON chatFragments2 ( fragmentID )" - // chatFragmentString,"CREATE INDEX chatFragmentString ON chatFragments2 ( fragmentString )" - // chatFragmentMapFragmentID,"CREATE INDEX chatFragmentMapFragmentID ON chatFragmentMap (fragmentID)" - // jobsPriorityNextRunManualSmartScanMerchantAndCategory,"CREATE INDEX jobsPriorityNextRunManualSmartScanMerchantAndCategory ON jobs (priority, nextRun) WHERE state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'manual/SmartScanMerchantAndCategory*'" - // jobsPriorityNextRunManualSmartScanAmountAndCurrency,"CREATE INDEX jobsPriorityNextRunManualSmartScanAmountAndCurrency ON jobs (priority, nextRun) WHERE state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'manual/SmartScanAmountAndCurrency*'" - // jobsPriorityNextRunManualSmartScanCreated,"CREATE INDEX jobsPriorityNextRunManualSmartScanCreated ON jobs (priority, nextRun) WHERE state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'manual/SmartScanCreated*'" - // jobsPriorityNextRunManualSmartScanIsCash,"CREATE INDEX jobsPriorityNextRunManualSmartScanIsCash ON jobs (priority, nextRun) WHERE state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'manual/SmartScanIsCash*'" - // jobsPriorityNextRunManualSmartScan,"CREATE INDEX jobsPriorityNextRunManualSmartScan ON jobs (priority, nextRun) WHERE state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'manual/SmartScan*'" - // jobsManualSmartscanReceiptID,"CREATE INDEX jobsManualSmartscanReceiptID ON jobs ( JSON_EXTRACT(data, '$.receiptID') ) WHERE JSON_VALID(data) AND name GLOB 'manual/SmartScan*'" - // jobsPriorityNextRunWWWProd,"CREATE INDEX jobsPriorityNextRunWWWProd ON jobs (priority, nextRun) WHERE state IN ('QUEUED', 'RUNQUEUED') AND name GLOB 'www-prod/*'" - // chatInputEventID,"CREATE INDEX chatInputEventID ON chatInputs (eventID)" - // agentActivityTimestamp,"CREATE INDEX agentActivityTimestamp ON agentActivity ( timestamp )" - // chatInputParticipantCreated,"CREATE INDEX chatInputParticipantCreated ON chatInputs (JSON_TYPE(message, '$.participants'), created, JSON_EXTRACT(message, '$.participants'))" - // - // Notes: - // It seems almost certain that it's the "priority" portion of the index that's conflicting, as it's re-used - // everywhere with only a few valid values, so almost everything will hit it. But let me look at nextRun. } // ========================================================================== From 5c000243a992fccdaa8e88c5c278315544dad6db Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Mon, 17 Aug 2020 16:55:12 -0700 Subject: [PATCH 3/5] Revert other test changes --- plugins/Jobs.h | 1 + test/clustertest/tests/ConflictSpamTest.cpp | 4 ++-- test/lib/BedrockTester.cpp | 1 - 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/Jobs.h b/plugins/Jobs.h index 83221d773..f1e809a2d 100644 --- a/plugins/Jobs.h +++ b/plugins/Jobs.h @@ -40,5 +40,6 @@ class BedrockJobsCommand : public BedrockCommand { bool mockRequest; + // Returns true if this command can skip straight to leader for process. bool canEscalateImmediately(SQLiteCommand& baseCommand); }; diff --git a/test/clustertest/tests/ConflictSpamTest.cpp b/test/clustertest/tests/ConflictSpamTest.cpp index 51b1965ef..da4c1e3ee 100644 --- a/test/clustertest/tests/ConflictSpamTest.cpp +++ b/test/clustertest/tests/ConflictSpamTest.cpp @@ -48,7 +48,7 @@ struct ConflictSpamTest : tpunit::TestFixture { for (int h = 0; h <= 4; h++) { for (int i : {0, 1, 2}) { BedrockTester& brtester = tester->getTester(i); - SData query("idcollision"); + SData query("idcollision b"); // What if we throw in a few sync commands? query["writeConsistency"] = "ASYNC"; int cmdNum = cmdID.fetch_add(1); @@ -98,7 +98,7 @@ struct ConflictSpamTest : tpunit::TestFixture { vector requests; int numCommands = 200; for (int j = 0; j < numCommands; j++) { - SData query("idcollision"); + SData query("idcollision b2"); query["writeConsistency"] = "ASYNC"; int cmdNum = cmdID.fetch_add(1); query["value"] = "sent-" + to_string(cmdNum); diff --git a/test/lib/BedrockTester.cpp b/test/lib/BedrockTester.cpp index ad5aeefbf..50432a77a 100644 --- a/test/lib/BedrockTester.cpp +++ b/test/lib/BedrockTester.cpp @@ -103,7 +103,6 @@ BedrockTester::BedrockTester(int threadID, const map& args, {"-enableMultiWrite", "true"}, {"-cacheSize", "1000"}, {"-parallelReplication", "true"}, - {"-logConflictingQueriesInCommands", "idcollision"}, }; // Set defaults. From 10b3049686ccd79e788c1f51b39a4b4d57212bf7 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Mon, 17 Aug 2020 17:28:14 -0700 Subject: [PATCH 4/5] Attempt to make Travis tests more reliable --- test/clustertest/tests/FinishJobTest.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/clustertest/tests/FinishJobTest.cpp b/test/clustertest/tests/FinishJobTest.cpp index accd445f3..173bc1a63 100644 --- a/test/clustertest/tests/FinishJobTest.cpp +++ b/test/clustertest/tests/FinishJobTest.cpp @@ -35,7 +35,7 @@ struct FinishJobTest : tpunit::TestFixture { void tearDown() { SData command("Query"); command["query"] = "DELETE FROM jobs WHERE jobID > 0;"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); } void tearDownClass() { @@ -93,7 +93,7 @@ struct FinishJobTest : tpunit::TestFixture { command.clear(); command.methodLine = "Query"; command["query"] = "UPDATE jobs SET state = 'RUNNING' WHERE jobID = " + childID + ";"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Finish the child command.clear(); @@ -154,7 +154,7 @@ struct FinishJobTest : tpunit::TestFixture { command.clear(); command.methodLine = "Query"; command["Query"] = "DELETE FROM jobs WHERE parentJobID = " + parentID + " AND JSON_EXTRACT(data, '$.mockRequest') IS NOT NULL;"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Finish a child command.clear(); @@ -245,7 +245,7 @@ struct FinishJobTest : tpunit::TestFixture { command.clear(); command.methodLine = "Query"; command["Query"] = "DELETE FROM jobs WHERE parentJobID = " + parentID + " AND JSON_EXTRACT(data, '$.mockRequest') IS NOT NULL;"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Finish the parent command.clear(); From fb9caf2b38a68cd0842bfb61fc65c93a001de407 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Mon, 17 Aug 2020 18:15:49 -0700 Subject: [PATCH 5/5] Use leader for removeFinishedAndCancelledChildren to make travis pass --- test/clustertest/tests/FinishJobTest.cpp | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/test/clustertest/tests/FinishJobTest.cpp b/test/clustertest/tests/FinishJobTest.cpp index 173bc1a63..49fcc39b1 100644 --- a/test/clustertest/tests/FinishJobTest.cpp +++ b/test/clustertest/tests/FinishJobTest.cpp @@ -107,27 +107,27 @@ struct FinishJobTest : tpunit::TestFixture { // Create the parent SData command("CreateJob"); command["name"] = "parent"; - STable response = tester->executeWaitVerifyContentTable(command); + STable response = clusterTester->getTester(0).executeWaitVerifyContentTable(command); string parentID = response["jobID"]; // Get the parent command.clear(); command.methodLine = "GetJob"; command["name"] = "parent"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Create the children command.clear(); command.methodLine = "CreateJob"; command["name"] = "child_finished"; command["parentJobID"] = parentID; - response = tester->executeWaitVerifyContentTable(command); + response = clusterTester->getTester(0).executeWaitVerifyContentTable(command); string finishedChildID = response["jobID"]; command.clear(); command.methodLine = "CreateJob"; command["name"] = "child_cancelled"; command["parentJobID"] = parentID; - response = tester->executeWaitVerifyContentTable(command); + response = clusterTester->getTester(0).executeWaitVerifyContentTable(command); string cancelledChildID = response["jobID"]; command.clear(); @@ -135,20 +135,20 @@ struct FinishJobTest : tpunit::TestFixture { command.clear(); command.methodLine = "FinishJob"; command["jobID"] = parentID; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Get the child job command.clear(); command.methodLine = "GetJob"; command["name"] = "child_finished"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Cancel a child // if this goes 2nd this doesn't requeue the parent job command.clear(); command.methodLine = "CancelJob"; command["jobID"] = cancelledChildID; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // The parent may have other children from mock requests, delete them. command.clear(); @@ -160,7 +160,7 @@ struct FinishJobTest : tpunit::TestFixture { command.clear(); command.methodLine = "FinishJob"; command["jobID"] = finishedChildID; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Confirm the parent is set to QUEUED SQResult result; @@ -171,11 +171,11 @@ struct FinishJobTest : tpunit::TestFixture { command.clear(); command.methodLine = "GetJob"; command["name"] = "parent"; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); command.clear(); command.methodLine = "FinishJob"; command["jobID"] = parentID; - tester->executeWaitVerifyContent(command); + clusterTester->getTester(0).executeWaitVerifyContent(command); // Confirm that the FINISHED and CANCELLED children are deleted clusterTester->getTester(0).readDB("SELECT count(*) FROM jobs WHERE jobID != " + parentID + " AND JSON_EXTRACT(data, '$.mockRequest') IS NULL;", result);