Skip to content

Commit

Permalink
Merge pull request #4762 from sysown/v3.0_dynamic_fast_forward
Browse files Browse the repository at this point in the history
'COPY ... FROM STDIN' Support with Dynamic Fast Forward and Improved Error Handling - v3.0
  • Loading branch information
renecannao authored Dec 12, 2024
2 parents 90d83e8 + 7e20a59 commit 2c80f4a
Show file tree
Hide file tree
Showing 30 changed files with 2,004 additions and 246 deletions.
2 changes: 1 addition & 1 deletion deps/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ pcre/pcre/.libs/libpcre.a:

pcre: pcre/pcre/.libs/libpcre.a

postgresql/postgresql/src/interfaces/libpq/libpq.a :
postgresql/postgresql/src/interfaces/libpq/libpq.a:
cd postgresql && rm -rf postgresql-*/ || true
cd postgresql && tar -zxf postgresql-*.tar.gz
cd postgresql/postgresql && patch -p0 < ../get_result_from_pgconn.patch
Expand Down
12 changes: 10 additions & 2 deletions include/Base_Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ class StmtLongDataHandler;
class MySQL_Session;
class PgSQL_Session;

enum SESSION_FORWARD_TYPE : uint8_t {
SESSION_FORWARD_TYPE_NONE = 0x00,
SESSION_FORWARD_TYPE_PERMANENT = 0x01,
SESSION_FORWARD_TYPE_TEMPORARY = 0x02,
SESSION_FORWARD_TYPE_COPY_STDIN = 0x04 | SESSION_FORWARD_TYPE_TEMPORARY,
SESSION_FORWARD_TYPE_START_REPLICATION = 0x08 | SESSION_FORWARD_TYPE_TEMPORARY,
};

template<typename S, typename DS, typename B, typename T>
class Base_Session {
public:
Expand Down Expand Up @@ -89,8 +97,8 @@ class Base_Session {
//bool stats;
bool schema_locked;
bool transaction_persistent;
bool session_fast_forward;
bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
SESSION_FORWARD_TYPE session_fast_forward;
//bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
bool use_ssl;
MySQL_STMTs_meta *sess_STMTs_meta;
StmtLongDataHandler *SLDH;
Expand Down
4 changes: 2 additions & 2 deletions include/MySQL_Data_Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ class MySQL_Data_Stream
//
// we have a similar code in MySQL_Connection
// in case of ASYNC_CONNECT_SUCCESSFUL
if (sess != NULL && sess->session_fast_forward == true) {
if (sess != NULL && sess->session_fast_forward) {
// if frontend and backend connection use SSL we will set
// encrypted = true and we will start using the SSL structure
// directly from P_MARIADB_TLS structure.
Expand Down Expand Up @@ -260,7 +260,7 @@ class MySQL_Data_Stream
myconn->myds=NULL;
myconn=NULL;
if (encrypted == true) {
if (sess != NULL && sess->session_fast_forward == true) {
if (sess != NULL && sess->session_fast_forward) {
// it seems we are a connection with SSL on a fast_forward session.
// See attach_connection() for more details .
// We now disable SSL metadata from the Data Stream
Expand Down
2 changes: 1 addition & 1 deletion include/MySQL_Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class MySQL_Session: public Base_Session<MySQL_Session, MySQL_Data_Stream, MySQL
bool schema_locked;
bool transaction_persistent;
bool session_fast_forward;
bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
//bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
bool use_ssl;
#endif // 0
/**
Expand Down
8 changes: 7 additions & 1 deletion include/PgSQL_Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -640,10 +640,10 @@ class PgSQL_Connection : public PgSQL_Connection_Placeholder {
inline int get_pg_is_nonblocking() { return PQisnonblocking(pgsql_conn); }
inline int get_pg_is_threadsafe() { return PQisthreadsafe(); }
inline const char* get_pg_error_message() { return PQerrorMessage(pgsql_conn); }
inline SSL* get_pg_ssl_object() { return (SSL*)PQsslStruct(pgsql_conn, "OpenSSL"); }
const char* get_pg_server_version_str(char* buff, int buff_size);
const char* get_pg_connection_status_str();
const char* get_pg_transaction_status_str();

unsigned int get_memory_usage() const;

//PgSQL_Conn_Param conn_params;
Expand All @@ -655,10 +655,16 @@ class PgSQL_Connection : public PgSQL_Connection_Placeholder {
PgSQL_Query_Result* query_result;
PgSQL_Query_Result* query_result_reuse;
bool new_result;
bool is_copy_out;
//PgSQL_SrvC* parent;
//PgSQL_Connection_userinfo* userinfo;
//PgSQL_Data_Stream* myds;
//int fd;

private:
// Handles the COPY OUT response from the server.
// Returns true if it consumes all buffer data, or false if the threshold for result size is reached
bool handle_copy_out(const PGresult* result, uint64_t* processed_bytes);
};

#endif /* __CLASS_PGSQL_CONNECTION_H */
27 changes: 12 additions & 15 deletions include/PgSQL_Data_Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class PgSQL_Data_Stream
private:
int array2buffer();
int buffer2array();
void generate_compressed_packet();
enum pgsql_sslstatus do_ssl_handshake();
void queue_encrypted_bytes(const char* buf, size_t len);
public:
Expand Down Expand Up @@ -217,39 +216,37 @@ class PgSQL_Data_Stream
//
// we have a similar code in MySQL_Connection
// in case of ASYNC_CONNECT_SUCCESSFUL
if (sess != NULL && sess->session_fast_forward == true) {
if (sess != NULL && sess->session_fast_forward) {
// if frontend and backend connection use SSL we will set
// encrypted = true and we will start using the SSL structure
// directly from P_MARIADB_TLS structure.
// directly from PGconn SSL structure.
//
// For futher details:
// - without ssl: we use the file descriptor from pgsql connection
// - with ssl: we use the SSL structure from pgsql connection
if (myconn->pgsql && myconn->ret_mysql) {
if (myconn->pgsql->options.use_ssl == 1) {
if (myconn->is_connected() && myconn->get_pg_ssl_in_use()) {
if (ssl == NULL) {
encrypted = true;
if (ssl == NULL) {
// check the definition of P_MARIADB_TLS
// P_MARIADB_TLS* matls = (P_MARIADB_TLS*)myconn->pgsql->net.pvio->ctls;
// ssl = (SSL*)matls->ssl;
// rbio_ssl = BIO_new(BIO_s_mem());
// wbio_ssl = BIO_new(BIO_s_mem());
// SSL_set_bio(ssl, rbio_ssl, wbio_ssl);
}
SSL* ssl_obj = myconn->get_pg_ssl_object();
if (ssl_obj == NULL) assert(0); // Should not be null
ssl = ssl_obj;
rbio_ssl = BIO_new(BIO_s_mem());
wbio_ssl = BIO_new(BIO_s_mem());
SSL_set_bio(ssl, rbio_ssl, wbio_ssl);
}
}
}
}

// safe way to detach a MySQL Connection
// safe way to detach a PgSQL Connection
void detach_connection() {
assert(myconn);
myconn->statuses.pgconnpoll_put++;
statuses.pgconnpoll_put++;
myconn->myds = NULL;
myconn = NULL;
if (encrypted == true) {
if (sess != NULL && sess->session_fast_forward == true) {
if (sess != NULL && sess->session_fast_forward) {
// it seems we are a connection with SSL on a fast_forward session.
// See attach_connection() for more details .
// We now disable SSL metadata from the Data Stream
Expand Down
75 changes: 75 additions & 0 deletions include/PgSQL_Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ class PgSQL_Protocol;
#define PGSQL_QUERY_RESULT_READY 0x04
#define PGSQL_QUERY_RESULT_ERROR 0x08
#define PGSQL_QUERY_RESULT_EMPTY 0x10
#define PGSQL_QUERY_RESULT_COPY_OUT 0x20
#define PGSQL_QUERY_RESULT_COPY_IN 0x30

class PgSQL_Query_Result {
public:
Expand Down Expand Up @@ -435,6 +437,40 @@ class PgSQL_Query_Result {
*/
unsigned int add_ready_status(PGTransactionStatusType txn_status);

/**
* @brief Adds the start of a COPY OUT response to the packet.
*
* This function adds the initial part of a COPY OUT response to the packet.
* It uses the provided PGresult object to determine the necessary information
* to include in the response.
*
* @param result A pointer to the PGresult object containing the response data.
* @return The number of bytes added to the packet.
*/
unsigned int add_copy_out_response_start(const PGresult* result);

/**
* @brief Adds a row of data to the COPY OUT response.
*
* This function adds a row of data to the ongoing COPY OUT response. The data
* is provided as a pointer to the row data and its length.
*
* @param data A pointer to the row data to be added.
* @param len The length of the row data in bytes.
* @return The number of bytes added to the packet.
*/
unsigned int add_copy_out_row(const void* data, unsigned int len);

/**
* @brief Adds the end of a COPY OUT response to the packet.
*
* This function adds the final part of a COPY OUT response to the packet,
* indicating the end of the response.
*
* @return The number of bytes added to the packet.
*/
unsigned int add_copy_out_response_end();

/**
* @brief Retrieves the query result set and copies it to a PtrSizeArray.
*
Expand Down Expand Up @@ -870,6 +906,45 @@ class PgSQL_Protocol : public MySQL_Protocol {
*/
unsigned int copy_buffer_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PSresult* result);

/**
* @brief Copies the start of a response to a PgSQL_Query_Result.
*
* This function copies the initial part of a response to the provided
* PgSQL_Query_Result object. It can optionally send the response.
*
* @param send Whether to send the response.
* @param pg_query_result The PgSQL_Query_Result object to copy the response to.
* @param result The PGresult object containing the response data.
* @return The number of bytes copied.
*/
unsigned int copy_out_response_start_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result);

/**
* @brief Copies a row to a PgSQL_Query_Result.
*
* This function copies a single row of data to the provided PgSQL_Query_Result
* object. It can optionally send the row data.
*
* @param send Whether to send the row data.
* @param pg_query_result The PgSQL_Query_Result object to copy the row to.
* @param data The row data to copy.
* @param len The length of the row data.
* @return The number of bytes copied.
*/
unsigned int copy_out_row_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const unsigned char* data, unsigned int len);

/**
* @brief Copies the end of a response to a PgSQL_Query_Result.
*
* This function copies the final part of a response to the provided
* PgSQL_Query_Result object. It can optionally send the response.
*
* @param send Whether to send the response.
* @param pg_query_result The PgSQL_Query_Result object to copy the response to.
* @return The number of bytes copied.
*/
unsigned int copy_out_response_end_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result);

private:

/**
Expand Down
30 changes: 27 additions & 3 deletions include/PgSQL_Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ class PgSQL_Query_Info {

PgSQL_Query_Info();
~PgSQL_Query_Info();
void init(unsigned char* _p, int len, bool mysql_header = false);
void init(unsigned char* _p, int len, bool header = false);
void query_parser_init();
enum PGSQL_QUERY_command query_parser_command_type();
void query_parser_free();
unsigned long long query_parser_update_counters();
void begin(unsigned char* _p, int len, bool mysql_header = false);
void begin(unsigned char* _p, int len, bool header = false);
void end();
char* get_digest_text();
bool is_select_NOT_for_update();
Expand Down Expand Up @@ -256,6 +256,29 @@ class PgSQL_Session : public Base_Session<PgSQL_Session, PgSQL_Data_Stream, PgSQ
void handler_WCD_SS_MCQ_qpo_error_msg(PtrSize_t* pkt);
void handler_WCD_SS_MCQ_qpo_LargePacket(PtrSize_t* pkt);

/**
* @brief Switches session from normal mode to fast forward mode.
*
* This method transitions the session to fast forward mode based on session type.
* (Currently only supports SESSION_FORWARD_TYPE_TEMPORARY and extended types)
*
* @param pkt Used solely to push the packet back to client_myds PSarrayIN,
* allowing it to be forwarded to the backend via the fast forward session
* @param command Command that causes the session to switch to fast forward mode.
* @param session_type SESSION_FORWARD_TYPE indicating the type of session.
*
* @return void.
*/
void switch_normal_to_fast_forward_mode(PtrSize_t& pkt, std::string_view command, SESSION_FORWARD_TYPE session_type);

/**
* @brief Switches session from fast forward mode to normal mode.
*
* This method is used to revert session from fast forward mode back to normal mode.
*
*/
void switch_fast_forward_to_normal_mode();

public:
bool handler_again___status_SETTING_GENERIC_VARIABLE(int* _rc, const char* var_name, const char* var_value, bool no_quote = false, bool set_transaction = false);
#if 0
Expand Down Expand Up @@ -341,7 +364,7 @@ class PgSQL_Session : public Base_Session<PgSQL_Session, PgSQL_Data_Stream, PgSQ
bool schema_locked;
bool transaction_persistent;
bool session_fast_forward;
bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
//bool started_sending_data_to_client; // this status variable tracks if some result set was sent to the client, or if proxysql is still buffering everything
bool use_ssl;
#endif // 0
/**
Expand All @@ -359,6 +382,7 @@ class PgSQL_Session : public Base_Session<PgSQL_Session, PgSQL_Data_Stream, PgSQ
// StmtLongDataHandler* SLDH;

Session_Regex** match_regexes;
CopyCmdMatcher* copy_cmd_matcher;

ProxySQL_Node_Address* proxysql_node_address; // this is used ONLY for Admin, and only if the other party is another proxysql instance part of a cluster
bool use_ldap_auth;
Expand Down
21 changes: 20 additions & 1 deletion include/PgSQL_Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,25 @@ enum PgSQL_Thread_status_variable {
PG_st_var_END = 42 // to avoid ASAN complaining. TO FIX
};


struct CopyCmdMatcher {
re2::RE2::Options options;
re2::RE2 pattern;

CopyCmdMatcher() :
options(RE2::Quiet),
pattern(
R"(((?is)(?:--.*?$|/\*[\s\S]*?\*/|\s)*\bCOPY\b\s+[^;]*?\bFROM\b\s+STDIN\b(?:\s+WITH\s*\([^)]*\))?))",
options) {
//((?is)(?:--.*?$|/\*[\s\S]*?\*/|\s)*\bCOPY\b\s+[^;]*?\bFROM\b\s+STDIN\b(?:\s+WITH\s*\([^)]*\))?)
}

inline
bool match(const char* query, re2::StringPiece* matched = nullptr) const {
return re2::RE2::PartialMatch(query, pattern, matched);
}
};

class __attribute__((aligned(64))) PgSQL_Thread : public Base_Thread
{
private:
Expand Down Expand Up @@ -196,7 +215,7 @@ class __attribute__((aligned(64))) PgSQL_Thread : public Base_Thread
#ifdef IDLE_THREADS
PtrArray* idle_mysql_sessions;
PtrArray* resume_mysql_sessions;

CopyCmdMatcher *copy_cmd_matcher;
pgsql_conn_exchange_t myexchange;
#endif // IDLE_THREADS

Expand Down
1 change: 1 addition & 0 deletions include/proxysql_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ enum PGSQL_QUERY_command {
PGSQL_QUERY_ALTER_TABLESPACE,
PGSQL_QUERY_DROP_TABLESPACE,
PGSQL_QUERY_CLUSTER,
PGSQL_QUERY_START_REPLICATION,
PGSQL_QUERY_UNKNOWN,
PGSQL_QUERY__UNINITIALIZED,
PGSQL_QUERY___NONE // Special marker.
Expand Down
4 changes: 4 additions & 0 deletions lib/Base_Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ void Base_Thread::register_session(T thr, S _sess, bool up_start) {
// assert(0);
// }
_sess->match_regexes=match_regexes;
if constexpr (std::is_same_v<T, PgSQL_Thread*>) {
_sess->copy_cmd_matcher = (static_cast<PgSQL_Thread*>(this))->copy_cmd_matcher;
}

if (up_start)
_sess->start_time=curtime;
proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Registered new session\n", _sess->thread, _sess);
Expand Down
2 changes: 1 addition & 1 deletion lib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ POSTGRES_LDIR=$(POSTGRES_IFACES)/libpq/

IDIR := ../include

IDIRS := -I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(RE2_IDIR) -I$(SQLITE3_DIR) -I$(PCRE_PATH) -I/usr/local/include -I$(CLICKHOUSE_CPP_DIR) -I$(CLICKHOUSE_CPP_DIR)/contrib/ $(MICROHTTPD_IDIR) $(LIBHTTPSERVER_IDIR) $(LIBINJECTION_IDIR) -I$(CURL_IDIR) -I$(EV_DIR) -I$(SSL_IDIR) -I$(PROMETHEUS_IDIR) -I$(LIBUSUAL_IDIR) -I$(LIBSCRAM_IDIR) -I$(POSTGRES_IFACE)
IDIRS := -I$(IDIR) -I$(JEMALLOC_IDIR) -I$(MARIADB_IDIR) $(LIBCONFIG_IDIR) -I$(RE2_IDIR) -I$(SQLITE3_DIR) -I$(PCRE_PATH) -I/usr/local/include -I$(CLICKHOUSE_CPP_DIR) -I$(CLICKHOUSE_CPP_DIR)/contrib/ $(MICROHTTPD_IDIR) $(LIBHTTPSERVER_IDIR) $(LIBINJECTION_IDIR) -I$(CURL_IDIR) -I$(EV_DIR) -I$(PROMETHEUS_IDIR) -I$(LIBUSUAL_IDIR) -I$(LIBSCRAM_IDIR) -I$(POSTGRES_IFACE) -I$(SSL_IDIR)
ifeq ($(UNAME_S),Linux)
IDIRS += -I$(COREDUMPER_IDIR)
endif
Expand Down
Loading

0 comments on commit 2c80f4a

Please sign in to comment.