From 6070bac765141e9a5a06025c5efbc36e38b96423 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Thu, 8 Feb 2024 17:03:48 +0000 Subject: [PATCH] switch_to_fast_forward query rules attribute Just a POC at the moment. This allows a query rules to switch to fast_forward mode. It uses mysql_query_rules.attributes , JSON key switch_to_fast_forward --- include/query_processor.h | 2 + lib/MySQL_Session.cpp | 108 ++++++++++++++++++++++++++++++++++++++ lib/Query_Processor.cpp | 44 +++++++++++++++- 3 files changed, 153 insertions(+), 1 deletion(-) diff --git a/include/query_processor.h b/include/query_processor.h index c9dd826a3d..1c90d4a12e 100644 --- a/include/query_processor.h +++ b/include/query_processor.h @@ -73,6 +73,7 @@ class QP_query_digest_stats { struct _Query_Processor_rule_t { int rule_id; bool active; + bool switch_to_fast_forward; char *username; char *schemaname; int flagIN; @@ -145,6 +146,7 @@ class Query_Processor_Output { char *comment; // #643 char *min_gtid; bool create_new_conn; + bool switch_to_fast_forward; std::string *new_query; void * operator new(size_t size) { return l_alloc(size); diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index c6097ebca4..19e3cb414c 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -4007,6 +4007,114 @@ int MySQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) { (begint.tv_sec*1000000000+begint.tv_nsec); } assert(qpo); // GloQPro->process_mysql_query() should always return a qpo + + + + if (qpo->switch_to_fast_forward == true) { + // In this switch we handle commands that download binlog events from MySQL + // servers. For these commands a lot of the features provided by ProxySQL + // aren't useful, like multiplexing, query parsing, etc. For this reason, + // ProxySQL enables fast_forward when it receives these commands.  + { + // we use a switch to write the command in the info message + std::string q = ""; +/* + switch ((enum_mysql_command)c) { + case _MYSQL_COM_BINLOG_DUMP: + q += "MYSQL_COM_BINLOG_DUMP"; + break; + case _MYSQL_COM_BINLOG_DUMP_GTID: + q += "MYSQL_COM_BINLOG_DUMP_GTID"; + break; + case _MYSQL_COM_REGISTER_SLAVE: + q += "MYSQL_COM_REGISTER_SLAVE"; + break; + default: + assert(0); + break; + }; +*/ + // we add the client details in the info message + if (client_myds && client_myds->addr.addr) { + q += "Client " + std::string(client_myds->addr.addr) + ":" + std::to_string(client_myds->addr.port); + } + q += " : changing session fast_forward to true"; + proxy_info("%s\n", q.c_str()); + } + session_fast_forward = true; + + if (client_myds->PSarrayIN->len) { + proxy_error("UNEXPECTED PACKET FROM CLIENT -- PLEASE REPORT A BUG\n"); + assert(0); + } + client_myds->PSarrayIN->add(pkt.ptr, pkt.size); + + // The following code prepares the session as if it was configured with fast + // forward before receiving the command. This way the state machine will + // handle the command automatically. + current_hostgroup = previous_hostgroup; + mybe = find_or_create_backend(current_hostgroup); // set a backend + mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active + // We reinitialize the 'wait_until' since this session shouldn't wait for processing as + // we are now transitioning to 'FAST_FORWARD'. + mybe->server_myds->wait_until = 0; + if (mybe->server_myds->DSS==STATE_NOT_INITIALIZED) { + // NOTE: This section is entirely borrowed from 'STATE_SLEEP' for 'session_fast_forward'. + // Check comments there for extra information. + // ============================================================================= + if (mybe->server_myds->max_connect_time == 0) { + uint64_t connect_timeout = + mysql_thread___connect_timeout_server < mysql_thread___connect_timeout_server_max ? + mysql_thread___connect_timeout_server_max : mysql_thread___connect_timeout_server; + mybe->server_myds->max_connect_time = thread->curtime + connect_timeout * 1000; + } + mybe->server_myds->connect_retries_on_failure = mysql_thread___connect_retries_on_failure; + CurrentQuery.start_time=thread->curtime; + // ============================================================================= + + // we don't have a connection + previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD + set_status(CONNECTING_SERVER); // now we need a connection + } else { + // In case of having a connection, we need to make user to reset the state machine + // for current server 'MySQL_Data_Stream', setting it outside of any state handled + // by 'mariadb' library. Otherwise 'MySQL_Thread' will threat this + // 'MySQL_Data_Stream' as library handled. + mybe->server_myds->DSS = STATE_READY; + // myds needs to have encrypted value set correctly + { + MySQL_Data_Stream * myds = mybe->server_myds; + MySQL_Connection * myconn = myds->myconn; + assert(myconn != NULL); + // PMC-10005 + // if backend connection uses SSL we will set + // encrypted = true and we will start using the SSL structure + // directly from P_MARIADB_TLS structure. + MYSQL *mysql = myconn->mysql; + if (mysql && myconn->ret_mysql) { + if (mysql->options.use_ssl == 1) { + P_MARIADB_TLS * matls = (P_MARIADB_TLS *)mysql->net.pvio->ctls; + if (matls != NULL) { + myds->encrypted = true; + myds->ssl = (SSL *)matls->ssl; + myds->rbio_ssl = BIO_new(BIO_s_mem()); + myds->wbio_ssl = BIO_new(BIO_s_mem()); + SSL_set_bio(myds->ssl, myds->rbio_ssl, myds->wbio_ssl); + } else { + // if mysql->options.use_ssl == 1 but matls == NULL + // it means that ProxySQL tried to use SSL to connect to the backend + // but the backend didn't support SSL + } + } + } + } + set_status(FAST_FORWARD); // we can set status to FAST_FORWARD + } + break; + } + + + // This block was moved from 'handler_special_queries' to support // handling of 'USE' statements which are preceded by a comment. // For more context check issue: #3493. diff --git a/lib/Query_Processor.cpp b/lib/Query_Processor.cpp index 85f1b499db..ea68bbd996 100644 --- a/lib/Query_Processor.cpp +++ b/lib/Query_Processor.cpp @@ -745,9 +745,48 @@ QP_rule_t * Query_Processor::new_query_rule(int rule_id, bool active, char *user newQR->flagOUT_weights_total = 0; newQR->flagOUT_ids = NULL; newQR->flagOUT_weights = NULL; + newQR->switch_to_fast_forward = false; if (newQR->attributes != NULL) { if (strlen(newQR->attributes)) { - nlohmann::json j_attributes = nlohmann::json::parse(newQR->attributes); + nlohmann::json j_attributes = nlohmann::json::parse(newQR->attributes); + if ( j_attributes.find("switch_to_fast_forward") != j_attributes.end() ) { + bool parsed = false; + const nlohmann::json& j = j_attributes; + if (j["switch_to_fast_forward"].type() == nlohmann::json::value_t::number_unsigned) { + if (j["switch_to_fast_forward"] == 0 || j["switch_to_fast_forward"] == 1) { + if (j["switch_to_fast_forward"] == 1) { + newQR->switch_to_fast_forward = true; + } + parsed = true; + } + } + if (parsed == false) { + if (j["switch_to_fast_forward"].type() == nlohmann::json::value_t::boolean) { + if (j["switch_to_fast_forward"] == true) { + newQR->switch_to_fast_forward = true; + } + parsed = true; + } + } + if (parsed == false) { + if (j["switch_to_fast_forward"].type() == nlohmann::json::value_t::string) { + string s = j["switch_to_fast_forward"]; + const char *a = s.c_str(); + if ( + (strcasecmp(a,"yes") == 0) || (strcasecmp(a,"true") == 0) || (strcasecmp(a,"1") == 0) + || + (strcasecmp(a,"no") == 0) || (strcasecmp(a,"false") == 0) || (strcasecmp(a,"0") == 0) + ) { + if ( + (strcasecmp(a,"yes") == 0) || (strcasecmp(a,"true") == 0) || (strcasecmp(a,"1") == 0) + ) { + newQR->switch_to_fast_forward = true; + } + } + parsed = true; + } + } + } if ( j_attributes.find("flagOUTs") != j_attributes.end() ) { newQR->flagOUT_ids = new vector; newQR->flagOUT_weights = new vector; @@ -1978,6 +2017,9 @@ Query_Processor_Output * Query_Processor::process_mysql_query(MySQL_Session *ses // if we arrived here, we have a match qr->hits++; // this is done without atomic function because it updates only the local variables bool set_flagOUT=false; + if (qr->switch_to_fast_forward == true) { + ret->switch_to_fast_forward = true; + } if (qr->flagOUT_weights_total > 0) { int rnd = random() % qr->flagOUT_weights_total; for (unsigned int i=0; i< qr->flagOUT_weights->size(); i++) {