From e1f446df39a5b526a504d343d2dd362dd6953d5b Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Tue, 19 Sep 2023 10:41:57 -0700 Subject: [PATCH 1/3] Make DB reads logically const --- sqlitecluster/SQLite.cpp | 8 ++++---- sqlitecluster/SQLite.h | 28 ++++++++++++++-------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/sqlitecluster/SQLite.cpp b/sqlitecluster/SQLite.cpp index 9ed1455c4..978fc66a4 100644 --- a/sqlitecluster/SQLite.cpp +++ b/sqlitecluster/SQLite.cpp @@ -445,7 +445,7 @@ bool SQLite::addColumn(const string& tableName, const string& column, const stri return false; } -string SQLite::read(const string& query) { +string SQLite::read(const string& query) const { // Execute the read-only query SQResult result; if (!read(query, result)) { @@ -457,7 +457,7 @@ string SQLite::read(const string& query) { return result[0][0]; } -bool SQLite::read(const string& query, SQResult& result) { +bool SQLite::read(const string& query, SQResult& result) const { uint64_t before = STimeNow(); bool queryResult = false; _queryCount++; @@ -478,7 +478,7 @@ bool SQLite::read(const string& query, SQResult& result) { return queryResult; } -void SQLite::_checkInterruptErrors(const string& error) { +void SQLite::_checkInterruptErrors(const string& error) const { // Local error code. int errorCode = 0; @@ -1011,7 +1011,7 @@ void SQLite::startTiming(uint64_t timeLimitUS) { _timeoutError = 0; } -void SQLite::resetTiming() { +void SQLite::resetTiming() const { _timeoutLimit = 0; _timeoutStart = 0; _timeoutError = 0; diff --git a/sqlitecluster/SQLite.h b/sqlitecluster/SQLite.h index a2dc2ec13..b95ced868 100644 --- a/sqlitecluster/SQLite.h +++ b/sqlitecluster/SQLite.h @@ -75,10 +75,10 @@ class SQLite { // Performs a read-only query (eg, SELECT). This can be done inside or outside a transaction. Returns true on // success, and fills the 'result' with the result of the query. - bool read(const string& query, SQResult& result); + bool read(const string& query, SQResult& result) const; // Performs a read-only query (eg, SELECT) that returns a single value. - string read(const string& query); + string read(const string& query) const; // Types of transactions that we can begin. enum class TRANSACTION_TYPE { @@ -227,7 +227,7 @@ class SQLite { void startTiming(uint64_t timeLimitUS); // Reset timing after finishing a timed operation. - void resetTiming(); + void resetTiming() const; // This atomically removes and returns committed transactions from our internal list. SQLiteNode can call this, and // it will return a map of transaction IDs to tuples of (query, hash, dbCountAtTransactionStart), so that those @@ -385,8 +385,8 @@ class SQLite { uint64_t _dbCountAtStart = 0; // Timing information. - uint64_t _beginElapsed = 0; - uint64_t _readElapsed = 0; + mutable uint64_t _beginElapsed = 0; + mutable uint64_t _readElapsed = 0; uint64_t _writeElapsed = 0; uint64_t _prepareElapsed = 0; uint64_t _commitElapsed = 0; @@ -459,13 +459,13 @@ class SQLite { // Registering this has the important side effect of preventing the DB from auto-checkpointing. static int _walHookCallback(void* sqliteObject, sqlite3* db, const char* name, int walFileSize); - uint64_t _timeoutLimit = 0; - uint64_t _timeoutStart; - uint64_t _timeoutError; + mutable uint64_t _timeoutLimit = 0; + mutable uint64_t _timeoutStart; + mutable uint64_t _timeoutError; // Check out various error cases that can interrupt a query. // We check them all together because we need to make sure we atomically pick a single one to handle. - void _checkInterruptErrors(const string& error); + void _checkInterruptErrors(const string& error) const; // Called internally by _sqliteAuthorizerCallback to authorize columns for a query. int _authorize(int actionCode, const char* detail1, const char* detail2, const char* detail3, const char* detail4); @@ -476,28 +476,28 @@ class SQLite { // `rollback` is called, we don't double-rollback, generating an error. This allows the externally visible SQLite // API to be consistent and not have to handle this special case. Consumers can just always call `rollback` after a // failed query, regardless of whether or not it was already rolled back internally. - bool _autoRolledBack = false; + mutable bool _autoRolledBack = false; bool _noopUpdateMode = false; // A map of queries to their cached results. This is populated only with deterministic queries, and is reset on any // write, rollback, or commit. - map _queryCache; + mutable map _queryCache; // List of table names used during this transaction. set _tablesUsed; // Number of queries that have been attempted in this transaction (for metrics only). - int64_t _queryCount = 0; + mutable int64_t _queryCount = 0; // Number of queries found in cache in this transaction (for metrics only). - int64_t _cacheHits = 0; + mutable int64_t _cacheHits = 0; // A string indicating the name of the transaction (typically a command name) for metric purposes. string _transactionName; // Will be set to false while running a non-deterministic query to prevent it's result being cached. - bool _isDeterministicQuery = false; + mutable bool _isDeterministicQuery = false; // Copies of parameters used to initialize the DB that we store if we make child objects based on this one. int _cacheSize; From 3b86f69f5e9437a6d6c9c8d6ef3226379f3cd56e Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Tue, 19 Sep 2023 11:15:08 -0700 Subject: [PATCH 2/3] Clean up setting and clearing timeouts --- BedrockCore.cpp | 28 +++++++++------------------- sqlitecluster/SQLite.cpp | 5 ++--- sqlitecluster/SQLite.h | 8 ++++---- 3 files changed, 15 insertions(+), 26 deletions(-) diff --git a/BedrockCore.cpp b/BedrockCore.cpp index c7c1015f6..8c0b65e7f 100644 --- a/BedrockCore.cpp +++ b/BedrockCore.cpp @@ -75,11 +75,8 @@ void BedrockCore::prePeekCommand(unique_ptr& command) { try { try { SDEBUG("prePeeking at '" << request.methodLine << "' with priority: " << command->priority); - - uint64_t timeout = _getRemainingTime(command, false); command->prePeekCount++; - - _db.startTiming(timeout); + _db.setTimeout(_getRemainingTime(command, false)); if (!_db.beginTransaction(SQLite::TRANSACTION_TYPE::SHARED)) { STHROW("501 Failed to begin shared prePeek transaction"); @@ -121,7 +118,7 @@ void BedrockCore::prePeekCommand(unique_ptr& command) { // Back out of the current transaction, it doesn't need to do anything. _db.rollback(); - _db.resetTiming(); + _db.clearTimeout(); // Reset, we can write now. _db.setQueryOnly(false); @@ -141,10 +138,8 @@ BedrockCore::RESULT BedrockCore::peekCommand(unique_ptr& command RESULT returnValue = RESULT::COMPLETE; try { SDEBUG("Peeking at '" << request.methodLine << "' with priority: " << command->priority); - uint64_t timeout = _getRemainingTime(command, false); command->peekCount++; - - _db.startTiming(timeout); + _db.setTimeout(_getRemainingTime(command, false)); try { if (!_db.beginTransaction(exclusive ? SQLite::TRANSACTION_TYPE::EXCLUSIVE : SQLite::TRANSACTION_TYPE::SHARED)) { @@ -161,7 +156,7 @@ BedrockCore::RESULT BedrockCore::peekCommand(unique_ptr& command if (!completed) { SDEBUG("Command '" << request.methodLine << "' not finished in peek, re-queuing."); - _db.resetTiming(); + _db.clearTimeout(); _db.setQueryOnly(false); return RESULT::SHOULD_PROCESS; } @@ -212,7 +207,7 @@ BedrockCore::RESULT BedrockCore::peekCommand(unique_ptr& command // Back out of the current transaction, it doesn't need to do anything. _db.rollback(); - _db.resetTiming(); + _db.clearTimeout(); // Reset, we can write now. _db.setQueryOnly(false); @@ -243,11 +238,8 @@ BedrockCore::RESULT BedrockCore::processCommand(unique_ptr& comm bool needsCommit = false; try { SDEBUG("Processing '" << request.methodLine << "'"); - uint64_t timeout = _getRemainingTime(command, true); command->processCount++; - - // Time in US. - _db.startTiming(timeout); + _db.setTimeout(_getRemainingTime(command, true)); if (!_db.insideTransaction()) { // If a transaction was already begun in `peek`, then this won't run. We call it here to support the case where // peek created a httpsRequest and closed it's first transaction until the httpsRequest was complete, in which @@ -323,7 +315,7 @@ BedrockCore::RESULT BedrockCore::processCommand(unique_ptr& comm _db.setUpdateNoopMode(false); // We can reset the timing info for the next command. - _db.resetTiming(); + _db.clearTimeout(); // Done, return whether or not we need the parent to commit our transaction. command->complete = !needsCommit; @@ -342,10 +334,8 @@ void BedrockCore::postProcessCommand(unique_ptr& command) { // We catch any exception and handle in `_handleCommandException`. try { SDEBUG("postProcessing at '" << request.methodLine << "' with priority: " << command->priority); - uint64_t timeout = _getRemainingTime(command, false); command->postProcessCount++; - - _db.startTiming(timeout); + _db.setTimeout(_getRemainingTime(command, false)); if (!_db.beginTransaction(SQLite::TRANSACTION_TYPE::SHARED)) { STHROW("501 Failed to begin shared postProcess transaction"); @@ -388,7 +378,7 @@ void BedrockCore::postProcessCommand(unique_ptr& command) { // Back out of the current transaction, it doesn't need to do anything. _db.rollback(); - _db.resetTiming(); + _db.clearTimeout(); // Reset, we can write now. _db.setQueryOnly(false); diff --git a/sqlitecluster/SQLite.cpp b/sqlitecluster/SQLite.cpp index 978fc66a4..cb1d3cbf6 100644 --- a/sqlitecluster/SQLite.cpp +++ b/sqlitecluster/SQLite.cpp @@ -492,7 +492,6 @@ void SQLite::_checkInterruptErrors(const string& error) const { } if (_timeoutError) { time = _timeoutError; - resetTiming(); errorCode = 1; } } @@ -1005,13 +1004,13 @@ int SQLite::_authorize(int actionCode, const char* detail1, const char* detail2, return SQLITE_DENY; } -void SQLite::startTiming(uint64_t timeLimitUS) { +void SQLite::setTimeout(uint64_t timeLimitUS) { _timeoutStart = STimeNow(); _timeoutLimit = _timeoutStart + timeLimitUS; _timeoutError = 0; } -void SQLite::resetTiming() const { +void SQLite::clearTimeout() { _timeoutLimit = 0; _timeoutStart = 0; _timeoutError = 0; diff --git a/sqlitecluster/SQLite.h b/sqlitecluster/SQLite.h index b95ced868..6321ac6da 100644 --- a/sqlitecluster/SQLite.h +++ b/sqlitecluster/SQLite.h @@ -223,11 +223,11 @@ class SQLite { // Looks up a range of commits. bool getCommits(uint64_t fromIndex, uint64_t toIndex, SQResult& result); - // Start a timing operation, that will time out after the given number of microseconds. - void startTiming(uint64_t timeLimitUS); + // Set a time limit for this transaction, in US from the current time. + void setTimeout(uint64_t timeLimitUS); - // Reset timing after finishing a timed operation. - void resetTiming() const; + // Reset all timeout information to 0, to be ready for the next operation. + void clearTimeout(); // This atomically removes and returns committed transactions from our internal list. SQLiteNode can call this, and // it will return a map of transaction IDs to tuples of (query, hash, dbCountAtTransactionStart), so that those From 835a8cd52b963c4d451128b908deb9540334f808 Mon Sep 17 00:00:00 2001 From: Tyler Karaszewski Date: Tue, 19 Sep 2023 11:35:29 -0700 Subject: [PATCH 3/3] Fix crash and add test --- BedrockCore.cpp | 54 +++++++++++----------- test/clustertest/testplugin/TestPlugin.cpp | 17 +++++-- test/clustertest/tests/TimeoutTest.cpp | 8 ++++ 3 files changed, 50 insertions(+), 29 deletions(-) diff --git a/BedrockCore.cpp b/BedrockCore.cpp index 8c0b65e7f..c37715d2b 100644 --- a/BedrockCore.cpp +++ b/BedrockCore.cpp @@ -333,39 +333,41 @@ void BedrockCore::postProcessCommand(unique_ptr& command) { // We catch any exception and handle in `_handleCommandException`. try { - SDEBUG("postProcessing at '" << request.methodLine << "' with priority: " << command->priority); - command->postProcessCount++; - _db.setTimeout(_getRemainingTime(command, false)); + try { + SDEBUG("postProcessing at '" << request.methodLine << "' with priority: " << command->priority); + command->postProcessCount++; + _db.setTimeout(_getRemainingTime(command, false)); - if (!_db.beginTransaction(SQLite::TRANSACTION_TYPE::SHARED)) { - STHROW("501 Failed to begin shared postProcess transaction"); - } + if (!_db.beginTransaction(SQLite::TRANSACTION_TYPE::SHARED)) { + STHROW("501 Failed to begin shared postProcess transaction"); + } - // Make sure no writes happen while in postProcess command - _db.setQueryOnly(true); + // Make sure no writes happen while in postProcess command + _db.setQueryOnly(true); - // postProcess. - command->postProcess(_db); - SDEBUG("Plugin '" << command->getName() << "' postProcess command '" << request.methodLine << "'"); + // postProcess. + command->postProcess(_db); + SDEBUG("Plugin '" << command->getName() << "' postProcess command '" << request.methodLine << "'"); - // Success. If a command has set "content", encode it in the response. - SINFO("Responding '" << response.methodLine << "' to read-only '" << request.methodLine << "'."); - if (!content.empty()) { - // Make sure we're not overwriting anything different. - string newContent = SComposeJSONObject(content); - if (response.content != newContent) { - if (!response.content.empty()) { - SWARN("Replacing existing response content in " << request.methodLine); + // Success. If a command has set "content", encode it in the response. + SINFO("Responding '" << response.methodLine << "' to read-only '" << request.methodLine << "'."); + if (!content.empty()) { + // Make sure we're not overwriting anything different. + string newContent = SComposeJSONObject(content); + if (response.content != newContent) { + if (!response.content.empty()) { + SWARN("Replacing existing response content in " << request.methodLine); + } + response.content = newContent; } - response.content = newContent; } + } catch (const SQLite::timeout_error& e) { + // Some plugins want to alert timeout errors themselves, and make them silent on bedrock. + if (!command->shouldSuppressTimeoutWarnings()) { + SALERT("Command " << command->request.methodLine << " timed out after " << e.time()/1000 << "ms."); + } + STHROW("555 Timeout postProcessing command"); } - } catch (const SQLite::timeout_error& e) { - // Some plugins want to alert timeout errors themselves, and make them silent on bedrock. - if (!command->shouldSuppressTimeoutWarnings()) { - SALERT("Command " << command->request.methodLine << " timed out after " << e.time()/1000 << "ms."); - } - STHROW("555 Timeout postProcessing command"); } catch (const SException& e) { _handleCommandException(command, e); } catch (...) { diff --git a/test/clustertest/testplugin/TestPlugin.cpp b/test/clustertest/testplugin/TestPlugin.cpp index 9978f21ed..3a8144596 100644 --- a/test/clustertest/testplugin/TestPlugin.cpp +++ b/test/clustertest/testplugin/TestPlugin.cpp @@ -99,7 +99,8 @@ unique_ptr BedrockPlugin_TestPlugin::getCommand(SQLiteCommand&& "postprocesscommand", "prepeekpostprocesscommand", "preparehandler", - "testquery" + "testquery", + "testPostProcessTimeout" }; for (auto& cmdName : supportedCommands) { if (SStartsWith(baseCommand.request.methodLine, cmdName)) { @@ -158,8 +159,12 @@ bool TestPluginCommand::shouldPrePeek() { } bool TestPluginCommand::shouldPostProcess() { - return request.methodLine == "postprocesscommand" || request.methodLine == "prepeekpostprocesscommand" || - request.methodLine == "preparehandler"; + return set{ + "postprocesscommand", + "prepeekpostprocesscommand", + "testPostProcessTimeout", + "preparehandler", + }.count(request.methodLine); } bool BedrockPlugin_TestPlugin::preventAttach() { @@ -520,6 +525,12 @@ void TestPluginCommand::postProcess(SQLite& db) { SQResult result; db.read("SELECT id FROM test WHERE value = 'this is written in onPrepareHandler'", result); jsonContent["tableID"] = result[0][0]; + } else if (request.methodLine == "testPostProcessTimeout") { + // It'll timeout eventually. + while (true) { + SQResult result; + db.read("SELECT COUNT(*) FROM test WHERE id = 999999999", result); + } } else { STHROW("500 no prePeek defined, shouldPrePeek should be false"); } diff --git a/test/clustertest/tests/TimeoutTest.cpp b/test/clustertest/tests/TimeoutTest.cpp index a092d8c6a..cba1d65a9 100644 --- a/test/clustertest/tests/TimeoutTest.cpp +++ b/test/clustertest/tests/TimeoutTest.cpp @@ -10,6 +10,7 @@ struct TimeoutTest : tpunit::TestFixture { TEST(TimeoutTest::test), TEST(TimeoutTest::longerThanDefaultProcess), TEST(TimeoutTest::testprocess), + TEST(TimeoutTest::testPostProcess), TEST(TimeoutTest::totalTimeout), TEST(TimeoutTest::quorumHTTPS), TEST(TimeoutTest::futureCommitTimeout)) { } @@ -80,6 +81,13 @@ struct TimeoutTest : tpunit::TestFixture { brtester.executeWaitVerifyContent(slow, "555 Timeout processing command"); } + void testPostProcess() { + BedrockTester& brtester = tester->getTester(0); + SData slow("testPostProcessTimeout"); + slow["timeout"] = "500"; // 0.5s + brtester.executeWaitVerifyContent(slow, "555 Timeout postProcessing command"); + } + void totalTimeout() { // Test total timeout, not process timeout. BedrockTester& brtester = tester->getTester(0);