Skip to content

Commit

Permalink
Merge pull request #4703 from sysown/v3.0_refactor_query_cache
Browse files Browse the repository at this point in the history
Refactored and Optimized Query Cache with PgSQL Support
  • Loading branch information
renecannao authored Nov 13, 2024
2 parents 8742bfb + 6d614a4 commit 6a7c1cf
Show file tree
Hide file tree
Showing 32 changed files with 2,433 additions and 747 deletions.
16 changes: 10 additions & 6 deletions deps/postgresql/handle_row_data.patch
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ index 2265ab5..56883ec 100644
return conn->result;
}

+int PShandleRowData(PGconn *conn, PSresult* result) {
+int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result) {
+ if (!conn || !result)
+ return 1;
+ return psHandleRowData(conn, result);
+ return psHandleRowData(conn, is_first_packet, result);
+}
+
diff --git src/interfaces/libpq/fe-misc.c src/interfaces/libpq/fe-misc.c
Expand Down Expand Up @@ -60,7 +60,7 @@ diff --git src/interfaces/libpq/fe-protocol3.c src/interfaces/libpq/fe-protocol3
index 9c4aa7e..de0746c 100644
--- src/interfaces/libpq/fe-protocol3.c
+++ src/interfaces/libpq/fe-protocol3.c
@@ -2299,3 +2299,105 @@ build_startup_packet(const PGconn *conn, char *packet,
@@ -2299,3 +2299,109 @@ build_startup_packet(const PGconn *conn, char *packet,

return packet_len;
}
Expand All @@ -78,7 +78,7 @@ index 9c4aa7e..de0746c 100644
+ * -1 -> Not enough data to process the message; the next call should be to PQconsumeInput.
+ */
+int
+psHandleRowData(PGconn *conn, PSresult* result)
+psHandleRowData(PGconn *conn, bool isFirstPacket, PSresult* result)
+{
+ char id;
+ int msgLength;
Expand Down Expand Up @@ -122,6 +122,10 @@ index 9c4aa7e..de0746c 100644
+ return 1;
+ }
+
+ /* First data row should be skipped since it is part of PGresult, which contains row description */
+ if (isFirstPacket)
+ return 1;
+
+ if (conn->result != NULL &&
+ conn->result->resultStatus == PGRES_TUPLES_OK)
+ {
Expand Down Expand Up @@ -194,7 +198,7 @@ index c5170d1..3e3cc34 100644
extern const PGresult *PQgetResultFromPGconn(PGconn *conn);

+/* ProxySQL special handler function */
+extern int PShandleRowData(PGconn *conn, PSresult* result);
+extern int PShandleRowData(PGconn *conn, bool is_first_packet, PSresult* result);
+
#ifdef __cplusplus
}
Expand All @@ -210,7 +214,7 @@ index a951f49..e1df8b5 100644
+ /*
+ * ProxySQL light weight routines
+ */
+extern int psHandleRowData(PGconn *conn, PSresult* result);
+extern int psHandleRowData(PGconn *conn, bool is_first_packet, PSresult* result);
+
/* === in fe-misc.c === */

Expand Down
26 changes: 26 additions & 0 deletions include/MySQL_Query_Cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#ifndef __CLASS_MYSQL_QUERY_CACHE_H
#define __CLASS_MYSQL_QUERY_CACHE_H

#include "proxysql.h"
#include "cpp.h"
#include "query_cache.hpp"

typedef struct _MySQL_QC_entry : public QC_entry_t {
uint32_t column_eof_pkt_offset;
uint32_t row_eof_pkt_offset;
uint32_t ok_pkt_offset;
} MySQL_QC_entry_t;

class MySQL_Query_Cache : public Query_Cache<MySQL_Query_Cache> {
public:
MySQL_Query_Cache() = default;
~MySQL_Query_Cache() = default;

bool set(uint64_t user_hash, const unsigned char* kp, uint32_t kl, unsigned char* vp, uint32_t vl,
uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms, bool deprecate_eof_active);
unsigned char* get(uint64_t user_hash, const unsigned char* kp, const uint32_t kl, uint32_t* lv,
uint64_t curtime_ms, uint64_t cache_ttl, bool deprecate_eof_active);
//void* purgeHash_thread(void*);
};

#endif /* __CLASS_MYSQL_QUERY_CACHE_H */
9 changes: 5 additions & 4 deletions include/PgSQL_Data_Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ class PgSQL_Data_Stream
FixedSizeQueue data_packets_history_IN;
FixedSizeQueue data_packets_history_OUT;
//PtrSizeArray *PSarrayOUTpending;
PtrSizeArray* resultset;
unsigned int resultset_length;
//PtrSizeArray* resultset;
//unsigned int resultset_length;

ProxySQL_Poll<PgSQL_Data_Stream>* mypolls;
//int listener;
Expand Down Expand Up @@ -201,8 +201,9 @@ class PgSQL_Data_Stream
void check_data_flow();
int assign_fd_from_mysql_conn();

unsigned char* resultset2buffer(bool);
void buffer2resultset(unsigned char*, unsigned int);
static unsigned char* copy_array_to_buffer(PtrSizeArray* resultset, size_t resultset_length, bool del);
static void copy_buffer_to_resultset(PtrSizeArray* resultset, unsigned char* ptr, uint64_t size,
char current_transaction_state);

// safe way to attach a PgSQL Connection
void attach_connection(PgSQL_Connection* mc) {
Expand Down
22 changes: 22 additions & 0 deletions include/PgSQL_Query_Cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#ifndef __CLASS_PGSQL_QUERY_CACHE_H
#define __CLASS_PGSQL_QUERY_CACHE_H

#include "proxysql.h"
#include "cpp.h"
#include "query_cache.hpp"

typedef struct _PgSQL_QC_entry : public QC_entry_t {} PgSQL_QC_entry_t;

class PgSQL_Query_Cache : public Query_Cache<PgSQL_Query_Cache> {
public:
PgSQL_Query_Cache() = default;
~PgSQL_Query_Cache() = default;

bool set(uint64_t user_hash, const unsigned char* kp, uint32_t kl, unsigned char* vp, uint32_t vl,
uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms);
const std::shared_ptr<PgSQL_QC_entry_t> get(uint64_t user_hash, const unsigned char* kp, const uint32_t kl,
uint64_t curtime_ms, uint64_t cache_ttl);
//void* purgeHash_thread(void*);
};

#endif /* __CLASS_PGSQL_QUERY_CACHE_H */
2 changes: 1 addition & 1 deletion include/cpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include "PgSQL_Backend.h"
#include "ProxySQL_Poll.h"
//#include "MySQL_Data_Stream.h"
#include "query_cache.hpp"
//#include "MySQL_Query_Cache.h"
#include "mysql_connection.h"
#include "sqlite3db.h"
//#include "StatCounters.h"
Expand Down
11 changes: 10 additions & 1 deletion include/proxysql_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,8 @@ class SimpleKV;
class AdvancedKV;
template <class T>
class ProxySQL_Poll;
class Query_Cache;
class MySQL_Query_Cache;
class PgSQL_Query_Cache;
class MySQL_Authentication;
class MySQL_Connection;
class PgSQL_Connection;
Expand Down Expand Up @@ -1101,6 +1102,10 @@ __thread char* pgsql_thread___monitor_username;
__thread char* pgsql_thread___monitor_password;
__thread char* pgsql_thread___monitor_dbname;

// PgSQL Query Cache
__thread int pgsql_thread___query_cache_size_MB;
__thread int pgsql_thread___query_cache_soft_ttl_pct;
__thread int pgsql_thread___query_cache_handle_warnings;
//---------------------------

__thread char *mysql_thread___default_schema;
Expand Down Expand Up @@ -1393,6 +1398,10 @@ extern __thread char* pgsql_thread___monitor_username;
extern __thread char* pgsql_thread___monitor_password;
extern __thread char* pgsql_thread___monitor_dbname;

// PgSQL Query Cache
extern __thread int pgsql_thread___query_cache_size_MB;
extern __thread int pgsql_thread___query_cache_soft_ttl_pct;
extern __thread int pgsql_thread___query_cache_handle_warnings;
//---------------------------

extern __thread char *mysql_thread___default_schema;
Expand Down
111 changes: 60 additions & 51 deletions include/query_cache.hpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#ifndef __CLASS_QUERY_CACHE_H
#define __CLASS_QUERY_CACHE_H

#include "proxysql.h"
#include "cpp.h"
#include <tuple>
#include "prometheus/counter.h"
#include "prometheus/gauge.h"

#define EXPIRE_DROPIT 0
#define SHARED_QUERY_CACHE_HASH_TABLES 32
Expand All @@ -13,30 +13,6 @@
#define DEFAULT_purge_threshold_pct_min 3
#define DEFAULT_purge_threshold_pct_max 90

#include "prometheus/counter.h"
#include "prometheus/gauge.h"

class KV_BtreeArray;

typedef struct __QC_entry_t QC_entry_t;

struct __QC_entry_t {
uint64_t key; // primary key
char *value; // pointer to value
KV_BtreeArray *kv; // pointer to the KV_BtreeArray where the entry is stored
QC_entry_t *self; // pointer to itself
uint32_t klen; // length of the key : FIXME: not sure if still relevant
uint32_t length; // length of the value
unsigned long long create_ms; // when the entry was created, monotonic, millisecond granularity
unsigned long long expire_ms; // when the entry will expire, monotonic , millisecond granularity
unsigned long long access_ms; // when the entry was read last , monotonic , millisecond granularity
bool refreshing; // true when a client will hit the backend to refresh the entry
uint32_t column_eof_pkt_offset = 0;
uint32_t row_eof_pkt_offset = 0;
uint32_t ok_pkt_offset = 0;
uint32_t ref_count; // reference counter
};

struct p_qc_counter {
enum metric {
query_cache_count_get = 0,
Expand Down Expand Up @@ -65,34 +41,67 @@ struct qc_metrics_map_idx {
};

class KV_BtreeArray;
class MySQL_Query_Cache;
class PgSQL_Query_Cache;
struct _MySQL_QC_entry;
struct _PgSQL_QC_entry;
typedef struct _MySQL_QC_entry MySQL_QC_entry_t;
typedef struct _PgSQL_QC_entry PgSQL_QC_entry_t;

typedef struct _QC_entry {
uint64_t key; // primary key
unsigned char *value; // pointer to value
uint32_t length; // length of the value
uint32_t klen; // length of the key : FIXME: not sure if still relevant
uint64_t create_ms; // when the entry was created, monotonic, millisecond granularity
uint64_t expire_ms; // when the entry will expire, monotonic , millisecond granularity
uint64_t access_ms; // when the entry was read last , monotonic , millisecond granularity
bool refreshing; // true when a client will hit the backend to refresh the entry
KV_BtreeArray* kv; // pointer to the KV_BtreeArray where the entry is stored (used for troubleshooting)
//struct _QC_entry* self; // pointer to itself
} QC_entry_t;

template <typename QC_DERIVED>
class Query_Cache {
private:
KV_BtreeArray * KVs[SHARED_QUERY_CACHE_HASH_TABLES];
uint64_t get_data_size_total();
unsigned int current_used_memory_pct();
struct {
std::array<prometheus::Counter*, p_qc_counter::__size> p_counter_array {};
std::array<prometheus::Gauge*, p_qc_gauge::__size> p_gauge_array {};
} metrics;
public:
static_assert(std::is_same_v<QC_DERIVED,MySQL_Query_Cache> || std::is_same_v<QC_DERIVED,PgSQL_Query_Cache>,
"Invalid QC_DERIVED Query Cache type");
using TypeQCEntry = typename std::conditional<std::is_same_v<QC_DERIVED, MySQL_Query_Cache>,
MySQL_QC_entry_t, PgSQL_QC_entry_t>::type;
public:
static bool shutting_down;
static pthread_t purge_thread_id;
constexpr static unsigned int purge_loop_time = DEFAULT_purge_loop_time;

void print_version();
uint64_t flush();
void p_update_metrics();
void * purgeHash_thread(void *);
int size;
int shutdown;
unsigned long long QCnow_ms;
pthread_t purge_thread_id;
unsigned int purge_loop_time;
unsigned int purge_total_time;
unsigned int purge_threshold_pct_min;
unsigned int purge_threshold_pct_max;
uint64_t max_memory_size;
SQLite3_result* SQL3_getStats();
void purgeHash(uint64_t max_memory_size);

protected:
Query_Cache();
~Query_Cache();
void print_version();
bool set(uint64_t user_hash, const unsigned char *kp, uint32_t kl, unsigned char *vp, uint32_t vl, unsigned long long create_ms, unsigned long long curtime_ms, unsigned long long expire_ms, bool deprecate_eof_active);
unsigned char * get(uint64_t , const unsigned char *, const uint32_t, uint32_t *, unsigned long long, unsigned long long, bool deprecate_eof_active);
uint64_t flush();
SQLite3_result * SQL3_getStats();

bool set(QC_entry_t* entry, uint64_t user_hash, const unsigned char *kp, uint32_t kl, unsigned char *vp,
uint32_t vl, uint64_t create_ms, uint64_t curtime_ms, uint64_t expire_ms);
std::shared_ptr<QC_entry_t> get(uint64_t user_hash, const unsigned char* kp, const uint32_t kl,
uint64_t curtime_ms, uint64_t cache_ttl);

constexpr static unsigned int purge_total_time = DEFAULT_purge_total_time;
constexpr static unsigned int purge_threshold_pct_min = DEFAULT_purge_threshold_pct_min;
constexpr static unsigned int purge_threshold_pct_max = DEFAULT_purge_threshold_pct_max;
//uint64_t max_memory_size;

private:
KV_BtreeArray* KVs[SHARED_QUERY_CACHE_HASH_TABLES];
uint64_t get_data_size_total();
unsigned int current_used_memory_pct(uint64_t max_memory_size);
void purgeHash(uint64_t QCnow_ms, unsigned int curr_pct);

struct {
std::array<prometheus::Counter*, p_qc_counter::__size> p_counter_array{};
std::array<prometheus::Gauge*, p_qc_gauge::__size> p_gauge_array{};
} metrics;
};
#endif /* __CLASS_QUERY_CACHE_H */

#endif /* __CLASS_QUERY_CACHE_H */
2 changes: 1 addition & 1 deletion include/query_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class Query_Processor_Output {
mirror_flagOUT=-1;
next_query_flagIN=-1;
cache_ttl=-1;
cache_empty_result=1;
cache_empty_result=-1;
cache_timeout=-1;
reconnect=-1;
timeout=-1;
Expand Down
2 changes: 1 addition & 1 deletion lib/Admin_Bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ struct cpu_timer
extern int admin_load_main_;
extern bool admin_nostart_;

extern Query_Cache *GloQC;
//extern MySQL_Query_Cache *GloMyQC;
extern MySQL_Authentication *GloMyAuth;
extern PgSQL_Authentication *GloPgAuth;
extern MySQL_LDAP_Authentication *GloMyLdapAuth;
Expand Down
2 changes: 1 addition & 1 deletion lib/Admin_FlushVariables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ extern char * proxysql_version;

#include "proxysql_find_charset.h"

extern Query_Cache *GloQC;
//extern MySQL_Query_Cache *GloMyQC;
extern MySQL_Authentication *GloMyAuth;
extern PgSQL_Authentication *GloPgAuth;
extern MySQL_LDAP_Authentication *GloMyLdapAuth;
Expand Down
33 changes: 30 additions & 3 deletions lib/Admin_Handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ using json = nlohmann::json;
#include <uuid/uuid.h>

#include "PgSQL_Protocol.h"
#include "MySQL_Query_Cache.h"
#include "PgSQL_Query_Cache.h"
//#include "usual/time.h"

using std::string;
Expand Down Expand Up @@ -135,7 +137,8 @@ extern bool admin_proxysql_pgsql_paused;
extern int admin_old_wait_timeout;


extern Query_Cache *GloQC;
extern MySQL_Query_Cache *GloMyQC;
extern PgSQL_Query_Cache* GloPgQC;
extern MySQL_Authentication *GloMyAuth;
extern PgSQL_Authentication *GloPgAuth;
extern MySQL_LDAP_Authentication *GloMyLdapAuth;
Expand Down Expand Up @@ -653,13 +656,37 @@ bool admin_handler_command_proxysql(char *query_no_space, unsigned int query_no_
if (query_no_space_length==strlen("PROXYSQL FLUSH QUERY CACHE") && !strncasecmp("PROXYSQL FLUSH QUERY CACHE",query_no_space, query_no_space_length)) {
proxy_info("Received PROXYSQL FLUSH QUERY CACHE command\n");
ProxySQL_Admin *SPA=(ProxySQL_Admin *)pa;
if (GloQC) {
GloQC->flush();
if (GloMyQC) {
GloMyQC->flush();
}
//if (GloPgQC) {
// GloPgQC->flush();
//}
SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space);
return false;
}

if (query_no_space_length == strlen("PROXYSQL FLUSH MYSQL QUERY CACHE") && !strncasecmp("PROXYSQL FLUSH MYSQL QUERY CACHE", query_no_space, query_no_space_length)) {
proxy_info("Received PROXYSQL FLUSH MYSQL QUERY CACHE command\n");
ProxySQL_Admin* SPA = (ProxySQL_Admin*)pa;
if (GloMyQC) {
GloMyQC->flush();
}
SPA->send_ok_msg_to_client(sess, NULL, 0, query_no_space);
return false;
}

if (query_no_space_length == strlen("PROXYSQL FLUSH PGSQL QUERY CACHE") && !strncasecmp("PROXYSQL FLUSH PGSQL QUERY CACHE", query_no_space, query_no_space_length)) {
proxy_info("Received PROXYSQL FLUSH PGSQL QUERY CACHE command\n");
ProxySQL_Admin* SPA = (ProxySQL_Admin*)pa;
uint64_t count = 0;
if (GloPgQC) {
count = GloPgQC->flush();
}
SPA->send_ok_msg_to_client(sess, NULL, (int)count, "DELETE ");
return false;
}

if (!strcasecmp("PROXYSQL FLUSH MYSQL CLIENT HOSTS", query_no_space)) {
proxy_info("Received PROXYSQL FLUSH MYSQL CLIENT HOSTS command\n");
ProxySQL_Admin *SPA=(ProxySQL_Admin *)pa;
Expand Down
Loading

0 comments on commit 6a7c1cf

Please sign in to comment.