From fed18f7f4f1a75c0ff5bde1f08d9e2b4dbe29e83 Mon Sep 17 00:00:00 2001 From: Jigyasu <133757043+jigyasumxkkxr@users.noreply.github.com> Date: Wed, 18 Dec 2024 13:38:50 +0530 Subject: [PATCH 1/7] recent-N-queries-logging --- src/common.hpp | 1 + src/help.cpp | 10 +++++++ src/node.hpp | 1 + src/random_test.cpp | 30 +++++++++++++++++++++ src/random_test.hpp | 5 ++++ src/thread.cpp | 63 +++++++++++++++++++++++++++++++++++++++------ 6 files changed, 102 insertions(+), 8 deletions(-) diff --git a/src/common.hpp b/src/common.hpp index c633da0..cbc4aca 100644 --- a/src/common.hpp +++ b/src/common.hpp @@ -99,6 +99,7 @@ struct Option { DELETE_ROW_USING_PKEY, INVALID_OPTION = 63, LOG_ALL_QUERIES = 'A', + LOG_N_QUERIES, PQUERY = 'k', DATABASE = 'd', ADDRESS = 'a', diff --git a/src/help.cpp b/src/help.cpp index 71cdf37..bd6a638 100644 --- a/src/help.cpp +++ b/src/help.cpp @@ -1024,6 +1024,12 @@ void add_options() { opt->setBool(false); opt->setArgs(no_argument); + /* log recent N queries */ + opt = newOption(Option::INT, Option::LOG_N_QUERIES, "log-N-queries"); + opt->help = "Set the number of queries to log (default is 5)"; + opt->setInt(5); + opt->setArgs(required_argument); + /* execute sql sequentially */ opt = newOption(Option::BOOL, Option::NO_SHUFFLE, "no-shuffle"); opt->help = "execute SQL sequentially | randomly\n"; @@ -1259,6 +1265,8 @@ void show_cli_help(void) { "threads=1 | no\n" << "--log-all-queries | Log all queries (succeeded and " "failed) | no\n" + << "--log-N-queries | Log recent N queries (succeeded and " + "failed) | 5\n" << "--log-succeeded-queries| Log succeeded queries " " | no\n" << "--log-failed-queries | Log failed queries " @@ -1316,6 +1324,8 @@ void show_config_help(void) { "verbose = No\n" << "# Log all queries\n" << "log-all-queries = No\n" + << "# Log recent N queries\n" + << "log-N-queries = 5\n" << "# Log succeeded queries\n" << "log-succeeded-queries = No\n" << "# Log failed queries\n" diff --git a/src/node.hpp b/src/node.hpp index 03a9a23..de8f21f 100644 --- a/src/node.hpp +++ b/src/node.hpp @@ -51,6 +51,7 @@ enum LogLevel { LOG_FAILED_QUERIES = 1 << 4, LOG_SUCCEDED_QUERIES = 1 << 5, LOG_ALL_QUERIES = LOG_FAILED_QUERIES | LOG_SUCCEDED_QUERIES, + LOG_N_QUERIES = 1 << 6, // New log enum LOG_CURRENT = LOG_NOTHING }; diff --git a/src/random_test.cpp b/src/random_test.cpp index 9143219..f6db3b8 100644 --- a/src/random_test.cpp +++ b/src/random_test.cpp @@ -2679,11 +2679,17 @@ void Table::Compare_between_engine(const std::string &sql, Thd1 *thd) { set_default(); } + // Data structures for recent queries + static std::deque recent_queries; + std::mutex recent_queries_mutex; + bool execute_sql(const std::string &sql, Thd1 *thd) { auto query = sql.c_str(); static auto log_all = opt_bool(LOG_ALL_QUERIES); static auto log_failed = opt_bool(LOG_FAILED_QUERIES); static auto log_success = opt_bool(LOG_SUCCEDED_QUERIES); + static auto log_N = opt_bool(LOG_N_QUERIES); + static auto log_N_count = options->at(Option::LOG_N_QUERIES)->getInt(); static auto log_query_duration = opt_bool(LOG_QUERY_DURATION); static auto log_client_output = opt_bool(LOG_CLIENT_OUTPUT); static auto log_query_numbers = opt_bool(LOG_QUERY_NUMBERS); @@ -2716,6 +2722,16 @@ bool execute_sql(const std::string &sql, Thd1 *thd) { if (res != 0) { // query failed thd->failed_queries_total++; thd->max_con_fail_count++; + // Manage recent queries for failed queries + if (!log_N) { + std::lock_guard lock(recent_queries_mutex); + recent_queries.push_back("FAILED: " + sql); + + // Maintain the recent queries count + if (recent_queries.size() > static_cast::size_type>(log_N_count)) { + recent_queries.pop_front(); + } + } if (log_all || log_failed) { thd->thread_log << " F " << sql << std::endl; thd->thread_log << "Error " << mysql_error(thd->conn) << std::endl; @@ -2749,6 +2765,15 @@ bool execute_sql(const std::string &sql, Thd1 *thd) { if (r) mysql_free_result(r); }); + // Manage recent queries for successful queries + if (!log_N) { + std::lock_guard lock(recent_queries_mutex); + recent_queries.push_back("SUCCESS: " + sql); + // Maintain the recent queries count + if (recent_queries.size() > static_cast::size_type>(log_N_count)) { + recent_queries.pop_front(); + } + } if (log_client_output) { if (thd->result != nullptr) { @@ -2797,6 +2822,11 @@ bool execute_sql(const std::string &sql, Thd1 *thd) { return (res == 0 ? 1 : 0); } +// Retrive the formed deque +std::deque get_recent_queries() { + std::lock_guard lock(recent_queries_mutex); + return recent_queries; +} const std::vector row_group_sizes = {2,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31}; diff --git a/src/random_test.hpp b/src/random_test.hpp index 0f30bd7..8e6c95c 100644 --- a/src/random_test.hpp +++ b/src/random_test.hpp @@ -11,6 +11,7 @@ #include #include //shared_ptr #include +#include #include #include #include @@ -42,6 +43,10 @@ std::string rand_float(float upper, float lower = 0); std::string rand_double(double upper, double lower = 0); std::string rand_string(int upper, int lower = 2); +// Introduction of new function to retrive deque of recent queries +std::deque get_recent_queries(); +extern std::mutex recent_queries_mutex; + struct Table; class Column { public: diff --git a/src/thread.cpp b/src/thread.cpp index fee031e..13dcb3e 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -9,6 +9,10 @@ std::atomic_flag lock_metadata = ATOMIC_FLAG_INIT; std::atomic metadata_loaded(false); +// Mutex for thread-safe logging +std::mutex general_log_mutex; + +// Get the number of affected rows safely inline unsigned long long Node::getAffectedRows(MYSQL *connection) { if (mysql_affected_rows(connection) == ~(unsigned long long)0) { return 0LL; @@ -32,16 +36,32 @@ void Node::workerThread(int number) { return; } } + bool log_all_queries = options->at(Option::LOG_ALL_QUERIES)->getBool(); + size_t n_queries = 5; + + if(options->at(Option::LOG_N_QUERIES) && options->at(Option::LOG_N_QUERIES)->getInt() >= 0) { + n_queries = options->at(Option::LOG_N_QUERIES)->getInt(); + } - std::ostringstream os; - os << myParams.logdir << "/" << myParams.myName << "_step_" - << std::to_string(options->at(Option::STEP)->getInt()) << "_thread-" - << number << ".sql"; - thread_log.open(os.str(), std::ios::out | std::ios::trunc); + // Prepare log filename based on logging mode + std::ostringstream log_filename; + log_filename<< myParams.logdir << "/" << myParams.myName << "_step_" + << std::to_string(options->at(Option::STEP)->getInt()) << "_thread-" + << number; + + // Construct full log filename + std::string full_log_filename = log_all_queries + ? (log_filename.str() + ".sql") // All queries log + : (log_filename.str() + "_recent" + + "-" + std::to_string(n_queries) + ".sql"); + + + // Thread log file setup + thread_log.open(full_log_filename, std::ios::out | std::ios::trunc); if (!thread_log.is_open()) { - general_log << "Unable to open thread logfile " << os.str() << ": " - << std::strerror(errno) << std::endl; - return; + general_log << "Unable to open thread logfile " << full_log_filename + << ": " << std::strerror(errno) << std::endl; + return; } if (options->at(Option::LOG_QUERY_DURATION)->getBool()) { @@ -160,6 +180,33 @@ void Node::workerThread(int number) { conn = thd->conn; delete thd; + if (!log_all_queries) { + // Retrieve recent queries from execute_sql + std::deque logDeque = get_recent_queries(); + + // Trim to N queries if necessary + if (options->at(Option::LOG_N_QUERIES) && + options->at(Option::LOG_N_QUERIES)->getInt() > 0) { + size_t max_queries = options->at(Option::LOG_N_QUERIES)->getInt(); + while (logDeque.size() > max_queries) { + logDeque.pop_front(); + } + } + + // Write logs to file + std::ofstream log_file_write(full_log_filename, std::ios::out | std::ios::trunc); + if (!log_file_write.is_open()) { + general_log << "Unable to open log file: " << full_log_filename << std::endl; + return; + } + + for (const auto &log : logDeque) { + log_file_write << log << std::endl; + } + log_file_write.close(); + } + + if (thread_log.is_open()) thread_log.close(); From bc2ecdc69f6bb0b8e848cb38bde9a1297231bd00 Mon Sep 17 00:00:00 2001 From: Jigyasu <133757043+jigyasumxkkxr@users.noreply.github.com> Date: Thu, 19 Dec 2024 19:22:48 +0530 Subject: [PATCH 2/7] thd-specific --- .vscode/settings.json | 80 +++++++++++++++++++++++++++++++++++++++++++ src/random_test.cpp | 17 ++------- src/random_test.hpp | 21 ++++++++++-- src/thread.cpp | 18 ++++------ 4 files changed, 108 insertions(+), 28 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..781d680 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,80 @@ +{ + "files.associations": { + "*.tsx": "typescriptreact", + "algorithm": "cpp", + "array": "cpp", + "atomic": "cpp", + "bit": "cpp", + "cctype": "cpp", + "charconv": "cpp", + "chrono": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "compare": "cpp", + "concepts": "cpp", + "condition_variable": "cpp", + "cstddef": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "deque": "cpp", + "exception": "cpp", + "format": "cpp", + "forward_list": "cpp", + "fstream": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "ios": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "iterator": "cpp", + "limits": "cpp", + "list": "cpp", + "locale": "cpp", + "map": "cpp", + "memory": "cpp", + "mutex": "cpp", + "new": "cpp", + "optional": "cpp", + "ostream": "cpp", + "random": "cpp", + "ratio": "cpp", + "regex": "cpp", + "set": "cpp", + "shared_mutex": "cpp", + "sstream": "cpp", + "stdexcept": "cpp", + "stop_token": "cpp", + "streambuf": "cpp", + "string": "cpp", + "system_error": "cpp", + "thread": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "typeinfo": "cpp", + "unordered_map": "cpp", + "unordered_set": "cpp", + "utility": "cpp", + "vector": "cpp", + "xfacet": "cpp", + "xhash": "cpp", + "xiosbase": "cpp", + "xlocale": "cpp", + "xlocbuf": "cpp", + "xlocinfo": "cpp", + "xlocmes": "cpp", + "xlocmon": "cpp", + "xlocnum": "cpp", + "xloctime": "cpp", + "xmemory": "cpp", + "xstddef": "cpp", + "xstring": "cpp", + "xtr1common": "cpp", + "xtree": "cpp", + "xutility": "cpp" + } +} diff --git a/src/random_test.cpp b/src/random_test.cpp index f6db3b8..a0531ef 100644 --- a/src/random_test.cpp +++ b/src/random_test.cpp @@ -2689,7 +2689,7 @@ bool execute_sql(const std::string &sql, Thd1 *thd) { static auto log_failed = opt_bool(LOG_FAILED_QUERIES); static auto log_success = opt_bool(LOG_SUCCEDED_QUERIES); static auto log_N = opt_bool(LOG_N_QUERIES); - static auto log_N_count = options->at(Option::LOG_N_QUERIES)->getInt(); + thd->max_recent_queries = options->at(Option::LOG_N_QUERIES)->getInt(); static auto log_query_duration = opt_bool(LOG_QUERY_DURATION); static auto log_client_output = opt_bool(LOG_CLIENT_OUTPUT); static auto log_query_numbers = opt_bool(LOG_QUERY_NUMBERS); @@ -2724,13 +2724,7 @@ bool execute_sql(const std::string &sql, Thd1 *thd) { thd->max_con_fail_count++; // Manage recent queries for failed queries if (!log_N) { - std::lock_guard lock(recent_queries_mutex); - recent_queries.push_back("FAILED: " + sql); - - // Maintain the recent queries count - if (recent_queries.size() > static_cast::size_type>(log_N_count)) { - recent_queries.pop_front(); - } + thd->add_query("FAILED: " + sql); } if (log_all || log_failed) { thd->thread_log << " F " << sql << std::endl; @@ -2767,12 +2761,7 @@ bool execute_sql(const std::string &sql, Thd1 *thd) { }); // Manage recent queries for successful queries if (!log_N) { - std::lock_guard lock(recent_queries_mutex); - recent_queries.push_back("SUCCESS: " + sql); - // Maintain the recent queries count - if (recent_queries.size() > static_cast::size_type>(log_N_count)) { - recent_queries.pop_front(); - } + thd->add_query("SUCCESS: " + sql); } if (log_client_output) { diff --git a/src/random_test.hpp b/src/random_test.hpp index 8e6c95c..31d8d49 100644 --- a/src/random_test.hpp +++ b/src/random_test.hpp @@ -192,9 +192,9 @@ struct Index { struct Thd1 { Thd1(int id, std::ofstream &tl, std::ofstream &ddl_l, std::ofstream &client_l, MYSQL *c, std::atomic &p, - std::atomic &f) + std::atomic &f,int log_N_count) : thread_id(id), thread_log(tl), ddl_logs(ddl_l), client_log(client_l), - conn(c), performed_queries_total(p), failed_queries_total(f){}; + conn(c), performed_queries_total(p), failed_queries_total(f), max_recent_queries(log_N_count){}; bool run_some_query(); // create default tables and run random queries bool load_metadata(); // load metada of tool in memory @@ -213,6 +213,10 @@ struct Thd1 { bool success = false; // if the sql is successfully executed int max_con_fail_count = 0; // consecutive failed queries + // For storing recent queries + std::deque recent_queries; + size_t max_recent_queries; + /* for loading Bulkdata, Primary key of current table is stored in this vector * which is used for the FK tables */ std::vector unique_keys; @@ -223,6 +227,19 @@ struct Thd1 { } struct workerParams *myParam; bool tryreconnet(); + + // Add a query to the deque + void add_query(const std::string &query) { + recent_queries.push_back(query); + if (recent_queries.size() > max_recent_queries) { + recent_queries.pop_front(); // Keep only the last N queries + } + } + + // Retrieve recent queries (for logging or debugging) + const std::deque& get_recent_queries() const { + return recent_queries; + } }; /* Table basic properties */ diff --git a/src/thread.cpp b/src/thread.cpp index 13dcb3e..3d67025 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -37,11 +37,6 @@ void Node::workerThread(int number) { } } bool log_all_queries = options->at(Option::LOG_ALL_QUERIES)->getBool(); - size_t n_queries = 5; - - if(options->at(Option::LOG_N_QUERIES) && options->at(Option::LOG_N_QUERIES)->getInt() >= 0) { - n_queries = options->at(Option::LOG_N_QUERIES)->getInt(); - } // Prepare log filename based on logging mode std::ostringstream log_filename; @@ -50,10 +45,7 @@ void Node::workerThread(int number) { << number; // Construct full log filename - std::string full_log_filename = log_all_queries - ? (log_filename.str() + ".sql") // All queries log - : (log_filename.str() + "_recent" - + "-" + std::to_string(n_queries) + ".sql"); + std::string full_log_filename = log_filename.str() + ".sql" // Thread log file setup @@ -109,11 +101,13 @@ void Node::workerThread(int number) { return; } + static auto log_N_count = options->at(Option::LOG_N_QUERIES)->getInt(); Thd1 *thd = new Thd1(number, thread_log, general_log, client_log, conn, - performed_queries_total, failed_queries_total); + performed_queries_total, failed_queries_total,log_N_count); thd->myParam = &myParams; + std::deque logDeque; //Initialise a Deque /* run pstress in with dynamic generator or infile */ if (options->at(Option::PQUERY)->getBool() == false) { static bool success = false; @@ -146,6 +140,7 @@ void Node::workerThread(int number) { } } } + logDeque = thd->get_recent_queries(); } else { std::random_device rd; @@ -175,14 +170,13 @@ void Node::workerThread(int number) { break; } } + logDeque = thd->get_recent_queries(); } /* connection can be changed if we thd->tryreconnect is called */ conn = thd->conn; delete thd; if (!log_all_queries) { - // Retrieve recent queries from execute_sql - std::deque logDeque = get_recent_queries(); // Trim to N queries if necessary if (options->at(Option::LOG_N_QUERIES) && From 4ac23b52be633d9cc4994518c3d73db39a1fc3c1 Mon Sep 17 00:00:00 2001 From: Jigyasu <133757043+jigyasumxkkxr@users.noreply.github.com> Date: Fri, 20 Dec 2024 12:12:19 +0530 Subject: [PATCH 3/7] log-updated --- .gitignore | 1 + .vscode/settings.json | 158 +++++++++++++++++++++--------------------- src/random_test.cpp | 2 - src/thread.cpp | 30 ++------ 4 files changed, 87 insertions(+), 104 deletions(-) diff --git a/.gitignore b/.gitignore index d0d0c33..4b89727 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ CMakeFiles Makefile *.a src/pstress-* +.vscode/* \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index 781d680..371b6d4 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,80 +1,80 @@ { - "files.associations": { - "*.tsx": "typescriptreact", - "algorithm": "cpp", - "array": "cpp", - "atomic": "cpp", - "bit": "cpp", - "cctype": "cpp", - "charconv": "cpp", - "chrono": "cpp", - "clocale": "cpp", - "cmath": "cpp", - "compare": "cpp", - "concepts": "cpp", - "condition_variable": "cpp", - "cstddef": "cpp", - "cstdint": "cpp", - "cstdio": "cpp", - "cstdlib": "cpp", - "cstring": "cpp", - "ctime": "cpp", - "cwchar": "cpp", - "deque": "cpp", - "exception": "cpp", - "format": "cpp", - "forward_list": "cpp", - "fstream": "cpp", - "initializer_list": "cpp", - "iomanip": "cpp", - "ios": "cpp", - "iosfwd": "cpp", - "iostream": "cpp", - "istream": "cpp", - "iterator": "cpp", - "limits": "cpp", - "list": "cpp", - "locale": "cpp", - "map": "cpp", - "memory": "cpp", - "mutex": "cpp", - "new": "cpp", - "optional": "cpp", - "ostream": "cpp", - "random": "cpp", - "ratio": "cpp", - "regex": "cpp", - "set": "cpp", - "shared_mutex": "cpp", - "sstream": "cpp", - "stdexcept": "cpp", - "stop_token": "cpp", - "streambuf": "cpp", - "string": "cpp", - "system_error": "cpp", - "thread": "cpp", - "tuple": "cpp", - "type_traits": "cpp", - "typeinfo": "cpp", - "unordered_map": "cpp", - "unordered_set": "cpp", - "utility": "cpp", - "vector": "cpp", - "xfacet": "cpp", - "xhash": "cpp", - "xiosbase": "cpp", - "xlocale": "cpp", - "xlocbuf": "cpp", - "xlocinfo": "cpp", - "xlocmes": "cpp", - "xlocmon": "cpp", - "xlocnum": "cpp", - "xloctime": "cpp", - "xmemory": "cpp", - "xstddef": "cpp", - "xstring": "cpp", - "xtr1common": "cpp", - "xtree": "cpp", - "xutility": "cpp" - } -} + "files.associations": { + "*.tsx": "typescriptreact", + "algorithm": "cpp", + "array": "cpp", + "atomic": "cpp", + "bit": "cpp", + "cctype": "cpp", + "charconv": "cpp", + "chrono": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "compare": "cpp", + "concepts": "cpp", + "condition_variable": "cpp", + "cstddef": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "deque": "cpp", + "exception": "cpp", + "format": "cpp", + "forward_list": "cpp", + "fstream": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "ios": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "iterator": "cpp", + "limits": "cpp", + "list": "cpp", + "locale": "cpp", + "map": "cpp", + "memory": "cpp", + "mutex": "cpp", + "new": "cpp", + "optional": "cpp", + "ostream": "cpp", + "random": "cpp", + "ratio": "cpp", + "regex": "cpp", + "set": "cpp", + "shared_mutex": "cpp", + "sstream": "cpp", + "stdexcept": "cpp", + "stop_token": "cpp", + "streambuf": "cpp", + "string": "cpp", + "system_error": "cpp", + "thread": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "typeinfo": "cpp", + "unordered_map": "cpp", + "unordered_set": "cpp", + "utility": "cpp", + "vector": "cpp", + "xfacet": "cpp", + "xhash": "cpp", + "xiosbase": "cpp", + "xlocale": "cpp", + "xlocbuf": "cpp", + "xlocinfo": "cpp", + "xlocmes": "cpp", + "xlocmon": "cpp", + "xlocnum": "cpp", + "xloctime": "cpp", + "xmemory": "cpp", + "xstddef": "cpp", + "xstring": "cpp", + "xtr1common": "cpp", + "xtree": "cpp", + "xutility": "cpp" + } +} \ No newline at end of file diff --git a/src/random_test.cpp b/src/random_test.cpp index a0531ef..80c9ae7 100644 --- a/src/random_test.cpp +++ b/src/random_test.cpp @@ -2681,7 +2681,6 @@ void Table::Compare_between_engine(const std::string &sql, Thd1 *thd) { // Data structures for recent queries static std::deque recent_queries; - std::mutex recent_queries_mutex; bool execute_sql(const std::string &sql, Thd1 *thd) { auto query = sql.c_str(); @@ -2813,7 +2812,6 @@ bool execute_sql(const std::string &sql, Thd1 *thd) { } // Retrive the formed deque std::deque get_recent_queries() { - std::lock_guard lock(recent_queries_mutex); return recent_queries; } diff --git a/src/thread.cpp b/src/thread.cpp index 3d67025..843ae4f 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -45,7 +45,7 @@ void Node::workerThread(int number) { << number; // Construct full log filename - std::string full_log_filename = log_filename.str() + ".sql" + std::string full_log_filename = log_filename.str() + ".sql"; // Thread log file setup @@ -172,34 +172,18 @@ void Node::workerThread(int number) { } logDeque = thd->get_recent_queries(); } - /* connection can be changed if we thd->tryreconnect is called */ - conn = thd->conn; - delete thd; if (!log_all_queries) { - - // Trim to N queries if necessary - if (options->at(Option::LOG_N_QUERIES) && - options->at(Option::LOG_N_QUERIES)->getInt() > 0) { - size_t max_queries = options->at(Option::LOG_N_QUERIES)->getInt(); - while (logDeque.size() > max_queries) { - logDeque.pop_front(); - } - } - - // Write logs to file - std::ofstream log_file_write(full_log_filename, std::ios::out | std::ios::trunc); - if (!log_file_write.is_open()) { - general_log << "Unable to open log file: " << full_log_filename << std::endl; - return; - } - for (const auto &log : logDeque) { - log_file_write << log << std::endl; + thread_log << log << std::endl; } - log_file_write.close(); } + /* connection can be changed if we thd->tryreconnect is called */ + conn = thd->conn; + delete thd; + + if (thread_log.is_open()) thread_log.close(); From db09c6a25be48ec60ee4d897ae12a2ac7d55fb55 Mon Sep 17 00:00:00 2001 From: Jigyasu <133757043+jigyasumxkkxr@users.noreply.github.com> Date: Fri, 20 Dec 2024 12:16:05 +0530 Subject: [PATCH 4/7] .vscode-removed --- .vscode/settings.json | 80 ------------------------------------------- 1 file changed, 80 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 371b6d4..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,80 +0,0 @@ -{ - "files.associations": { - "*.tsx": "typescriptreact", - "algorithm": "cpp", - "array": "cpp", - "atomic": "cpp", - "bit": "cpp", - "cctype": "cpp", - "charconv": "cpp", - "chrono": "cpp", - "clocale": "cpp", - "cmath": "cpp", - "compare": "cpp", - "concepts": "cpp", - "condition_variable": "cpp", - "cstddef": "cpp", - "cstdint": "cpp", - "cstdio": "cpp", - "cstdlib": "cpp", - "cstring": "cpp", - "ctime": "cpp", - "cwchar": "cpp", - "deque": "cpp", - "exception": "cpp", - "format": "cpp", - "forward_list": "cpp", - "fstream": "cpp", - "initializer_list": "cpp", - "iomanip": "cpp", - "ios": "cpp", - "iosfwd": "cpp", - "iostream": "cpp", - "istream": "cpp", - "iterator": "cpp", - "limits": "cpp", - "list": "cpp", - "locale": "cpp", - "map": "cpp", - "memory": "cpp", - "mutex": "cpp", - "new": "cpp", - "optional": "cpp", - "ostream": "cpp", - "random": "cpp", - "ratio": "cpp", - "regex": "cpp", - "set": "cpp", - "shared_mutex": "cpp", - "sstream": "cpp", - "stdexcept": "cpp", - "stop_token": "cpp", - "streambuf": "cpp", - "string": "cpp", - "system_error": "cpp", - "thread": "cpp", - "tuple": "cpp", - "type_traits": "cpp", - "typeinfo": "cpp", - "unordered_map": "cpp", - "unordered_set": "cpp", - "utility": "cpp", - "vector": "cpp", - "xfacet": "cpp", - "xhash": "cpp", - "xiosbase": "cpp", - "xlocale": "cpp", - "xlocbuf": "cpp", - "xlocinfo": "cpp", - "xlocmes": "cpp", - "xlocmon": "cpp", - "xlocnum": "cpp", - "xloctime": "cpp", - "xmemory": "cpp", - "xstddef": "cpp", - "xstring": "cpp", - "xtr1common": "cpp", - "xtree": "cpp", - "xutility": "cpp" - } -} \ No newline at end of file From dddc2aefa87e85f7be598609754899467b982fad Mon Sep 17 00:00:00 2001 From: Jigyasu <133757043+jigyasumxkkxr@users.noreply.github.com> Date: Sat, 21 Dec 2024 10:55:02 +0530 Subject: [PATCH 5/7] final-resolved --- src/random_test.cpp | 12 ++---------- src/random_test.hpp | 3 --- src/thread.cpp | 5 ----- 3 files changed, 2 insertions(+), 18 deletions(-) diff --git a/src/random_test.cpp b/src/random_test.cpp index 80c9ae7..ca0e8eb 100644 --- a/src/random_test.cpp +++ b/src/random_test.cpp @@ -2679,16 +2679,12 @@ void Table::Compare_between_engine(const std::string &sql, Thd1 *thd) { set_default(); } - // Data structures for recent queries - static std::deque recent_queries; bool execute_sql(const std::string &sql, Thd1 *thd) { auto query = sql.c_str(); static auto log_all = opt_bool(LOG_ALL_QUERIES); static auto log_failed = opt_bool(LOG_FAILED_QUERIES); static auto log_success = opt_bool(LOG_SUCCEDED_QUERIES); - static auto log_N = opt_bool(LOG_N_QUERIES); - thd->max_recent_queries = options->at(Option::LOG_N_QUERIES)->getInt(); static auto log_query_duration = opt_bool(LOG_QUERY_DURATION); static auto log_client_output = opt_bool(LOG_CLIENT_OUTPUT); static auto log_query_numbers = opt_bool(LOG_QUERY_NUMBERS); @@ -2722,7 +2718,7 @@ bool execute_sql(const std::string &sql, Thd1 *thd) { thd->failed_queries_total++; thd->max_con_fail_count++; // Manage recent queries for failed queries - if (!log_N) { + if (options->at(Option::LOG_N_QUERIES)->getInt()>0) { thd->add_query("FAILED: " + sql); } if (log_all || log_failed) { @@ -2759,7 +2755,7 @@ bool execute_sql(const std::string &sql, Thd1 *thd) { mysql_free_result(r); }); // Manage recent queries for successful queries - if (!log_N) { + if (options->at(Option::LOG_N_QUERIES)->getInt()>0) { thd->add_query("SUCCESS: " + sql); } @@ -2810,10 +2806,6 @@ bool execute_sql(const std::string &sql, Thd1 *thd) { return (res == 0 ? 1 : 0); } -// Retrive the formed deque -std::deque get_recent_queries() { - return recent_queries; -} const std::vector row_group_sizes = {2,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31}; diff --git a/src/random_test.hpp b/src/random_test.hpp index 31d8d49..6354c87 100644 --- a/src/random_test.hpp +++ b/src/random_test.hpp @@ -43,9 +43,6 @@ std::string rand_float(float upper, float lower = 0); std::string rand_double(double upper, double lower = 0); std::string rand_string(int upper, int lower = 2); -// Introduction of new function to retrive deque of recent queries -std::deque get_recent_queries(); -extern std::mutex recent_queries_mutex; struct Table; class Column { diff --git a/src/thread.cpp b/src/thread.cpp index 843ae4f..f66a2e8 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -9,8 +9,6 @@ std::atomic_flag lock_metadata = ATOMIC_FLAG_INIT; std::atomic metadata_loaded(false); -// Mutex for thread-safe logging -std::mutex general_log_mutex; // Get the number of affected rows safely inline unsigned long long Node::getAffectedRows(MYSQL *connection) { @@ -170,7 +168,6 @@ void Node::workerThread(int number) { break; } } - logDeque = thd->get_recent_queries(); } if (!log_all_queries) { @@ -183,8 +180,6 @@ void Node::workerThread(int number) { conn = thd->conn; delete thd; - - if (thread_log.is_open()) thread_log.close(); From 75ddee65e9a51e1a6f5b112f94c1e4b81fe737ef Mon Sep 17 00:00:00 2001 From: Jigyasu <133757043+jigyasumxkkxr@users.noreply.github.com> Date: Fri, 10 Jan 2025 12:19:05 +0530 Subject: [PATCH 6/7] DuckDB --- src/CMakeLists.txt | 38 ++++-- src/common.hpp | 1 + src/help.cpp | 6 + src/node.cpp | 136 +++++++++++++-------- src/node.hpp | 16 ++- src/pstress.cpp | 10 +- src/random_test.cpp | 291 ++++++++++++++++++++++++++++---------------- src/random_test.hpp | 35 +++++- src/thread.cpp | 24 +++- 9 files changed, 380 insertions(+), 177 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e7d3d3d..ffa18cd 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,7 +1,11 @@ -IF (MYSQL_FOUND) - SET (BINARY_NAME "pstress") - ADD_SUBDIRECTORY(third_party) - INCLUDE_DIRECTORIES(third_party/inih++/include third_party/rapidjson/include) +option(USE_DUCKDB "Enable support for DuckDB" OFF) +option(USE_MYSQL "Enable support for MySQL" ON) +SET(BINARY_NAME "pstress") +ADD_SUBDIRECTORY(third_party) +INCLUDE_DIRECTORIES(third_party/inih++/include third_party/rapidjson/include) + +IF (USE_MYSQL AND MYSQL_FOUND) + MESSAGE(STATUS "Configuring for MySQL...") IF (MARIADB) INCLUDE_DIRECTORIES( ${MYSQL_INCLUDE_DIR} ${MYSQL_INCLUDE_DIR}/.. ) ELSE() @@ -9,10 +13,30 @@ IF (MYSQL_FOUND) ENDIF(MARIADB) ADD_EXECUTABLE(${BINARY_NAME}-${PSTRESS_EXT} pstress.cpp help.cpp node.cpp thread.cpp random_test.cpp) TARGET_LINK_LIBRARIES( ${BINARY_NAME}-${PSTRESS_EXT} ${MYSQL_LIBRARY} ${OTHER_LIBS} inih++) - FILE(COPY + INSTALL(TARGETS ${BINARY_NAME}-${PSTRESS_EXT} DESTINATION bin) +ENDIF(USE_MYSQL AND MYSQL_FOUND) + +IF (USE_DUCKDB) + MESSAGE(STATUS "Configuring for DuckDB...") + + # Manually set the paths for DuckDB if not found + set(DUCKDB_INCLUDE_DIR "/usr/local/lib") + set(DUCKDB_LIBRARY "/usr/local/lib/libduckdb.so") + + # Check if DuckDB includes and libraries are provided + if (DUCKDB_INCLUDE_DIR AND DUCKDB_LIBRARY) + INCLUDE_DIRECTORIES(${DUCKDB_INCLUDE_DIR}) + ADD_EXECUTABLE(${BINARY_NAME}-duckdb pstress.cpp help.cpp node.cpp thread.cpp random_test.cpp) + TARGET_LINK_LIBRARIES(${BINARY_NAME}-duckdb ${DUCKDB_LIBRARY} ${OTHER_LIBS} inih++) + INSTALL(TARGETS ${BINARY_NAME}-duckdb DESTINATION bin) + else() + message(FATAL_ERROR "DuckDB include or library not found") + endif() + +ENDIF(USE_DUCKDB) + +FILE(COPY grammar.sql DESTINATION .) INSTALL(FILES grammar.sql DESTINATION bin) - INSTALL(TARGETS ${BINARY_NAME}-${PSTRESS_EXT} DESTINATION bin) -ENDIF(MYSQL_FOUND) SET( CMAKE_EXPORT_COMPILE_COMMANDS ON ) diff --git a/src/common.hpp b/src/common.hpp index cbc4aca..ef13448 100644 --- a/src/common.hpp +++ b/src/common.hpp @@ -98,6 +98,7 @@ struct Option { DELETE_ALL_ROW, DELETE_ROW_USING_PKEY, INVALID_OPTION = 63, + DUCKDB, LOG_ALL_QUERIES = 'A', LOG_N_QUERIES, PQUERY = 'k', diff --git a/src/help.cpp b/src/help.cpp index bd6a638..948af9a 100644 --- a/src/help.cpp +++ b/src/help.cpp @@ -1030,6 +1030,12 @@ void add_options() { opt->setInt(5); opt->setArgs(required_argument); + /* Use DuckDb */ + opt = newOption(Option::BOOL, Option::DUCKDB, "duckdb"); + opt->help = "Use DuckDB as database backend"; + opt->setBool(false); + opt->setArgs(no_argument); + /* execute sql sequentially */ opt = newOption(Option::BOOL, Option::NO_SHUFFLE, "no-shuffle"); opt->help = "execute SQL sequentially | randomly\n"; diff --git a/src/node.cpp b/src/node.cpp index ae9eded..ca4d25f 100644 --- a/src/node.cpp +++ b/src/node.cpp @@ -44,8 +44,16 @@ bool Node::createGeneralLog() { } general_log.open(logName, std::ios::out | std::ios::trunc); general_log << "- PStress v" << PQVERSION << "-" << PQREVISION - << " compiled with " << FORK << "-" << mysql_get_client_info() - << std::endl; + << " compiled with " << FORK; + #ifdef USE_MYSQL + general_log << "-" << mysql_get_client_info(); + #endif + + #ifdef USE_DUCKDB + general_log << "-" << duckdb::LibraryVersion(); + #endif + + general_log << std::endl; if (!general_log.is_open()) { std::cout << "Unable to open log file " << logName << ": " @@ -128,62 +136,82 @@ int Node::startWork() { void Node::tryConnect() { node_mutex.lock(); - MYSQL *conn; - conn = mysql_init(NULL); - if (conn == NULL) { - std::cerr << "Error " << mysql_errno(conn) << ": " << mysql_error(conn) - << std::endl; - std::cerr << "* PSTRESS: Unable to continue [1], exiting" << std::endl; - general_log << "Error " << mysql_errno(conn) << ": " << mysql_error(conn) - << std::endl; - general_log << "* PSTRESS: Unable to continue [1], exiting" << std::endl; - mysql_close(conn); - mysql_library_end(); - exit(EXIT_FAILURE); - } - if (mysql_real_connect(conn, myParams.address.c_str(), - myParams.username.c_str(), myParams.password.c_str(), - options->at(Option::DATABASE)->getString().c_str(), - myParams.port, myParams.socket.c_str(), 0) == NULL) { - std::cerr << "Error " << mysql_errno(conn) << ": " << mysql_error(conn) - << std::endl; - std::cerr << "* PSTRESS: Unable to continue [2], exiting" << std::endl; - general_log << "Error " << mysql_errno(conn) << ": " << mysql_error(conn) - << std::endl; - general_log << "* PSTRESS: Unable to continue [2], exiting" << std::endl; - mysql_close(conn); - mysql_library_end(); - exit(EXIT_FAILURE); - } - general_log << "- Connected to " << mysql_get_host_info(conn) << "..." - << std::endl; - // getting the real server version - MYSQL_RES *result = NULL; - std::string server_version; - - if (!mysql_query(conn, "select @@version_comment limit 1") && - (result = mysql_use_result(conn))) { - MYSQL_ROW row = mysql_fetch_row(result); - if (row && row[0]) { + #ifdef MYSQL + MYSQL *conn = mysql_init(NULL); + if (conn == NULL) { + std::cerr << "Error: Unable to initialize MySQL connection." << std::endl; + general_log << "Error: Unable to initialize MySQL connection." << std::endl; + exit(EXIT_FAILURE); + } + + if (mysql_real_connect(conn, myParams.address.c_str(), + myParams.username.c_str(), myParams.password.c_str(), + options->at(Option::DATABASE)->getString().c_str(), + myParams.port, myParams.socket.c_str(), 0) == NULL) { + std::cerr << "MySQL Error " << mysql_errno(conn) << ": " << mysql_error(conn) << std::endl; + general_log << "MySQL Error " << mysql_errno(conn) << ": " << mysql_error(conn) << std::endl; + mysql_close(conn); + mysql_library_end(); + exit(EXIT_FAILURE); + } + + general_log << "- Connected to MySQL: " << mysql_get_host_info(conn) << std::endl; + + // Get MySQL server version + MYSQL_RES *result = NULL; + std::string server_version; + if (!mysql_query(conn, "SELECT @@version_comment LIMIT 1") && + (result = mysql_use_result(conn))) { + MYSQL_ROW row = mysql_fetch_row(result); + if (row && row[0]) { + server_version = mysql_get_server_info(conn); + server_version.append(" "); + server_version.append(row[0]); + } + } else { server_version = mysql_get_server_info(conn); - server_version.append(" "); - server_version.append(row[0]); } - } else { - server_version = mysql_get_server_info(conn); - } - general_log << "- Connected server version: " << server_version << std::endl; - if (strcmp(PLATFORM_ID, "Darwin") == 0) - general_log << "- Table compression is disabled as hole punching is not " - "supported on OSX" - << std::endl; - if (result != NULL) { - mysql_free_result(result); - } - mysql_close(conn); - mysql_thread_end(); + general_log << "- MySQL server version: " << server_version << std::endl; + + if (result != NULL) { + mysql_free_result(result); + } + mysql_close(conn); + mysql_thread_end(); + + #elif defined(DUCKDB) + duckdb_database db; + duckdb_connection conn; + if (duckdb_open(myParams.database_path.c_str(), &db) != DuckDBSuccess) { + std::cerr << "Error: Unable to open DuckDB database." << std::endl; + general_log << "Error: Unable to open DuckDB database." << std::endl; + exit(EXIT_FAILURE); + } + + if (duckdb_connect(db, &conn) != DuckDBSuccess) { + std::cerr << "Error: Unable to connect to DuckDB." << std::endl; + general_log << "Error: Unable to connect to DuckDB." << std::endl; + duckdb_close(&db); + exit(EXIT_FAILURE); + } + + general_log << "- Connected to DuckDB." << std::endl; + + // Get DuckDB version + general_log << "- DuckDB version: " << duckdb_library_version() << std::endl; + + duckdb_disconnect(&conn); + duckdb_close(&db); + + #else + std::cerr << "Error: No database backend defined." << std::endl; + general_log << "Error: No database backend defined." << std::endl; + exit(EXIT_FAILURE); + #endif + if (options->at(Option::TEST_CONNECTION)->getBool()) { exit(EXIT_SUCCESS); } + node_mutex.unlock(); } diff --git a/src/node.hpp b/src/node.hpp index de8f21f..93bb016 100644 --- a/src/node.hpp +++ b/src/node.hpp @@ -6,7 +6,13 @@ #include #include #include +#ifdef USE_MYSQL #include +#endif + +#ifdef USE_DUCKDB +#include "duckdb.hpp" +#endif #include #include #include @@ -69,8 +75,16 @@ class Node { private: // declaration for worker thread function void workerThread(int); - inline unsigned long long getAffectedRows(MYSQL *); void tryConnect(); + #ifdef USE_MYSQL + inline unsigned long long getAffectedRows(MYSQL *); + void tryConnect(); // MySQL-specific connection logic + #endif + + #ifdef USE_DUCKDB + inline unsigned long long getAffectedRows(duckdb::Connection *); + void tryConnect(); // DuckDB-specific connection logic + #endif bool createGeneralLog(); void readSettings(std::string); void writeFinalReport(); diff --git a/src/pstress.cpp b/src/pstress.cpp index ef05879..9d86d32 100644 --- a/src/pstress.cpp +++ b/src/pstress.cpp @@ -18,7 +18,13 @@ #include "random_test.hpp" #include #include //dirname() uses this +#ifdef USE_MYSQL #include +#endif + +#ifdef USE_DUCKDB +#include "duckdb.hpp" +#endif #include #include @@ -197,7 +203,9 @@ int main(int argc, char *argv[]) { save_metadata_to_file(); clean_up_at_end(); - mysql_library_end(); + #ifdef USE_MYSQL + mysql_library_end(); + #endif delete_options(); std::cout << "COMPLETED" << std::endl; if (run_query_failed) diff --git a/src/random_test.cpp b/src/random_test.cpp index ca0e8eb..44b1907 100644 --- a/src/random_test.cpp +++ b/src/random_test.cpp @@ -2681,130 +2681,211 @@ void Table::Compare_between_engine(const std::string &sql, Thd1 *thd) { bool execute_sql(const std::string &sql, Thd1 *thd) { - auto query = sql.c_str(); - static auto log_all = opt_bool(LOG_ALL_QUERIES); - static auto log_failed = opt_bool(LOG_FAILED_QUERIES); - static auto log_success = opt_bool(LOG_SUCCEDED_QUERIES); - static auto log_query_duration = opt_bool(LOG_QUERY_DURATION); - static auto log_client_output = opt_bool(LOG_CLIENT_OUTPUT); - static auto log_query_numbers = opt_bool(LOG_QUERY_NUMBERS); - std::chrono::system_clock::time_point begin, end; + auto query = sql.c_str(); + static auto log_all = opt_bool(LOG_ALL_QUERIES); + static auto log_failed = opt_bool(LOG_FAILED_QUERIES); + static auto log_success = opt_bool(LOG_SUCCEDED_QUERIES); + static auto log_query_duration = opt_bool(LOG_QUERY_DURATION); + static auto log_client_output = opt_bool(LOG_CLIENT_OUTPUT); + static auto log_query_numbers = opt_bool(LOG_QUERY_NUMBERS); + std::chrono::system_clock::time_point begin, end; + + if (log_query_duration) { + begin = std::chrono::system_clock::now(); + } - if (log_query_duration) { - begin = std::chrono::system_clock::now(); - } +#ifdef DUCKDB + try { + // Create DuckDB connection + duckdb::DuckDB db(nullptr); // In-memory database + duckdb::Connection con(db); + auto result = con.Query(sql); - auto res = mysql_real_query(thd->conn, query, strlen(query)); + if (log_query_duration) { + end = std::chrono::system_clock::now(); + auto te_start = std::chrono::duration_cast(begin - start_time); + auto te_query = std::chrono::duration_cast(end - begin); + auto in_time_t = std::chrono::system_clock::to_time_t(begin); - if (log_query_duration) { - end = std::chrono::system_clock::now(); + std::stringstream ss; + ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%dT%X"); - /* elpased time in micro-seconds */ - auto te_start = std::chrono::duration_cast( - begin - start_time); - auto te_query = - std::chrono::duration_cast(end - begin); - auto in_time_t = std::chrono::system_clock::to_time_t(begin); + thd->thread_log << ss.str() << " " << te_start.count() << "=>" << te_query.count() << "ms "; + } + thd->performed_queries_total++; - std::stringstream ss; - ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%dT%X"); + if (!result->success) { // Query failed + thd->failed_queries_total++; + thd->max_con_fail_count++; - thd->thread_log << ss.str() << " " << te_start.count() << "=>" - << te_query.count() << "ms "; - } - thd->performed_queries_total++; + // Manage recent queries for failed queries + if (options->at(Option::LOG_N_QUERIES)->getInt() > 0) { + thd->add_query("FAILED: " + sql); + } + if (log_all || log_failed) { + thd->thread_log << " F " << sql << std::endl; + thd->thread_log << "Error: " << result->error << std::endl; + } - if (res != 0) { // query failed - thd->failed_queries_total++; - thd->max_con_fail_count++; - // Manage recent queries for failed queries - if (options->at(Option::LOG_N_QUERIES)->getInt()>0) { - thd->add_query("FAILED: " + sql); - } - if (log_all || log_failed) { - thd->thread_log << " F " << sql << std::endl; - thd->thread_log << "Error " << mysql_error(thd->conn) << std::endl; - } - static std::set mysql_ignore_error = - splitStringToIntSet(options->at(Option::IGNORE_ERRORS)->getString()); + print_and_log("Fatal: " + sql, thd, true); + run_query_failed = true; + } else { // Query succeeded + thd->max_con_fail_count = 0; + thd->success = true; + + // Process query results + if (result->ColumnCount() > 0) { + thd->client_log << "Result Rows: "; + while (result->HasNext()) { + auto row = result->FetchRow(); + for (size_t i = 0; i < result->ColumnCount(); ++i) { + if (row[i].is_null) { + thd->client_log << "#NO DATA#"; + } else { + thd->client_log << row[i].ToString() << "#"; + } + } + if (log_query_numbers) { + thd->client_log << ++thd->query_number; + } + thd->client_log << '\n'; + } + } - if (options->at(Option::IGNORE_ERRORS)->getString() == "all" || - mysql_ignore_error.count(mysql_errno(thd->conn)) > 0) { - thd->thread_log << "Ignoring error " << mysql_error(thd->conn) - << std::endl; + // Manage recent queries for successful queries + if (options->at(Option::LOG_N_QUERIES)->getInt() > 0) { + thd->add_query("SUCCESS: " + sql); + } - if (mysql_errno(thd->conn) == CR_SERVER_GONE_ERROR || - mysql_errno(thd->conn) == CR_SERVER_LOST || - mysql_errno(thd->conn) == CR_WSREP_NOT_PREPARED) { - thd->tryreconnet(); - } + if (log_all || log_success) { + thd->thread_log << " S " << sql << " rows: " << result->row_count() << std::endl; + } + } - } else if (mysql_errno(thd->conn) == CR_SERVER_LOST || - mysql_errno(thd->conn) == CR_WSREP_NOT_PREPARED || - mysql_errno(thd->conn) == CR_SERVER_GONE_ERROR || - mysql_errno(thd->conn) == CR_SECONDARY_NOT_READY) { - print_and_log("Fatal: " + sql, thd, true); - run_query_failed = true; + if (thd->ddl_query) { + std::lock_guard lock(ddl_logs_write); + thd->ddl_logs << thd->thread_id << " " << sql << " " << result->error << std::endl; + } + + return result->success ? 1 : 0; + + } catch (std::exception &e) { + thd->failed_queries_total++; + thd->thread_log << "Exception: " << e.what() << std::endl; + return 0; } - } else { - thd->max_con_fail_count = 0; - thd->success = true; - auto result = mysql_store_result(thd->conn); - thd->result = std::shared_ptr(result, [](MYSQL_RES *r) { - if (r) - mysql_free_result(r); - }); - // Manage recent queries for successful queries - if (options->at(Option::LOG_N_QUERIES)->getInt()>0) { - thd->add_query("SUCCESS: " + sql); + +#else + // MySQL Logic + auto res = mysql_real_query(thd->conn, query, strlen(query)); + + if (log_query_duration) { + end = std::chrono::system_clock::now(); + auto te_start = std::chrono::duration_cast(begin - start_time); + auto te_query = std::chrono::duration_cast(end - begin); + auto in_time_t = std::chrono::system_clock::to_time_t(begin); + + std::stringstream ss; + ss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%dT%X"); + + thd->thread_log << ss.str() << " " << te_start.count() << "=>" << te_query.count() << "ms "; } + thd->performed_queries_total++; - if (log_client_output) { - if (thd->result != nullptr) { - unsigned int i, num_fields; - - num_fields = mysql_num_fields(thd->result.get()); - while (auto row = mysql_fetch_row_safe(thd)) { - for (i = 0; i < num_fields; i++) { - if (row[i]) { - if (strlen(row[i]) == 0) { - thd->client_log << "EMPTY" - << "#"; - } else { - thd->client_log << row[i] << "#"; - } - } else { - thd->client_log << "#NO DATA" - << "#"; + if (res != 0) { // Query failed + thd->failed_queries_total++; + thd->max_con_fail_count++; + + // Manage recent queries for failed queries + if (options->at(Option::LOG_N_QUERIES)->getInt() > 0) { + thd->add_query("FAILED: " + sql); + } + if (log_all || log_failed) { + thd->thread_log << " F " << sql << std::endl; + thd->thread_log << "Error: " << mysql_error(thd->conn) << std::endl; + } + + static std::set mysql_ignore_error = + splitStringToIntSet(options->at(Option::IGNORE_ERRORS)->getString()); + + if (options->at(Option::IGNORE_ERRORS)->getString() == "all" || + mysql_ignore_error.count(mysql_errno(thd->conn)) > 0) { + thd->thread_log << "Ignoring error " << mysql_error(thd->conn) << std::endl; + + if (mysql_errno(thd->conn) == CR_SERVER_GONE_ERROR || + mysql_errno(thd->conn) == CR_SERVER_LOST || + mysql_errno(thd->conn) == CR_WSREP_NOT_PREPARED) { + thd->tryreconnet(); } - } - if (log_query_numbers) { - thd->client_log << ++thd->query_number; - } - thd->client_log << '\n'; + } else if (mysql_errno(thd->conn) == CR_SERVER_LOST || + mysql_errno(thd->conn) == CR_WSREP_NOT_PREPARED || + mysql_errno(thd->conn) == CR_SERVER_GONE_ERROR || + mysql_errno(thd->conn) == CR_SECONDARY_NOT_READY) { + print_and_log("Fatal: " + sql, thd, true); + run_query_failed = true; + } + } else { // Query succeeded + thd->max_con_fail_count = 0; + thd->success = true; + + auto result = mysql_store_result(thd->conn); + thd->result = std::shared_ptr(result, [](MYSQL_RES *r) { + if (r) + mysql_free_result(r); + }); + + // Manage recent queries for successful queries + if (options->at(Option::LOG_N_QUERIES)->getInt() > 0) { + thd->add_query("SUCCESS: " + sql); } - } - } - /* log successful query */ - if (log_all || log_success) { - thd->thread_log << " S " << sql; - int number; - if (thd->result == nullptr) - number = mysql_affected_rows(thd->conn); - else - number = mysql_num_rows(thd->result.get()); - thd->thread_log << " rows:" << number << std::endl; + if (log_client_output) { + if (thd->result != nullptr) { + unsigned int i, num_fields; + + num_fields = mysql_num_fields(thd->result.get()); + while (auto row = mysql_fetch_row_safe(thd)) { + for (i = 0; i < num_fields; i++) { + if (row[i]) { + if (strlen(row[i]) == 0) { + thd->client_log << "EMPTY" + << "#"; + } else { + thd->client_log << row[i] << "#"; + } + } else { + thd->client_log << "#NO DATA" + << "#"; + } + } + if (log_query_numbers) { + thd->client_log << ++thd->query_number; + } + thd->client_log << '\n'; + } + } + } + + /* log successful query */ + if (log_all || log_success) { + thd->thread_log << " S " << sql; + int number; + if (thd->result == nullptr) + number = mysql_affected_rows(thd->conn); + else + number = mysql_num_rows(thd->result.get()); + thd->thread_log << " rows:" << number << std::endl; + } } - } - if (thd->ddl_query) { - std::lock_guard lock(ddl_logs_write); - thd->ddl_logs << thd->thread_id << " " << sql << " " - << mysql_error(thd->conn) << std::endl; - } + if (thd->ddl_query) { + std::lock_guard lock(ddl_logs_write); + thd->ddl_logs << thd->thread_id << " " << sql << " " + << mysql_error(thd->conn) << std::endl; + } - return (res == 0 ? 1 : 0); + return (res == 0 ? 1 : 0); +#endif } const std::vector row_group_sizes = {2,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31}; diff --git a/src/random_test.hpp b/src/random_test.hpp index 6354c87..91f7a8e 100644 --- a/src/random_test.hpp +++ b/src/random_test.hpp @@ -12,7 +12,13 @@ #include //shared_ptr #include #include +#ifdef USE_MYSQL #include +#endif + +#ifdef USE_DUCKDB +#include "duckdb.hpp" +#endif #include #include #include @@ -188,10 +194,22 @@ struct Index { struct Thd1 { Thd1(int id, std::ofstream &tl, std::ofstream &ddl_l, std::ofstream &client_l, - MYSQL *c, std::atomic &p, - std::atomic &f,int log_N_count) +#ifdef USE_MYSQL + MYSQL *c, +#endif +#ifdef USE_DUCKDB + duckdb::Connection *c, +#endif + std::atomic &p, + std::atomic &f, int log_N_count) : thread_id(id), thread_log(tl), ddl_logs(ddl_l), client_log(client_l), - conn(c), performed_queries_total(p), failed_queries_total(f), max_recent_queries(log_N_count){}; +#ifdef USE_MYSQL + conn(c), +#endif +#ifdef USE_DUCKDB + duckdb_conn(c), +#endif + performed_queries_total(p), failed_queries_total(f), max_recent_queries(log_N_count) {} bool run_some_query(); // create default tables and run random queries bool load_metadata(); // load metada of tool in memory @@ -202,10 +220,17 @@ struct Thd1 { std::ofstream &thread_log; std::ofstream &ddl_logs; std::ofstream &client_log; - MYSQL *conn; + #ifdef USE_MYSQL + MYSQL *conn; // MySQL connection + std::shared_ptr result; // Result set of SQL for MySQL + #endif + + #ifdef USE_DUCKDB + duckdb::Connection *duckdb_conn; // DuckDB connection + std::unique_ptr result; // Result set for DuckDB + #endif std::atomic &performed_queries_total; std::atomic &failed_queries_total; - std::shared_ptr result; // result set of sql bool ddl_query = false; // is the query ddl bool success = false; // if the sql is successfully executed int max_con_fail_count = 0; // consecutive failed queries diff --git a/src/thread.cpp b/src/thread.cpp index f66a2e8..36d0d05 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -186,13 +186,19 @@ void Node::workerThread(int number) { if (client_log.is_open()) client_log.close(); - mysql_close(conn); - mysql_thread_end(); + #ifdef USE_MYSQL + mysql_close(conn); + mysql_thread_end(); + #elif defined(USE_DUCKDB) + delete conn; + #endif } -bool Thd1::tryreconnet() { +bool Thd1::tryreconnect() { MYSQL *conn; auto myParams = *this->myParam; + +#ifdef USE_MYSQL conn = mysql_init(NULL); if (mysql_real_connect(conn, myParams.address.c_str(), myParams.username.c_str(), myParams.password.c_str(), @@ -200,11 +206,21 @@ bool Thd1::tryreconnet() { myParams.socket.c_str(), 0) == NULL) { thread_log << "Error Failed to reconnect " << mysql_errno(conn); mysql_close(conn); - return false; } MYSQL *old_conn = this->conn; mysql_close(old_conn); this->conn = conn; return true; +#elif defined(USE_DUCKDB) + duckdb::DuckDB *conn = new duckdb::DuckDB(myParams.database); + if (conn == nullptr) { + thread_log << "Error: Failed to reconnect to DuckDB database." << std::endl; + return false; + } + duckdb::DuckDB *old_conn = this->conn; + delete old_conn; + this->conn = conn; + return true; +#endif } From 83f3daa48efec07c6dd6ff6ef4cfb7ba30dc2e8a Mon Sep 17 00:00:00 2001 From: Jigyasu <133757043+jigyasumxkkxr@users.noreply.github.com> Date: Sun, 12 Jan 2025 20:14:27 +0530 Subject: [PATCH 7/7] DuckDB --- src/random_test.cpp | 25 +++++++++++++++++++--- src/random_test.hpp | 48 ++++++++++++++++++++++++----------------- src/thread.cpp | 52 ++++++++++++++++++++++++--------------------- 3 files changed, 78 insertions(+), 47 deletions(-) diff --git a/src/random_test.cpp b/src/random_test.cpp index 44b1907..21d8247 100644 --- a/src/random_test.cpp +++ b/src/random_test.cpp @@ -193,6 +193,7 @@ static query_result get_query_result(Thd1 *thd) { return result; } +#ifdef USE_MYSQL static void kill_query(Thd1 *thd) { auto on_exit = std::shared_ptr(nullptr, [&](...) { @@ -225,6 +226,7 @@ static void kill_query(Thd1 *thd) { } return; } +#endif /* return table pointer of matching table. This is only done during the * first step or during the prepare, so you would have only tables that are not @@ -2695,10 +2697,18 @@ bool execute_sql(const std::string &sql, Thd1 *thd) { } #ifdef DUCKDB + if (!thd->db_initialized) { + try { + // Create DuckDB connection + duckdb::DuckDB db(nullptr); // In-memory database + thd->db_connection = std::make_shared(db); + thd->db_initialized = true; + } catch (const std::exception &e) { + thd->thread_log << "Error initializing DuckDB: " << e.what() << std::endl; + return 0; + } + } try { - // Create DuckDB connection - duckdb::DuckDB db(nullptr); // In-memory database - duckdb::Connection con(db); auto result = con.Query(sql); if (log_query_duration) { @@ -2777,6 +2787,14 @@ bool execute_sql(const std::string &sql, Thd1 *thd) { #else // MySQL Logic + if (!thd->conn_initialized) { + thd->conn = mysql_init(nullptr); + if (!thd->conn) { + thd->thread_log << "Error initializing MySQL connection" << std::endl; + return 0; + } + thd->conn_initialized = true; + } auto res = mysql_real_query(thd->conn, query, strlen(query)); if (log_query_duration) { @@ -2888,6 +2906,7 @@ bool execute_sql(const std::string &sql, Thd1 *thd) { #endif } + const std::vector row_group_sizes = {2,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31}; const std::vector htable_sizes = {3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22}; diff --git a/src/random_test.hpp b/src/random_test.hpp index 91f7a8e..d02936b 100644 --- a/src/random_test.hpp +++ b/src/random_test.hpp @@ -19,6 +19,7 @@ #ifdef USE_DUCKDB #include "duckdb.hpp" #endif + #include #include #include @@ -195,24 +196,25 @@ struct Index { struct Thd1 { Thd1(int id, std::ofstream &tl, std::ofstream &ddl_l, std::ofstream &client_l, #ifdef USE_MYSQL - MYSQL *c, + MYSQL *mysql_conn, // Renamed to mysql_conn for clarity #endif #ifdef USE_DUCKDB - duckdb::Connection *c, + duckdb::Connection *duckdb_conn, // Renamed for consistency #endif std::atomic &p, std::atomic &f, int log_N_count) : thread_id(id), thread_log(tl), ddl_logs(ddl_l), client_log(client_l), #ifdef USE_MYSQL - conn(c), + conn(mysql_conn), // Initialize MySQL connection #endif #ifdef USE_DUCKDB - duckdb_conn(c), + duckdb_conn(duckdb_conn), // Initialize DuckDB connection #endif performed_queries_total(p), failed_queries_total(f), max_recent_queries(log_N_count) {} - bool run_some_query(); // create default tables and run random queries - bool load_metadata(); // load metada of tool in memory + bool run_some_query(); // Create default tables and run random queries + bool load_metadata(); // Load metadata of the tool in memory + bool tryreconnect(); int thread_id; long int seed; @@ -220,33 +222,38 @@ struct Thd1 { std::ofstream &thread_log; std::ofstream &ddl_logs; std::ofstream &client_log; - #ifdef USE_MYSQL - MYSQL *conn; // MySQL connection - std::shared_ptr result; // Result set of SQL for MySQL - #endif - - #ifdef USE_DUCKDB - duckdb::Connection *duckdb_conn; // DuckDB connection - std::unique_ptr result; // Result set for DuckDB - #endif + +#ifdef USE_MYSQL + MYSQL *conn; // MySQL connection + std::shared_ptr result; // Result set of SQL for MySQL +#endif + +#ifdef USE_DUCKDB + bool db_initialized = false; + duckdb::Connection *duckdb_conn; // DuckDB connection + std::unique_ptr result; // Result set for DuckDB +#endif + std::atomic &performed_queries_total; std::atomic &failed_queries_total; - bool ddl_query = false; // is the query ddl - bool success = false; // if the sql is successfully executed - int max_con_fail_count = 0; // consecutive failed queries + + bool ddl_query = false; // Is the query DDL + bool success = false; // If the SQL is successfully executed + int max_con_fail_count = 0; // Consecutive failed queries // For storing recent queries std::deque recent_queries; size_t max_recent_queries; - /* for loading Bulkdata, Primary key of current table is stored in this vector - * which is used for the FK tables */ + // For loading bulk data, primary key of current table is stored in this vector std::vector unique_keys; int query_number = 0; + std::string get_xid() { std::string xid = "\'xid" + std::to_string(thread_id) + "\'"; return xid; } + struct workerParams *myParam; bool tryreconnet(); @@ -264,6 +271,7 @@ struct Thd1 { } }; + /* Table basic properties */ struct Table { enum TABLE_TYPES { PARTITION, NORMAL, TEMPORARY, FK } type; diff --git a/src/thread.cpp b/src/thread.cpp index 36d0d05..ab3ad87 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -9,7 +9,7 @@ std::atomic_flag lock_metadata = ATOMIC_FLAG_INIT; std::atomic metadata_loaded(false); - +#ifdef USE_MYSQL // Get the number of affected rows safely inline unsigned long long Node::getAffectedRows(MYSQL *connection) { if (mysql_affected_rows(connection) == ~(unsigned long long)0) { @@ -17,6 +17,8 @@ inline unsigned long long Node::getAffectedRows(MYSQL *connection) { } return mysql_affected_rows(connection); } +#endif + void Node::workerThread(int number) { @@ -63,6 +65,7 @@ void Node::workerThread(int number) { std::cout << std::fixed; } + #ifdef USE_MYSQL MYSQL *conn; conn = mysql_init(NULL); @@ -77,13 +80,7 @@ void Node::workerThread(int number) { << std::endl; return; } - /* -#ifdef MAXPACKET - if (myParams.maxpacket != MAX_PACKET_DEFAULT) { - mysql_options(conn, MYSQL_OPT_MAX_ALLOWED_PACKET, &myParams.maxpacket); - } -#endif -*/ + if (mysql_real_connect(conn, myParams.address.c_str(), myParams.username.c_str(), myParams.password.c_str(), myParams.database.c_str(), myParams.port, @@ -98,10 +95,27 @@ void Node::workerThread(int number) { mysql_thread_end(); return; } +#endif + +#ifdef USE_DUCKDB + duckdb::DuckDB db(myParams.database); + duckdb::Connection conn(db); +#endif + +Thd1 *thd = nullptr; static auto log_N_count = options->at(Option::LOG_N_QUERIES)->getInt(); - Thd1 *thd = new Thd1(number, thread_log, general_log, client_log, conn, - performed_queries_total, failed_queries_total,log_N_count); + #ifdef USE_MYSQL + thd = new Thd1(number, thread_log, general_log, client_log, conn, + performed_queries_total, failed_queries_total, + log_N_count); +#endif + +#ifdef USE_DUCKDB + thd = new Thd1(number, thread_log, general_log, client_log, &conn, // Passing address for DuckDB + performed_queries_total, failed_queries_total, + log_N_count); +#endif thd->myParam = &myParams; @@ -194,11 +208,10 @@ void Node::workerThread(int number) { #endif } -bool Thd1::tryreconnect() { +#ifdef USE_MYSQL +bool Thd1::tryreconnet() { MYSQL *conn; auto myParams = *this->myParam; - -#ifdef USE_MYSQL conn = mysql_init(NULL); if (mysql_real_connect(conn, myParams.address.c_str(), myParams.username.c_str(), myParams.password.c_str(), @@ -206,21 +219,12 @@ bool Thd1::tryreconnect() { myParams.socket.c_str(), 0) == NULL) { thread_log << "Error Failed to reconnect " << mysql_errno(conn); mysql_close(conn); + return false; } MYSQL *old_conn = this->conn; mysql_close(old_conn); this->conn = conn; return true; -#elif defined(USE_DUCKDB) - duckdb::DuckDB *conn = new duckdb::DuckDB(myParams.database); - if (conn == nullptr) { - thread_log << "Error: Failed to reconnect to DuckDB database." << std::endl; - return false; - } - duckdb::DuckDB *old_conn = this->conn; - delete old_conn; - this->conn = conn; - return true; -#endif } +#endif \ No newline at end of file