From 62fd8915ea559e19d86685a83e8559c7ad454850 Mon Sep 17 00:00:00 2001 From: zhenwei pi Date: Mon, 21 Oct 2024 14:39:29 +0800 Subject: [PATCH 01/10] Abstract set/rewrite config bind option Originally, special config 'bind' is used for socket and TLS, however multiple addresses handling is also workable for RDMA(QUIC in the future). Abstract bind option as helper functions to apply more connection types. rewriteConfigBindOption is a local function, declare it as 'static' function. Signed-off-by: zhenwei pi --- src/config.c | 50 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/src/config.c b/src/config.c index c4009adefa..dd4e06b229 100644 --- a/src/config.c +++ b/src/config.c @@ -1536,10 +1536,27 @@ void rewriteConfigOOMScoreAdjValuesOption(standardConfig *config, const char *na } /* Rewrite the bind option. */ -void rewriteConfigBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) { +static void rewriteConfigBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state, char **bindaddr, int bindaddr_count) { UNUSED(config); int force = 1; sds line, addresses; + + /* Rewrite as bind ... */ + if (bindaddr_count > 0) + addresses = sdsjoin(bindaddr, bindaddr_count, " "); + else + addresses = sdsnew("\"\""); + line = sdsnew(name); + line = sdscatlen(line, " ", 1); + line = sdscatsds(line, addresses); + sdsfree(addresses); + + rewriteConfigRewriteLine(state, name, line, force); +} + +/* Rewrite the bind option. */ +static void rewriteConfigSocketBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) { + UNUSED(config); int is_default = 0; /* Compare server.bindaddr with CONFIG_DEFAULT_BINDADDR */ @@ -1559,17 +1576,7 @@ void rewriteConfigBindOption(standardConfig *config, const char *name, struct re return; } - /* Rewrite as bind ... */ - if (server.bindaddr_count > 0) - addresses = sdsjoin(server.bindaddr, server.bindaddr_count, " "); - else - addresses = sdsnew("\"\""); - line = sdsnew(name); - line = sdscatlen(line, " ", 1); - line = sdscatsds(line, addresses); - sdsfree(addresses); - - rewriteConfigRewriteLine(state, name, line, force); + rewriteConfigBindOption(config, name, state, server.bindaddr, server.bindaddr_count); } /* Rewrite the loadmodule option. */ @@ -2922,8 +2929,9 @@ static sds getConfigNotifyKeyspaceEventsOption(standardConfig *config) { return keyspaceEventsFlagsToString(server.notify_keyspace_events); } -static int setConfigBindOption(standardConfig *config, sds *argv, int argc, const char **err) { +static int setConfigBindOption(standardConfig *config, sds *argv, int argc, const char **err, char **bindaddr, int *bindaddr_count) { UNUSED(config); + int orig_bindaddr_count = *bindaddr_count; int j; if (argc > CONFIG_BINDADDR_MAX) { @@ -2935,15 +2943,21 @@ static int setConfigBindOption(standardConfig *config, sds *argv, int argc, cons if (argc == 1 && sdslen(argv[0]) == 0) argc = 0; /* Free old bind addresses */ - for (j = 0; j < server.bindaddr_count; j++) { - zfree(server.bindaddr[j]); + for (j = 0; j < orig_bindaddr_count; j++) { + zfree(bindaddr[j]); } - for (j = 0; j < argc; j++) server.bindaddr[j] = zstrdup(argv[j]); - server.bindaddr_count = argc; + for (j = 0; j < argc; j++) bindaddr[j] = zstrdup(argv[j]); + *bindaddr_count = argc; return 1; } +static int setConfigSocketBindOption(standardConfig *config, sds *argv, int argc, const char **err) { + UNUSED(config); + + return setConfigBindOption(config, argv, argc, err, server.bindaddr, &server.bindaddr_count); +} + static int setConfigReplicaOfOption(standardConfig *config, sds *argv, int argc, const char **err) { UNUSED(config); @@ -3316,7 +3330,7 @@ standardConfig static_configs[] = { createSpecialConfig("client-output-buffer-limit", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigClientOutputBufferLimitOption, getConfigClientOutputBufferLimitOption, rewriteConfigClientOutputBufferLimitOption, NULL), createSpecialConfig("oom-score-adj-values", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigOOMScoreAdjValuesOption, getConfigOOMScoreAdjValuesOption, rewriteConfigOOMScoreAdjValuesOption, updateOOMScoreAdj), createSpecialConfig("notify-keyspace-events", NULL, MODIFIABLE_CONFIG, setConfigNotifyKeyspaceEventsOption, getConfigNotifyKeyspaceEventsOption, rewriteConfigNotifyKeyspaceEventsOption, NULL), - createSpecialConfig("bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigBindOption, getConfigBindOption, rewriteConfigBindOption, applyBind), + createSpecialConfig("bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigSocketBindOption, getConfigBindOption, rewriteConfigSocketBindOption, applyBind), createSpecialConfig("replicaof", "slaveof", IMMUTABLE_CONFIG | MULTI_ARG_CONFIG, setConfigReplicaOfOption, getConfigReplicaOfOption, rewriteConfigReplicaOfOption, NULL), createSpecialConfig("latency-tracking-info-percentiles", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigLatencyTrackingInfoPercentilesOutputOption, getConfigLatencyTrackingInfoPercentilesOutputOption, rewriteConfigLatencyTrackingInfoPercentilesOutputOption, NULL), From feeb58939ce49ae5d5e3ee85cec6dc51e5e7eb54 Mon Sep 17 00:00:00 2001 From: zhenwei pi Date: Mon, 21 Oct 2024 19:43:09 +0800 Subject: [PATCH 02/10] Introduce closeListener for connection type Socket family use close() syscall to close listener, however RDMA has another style. Use an abstract function handler instead of hard code syscall style. Signed-off-by: zhenwei pi --- src/config.c | 4 ++-- src/connection.h | 8 ++++++++ src/rdma.c | 36 +++++++++++++++++++----------------- src/server.c | 17 ++--------------- src/server.h | 1 - src/socket.c | 14 ++++++++++++++ src/tls.c | 5 +++++ src/unix.c | 5 +++++ 8 files changed, 55 insertions(+), 35 deletions(-) diff --git a/src/config.c b/src/config.c index dd4e06b229..2b2a42e4ec 100644 --- a/src/config.c +++ b/src/config.c @@ -2644,7 +2644,7 @@ static int applyBind(const char **err) { tcp_listener->ct = connectionByType(CONN_TYPE_SOCKET); if (changeListener(tcp_listener) == C_ERR) { *err = "Failed to bind to specified addresses."; - if (tls_listener) closeListener(tls_listener); /* failed with TLS together */ + if (tls_listener) connCloseListener(tls_listener); /* failed with TLS together */ return 0; } @@ -2656,7 +2656,7 @@ static int applyBind(const char **err) { tls_listener->ct = connectionByType(CONN_TYPE_TLS); if (changeListener(tls_listener) == C_ERR) { *err = "Failed to bind to specified addresses."; - closeListener(tcp_listener); /* failed with TCP together */ + connCloseListener(tcp_listener); /* failed with TCP together */ return 0; } } diff --git a/src/connection.h b/src/connection.h index 0762441732..099bbc558d 100644 --- a/src/connection.h +++ b/src/connection.h @@ -79,6 +79,7 @@ typedef struct ConnectionType { int (*addr)(connection *conn, char *ip, size_t ip_len, int *port, int remote); int (*is_local)(connection *conn); int (*listen)(connListener *listener); + void (*closeListener)(connListener *listener); /* create/shutdown/close connection */ connection *(*conn_create)(void); @@ -442,6 +443,13 @@ static inline int connListen(connListener *listener) { return listener->ct->listen(listener); } +/* Close a listened listener */ +static inline void connCloseListener(connListener *listener) { + if (listener->count) { + listener->ct->closeListener(listener); + } +} + /* Get accept_handler of a connection type */ static inline aeFileProc *connAcceptHandler(ConnectionType *ct) { if (ct) return ct->accept_handler; diff --git a/src/rdma.c b/src/rdma.c index 7cdcb24913..8f8b0df8b4 100644 --- a/src/rdma.c +++ b/src/rdma.c @@ -1613,6 +1613,23 @@ int connRdmaListen(connListener *listener) { return C_OK; } +static void connRdmaCloseListener(connListener *listener) { + /* Close old servers */ + for (int i = 0; i < listener->count; i++) { + if (listener->fd[i] == -1) continue; + + aeDeleteFileEvent(server.el, listener->fd[i], AE_READABLE); + listener->fd[i] = -1; + struct rdma_listener *rdma_listener = &rdma_listeners[i]; + rdma_destroy_id(rdma_listener->cm_id); + rdma_destroy_event_channel(rdma_listener->cm_channel); + } + + listener->count = 0; + zfree(rdma_listeners); + rdma_listeners = NULL; +} + static int connRdmaAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) { rdma_connection *rdma_conn = (rdma_connection *)conn; struct rdma_cm_id *cm_id = rdma_conn->cm_id; @@ -1740,6 +1757,7 @@ static ConnectionType CT_RDMA = { //.cluster_accept_handler = NULL, .is_local = connRdmaIsLocal, .listen = connRdmaListen, + .closeListener = connRdmaCloseListener, .addr = connRdmaAddr, /* create/close connection */ @@ -1796,23 +1814,7 @@ ConnectionType *connectionTypeRdma(void) { static int rdmaChangeListener(void) { struct connListener *listener = rdmaListener(); - /* Close old servers */ - for (int i = 0; i < listener->count; i++) { - if (listener->fd[i] == -1) continue; - - aeDeleteFileEvent(server.el, listener->fd[i], AE_READABLE); - listener->fd[i] = -1; - struct rdma_listener *rdma_listener = &rdma_listeners[i]; - rdma_destroy_id(rdma_listener->cm_id); - rdma_destroy_event_channel(rdma_listener->cm_channel); - } - - listener->count = 0; - zfree(rdma_listeners); - rdma_listeners = NULL; - - closeListener(listener); - + connRdmaCloseListener(listener); /* Just close the server if port disabled */ if (listener->port == 0) { if (server.set_proc_title) serverSetProcTitle(NULL); diff --git a/src/server.c b/src/server.c index aebbb57a93..7a472de65f 100644 --- a/src/server.c +++ b/src/server.c @@ -2481,19 +2481,6 @@ void checkTcpBacklogSettings(void) { #endif } -void closeListener(connListener *sfd) { - int j; - - for (j = 0; j < sfd->count; j++) { - if (sfd->fd[j] == -1) continue; - - aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE); - close(sfd->fd[j]); - } - - sfd->count = 0; -} - /* Create an event handler for accepting new connections in TCP or TLS domain sockets. * This works atomically for all socket fds */ int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) { @@ -2557,7 +2544,7 @@ int listenToPort(connListener *sfd) { continue; /* Rollback successful listens before exiting */ - closeListener(sfd); + connCloseListener(sfd); return C_ERR; } if (server.socket_mark_id > 0) anetSetSockMarkId(NULL, sfd->fd[sfd->count], server.socket_mark_id); @@ -6295,7 +6282,7 @@ connListener *listenerByType(const char *typename) { /* Close original listener, re-create a new listener from the updated bind address & port */ int changeListener(connListener *listener) { /* Close old servers */ - closeListener(listener); + connCloseListener(listener); /* Just close the server if port disabled */ if (listener->port == 0) { diff --git a/src/server.h b/src/server.h index 531ca8e7c8..6a960b22e1 100644 --- a/src/server.h +++ b/src/server.h @@ -3288,7 +3288,6 @@ void setupSignalHandlers(void); int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler); connListener *listenerByType(const char *typename); int changeListener(connListener *listener); -void closeListener(connListener *listener); struct serverCommand *lookupSubcommand(struct serverCommand *container, sds sub_name); struct serverCommand *lookupCommand(robj **argv, int argc); struct serverCommand *lookupCommandBySdsLogic(dict *commands, sds s); diff --git a/src/socket.c b/src/socket.c index 7344d66ad8..d89e6c8767 100644 --- a/src/socket.c +++ b/src/socket.c @@ -339,6 +339,19 @@ static int connSocketListen(connListener *listener) { return listenToPort(listener); } +static void connSocketCloseListener(connListener *listener) { + int j; + + for (j = 0; j < listener->count; j++) { + if (listener->fd[j] == -1) continue; + + aeDeleteFileEvent(server.el, listener->fd[j], AE_READABLE); + close(listener->fd[j]); + } + + listener->count = 0; +} + static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) { int fd = anetTcpNonBlockConnect(NULL, addr, port); if (fd == -1) { @@ -395,6 +408,7 @@ static ConnectionType CT_Socket = { .addr = connSocketAddr, .is_local = connSocketIsLocal, .listen = connSocketListen, + .closeListener = connSocketCloseListener, /* create/shutdown/close connection */ .conn_create = connCreateSocket, diff --git a/src/tls.c b/src/tls.c index a1fda2a7ae..fc3aaf58ce 100644 --- a/src/tls.c +++ b/src/tls.c @@ -805,6 +805,10 @@ static int connTLSListen(connListener *listener) { return listenToPort(listener); } +static void connTLSCloseListener(connListener *listener) { + connectionTypeTcp()->closeListener(listener); +} + static void connTLSShutdown(connection *conn_) { tls_connection *conn = (tls_connection *)conn_; @@ -1136,6 +1140,7 @@ static ConnectionType CT_TLS = { .addr = connTLSAddr, .is_local = connTLSIsLocal, .listen = connTLSListen, + .closeListener = connTLSCloseListener, /* create/shutdown/close connection */ .conn_create = connCreateTLS, diff --git a/src/unix.c b/src/unix.c index 35778779f9..86df05bd52 100644 --- a/src/unix.c +++ b/src/unix.c @@ -74,6 +74,10 @@ static int connUnixListen(connListener *listener) { return C_OK; } +static void connUnixCloseListener(connListener *listener) { + connectionTypeTcp()->closeListener(listener); +} + static connection *connCreateUnix(void) { connection *conn = zcalloc(sizeof(connection)); conn->type = &CT_Unix; @@ -174,6 +178,7 @@ static ConnectionType CT_Unix = { .addr = connUnixAddr, .is_local = connUnixIsLocal, .listen = connUnixListen, + .closeListener = connUnixCloseListener, /* create/shutdown/close connection */ .conn_create = connCreateUnix, From ec730d053ce913200c539b4500e0ebdca577545b Mon Sep 17 00:00:00 2001 From: zhenwei pi Date: Thu, 21 Nov 2024 10:59:01 +0800 Subject: [PATCH 03/10] RDMA: Use valkey.conf style instead of module parameters Move 4 parameters from valkey-rdma.so to valkey-server, keep RDMA listener similar to TCP/TLS. Also prepare to build Valkey Over RDMA into builtin. Signed-off-by: zhenwei pi --- README.md | 17 ++-- src/config.c | 64 +++++++++++++ src/connection.h | 1 + src/rdma.c | 169 +++-------------------------------- src/server.c | 11 +++ src/server.h | 12 +++ tests/rdma/run.py | 2 +- tests/unit/introspection.tcl | 4 + valkey.conf | 45 ++++++++++ 9 files changed, 164 insertions(+), 161 deletions(-) diff --git a/README.md b/README.md index 94f38bccf7..c89bb1382c 100644 --- a/README.md +++ b/README.md @@ -203,20 +203,27 @@ Note that Valkey Over RDMA is an experimental feature. It may be changed or removed in any minor or major version. Currently, it is only supported on Linux. -To manually run a Valkey server with RDMA mode: +* RDMA built-in mode: + ``` + ./src/valkey-server --protected-mode no \ + --rdma-bind 192.168.122.100 --rdma-port 6379 + ``` - % ./src/valkey-server --protected-mode no \ - --loadmodule src/valkey-rdma.so bind=192.168.122.100 port=6379 +* RDMA module mode: + ``` + ./src/valkey-server --protected-mode no \ + --loadmodule src/valkey-rdma.so --rdma-bind 192.168.122.100 --rdma-port 6379 + ``` It's possible to change bind address/port of RDMA by runtime command: - 192.168.122.100:6379> CONFIG SET rdma.port 6380 + 192.168.122.100:6379> CONFIG SET rdma-port 6380 It's also possible to have both RDMA and TCP available, and there is no conflict of TCP(6379) and RDMA(6379), Ex: % ./src/valkey-server --protected-mode no \ - --loadmodule src/valkey-rdma.so bind=192.168.122.100 port=6379 \ + --loadmodule src/valkey-rdma.so --rdma-bind 192.168.122.100 --rdma-port 6379 \ --port 6379 Note that the network card (192.168.122.100 of this example) should support diff --git a/src/config.c b/src/config.c index 2b2a42e4ec..effda9a500 100644 --- a/src/config.c +++ b/src/config.c @@ -2958,6 +2958,66 @@ static int setConfigSocketBindOption(standardConfig *config, sds *argv, int argc return setConfigBindOption(config, argv, argc, err, server.bindaddr, &server.bindaddr_count); } +static int setConfigRdmaBindOption(standardConfig *config, sds *argv, int argc, const char **err) { + UNUSED(config); + + return setConfigBindOption(config, argv, argc, err, server.rdma_ctx_config.bindaddr, &server.rdma_ctx_config.bindaddr_count); +} + +static sds getConfigRdmaBindOption(standardConfig *config) { + UNUSED(config); + return sdsjoin(server.rdma_ctx_config.bindaddr, server.rdma_ctx_config.bindaddr_count, " "); +} + +static void rewriteConfigRdmaBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) { + UNUSED(config); + + if (server.rdma_ctx_config.bindaddr_count) { + rewriteConfigBindOption(config, name, state, server.rdma_ctx_config.bindaddr, + server.rdma_ctx_config.bindaddr_count); + } +} + +static int applyRdmaBind(const char **err) { + connListener *rdma_listener = listenerByType(CONN_TYPE_RDMA); + + if (!rdma_listener) { + *err = "No RDMA building support."; + return 0; + } + + rdma_listener->bindaddr = server.rdma_ctx_config.bindaddr; + rdma_listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count; + rdma_listener->port = server.rdma_ctx_config.port; + rdma_listener->ct = connectionByType(CONN_TYPE_RDMA); + if (changeListener(rdma_listener) == C_ERR) { + *err = "Failed to bind to specified addresses for RDMA."; + return 0; + } + + return 1; +} + +static int updateRdmaPort(const char **err) { + connListener *listener = listenerByType(CONN_TYPE_RDMA); + + if (listener == NULL) { + *err = "No RDMA building support."; + return 0; + } + + listener->bindaddr = server.rdma_ctx_config.bindaddr; + listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count; + listener->port = server.rdma_ctx_config.port; + listener->ct = connectionByType(CONN_TYPE_RDMA); + if (changeListener(listener) == C_ERR) { + *err = "Unable to listen on this port for RDMA. Check server logs."; + return 0; + } + + return 1; +} + static int setConfigReplicaOfOption(standardConfig *config, sds *argv, int argc, const char **err) { UNUSED(config); @@ -3251,6 +3311,9 @@ standardConfig static_configs[] = { createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod), createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL), createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL), + createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort), + createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL), + createIntConfig("rdma-comp-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.comp_vector, -1, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), @@ -3331,6 +3394,7 @@ standardConfig static_configs[] = { createSpecialConfig("oom-score-adj-values", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigOOMScoreAdjValuesOption, getConfigOOMScoreAdjValuesOption, rewriteConfigOOMScoreAdjValuesOption, updateOOMScoreAdj), createSpecialConfig("notify-keyspace-events", NULL, MODIFIABLE_CONFIG, setConfigNotifyKeyspaceEventsOption, getConfigNotifyKeyspaceEventsOption, rewriteConfigNotifyKeyspaceEventsOption, NULL), createSpecialConfig("bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigSocketBindOption, getConfigBindOption, rewriteConfigSocketBindOption, applyBind), + createSpecialConfig("rdma-bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigRdmaBindOption, getConfigRdmaBindOption, rewriteConfigRdmaBindOption, applyRdmaBind), createSpecialConfig("replicaof", "slaveof", IMMUTABLE_CONFIG | MULTI_ARG_CONFIG, setConfigReplicaOfOption, getConfigReplicaOfOption, rewriteConfigReplicaOfOption, NULL), createSpecialConfig("latency-tracking-info-percentiles", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigLatencyTrackingInfoPercentilesOutputOption, getConfigLatencyTrackingInfoPercentilesOutputOption, rewriteConfigLatencyTrackingInfoPercentilesOutputOption, NULL), diff --git a/src/connection.h b/src/connection.h index 099bbc558d..76b5298e9a 100644 --- a/src/connection.h +++ b/src/connection.h @@ -60,6 +60,7 @@ typedef enum { #define CONN_TYPE_SOCKET "tcp" #define CONN_TYPE_UNIX "unix" #define CONN_TYPE_TLS "tls" +#define CONN_TYPE_RDMA "rdma" #define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */ typedef void (*ConnectionCallbackFunc)(struct connection *conn); diff --git a/src/rdma.c b/src/rdma.c index 8f8b0df8b4..d3c6f0acfb 100644 --- a/src/rdma.c +++ b/src/rdma.c @@ -128,12 +128,10 @@ typedef struct rdma_listener { static list *pending_list; static rdma_listener *rdma_listeners; +static serverRdmaContextConfig *rdma_config; static ConnectionType CT_RDMA; -static int valkey_rdma_rx_size = VALKEY_RDMA_DEFAULT_RX_SIZE; -static int valkey_rdma_comp_vector = -1; /* -1 means a random one */ - static void serverRdmaError(char *err, const char *fmt, ...) { va_list ap; @@ -272,7 +270,7 @@ static int rdmaSetupIoBuf(RdmaContext *ctx, struct rdma_cm_id *cm_id) { /* setup recv buf & MR */ access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE; - length = valkey_rdma_rx_size; + length = rdma_config->rx_size; ctx->rx.addr = page_aligned_zalloc(length); ctx->rx.length = length; ctx->rx.mr = ibv_reg_mr(ctx->pd, ctx->rx.addr, length, access); @@ -295,6 +293,7 @@ static int rdmaCreateResource(RdmaContext *ctx, struct rdma_cm_id *cm_id) { struct ibv_comp_channel *comp_channel = NULL; struct ibv_cq *cq = NULL; struct ibv_pd *pd = NULL; + int comp_vector = rdma_config->comp_vector; if (ibv_query_device(cm_id->verbs, &device_attr)) { serverLog(LL_WARNING, "RDMA: ibv ibv query device failed"); @@ -317,8 +316,13 @@ static int rdmaCreateResource(RdmaContext *ctx, struct rdma_cm_id *cm_id) { ctx->comp_channel = comp_channel; + /* negative number means a random one */ + if (comp_vector < 0) { + comp_vector = abs((int)random()); + } + cq = ibv_create_cq(cm_id->verbs, VALKEY_RDMA_MAX_WQE * 2, NULL, comp_channel, - valkey_rdma_comp_vector % cm_id->verbs->num_comp_vectors); + comp_vector % cm_id->verbs->num_comp_vectors); if (!cq) { serverLog(LL_WARNING, "RDMA: ibv create cq failed"); return C_ERR; @@ -1610,6 +1614,7 @@ int connRdmaListen(connListener *listener) { rdma_listener++; } + rdma_config = listener->priv; return C_OK; } @@ -1628,6 +1633,7 @@ static void connRdmaCloseListener(connListener *listener) { listener->count = 0; zfree(rdma_listeners); rdma_listeners = NULL; + rdma_config = NULL; } static int connRdmaAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) { @@ -1787,17 +1793,6 @@ static ConnectionType CT_RDMA = { .process_pending_data = rdmaProcessPendingData, }; -static struct connListener *rdmaListener(void) { - static struct connListener *listener = NULL; - - if (listener) return listener; - - listener = listenerByType(CONN_TYPE_RDMA); - serverAssert(listener != NULL); - - return listener; -} - ConnectionType *connectionTypeRdma(void) { static ConnectionType *ct_rdma = NULL; @@ -1809,117 +1804,15 @@ ConnectionType *connectionTypeRdma(void) { return ct_rdma; } -/* rdma listener has different create/close logic from TCP, we can't re-use 'int changeListener(connListener *listener)' - * directly */ -static int rdmaChangeListener(void) { - struct connListener *listener = rdmaListener(); - - connRdmaCloseListener(listener); - /* Just close the server if port disabled */ - if (listener->port == 0) { - if (server.set_proc_title) serverSetProcTitle(NULL); - return VALKEYMODULE_OK; - } - - /* Re-create listener */ - if (connListen(listener) != C_OK) { - return VALKEYMODULE_ERR; - } - - /* Create event handlers */ - if (createSocketAcceptHandler(listener, listener->ct->accept_handler) != C_OK) { - serverPanic("Unrecoverable error creating %s accept handler.", listener->ct->get_type(NULL)); - } - - if (server.set_proc_title) serverSetProcTitle(NULL); - - return VALKEYMODULE_OK; -} - #ifdef BUILD_RDMA_MODULE #include "release.h" -static long long rdmaGetPort(const char *name, void *privdata) { - UNUSED(name); - UNUSED(privdata); - struct connListener *listener = rdmaListener(); - - return listener->port; -} - -static int rdmaSetPort(const char *name, long long val, void *privdata, ValkeyModuleString **err) { - UNUSED(name); - UNUSED(privdata); - UNUSED(err); - struct connListener *listener = rdmaListener(); - listener->port = val; - - return VALKEYMODULE_OK; -} - -static ValkeyModuleString *rdma_bind; - -static void rdmaBuildBind(void *ctx) { - struct connListener *listener = rdmaListener(); - - if (rdma_bind) ValkeyModule_FreeString(NULL, rdma_bind); - - sds rdma_bind_str = sdsjoin(listener->bindaddr, listener->bindaddr_count, " "); - rdma_bind = ValkeyModule_CreateString(ctx, rdma_bind_str, sdslen(rdma_bind_str)); -} - -static ValkeyModuleString *rdmaGetBind(const char *name, void *privdata) { - UNUSED(name); - UNUSED(privdata); - - return rdma_bind; -} - -static int rdmaSetBind(const char *name, ValkeyModuleString *val, void *privdata, ValkeyModuleString **err) { - UNUSED(name); - UNUSED(err); - struct connListener *listener = rdmaListener(); - const char *bind = ValkeyModule_StringPtrLen(val, NULL); - int nexts; - sds *exts = sdssplitlen(bind, strlen(bind), " ", 1, &nexts); - - if (nexts > CONFIG_BINDADDR_MAX) { - serverLog(LL_WARNING, "RDMA: Unsupported bind ( > %d)", CONFIG_BINDADDR_MAX); - return VALKEYMODULE_ERR; - } - - /* Free old bind addresses */ - for (int j = 0; j < listener->bindaddr_count; j++) { - zfree(listener->bindaddr[j]); - } - - for (int j = 0; j < nexts; j++) listener->bindaddr[j] = zstrdup(exts[j]); - listener->bindaddr_count = nexts; - - sdsfreesplitres(exts, nexts); - rdmaBuildBind(privdata); - - return VALKEYMODULE_OK; -} - -static int rdmaApplyListener(ValkeyModuleCtx *ctx, void *privdata, ValkeyModuleString **err) { - UNUSED(ctx); - UNUSED(privdata); - UNUSED(err); - - return rdmaChangeListener(); -} - -static void rdmaListenerAddConfig(void *ctx) { - serverAssert(ValkeyModule_RegisterNumericConfig(ctx, "port", 0, VALKEYMODULE_CONFIG_DEFAULT, 0, 65535, rdmaGetPort, - rdmaSetPort, rdmaApplyListener, NULL) == VALKEYMODULE_OK); - serverAssert(ValkeyModule_RegisterStringConfig(ctx, "bind", "", VALKEYMODULE_CONFIG_DEFAULT, rdmaGetBind, - rdmaSetBind, rdmaApplyListener, ctx) == VALKEYMODULE_OK); - serverAssert(ValkeyModule_LoadConfigs(ctx) == VALKEYMODULE_OK); -} int ValkeyModule_OnLoad(void *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + /* Connection modules MUST be part of the same build as valkey. */ if (strcmp(REDIS_BUILD_ID_RAW, serverBuildIdRaw())) { serverLog(LL_NOTICE, "Connection type %s was not built together with the valkey-server used.", CONN_TYPE_RDMA); @@ -1938,40 +1831,6 @@ int ValkeyModule_OnLoad(void *ctx, ValkeyModuleString **argv, int argc) { if (connTypeRegister(&CT_RDMA) != C_OK) return VALKEYMODULE_ERR; - rdmaListenerAddConfig(ctx); - - struct connListener *listener = rdmaListener(); - listener->ct = connectionTypeRdma(); - listener->bindaddr = zcalloc_num(CONFIG_BINDADDR_MAX, sizeof(listener->bindaddr[0])); - - for (int i = 0; i < argc; i++) { - robj *str = (robj *)argv[i]; - int nexts; - sds *exts = sdssplitlen(str->ptr, strlen(str->ptr), "=", 1, &nexts); - if (nexts != 2) { - serverLog(LL_WARNING, "RDMA: Unsupported argument \"%s\"", (char *)str->ptr); - return VALKEYMODULE_ERR; - } - - if (!strcasecmp(exts[0], "bind")) { - listener->bindaddr[listener->bindaddr_count++] = zstrdup(exts[1]); - } else if (!strcasecmp(exts[0], "port")) { - listener->port = atoi(exts[1]); - } else if (!strcasecmp(exts[0], "rx-size")) { - valkey_rdma_rx_size = atoi(exts[1]); - } else if (!strcasecmp(exts[0], "comp-vector")) { - valkey_rdma_comp_vector = atoi(exts[1]); - } else { - serverLog(LL_WARNING, "RDMA: Unsupported argument \"%s\"", (char *)str->ptr); - return VALKEYMODULE_ERR; - } - - sdsfreesplitres(exts, nexts); - } - - rdmaBuildBind(ctx); - if (valkey_rdma_comp_vector == -1) valkey_rdma_comp_vector = abs((int)random()); - return VALKEYMODULE_OK; } diff --git a/src/server.c b/src/server.c index 7a472de65f..b519501ca3 100644 --- a/src/server.c +++ b/src/server.c @@ -2884,6 +2884,17 @@ void initListeners(void) { listener->priv = &server.unix_ctx_config; /* Unix socket specified */ } + if (server.rdma_ctx_config.port != 0) { + conn_index = connectionIndexByType(CONN_TYPE_RDMA); + if (conn_index < 0) serverPanic("Failed finding connection listener of %s", CONN_TYPE_RDMA); + listener = &server.listeners[conn_index]; + listener->bindaddr = server.rdma_ctx_config.bindaddr; + listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count; + listener->port = server.rdma_ctx_config.port; + listener->ct = connectionByType(CONN_TYPE_RDMA); + listener->priv = &server.rdma_ctx_config; + } + /* create all the configured listener, and add handler to start to accept */ int listen_fds = 0; for (int j = 0; j < CONN_TYPE_MAX; j++) { diff --git a/src/server.h b/src/server.h index 6a960b22e1..197ad3b46c 100644 --- a/src/server.h +++ b/src/server.h @@ -1612,6 +1612,17 @@ typedef struct serverUnixContextConfig { unsigned int perm; /* UNIX socket permission (see mode_t) */ } serverUnixContextConfig; +/*----------------------------------------------------------------------------- + * RDMA Context Configuration + *----------------------------------------------------------------------------*/ +typedef struct serverRdmaContextConfig { + char *bindaddr[CONFIG_BINDADDR_MAX]; + int bindaddr_count; + int port; + int rx_size; + int comp_vector; +} serverRdmaContextConfig; + /*----------------------------------------------------------------------------- * AOF manifest definition *----------------------------------------------------------------------------*/ @@ -2225,6 +2236,7 @@ struct valkeyServer { int tls_auth_clients; serverTLSContextConfig tls_ctx_config; serverUnixContextConfig unix_ctx_config; + serverRdmaContextConfig rdma_ctx_config; /* cpu affinity */ char *server_cpulist; /* cpu affinity list of server main/io thread. */ char *bio_cpulist; /* cpu affinity list of bio thread. */ diff --git a/tests/rdma/run.py b/tests/rdma/run.py index 0724c27adc..09168f368a 100755 --- a/tests/rdma/run.py +++ b/tests/rdma/run.py @@ -63,7 +63,7 @@ def test_rdma(ipaddr): rdmapath = valkeydir + "/src/valkey-rdma.so" svrcmd = [svrpath, "--port", "0", "--loglevel", "verbose", "--protected-mode", "yes", "--appendonly", "no", "--daemonize", "no", "--dir", valkeydir + "/tests/rdma/tmp", - "--loadmodule", rdmapath, "port=6379", "bind=" + ipaddr] + "--loadmodule", rdmapath, "--rdma-port", "6379", "--rdma-bind", ipaddr] svr = subprocess.Popen(svrcmd, shell=False, stdout=subprocess.PIPE) try: diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 352f5f183e..5a9ddc00f4 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -558,6 +558,10 @@ start_server {tags {"introspection"}} { req-res-logfile client-default-resp dual-channel-replication-enabled + rdma-comp-vector + rdma-rx-size + rdma-bind + rdma-port } if {!$::tls} { diff --git a/valkey.conf b/valkey.conf index bf82b01874..747ef4fc12 100644 --- a/valkey.conf +++ b/valkey.conf @@ -300,6 +300,51 @@ tcp-keepalive 300 # # tls-session-cache-timeout 60 +################################### RDMA ###################################### + +# By default, RDMA is disabled. To enable it, the "rdma-port" configuration +# directive can be used to define RDMA-listening ports. +# +# rdma-port 6379 +# rdma-bind 192.168.1.100 + +# The RDMA receive transfer buffer is 1M by default. It can be set between 64K and 16M. +# Note that page size aligned size is preferred. +# +# rdma-rx-size 1048576 + +# The RDMA completion queue will use the completion vector to signal completion events +# via hardware interrupts. A large number of hardware interrupts can affect CPU performance. +# It is possible to tune the performance using rdma-comp-vector. +# +# Example 1. a) Pin hardware interrupt vectors [0, 3] to CPU [0, 3]. +# b) Set CPU affinity for valkey to CPU [4, X]. +# c) Any valkey server uses a random RDMA completion vector. +# All valkey servers will not affect each other and will be isolated from kernel interrupts. +# +# SYS SYS SYS SYS VALKEY VALKEY VALKEY +# | | | | | | | +# CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 ... CPUX +# | | | | +# INTR0 INTR1 INTR2 INTR3 +# +# Example 2. a) Pin hardware interrupt vectors [0, X] to CPU [0, X]. +# b) Set CPU affinity for valkey to CPU [0, X]. +# c) Valkey server [M] uses RDMA completion vector [M]. +# A single CPU handles hardware interrupts, the RDMA completion queue, and the valkey server. +# This avoids overhead and function calls across multiple CPUs. +# +# VALKEY VALKEY VALKEY VALKEY VALKEY VALKEY VALKEY +# | | | | | | | +# CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 ... CPUX +# | | | | | | | +# INTR0 INTR1 INTR2 INTR3 INTR4 INTR5 INTRX +# +# Use 0 and positive numbers to specify the RDMA completion vector, or specify -1 to allow +# the server to use a random vector for a new connection. The default vector is -1. +# +# rdma-comp-vector 0 + ################################# GENERAL ##################################### # By default the server does not run as a daemon. Use 'yes' if you need it. From 3bdd658be0e34b5c93172e4ae6a76d707b3ca733 Mon Sep 17 00:00:00 2001 From: zhenwei pi Date: Tue, 22 Oct 2024 18:16:52 +0800 Subject: [PATCH 04/10] RDMA: Support builtin Support RDMA builtin and module together. To build a builtin version: $ make BUILD_RDMA=yes Or build it by cmake: $ cmake .. -DBUILD_RDMA=yes Signed-off-by: zhenwei pi --- README.md | 9 +++++++-- cmake/Modules/SourceFiles.cmake | 1 + cmake/Modules/ValkeySetup.cmake | 29 +++++++++++++++-------------- src/CMakeLists.txt | 2 +- src/Makefile | 30 +++++++++++++++--------------- src/connection.c | 3 +++ src/connection.h | 1 + src/rdma.c | 29 +++++++++++++++++++++++++---- 8 files changed, 68 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index c89bb1382c..5c74a41dd3 100644 --- a/README.md +++ b/README.md @@ -37,8 +37,13 @@ To build TLS as Valkey module: Note that sentinel mode does not support TLS module. To build with experimental RDMA support you'll need RDMA development libraries -(e.g. librdmacm-dev and libibverbs-dev on Debian/Ubuntu). For now, Valkey only -supports RDMA as connection module mode. Run: +(e.g. librdmacm-dev and libibverbs-dev on Debian/Ubuntu). + +To build RDMA support as Valkey built-in: + + % make BUILD_RDMA=yes + +To build RDMA as Valkey module: % make BUILD_RDMA=module diff --git a/cmake/Modules/SourceFiles.cmake b/cmake/Modules/SourceFiles.cmake index d76f17625e..4d77c6b5ba 100644 --- a/cmake/Modules/SourceFiles.cmake +++ b/cmake/Modules/SourceFiles.cmake @@ -87,6 +87,7 @@ set(VALKEY_SERVER_SRCS ${CMAKE_SOURCE_DIR}/src/tracking.c ${CMAKE_SOURCE_DIR}/src/socket.c ${CMAKE_SOURCE_DIR}/src/tls.c + ${CMAKE_SOURCE_DIR}/src/rdma.c ${CMAKE_SOURCE_DIR}/src/sha256.c ${CMAKE_SOURCE_DIR}/src/timeout.c ${CMAKE_SOURCE_DIR}/src/setcpuaffinity.c diff --git a/cmake/Modules/ValkeySetup.cmake b/cmake/Modules/ValkeySetup.cmake index e935c3b308..bbd203f7f4 100644 --- a/cmake/Modules/ValkeySetup.cmake +++ b/cmake/Modules/ValkeySetup.cmake @@ -199,29 +199,30 @@ if (BUILD_RDMA) # RDMA support (Linux only) if (LINUX AND NOT APPLE) valkey_parse_build_option(${BUILD_RDMA} USE_RDMA) + find_package(PkgConfig REQUIRED) + # Locate librdmacm & libibverbs, fail if we can't find them + pkg_check_modules(RDMACM REQUIRED librdmacm) + pkg_check_modules(IBVERBS REQUIRED libibverbs) + message(STATUS "${RDMACM_LINK_LIBRARIES};${IBVERBS_LINK_LIBRARIES}") + list(APPEND RDMA_LIBS "${RDMACM_LIBRARIES};${IBVERBS_LIBRARIES}") + if (USE_RDMA EQUAL 2) # Module message(STATUS "Building RDMA as module") add_valkey_server_compiler_options("-DUSE_RDMA=2") - find_package(PkgConfig REQUIRED) - - # Locate librdmacm & libibverbs, fail if we can't find them - pkg_check_modules(RDMACM REQUIRED librdmacm) - pkg_check_modules(IBVERBS REQUIRED libibverbs) - - message(STATUS "${RDMACM_LINK_LIBRARIES};${IBVERBS_LINK_LIBRARIES}") - list(APPEND RDMA_LIBS "${RDMACM_LIBRARIES};${IBVERBS_LIBRARIES}") - unset(RDMACM_LINK_LIBRARIES CACHE) - unset(IBVERBS_LINK_LIBRARIES CACHE) set(BUILD_RDMA_MODULE 1) elseif (USE_RDMA EQUAL 1) - # RDMA can only be built as a module. So disable it - message(WARNING "BUILD_RDMA can be one of: [NO | 0 | MODULE], but '${BUILD_RDMA}' was provided") - message(STATUS "RDMA build is disabled") - set(USE_RDMA 0) + message(STATUS "Building RDMA as builtin") + add_valkey_server_compiler_options("-DUSE_RDMA=1") + add_valkey_server_compiler_options("-DBUILD_RDMA_MODULE=0") + list(APPEND SERVER_LIBS "${RDMA_LIBS}") endif () else () message(WARNING "RDMA is only supported on Linux platforms") endif () +else () + # By default, RDMA is disabled + message(STATUS "RDMA is disabled") + set(USE_RDMA 0) endif () set(BUILDING_ARM64 0) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b7e328163b..e3c4b1cd51 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -45,7 +45,7 @@ if (BUILD_RDMA_MODULE) set(MODULE_NAME "valkey-rdma") message(STATUS "Building RDMA module") add_library(${MODULE_NAME} SHARED "${VALKEY_RDMA_MODULE_SRCS}") - target_compile_options(${MODULE_NAME} PRIVATE -DBUILD_RDMA_MODULE -DUSE_RDMA=1) + target_compile_options(${MODULE_NAME} PRIVATE -DBUILD_RDMA_MODULE=2 -DUSE_RDMA=1) target_link_libraries(${MODULE_NAME} "${RDMA_LIBS}") # remove the "lib" prefix from the module set_target_properties(${MODULE_NAME} PROPERTIES PREFIX "") diff --git a/src/Makefile b/src/Makefile index a76356e9d5..91f1e19084 100644 --- a/src/Makefile +++ b/src/Makefile @@ -325,26 +325,26 @@ ifeq ($(BUILD_TLS),module) TLS_MODULE_CFLAGS+=-DUSE_OPENSSL=$(BUILD_MODULE) $(OPENSSL_CFLAGS) -DBUILD_TLS_MODULE=$(BUILD_MODULE) endif -BUILD_RDMA:=no -RDMA_MODULE= -RDMA_MODULE_NAME:=valkey-rdma$(PROG_SUFFIX).so -RDMA_MODULE_CFLAGS:=$(FINAL_CFLAGS) -ifeq ($(BUILD_RDMA),module) - FINAL_CFLAGS+=-DUSE_RDMA=$(BUILD_MODULE) - RDMA_PKGCONFIG := $(shell $(PKG_CONFIG) --exists librdmacm libibverbs && echo $$?) +RDMA_LIBS= +RDMA_PKGCONFIG := $(shell $(PKG_CONFIG) --exists librdmacm libibverbs && echo $$?) ifeq ($(RDMA_PKGCONFIG),0) RDMA_LIBS=$(shell $(PKG_CONFIG) --libs librdmacm libibverbs) else RDMA_LIBS=-lrdmacm -libverbs endif - RDMA_MODULE=$(RDMA_MODULE_NAME) - RDMA_MODULE_CFLAGS+=-DUSE_RDMA=$(BUILD_YES) -DBUILD_RDMA_MODULE $(RDMA_LIBS) -else -ifeq ($(BUILD_RDMA),no) - # disable RDMA, do nothing -else - $(error "RDMA is only supported as module (BUILD_RDMA=module), or disabled (BUILD_RDMA=no)") + +ifeq ($(BUILD_RDMA),yes) + FINAL_CFLAGS+=-DUSE_RDMA=$(BUILD_YES) -DBUILD_RDMA_MODULE=$(BUILD_NO) + FINAL_LIBS += $(RDMA_LIBS) endif + +RDMA_MODULE= +RDMA_MODULE_NAME:=valkey-rdma$(PROG_SUFFIX).so +RDMA_MODULE_CFLAGS:=$(FINAL_CFLAGS) +ifeq ($(BUILD_RDMA),module) + FINAL_CFLAGS+=-DUSE_RDMA=$(BUILD_MODULE) + RDMA_MODULE=$(RDMA_MODULE_NAME) + RDMA_MODULE_CFLAGS+=-DUSE_RDMA=$(BUILD_MODULE) -DBUILD_RDMA_MODULE=$(BUILD_MODULE) $(RDMA_LIBS) endif ifndef V @@ -411,7 +411,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/connection.c b/src/connection.c index f0c1c2d364..8807541d77 100644 --- a/src/connection.c +++ b/src/connection.c @@ -66,6 +66,9 @@ int connTypeInitialize(void) { /* may fail if without BUILD_TLS=yes */ RedisRegisterConnectionTypeTLS(); + /* may fail if without BUILD_RDMA=yes */ + RegisterConnectionTypeRdma(); + return C_OK; } diff --git a/src/connection.h b/src/connection.h index 76b5298e9a..8a2775ee34 100644 --- a/src/connection.h +++ b/src/connection.h @@ -463,6 +463,7 @@ sds getListensInfoString(sds info); int RedisRegisterConnectionTypeSocket(void); int RedisRegisterConnectionTypeUnix(void); int RedisRegisterConnectionTypeTLS(void); +int RegisterConnectionTypeRdma(void); /* Return 1 if connection is using TLS protocol, 0 if otherwise. */ static inline int connIsTLS(connection *conn) { diff --git a/src/rdma.c b/src/rdma.c index d3c6f0acfb..09b7f69da2 100644 --- a/src/rdma.c +++ b/src/rdma.c @@ -10,9 +10,10 @@ #define VALKEYMODULE_CORE_MODULE #include "server.h" - -#if defined USE_RDMA && defined __linux__ /* currently RDMA is only supported on Linux */ #include "connection.h" + +#if defined __linux__ /* currently RDMA is only supported on Linux */ +#if (USE_RDMA == 1 /* BUILD_YES */) || ((USE_RDMA == 2 /* BUILD_MODULE */) && (BUILD_RDMA_MODULE == 2)) #include "connhelpers.h" #include @@ -1804,7 +1805,20 @@ ConnectionType *connectionTypeRdma(void) { return ct_rdma; } -#ifdef BUILD_RDMA_MODULE +int RegisterConnectionTypeRdma(void) { + return connTypeRegister(&CT_RDMA); +} + +#else + +int RegisterConnectionTypeRdma(void) { + serverLog(LL_VERBOSE, "Connection type %s not builtin", CONN_TYPE_RDMA); + return C_ERR; +} + +#endif + +#if BUILD_RDMA_MODULE == 2 /* BUILD_MODULE */ #include "release.h" @@ -1842,4 +1856,11 @@ int ValkeyModule_OnUnload(void *arg) { #endif /* BUILD_RDMA_MODULE */ -#endif /* USE_RDMA && __linux__ */ +#else /* __linux__ */ + +int RegisterConnectionTypeRdma(void) { + serverLog(LL_VERBOSE, "Connection type %s is supported on Linux only", CONN_TYPE_RDMA); + return C_ERR; +} + +#endif /* __linux__ */ From 78fd08b9da77a211e0ba990c728793d8ea0d4e4f Mon Sep 17 00:00:00 2001 From: zhenwei pi Date: Thu, 21 Nov 2024 17:01:14 +0800 Subject: [PATCH 05/10] Update src/config.c Co-authored-by: Binbin Signed-off-by: zhenwei pi --- src/config.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/config.c b/src/config.c index effda9a500..c9445142f3 100644 --- a/src/config.c +++ b/src/config.c @@ -2954,7 +2954,6 @@ static int setConfigBindOption(standardConfig *config, sds *argv, int argc, cons static int setConfigSocketBindOption(standardConfig *config, sds *argv, int argc, const char **err) { UNUSED(config); - return setConfigBindOption(config, argv, argc, err, server.bindaddr, &server.bindaddr_count); } From 24acb40253b3ebacace8d0c2aea081cb970a536f Mon Sep 17 00:00:00 2001 From: zhenwei pi Date: Thu, 21 Nov 2024 17:02:05 +0800 Subject: [PATCH 06/10] Update src/config.c Apply suggestion from Binbin. Co-authored-by: Binbin Signed-off-by: zhenwei pi --- src/config.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/config.c b/src/config.c index c9445142f3..b06a4cd227 100644 --- a/src/config.c +++ b/src/config.c @@ -2959,7 +2959,6 @@ static int setConfigSocketBindOption(standardConfig *config, sds *argv, int argc static int setConfigRdmaBindOption(standardConfig *config, sds *argv, int argc, const char **err) { UNUSED(config); - return setConfigBindOption(config, argv, argc, err, server.rdma_ctx_config.bindaddr, &server.rdma_ctx_config.bindaddr_count); } From ad5c1ea09b92926ad2a21c24220141233dc429df Mon Sep 17 00:00:00 2001 From: zhenwei pi Date: Thu, 21 Nov 2024 17:02:24 +0800 Subject: [PATCH 07/10] Update src/config.c Apply suggestion from Binbin. Co-authored-by: Binbin Signed-off-by: zhenwei pi --- src/config.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/config.c b/src/config.c index b06a4cd227..16afab34dc 100644 --- a/src/config.c +++ b/src/config.c @@ -2943,9 +2943,7 @@ static int setConfigBindOption(standardConfig *config, sds *argv, int argc, cons if (argc == 1 && sdslen(argv[0]) == 0) argc = 0; /* Free old bind addresses */ - for (j = 0; j < orig_bindaddr_count; j++) { - zfree(bindaddr[j]); - } + for (j = 0; j < orig_bindaddr_count; j++) zfree(bindaddr[j]); for (j = 0; j < argc; j++) bindaddr[j] = zstrdup(argv[j]); *bindaddr_count = argc; From 7c2a9713fc24f1fe77aa9e0f9f1a3a094b8d2807 Mon Sep 17 00:00:00 2001 From: zhenwei pi Date: Fri, 22 Nov 2024 11:20:06 +0800 Subject: [PATCH 08/10] Rename "rdma-comp-vector" to "rdma-completion-vector" Signed-off-by: zhenwei pi --- src/config.c | 2 +- src/rdma.c | 2 +- src/server.h | 2 +- valkey.conf | 17 ++++++++++------- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/config.c b/src/config.c index 16afab34dc..1e30b5f60e 100644 --- a/src/config.c +++ b/src/config.c @@ -3309,7 +3309,7 @@ standardConfig static_configs[] = { createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort), createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL), - createIntConfig("rdma-comp-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.comp_vector, -1, INTEGER_CONFIG, NULL, NULL), + createIntConfig("rdma-completion-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.complection_vector, -1, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), diff --git a/src/rdma.c b/src/rdma.c index 09b7f69da2..275608a117 100644 --- a/src/rdma.c +++ b/src/rdma.c @@ -294,7 +294,7 @@ static int rdmaCreateResource(RdmaContext *ctx, struct rdma_cm_id *cm_id) { struct ibv_comp_channel *comp_channel = NULL; struct ibv_cq *cq = NULL; struct ibv_pd *pd = NULL; - int comp_vector = rdma_config->comp_vector; + int comp_vector = rdma_config->complection_vector; if (ibv_query_device(cm_id->verbs, &device_attr)) { serverLog(LL_WARNING, "RDMA: ibv ibv query device failed"); diff --git a/src/server.h b/src/server.h index 197ad3b46c..90fbc93f24 100644 --- a/src/server.h +++ b/src/server.h @@ -1620,7 +1620,7 @@ typedef struct serverRdmaContextConfig { int bindaddr_count; int port; int rx_size; - int comp_vector; + int complection_vector; } serverRdmaContextConfig; /*----------------------------------------------------------------------------- diff --git a/valkey.conf b/valkey.conf index 747ef4fc12..8d3e11c515 100644 --- a/valkey.conf +++ b/valkey.conf @@ -302,6 +302,7 @@ tcp-keepalive 300 ################################### RDMA ###################################### +# Valkey Over RDMA is experimental, it may be changed or be removed in any minor or major version. # By default, RDMA is disabled. To enable it, the "rdma-port" configuration # directive can be used to define RDMA-listening ports. # @@ -315,11 +316,11 @@ tcp-keepalive 300 # The RDMA completion queue will use the completion vector to signal completion events # via hardware interrupts. A large number of hardware interrupts can affect CPU performance. -# It is possible to tune the performance using rdma-comp-vector. +# It is possible to tune the performance using rdma-completion-vector. # # Example 1. a) Pin hardware interrupt vectors [0, 3] to CPU [0, 3]. # b) Set CPU affinity for valkey to CPU [4, X]. -# c) Any valkey server uses a random RDMA completion vector. +# c) Any valkey server uses a random RDMA completion vector [-1]. # All valkey servers will not affect each other and will be isolated from kernel interrupts. # # SYS SYS SYS SYS VALKEY VALKEY VALKEY @@ -328,11 +329,13 @@ tcp-keepalive 300 # | | | | # INTR0 INTR1 INTR2 INTR3 # -# Example 2. a) Pin hardware interrupt vectors [0, X] to CPU [0, X]. -# b) Set CPU affinity for valkey to CPU [0, X]. +# Example 2. a) 1:1 pin hardware interrupt vectors [0, X] to CPU [0, X]. +# b) Set CPU affinity for valkey [M] to CPU [M]. # c) Valkey server [M] uses RDMA completion vector [M]. -# A single CPU handles hardware interrupts, the RDMA completion queue, and the valkey server. -# This avoids overhead and function calls across multiple CPUs. +# A single CPU [M] handles hardware interrupts, the RDMA completion vector [M], +# and the valkey server [M] within its context only. +# This avoids overhead and function calls across multiple CPUs, fully isolating +# each valkey server from one another. # # VALKEY VALKEY VALKEY VALKEY VALKEY VALKEY VALKEY # | | | | | | | @@ -343,7 +346,7 @@ tcp-keepalive 300 # Use 0 and positive numbers to specify the RDMA completion vector, or specify -1 to allow # the server to use a random vector for a new connection. The default vector is -1. # -# rdma-comp-vector 0 +# rdma-completion-vector 0 ################################# GENERAL ##################################### From 2644d71dd035652bec0689620593e5341e550a6b Mon Sep 17 00:00:00 2001 From: zhenwei pi Date: Fri, 22 Nov 2024 11:30:27 +0800 Subject: [PATCH 09/10] Rename "rdma-comp-vector" to "rdma-completion-vector" for test Signed-off-by: zhenwei pi --- tests/unit/introspection.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 5a9ddc00f4..d79bb1c7da 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -558,7 +558,7 @@ start_server {tags {"introspection"}} { req-res-logfile client-default-resp dual-channel-replication-enabled - rdma-comp-vector + rdma-completion-vector rdma-rx-size rdma-bind rdma-port From 4fec63d6cfa92b6fd7683b6b7e9acd67eef5e453 Mon Sep 17 00:00:00 2001 From: zhenwei pi Date: Fri, 22 Nov 2024 11:34:32 +0800 Subject: [PATCH 10/10] Typo fix Signed-off-by: zhenwei pi --- src/config.c | 2 +- src/rdma.c | 2 +- src/server.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/config.c b/src/config.c index 1e30b5f60e..7f0901c50a 100644 --- a/src/config.c +++ b/src/config.c @@ -3309,7 +3309,7 @@ standardConfig static_configs[] = { createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort), createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL), - createIntConfig("rdma-completion-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.complection_vector, -1, INTEGER_CONFIG, NULL, NULL), + createIntConfig("rdma-completion-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.completion_vector, -1, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), diff --git a/src/rdma.c b/src/rdma.c index 275608a117..de7ea396a1 100644 --- a/src/rdma.c +++ b/src/rdma.c @@ -294,7 +294,7 @@ static int rdmaCreateResource(RdmaContext *ctx, struct rdma_cm_id *cm_id) { struct ibv_comp_channel *comp_channel = NULL; struct ibv_cq *cq = NULL; struct ibv_pd *pd = NULL; - int comp_vector = rdma_config->complection_vector; + int comp_vector = rdma_config->completion_vector; if (ibv_query_device(cm_id->verbs, &device_attr)) { serverLog(LL_WARNING, "RDMA: ibv ibv query device failed"); diff --git a/src/server.h b/src/server.h index 43adaee407..a4141c33d4 100644 --- a/src/server.h +++ b/src/server.h @@ -1621,7 +1621,7 @@ typedef struct serverRdmaContextConfig { int bindaddr_count; int port; int rx_size; - int complection_vector; + int completion_vector; } serverRdmaContextConfig; /*-----------------------------------------------------------------------------