diff --git a/README.md b/README.md index 94f38bccf7..5c74a41dd3 100644 --- a/README.md +++ b/README.md @@ -37,8 +37,13 @@ To build TLS as Valkey module: Note that sentinel mode does not support TLS module. To build with experimental RDMA support you'll need RDMA development libraries -(e.g. librdmacm-dev and libibverbs-dev on Debian/Ubuntu). For now, Valkey only -supports RDMA as connection module mode. Run: +(e.g. librdmacm-dev and libibverbs-dev on Debian/Ubuntu). + +To build RDMA support as Valkey built-in: + + % make BUILD_RDMA=yes + +To build RDMA as Valkey module: % make BUILD_RDMA=module @@ -203,20 +208,27 @@ Note that Valkey Over RDMA is an experimental feature. It may be changed or removed in any minor or major version. Currently, it is only supported on Linux. -To manually run a Valkey server with RDMA mode: +* RDMA built-in mode: + ``` + ./src/valkey-server --protected-mode no \ + --rdma-bind 192.168.122.100 --rdma-port 6379 + ``` - % ./src/valkey-server --protected-mode no \ - --loadmodule src/valkey-rdma.so bind=192.168.122.100 port=6379 +* RDMA module mode: + ``` + ./src/valkey-server --protected-mode no \ + --loadmodule src/valkey-rdma.so --rdma-bind 192.168.122.100 --rdma-port 6379 + ``` It's possible to change bind address/port of RDMA by runtime command: - 192.168.122.100:6379> CONFIG SET rdma.port 6380 + 192.168.122.100:6379> CONFIG SET rdma-port 6380 It's also possible to have both RDMA and TCP available, and there is no conflict of TCP(6379) and RDMA(6379), Ex: % ./src/valkey-server --protected-mode no \ - --loadmodule src/valkey-rdma.so bind=192.168.122.100 port=6379 \ + --loadmodule src/valkey-rdma.so --rdma-bind 192.168.122.100 --rdma-port 6379 \ --port 6379 Note that the network card (192.168.122.100 of this example) should support diff --git a/cmake/Modules/SourceFiles.cmake b/cmake/Modules/SourceFiles.cmake index 873229d6f0..c34ae644a2 100644 --- a/cmake/Modules/SourceFiles.cmake +++ b/cmake/Modules/SourceFiles.cmake @@ -88,6 +88,7 @@ set(VALKEY_SERVER_SRCS ${CMAKE_SOURCE_DIR}/src/tracking.c ${CMAKE_SOURCE_DIR}/src/socket.c ${CMAKE_SOURCE_DIR}/src/tls.c + ${CMAKE_SOURCE_DIR}/src/rdma.c ${CMAKE_SOURCE_DIR}/src/sha256.c ${CMAKE_SOURCE_DIR}/src/timeout.c ${CMAKE_SOURCE_DIR}/src/setcpuaffinity.c diff --git a/cmake/Modules/ValkeySetup.cmake b/cmake/Modules/ValkeySetup.cmake index e935c3b308..bbd203f7f4 100644 --- a/cmake/Modules/ValkeySetup.cmake +++ b/cmake/Modules/ValkeySetup.cmake @@ -199,29 +199,30 @@ if (BUILD_RDMA) # RDMA support (Linux only) if (LINUX AND NOT APPLE) valkey_parse_build_option(${BUILD_RDMA} USE_RDMA) + find_package(PkgConfig REQUIRED) + # Locate librdmacm & libibverbs, fail if we can't find them + pkg_check_modules(RDMACM REQUIRED librdmacm) + pkg_check_modules(IBVERBS REQUIRED libibverbs) + message(STATUS "${RDMACM_LINK_LIBRARIES};${IBVERBS_LINK_LIBRARIES}") + list(APPEND RDMA_LIBS "${RDMACM_LIBRARIES};${IBVERBS_LIBRARIES}") + if (USE_RDMA EQUAL 2) # Module message(STATUS "Building RDMA as module") add_valkey_server_compiler_options("-DUSE_RDMA=2") - find_package(PkgConfig REQUIRED) - - # Locate librdmacm & libibverbs, fail if we can't find them - pkg_check_modules(RDMACM REQUIRED librdmacm) - pkg_check_modules(IBVERBS REQUIRED libibverbs) - - message(STATUS "${RDMACM_LINK_LIBRARIES};${IBVERBS_LINK_LIBRARIES}") - list(APPEND RDMA_LIBS "${RDMACM_LIBRARIES};${IBVERBS_LIBRARIES}") - unset(RDMACM_LINK_LIBRARIES CACHE) - unset(IBVERBS_LINK_LIBRARIES CACHE) set(BUILD_RDMA_MODULE 1) elseif (USE_RDMA EQUAL 1) - # RDMA can only be built as a module. So disable it - message(WARNING "BUILD_RDMA can be one of: [NO | 0 | MODULE], but '${BUILD_RDMA}' was provided") - message(STATUS "RDMA build is disabled") - set(USE_RDMA 0) + message(STATUS "Building RDMA as builtin") + add_valkey_server_compiler_options("-DUSE_RDMA=1") + add_valkey_server_compiler_options("-DBUILD_RDMA_MODULE=0") + list(APPEND SERVER_LIBS "${RDMA_LIBS}") endif () else () message(WARNING "RDMA is only supported on Linux platforms") endif () +else () + # By default, RDMA is disabled + message(STATUS "RDMA is disabled") + set(USE_RDMA 0) endif () set(BUILDING_ARM64 0) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b7e328163b..e3c4b1cd51 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -45,7 +45,7 @@ if (BUILD_RDMA_MODULE) set(MODULE_NAME "valkey-rdma") message(STATUS "Building RDMA module") add_library(${MODULE_NAME} SHARED "${VALKEY_RDMA_MODULE_SRCS}") - target_compile_options(${MODULE_NAME} PRIVATE -DBUILD_RDMA_MODULE -DUSE_RDMA=1) + target_compile_options(${MODULE_NAME} PRIVATE -DBUILD_RDMA_MODULE=2 -DUSE_RDMA=1) target_link_libraries(${MODULE_NAME} "${RDMA_LIBS}") # remove the "lib" prefix from the module set_target_properties(${MODULE_NAME} PROPERTIES PREFIX "") diff --git a/src/Makefile b/src/Makefile index f876f55dec..c13a21d58c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -325,26 +325,26 @@ ifeq ($(BUILD_TLS),module) TLS_MODULE_CFLAGS+=-DUSE_OPENSSL=$(BUILD_MODULE) $(OPENSSL_CFLAGS) -DBUILD_TLS_MODULE=$(BUILD_MODULE) endif -BUILD_RDMA:=no -RDMA_MODULE= -RDMA_MODULE_NAME:=valkey-rdma$(PROG_SUFFIX).so -RDMA_MODULE_CFLAGS:=$(FINAL_CFLAGS) -ifeq ($(BUILD_RDMA),module) - FINAL_CFLAGS+=-DUSE_RDMA=$(BUILD_MODULE) - RDMA_PKGCONFIG := $(shell $(PKG_CONFIG) --exists librdmacm libibverbs && echo $$?) +RDMA_LIBS= +RDMA_PKGCONFIG := $(shell $(PKG_CONFIG) --exists librdmacm libibverbs && echo $$?) ifeq ($(RDMA_PKGCONFIG),0) RDMA_LIBS=$(shell $(PKG_CONFIG) --libs librdmacm libibverbs) else RDMA_LIBS=-lrdmacm -libverbs endif - RDMA_MODULE=$(RDMA_MODULE_NAME) - RDMA_MODULE_CFLAGS+=-DUSE_RDMA=$(BUILD_YES) -DBUILD_RDMA_MODULE $(RDMA_LIBS) -else -ifeq ($(BUILD_RDMA),no) - # disable RDMA, do nothing -else - $(error "RDMA is only supported as module (BUILD_RDMA=module), or disabled (BUILD_RDMA=no)") + +ifeq ($(BUILD_RDMA),yes) + FINAL_CFLAGS+=-DUSE_RDMA=$(BUILD_YES) -DBUILD_RDMA_MODULE=$(BUILD_NO) + FINAL_LIBS += $(RDMA_LIBS) endif + +RDMA_MODULE= +RDMA_MODULE_NAME:=valkey-rdma$(PROG_SUFFIX).so +RDMA_MODULE_CFLAGS:=$(FINAL_CFLAGS) +ifeq ($(BUILD_RDMA),module) + FINAL_CFLAGS+=-DUSE_RDMA=$(BUILD_MODULE) + RDMA_MODULE=$(RDMA_MODULE_NAME) + RDMA_MODULE_CFLAGS+=-DUSE_RDMA=$(BUILD_MODULE) -DBUILD_RDMA_MODULE=$(BUILD_MODULE) $(RDMA_LIBS) endif ifndef V @@ -411,7 +411,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/config.c b/src/config.c index c4009adefa..7f0901c50a 100644 --- a/src/config.c +++ b/src/config.c @@ -1536,10 +1536,27 @@ void rewriteConfigOOMScoreAdjValuesOption(standardConfig *config, const char *na } /* Rewrite the bind option. */ -void rewriteConfigBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) { +static void rewriteConfigBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state, char **bindaddr, int bindaddr_count) { UNUSED(config); int force = 1; sds line, addresses; + + /* Rewrite as bind ... */ + if (bindaddr_count > 0) + addresses = sdsjoin(bindaddr, bindaddr_count, " "); + else + addresses = sdsnew("\"\""); + line = sdsnew(name); + line = sdscatlen(line, " ", 1); + line = sdscatsds(line, addresses); + sdsfree(addresses); + + rewriteConfigRewriteLine(state, name, line, force); +} + +/* Rewrite the bind option. */ +static void rewriteConfigSocketBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) { + UNUSED(config); int is_default = 0; /* Compare server.bindaddr with CONFIG_DEFAULT_BINDADDR */ @@ -1559,17 +1576,7 @@ void rewriteConfigBindOption(standardConfig *config, const char *name, struct re return; } - /* Rewrite as bind ... */ - if (server.bindaddr_count > 0) - addresses = sdsjoin(server.bindaddr, server.bindaddr_count, " "); - else - addresses = sdsnew("\"\""); - line = sdsnew(name); - line = sdscatlen(line, " ", 1); - line = sdscatsds(line, addresses); - sdsfree(addresses); - - rewriteConfigRewriteLine(state, name, line, force); + rewriteConfigBindOption(config, name, state, server.bindaddr, server.bindaddr_count); } /* Rewrite the loadmodule option. */ @@ -2637,7 +2644,7 @@ static int applyBind(const char **err) { tcp_listener->ct = connectionByType(CONN_TYPE_SOCKET); if (changeListener(tcp_listener) == C_ERR) { *err = "Failed to bind to specified addresses."; - if (tls_listener) closeListener(tls_listener); /* failed with TLS together */ + if (tls_listener) connCloseListener(tls_listener); /* failed with TLS together */ return 0; } @@ -2649,7 +2656,7 @@ static int applyBind(const char **err) { tls_listener->ct = connectionByType(CONN_TYPE_TLS); if (changeListener(tls_listener) == C_ERR) { *err = "Failed to bind to specified addresses."; - closeListener(tcp_listener); /* failed with TCP together */ + connCloseListener(tcp_listener); /* failed with TCP together */ return 0; } } @@ -2922,8 +2929,9 @@ static sds getConfigNotifyKeyspaceEventsOption(standardConfig *config) { return keyspaceEventsFlagsToString(server.notify_keyspace_events); } -static int setConfigBindOption(standardConfig *config, sds *argv, int argc, const char **err) { +static int setConfigBindOption(standardConfig *config, sds *argv, int argc, const char **err, char **bindaddr, int *bindaddr_count) { UNUSED(config); + int orig_bindaddr_count = *bindaddr_count; int j; if (argc > CONFIG_BINDADDR_MAX) { @@ -2935,11 +2943,73 @@ static int setConfigBindOption(standardConfig *config, sds *argv, int argc, cons if (argc == 1 && sdslen(argv[0]) == 0) argc = 0; /* Free old bind addresses */ - for (j = 0; j < server.bindaddr_count; j++) { - zfree(server.bindaddr[j]); + for (j = 0; j < orig_bindaddr_count; j++) zfree(bindaddr[j]); + for (j = 0; j < argc; j++) bindaddr[j] = zstrdup(argv[j]); + *bindaddr_count = argc; + + return 1; +} + +static int setConfigSocketBindOption(standardConfig *config, sds *argv, int argc, const char **err) { + UNUSED(config); + return setConfigBindOption(config, argv, argc, err, server.bindaddr, &server.bindaddr_count); +} + +static int setConfigRdmaBindOption(standardConfig *config, sds *argv, int argc, const char **err) { + UNUSED(config); + return setConfigBindOption(config, argv, argc, err, server.rdma_ctx_config.bindaddr, &server.rdma_ctx_config.bindaddr_count); +} + +static sds getConfigRdmaBindOption(standardConfig *config) { + UNUSED(config); + return sdsjoin(server.rdma_ctx_config.bindaddr, server.rdma_ctx_config.bindaddr_count, " "); +} + +static void rewriteConfigRdmaBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) { + UNUSED(config); + + if (server.rdma_ctx_config.bindaddr_count) { + rewriteConfigBindOption(config, name, state, server.rdma_ctx_config.bindaddr, + server.rdma_ctx_config.bindaddr_count); + } +} + +static int applyRdmaBind(const char **err) { + connListener *rdma_listener = listenerByType(CONN_TYPE_RDMA); + + if (!rdma_listener) { + *err = "No RDMA building support."; + return 0; + } + + rdma_listener->bindaddr = server.rdma_ctx_config.bindaddr; + rdma_listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count; + rdma_listener->port = server.rdma_ctx_config.port; + rdma_listener->ct = connectionByType(CONN_TYPE_RDMA); + if (changeListener(rdma_listener) == C_ERR) { + *err = "Failed to bind to specified addresses for RDMA."; + return 0; + } + + return 1; +} + +static int updateRdmaPort(const char **err) { + connListener *listener = listenerByType(CONN_TYPE_RDMA); + + if (listener == NULL) { + *err = "No RDMA building support."; + return 0; + } + + listener->bindaddr = server.rdma_ctx_config.bindaddr; + listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count; + listener->port = server.rdma_ctx_config.port; + listener->ct = connectionByType(CONN_TYPE_RDMA); + if (changeListener(listener) == C_ERR) { + *err = "Unable to listen on this port for RDMA. Check server logs."; + return 0; } - for (j = 0; j < argc; j++) server.bindaddr[j] = zstrdup(argv[j]); - server.bindaddr_count = argc; return 1; } @@ -3237,6 +3307,9 @@ standardConfig static_configs[] = { createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod), createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL), createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL), + createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort), + createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL), + createIntConfig("rdma-completion-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.completion_vector, -1, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), @@ -3316,7 +3389,8 @@ standardConfig static_configs[] = { createSpecialConfig("client-output-buffer-limit", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigClientOutputBufferLimitOption, getConfigClientOutputBufferLimitOption, rewriteConfigClientOutputBufferLimitOption, NULL), createSpecialConfig("oom-score-adj-values", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigOOMScoreAdjValuesOption, getConfigOOMScoreAdjValuesOption, rewriteConfigOOMScoreAdjValuesOption, updateOOMScoreAdj), createSpecialConfig("notify-keyspace-events", NULL, MODIFIABLE_CONFIG, setConfigNotifyKeyspaceEventsOption, getConfigNotifyKeyspaceEventsOption, rewriteConfigNotifyKeyspaceEventsOption, NULL), - createSpecialConfig("bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigBindOption, getConfigBindOption, rewriteConfigBindOption, applyBind), + createSpecialConfig("bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigSocketBindOption, getConfigBindOption, rewriteConfigSocketBindOption, applyBind), + createSpecialConfig("rdma-bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigRdmaBindOption, getConfigRdmaBindOption, rewriteConfigRdmaBindOption, applyRdmaBind), createSpecialConfig("replicaof", "slaveof", IMMUTABLE_CONFIG | MULTI_ARG_CONFIG, setConfigReplicaOfOption, getConfigReplicaOfOption, rewriteConfigReplicaOfOption, NULL), createSpecialConfig("latency-tracking-info-percentiles", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigLatencyTrackingInfoPercentilesOutputOption, getConfigLatencyTrackingInfoPercentilesOutputOption, rewriteConfigLatencyTrackingInfoPercentilesOutputOption, NULL), diff --git a/src/connection.c b/src/connection.c index f0c1c2d364..8807541d77 100644 --- a/src/connection.c +++ b/src/connection.c @@ -66,6 +66,9 @@ int connTypeInitialize(void) { /* may fail if without BUILD_TLS=yes */ RedisRegisterConnectionTypeTLS(); + /* may fail if without BUILD_RDMA=yes */ + RegisterConnectionTypeRdma(); + return C_OK; } diff --git a/src/connection.h b/src/connection.h index 0762441732..8a2775ee34 100644 --- a/src/connection.h +++ b/src/connection.h @@ -60,6 +60,7 @@ typedef enum { #define CONN_TYPE_SOCKET "tcp" #define CONN_TYPE_UNIX "unix" #define CONN_TYPE_TLS "tls" +#define CONN_TYPE_RDMA "rdma" #define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */ typedef void (*ConnectionCallbackFunc)(struct connection *conn); @@ -79,6 +80,7 @@ typedef struct ConnectionType { int (*addr)(connection *conn, char *ip, size_t ip_len, int *port, int remote); int (*is_local)(connection *conn); int (*listen)(connListener *listener); + void (*closeListener)(connListener *listener); /* create/shutdown/close connection */ connection *(*conn_create)(void); @@ -442,6 +444,13 @@ static inline int connListen(connListener *listener) { return listener->ct->listen(listener); } +/* Close a listened listener */ +static inline void connCloseListener(connListener *listener) { + if (listener->count) { + listener->ct->closeListener(listener); + } +} + /* Get accept_handler of a connection type */ static inline aeFileProc *connAcceptHandler(ConnectionType *ct) { if (ct) return ct->accept_handler; @@ -454,6 +463,7 @@ sds getListensInfoString(sds info); int RedisRegisterConnectionTypeSocket(void); int RedisRegisterConnectionTypeUnix(void); int RedisRegisterConnectionTypeTLS(void); +int RegisterConnectionTypeRdma(void); /* Return 1 if connection is using TLS protocol, 0 if otherwise. */ static inline int connIsTLS(connection *conn) { diff --git a/src/rdma.c b/src/rdma.c index 7cdcb24913..de7ea396a1 100644 --- a/src/rdma.c +++ b/src/rdma.c @@ -10,9 +10,10 @@ #define VALKEYMODULE_CORE_MODULE #include "server.h" - -#if defined USE_RDMA && defined __linux__ /* currently RDMA is only supported on Linux */ #include "connection.h" + +#if defined __linux__ /* currently RDMA is only supported on Linux */ +#if (USE_RDMA == 1 /* BUILD_YES */) || ((USE_RDMA == 2 /* BUILD_MODULE */) && (BUILD_RDMA_MODULE == 2)) #include "connhelpers.h" #include @@ -128,12 +129,10 @@ typedef struct rdma_listener { static list *pending_list; static rdma_listener *rdma_listeners; +static serverRdmaContextConfig *rdma_config; static ConnectionType CT_RDMA; -static int valkey_rdma_rx_size = VALKEY_RDMA_DEFAULT_RX_SIZE; -static int valkey_rdma_comp_vector = -1; /* -1 means a random one */ - static void serverRdmaError(char *err, const char *fmt, ...) { va_list ap; @@ -272,7 +271,7 @@ static int rdmaSetupIoBuf(RdmaContext *ctx, struct rdma_cm_id *cm_id) { /* setup recv buf & MR */ access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE; - length = valkey_rdma_rx_size; + length = rdma_config->rx_size; ctx->rx.addr = page_aligned_zalloc(length); ctx->rx.length = length; ctx->rx.mr = ibv_reg_mr(ctx->pd, ctx->rx.addr, length, access); @@ -295,6 +294,7 @@ static int rdmaCreateResource(RdmaContext *ctx, struct rdma_cm_id *cm_id) { struct ibv_comp_channel *comp_channel = NULL; struct ibv_cq *cq = NULL; struct ibv_pd *pd = NULL; + int comp_vector = rdma_config->completion_vector; if (ibv_query_device(cm_id->verbs, &device_attr)) { serverLog(LL_WARNING, "RDMA: ibv ibv query device failed"); @@ -317,8 +317,13 @@ static int rdmaCreateResource(RdmaContext *ctx, struct rdma_cm_id *cm_id) { ctx->comp_channel = comp_channel; + /* negative number means a random one */ + if (comp_vector < 0) { + comp_vector = abs((int)random()); + } + cq = ibv_create_cq(cm_id->verbs, VALKEY_RDMA_MAX_WQE * 2, NULL, comp_channel, - valkey_rdma_comp_vector % cm_id->verbs->num_comp_vectors); + comp_vector % cm_id->verbs->num_comp_vectors); if (!cq) { serverLog(LL_WARNING, "RDMA: ibv create cq failed"); return C_ERR; @@ -1610,9 +1615,28 @@ int connRdmaListen(connListener *listener) { rdma_listener++; } + rdma_config = listener->priv; return C_OK; } +static void connRdmaCloseListener(connListener *listener) { + /* Close old servers */ + for (int i = 0; i < listener->count; i++) { + if (listener->fd[i] == -1) continue; + + aeDeleteFileEvent(server.el, listener->fd[i], AE_READABLE); + listener->fd[i] = -1; + struct rdma_listener *rdma_listener = &rdma_listeners[i]; + rdma_destroy_id(rdma_listener->cm_id); + rdma_destroy_event_channel(rdma_listener->cm_channel); + } + + listener->count = 0; + zfree(rdma_listeners); + rdma_listeners = NULL; + rdma_config = NULL; +} + static int connRdmaAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) { rdma_connection *rdma_conn = (rdma_connection *)conn; struct rdma_cm_id *cm_id = rdma_conn->cm_id; @@ -1740,6 +1764,7 @@ static ConnectionType CT_RDMA = { //.cluster_accept_handler = NULL, .is_local = connRdmaIsLocal, .listen = connRdmaListen, + .closeListener = connRdmaCloseListener, .addr = connRdmaAddr, /* create/close connection */ @@ -1769,17 +1794,6 @@ static ConnectionType CT_RDMA = { .process_pending_data = rdmaProcessPendingData, }; -static struct connListener *rdmaListener(void) { - static struct connListener *listener = NULL; - - if (listener) return listener; - - listener = listenerByType(CONN_TYPE_RDMA); - serverAssert(listener != NULL); - - return listener; -} - ConnectionType *connectionTypeRdma(void) { static ConnectionType *ct_rdma = NULL; @@ -1791,133 +1805,28 @@ ConnectionType *connectionTypeRdma(void) { return ct_rdma; } -/* rdma listener has different create/close logic from TCP, we can't re-use 'int changeListener(connListener *listener)' - * directly */ -static int rdmaChangeListener(void) { - struct connListener *listener = rdmaListener(); - - /* Close old servers */ - for (int i = 0; i < listener->count; i++) { - if (listener->fd[i] == -1) continue; - - aeDeleteFileEvent(server.el, listener->fd[i], AE_READABLE); - listener->fd[i] = -1; - struct rdma_listener *rdma_listener = &rdma_listeners[i]; - rdma_destroy_id(rdma_listener->cm_id); - rdma_destroy_event_channel(rdma_listener->cm_channel); - } - - listener->count = 0; - zfree(rdma_listeners); - rdma_listeners = NULL; - - closeListener(listener); - - /* Just close the server if port disabled */ - if (listener->port == 0) { - if (server.set_proc_title) serverSetProcTitle(NULL); - return VALKEYMODULE_OK; - } - - /* Re-create listener */ - if (connListen(listener) != C_OK) { - return VALKEYMODULE_ERR; - } - - /* Create event handlers */ - if (createSocketAcceptHandler(listener, listener->ct->accept_handler) != C_OK) { - serverPanic("Unrecoverable error creating %s accept handler.", listener->ct->get_type(NULL)); - } - - if (server.set_proc_title) serverSetProcTitle(NULL); - - return VALKEYMODULE_OK; -} - -#ifdef BUILD_RDMA_MODULE - -#include "release.h" - -static long long rdmaGetPort(const char *name, void *privdata) { - UNUSED(name); - UNUSED(privdata); - struct connListener *listener = rdmaListener(); - - return listener->port; -} - -static int rdmaSetPort(const char *name, long long val, void *privdata, ValkeyModuleString **err) { - UNUSED(name); - UNUSED(privdata); - UNUSED(err); - struct connListener *listener = rdmaListener(); - listener->port = val; - - return VALKEYMODULE_OK; -} - -static ValkeyModuleString *rdma_bind; - -static void rdmaBuildBind(void *ctx) { - struct connListener *listener = rdmaListener(); - - if (rdma_bind) ValkeyModule_FreeString(NULL, rdma_bind); - - sds rdma_bind_str = sdsjoin(listener->bindaddr, listener->bindaddr_count, " "); - rdma_bind = ValkeyModule_CreateString(ctx, rdma_bind_str, sdslen(rdma_bind_str)); +int RegisterConnectionTypeRdma(void) { + return connTypeRegister(&CT_RDMA); } -static ValkeyModuleString *rdmaGetBind(const char *name, void *privdata) { - UNUSED(name); - UNUSED(privdata); +#else - return rdma_bind; +int RegisterConnectionTypeRdma(void) { + serverLog(LL_VERBOSE, "Connection type %s not builtin", CONN_TYPE_RDMA); + return C_ERR; } -static int rdmaSetBind(const char *name, ValkeyModuleString *val, void *privdata, ValkeyModuleString **err) { - UNUSED(name); - UNUSED(err); - struct connListener *listener = rdmaListener(); - const char *bind = ValkeyModule_StringPtrLen(val, NULL); - int nexts; - sds *exts = sdssplitlen(bind, strlen(bind), " ", 1, &nexts); - - if (nexts > CONFIG_BINDADDR_MAX) { - serverLog(LL_WARNING, "RDMA: Unsupported bind ( > %d)", CONFIG_BINDADDR_MAX); - return VALKEYMODULE_ERR; - } - - /* Free old bind addresses */ - for (int j = 0; j < listener->bindaddr_count; j++) { - zfree(listener->bindaddr[j]); - } - - for (int j = 0; j < nexts; j++) listener->bindaddr[j] = zstrdup(exts[j]); - listener->bindaddr_count = nexts; - - sdsfreesplitres(exts, nexts); - rdmaBuildBind(privdata); - - return VALKEYMODULE_OK; -} +#endif -static int rdmaApplyListener(ValkeyModuleCtx *ctx, void *privdata, ValkeyModuleString **err) { - UNUSED(ctx); - UNUSED(privdata); - UNUSED(err); +#if BUILD_RDMA_MODULE == 2 /* BUILD_MODULE */ - return rdmaChangeListener(); -} +#include "release.h" -static void rdmaListenerAddConfig(void *ctx) { - serverAssert(ValkeyModule_RegisterNumericConfig(ctx, "port", 0, VALKEYMODULE_CONFIG_DEFAULT, 0, 65535, rdmaGetPort, - rdmaSetPort, rdmaApplyListener, NULL) == VALKEYMODULE_OK); - serverAssert(ValkeyModule_RegisterStringConfig(ctx, "bind", "", VALKEYMODULE_CONFIG_DEFAULT, rdmaGetBind, - rdmaSetBind, rdmaApplyListener, ctx) == VALKEYMODULE_OK); - serverAssert(ValkeyModule_LoadConfigs(ctx) == VALKEYMODULE_OK); -} int ValkeyModule_OnLoad(void *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + UNUSED(argc); + /* Connection modules MUST be part of the same build as valkey. */ if (strcmp(REDIS_BUILD_ID_RAW, serverBuildIdRaw())) { serverLog(LL_NOTICE, "Connection type %s was not built together with the valkey-server used.", CONN_TYPE_RDMA); @@ -1936,40 +1845,6 @@ int ValkeyModule_OnLoad(void *ctx, ValkeyModuleString **argv, int argc) { if (connTypeRegister(&CT_RDMA) != C_OK) return VALKEYMODULE_ERR; - rdmaListenerAddConfig(ctx); - - struct connListener *listener = rdmaListener(); - listener->ct = connectionTypeRdma(); - listener->bindaddr = zcalloc_num(CONFIG_BINDADDR_MAX, sizeof(listener->bindaddr[0])); - - for (int i = 0; i < argc; i++) { - robj *str = (robj *)argv[i]; - int nexts; - sds *exts = sdssplitlen(str->ptr, strlen(str->ptr), "=", 1, &nexts); - if (nexts != 2) { - serverLog(LL_WARNING, "RDMA: Unsupported argument \"%s\"", (char *)str->ptr); - return VALKEYMODULE_ERR; - } - - if (!strcasecmp(exts[0], "bind")) { - listener->bindaddr[listener->bindaddr_count++] = zstrdup(exts[1]); - } else if (!strcasecmp(exts[0], "port")) { - listener->port = atoi(exts[1]); - } else if (!strcasecmp(exts[0], "rx-size")) { - valkey_rdma_rx_size = atoi(exts[1]); - } else if (!strcasecmp(exts[0], "comp-vector")) { - valkey_rdma_comp_vector = atoi(exts[1]); - } else { - serverLog(LL_WARNING, "RDMA: Unsupported argument \"%s\"", (char *)str->ptr); - return VALKEYMODULE_ERR; - } - - sdsfreesplitres(exts, nexts); - } - - rdmaBuildBind(ctx); - if (valkey_rdma_comp_vector == -1) valkey_rdma_comp_vector = abs((int)random()); - return VALKEYMODULE_OK; } @@ -1981,4 +1856,11 @@ int ValkeyModule_OnUnload(void *arg) { #endif /* BUILD_RDMA_MODULE */ -#endif /* USE_RDMA && __linux__ */ +#else /* __linux__ */ + +int RegisterConnectionTypeRdma(void) { + serverLog(LL_VERBOSE, "Connection type %s is supported on Linux only", CONN_TYPE_RDMA); + return C_ERR; +} + +#endif /* __linux__ */ diff --git a/src/server.c b/src/server.c index 51de89ee53..dc8b6d7d6c 100644 --- a/src/server.c +++ b/src/server.c @@ -2481,19 +2481,6 @@ void checkTcpBacklogSettings(void) { #endif } -void closeListener(connListener *sfd) { - int j; - - for (j = 0; j < sfd->count; j++) { - if (sfd->fd[j] == -1) continue; - - aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE); - close(sfd->fd[j]); - } - - sfd->count = 0; -} - /* Create an event handler for accepting new connections in TCP or TLS domain sockets. * This works atomically for all socket fds */ int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) { @@ -2557,7 +2544,7 @@ int listenToPort(connListener *sfd) { continue; /* Rollback successful listens before exiting */ - closeListener(sfd); + connCloseListener(sfd); return C_ERR; } if (server.socket_mark_id > 0) anetSetSockMarkId(NULL, sfd->fd[sfd->count], server.socket_mark_id); @@ -2897,6 +2884,17 @@ void initListeners(void) { listener->priv = &server.unix_ctx_config; /* Unix socket specified */ } + if (server.rdma_ctx_config.port != 0) { + conn_index = connectionIndexByType(CONN_TYPE_RDMA); + if (conn_index < 0) serverPanic("Failed finding connection listener of %s", CONN_TYPE_RDMA); + listener = &server.listeners[conn_index]; + listener->bindaddr = server.rdma_ctx_config.bindaddr; + listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count; + listener->port = server.rdma_ctx_config.port; + listener->ct = connectionByType(CONN_TYPE_RDMA); + listener->priv = &server.rdma_ctx_config; + } + /* create all the configured listener, and add handler to start to accept */ int listen_fds = 0; for (int j = 0; j < CONN_TYPE_MAX; j++) { @@ -6295,7 +6293,7 @@ connListener *listenerByType(const char *typename) { /* Close original listener, re-create a new listener from the updated bind address & port */ int changeListener(connListener *listener) { /* Close old servers */ - closeListener(listener); + connCloseListener(listener); /* Just close the server if port disabled */ if (listener->port == 0) { diff --git a/src/server.h b/src/server.h index 8962b04086..a4141c33d4 100644 --- a/src/server.h +++ b/src/server.h @@ -1613,6 +1613,17 @@ typedef struct serverUnixContextConfig { unsigned int perm; /* UNIX socket permission (see mode_t) */ } serverUnixContextConfig; +/*----------------------------------------------------------------------------- + * RDMA Context Configuration + *----------------------------------------------------------------------------*/ +typedef struct serverRdmaContextConfig { + char *bindaddr[CONFIG_BINDADDR_MAX]; + int bindaddr_count; + int port; + int rx_size; + int completion_vector; +} serverRdmaContextConfig; + /*----------------------------------------------------------------------------- * AOF manifest definition *----------------------------------------------------------------------------*/ @@ -2226,6 +2237,7 @@ struct valkeyServer { int tls_auth_clients; serverTLSContextConfig tls_ctx_config; serverUnixContextConfig unix_ctx_config; + serverRdmaContextConfig rdma_ctx_config; /* cpu affinity */ char *server_cpulist; /* cpu affinity list of server main/io thread. */ char *bio_cpulist; /* cpu affinity list of bio thread. */ @@ -3289,7 +3301,6 @@ void setupSignalHandlers(void); int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler); connListener *listenerByType(const char *typename); int changeListener(connListener *listener); -void closeListener(connListener *listener); struct serverCommand *lookupSubcommand(struct serverCommand *container, sds sub_name); struct serverCommand *lookupCommand(robj **argv, int argc); struct serverCommand *lookupCommandBySdsLogic(dict *commands, sds s); diff --git a/src/socket.c b/src/socket.c index 7344d66ad8..d89e6c8767 100644 --- a/src/socket.c +++ b/src/socket.c @@ -339,6 +339,19 @@ static int connSocketListen(connListener *listener) { return listenToPort(listener); } +static void connSocketCloseListener(connListener *listener) { + int j; + + for (j = 0; j < listener->count; j++) { + if (listener->fd[j] == -1) continue; + + aeDeleteFileEvent(server.el, listener->fd[j], AE_READABLE); + close(listener->fd[j]); + } + + listener->count = 0; +} + static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) { int fd = anetTcpNonBlockConnect(NULL, addr, port); if (fd == -1) { @@ -395,6 +408,7 @@ static ConnectionType CT_Socket = { .addr = connSocketAddr, .is_local = connSocketIsLocal, .listen = connSocketListen, + .closeListener = connSocketCloseListener, /* create/shutdown/close connection */ .conn_create = connCreateSocket, diff --git a/src/tls.c b/src/tls.c index d1dd567354..48b75553de 100644 --- a/src/tls.c +++ b/src/tls.c @@ -805,6 +805,10 @@ static int connTLSListen(connListener *listener) { return listenToPort(listener); } +static void connTLSCloseListener(connListener *listener) { + connectionTypeTcp()->closeListener(listener); +} + static void connTLSShutdown(connection *conn_) { tls_connection *conn = (tls_connection *)conn_; @@ -1147,6 +1151,7 @@ static ConnectionType CT_TLS = { .addr = connTLSAddr, .is_local = connTLSIsLocal, .listen = connTLSListen, + .closeListener = connTLSCloseListener, /* create/shutdown/close connection */ .conn_create = connCreateTLS, diff --git a/src/unix.c b/src/unix.c index 35778779f9..86df05bd52 100644 --- a/src/unix.c +++ b/src/unix.c @@ -74,6 +74,10 @@ static int connUnixListen(connListener *listener) { return C_OK; } +static void connUnixCloseListener(connListener *listener) { + connectionTypeTcp()->closeListener(listener); +} + static connection *connCreateUnix(void) { connection *conn = zcalloc(sizeof(connection)); conn->type = &CT_Unix; @@ -174,6 +178,7 @@ static ConnectionType CT_Unix = { .addr = connUnixAddr, .is_local = connUnixIsLocal, .listen = connUnixListen, + .closeListener = connUnixCloseListener, /* create/shutdown/close connection */ .conn_create = connCreateUnix, diff --git a/tests/rdma/run.py b/tests/rdma/run.py index 0724c27adc..09168f368a 100755 --- a/tests/rdma/run.py +++ b/tests/rdma/run.py @@ -63,7 +63,7 @@ def test_rdma(ipaddr): rdmapath = valkeydir + "/src/valkey-rdma.so" svrcmd = [svrpath, "--port", "0", "--loglevel", "verbose", "--protected-mode", "yes", "--appendonly", "no", "--daemonize", "no", "--dir", valkeydir + "/tests/rdma/tmp", - "--loadmodule", rdmapath, "port=6379", "bind=" + ipaddr] + "--loadmodule", rdmapath, "--rdma-port", "6379", "--rdma-bind", ipaddr] svr = subprocess.Popen(svrcmd, shell=False, stdout=subprocess.PIPE) try: diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 352f5f183e..d79bb1c7da 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -558,6 +558,10 @@ start_server {tags {"introspection"}} { req-res-logfile client-default-resp dual-channel-replication-enabled + rdma-completion-vector + rdma-rx-size + rdma-bind + rdma-port } if {!$::tls} { diff --git a/valkey.conf b/valkey.conf index bf82b01874..8d3e11c515 100644 --- a/valkey.conf +++ b/valkey.conf @@ -300,6 +300,54 @@ tcp-keepalive 300 # # tls-session-cache-timeout 60 +################################### RDMA ###################################### + +# Valkey Over RDMA is experimental, it may be changed or be removed in any minor or major version. +# By default, RDMA is disabled. To enable it, the "rdma-port" configuration +# directive can be used to define RDMA-listening ports. +# +# rdma-port 6379 +# rdma-bind 192.168.1.100 + +# The RDMA receive transfer buffer is 1M by default. It can be set between 64K and 16M. +# Note that page size aligned size is preferred. +# +# rdma-rx-size 1048576 + +# The RDMA completion queue will use the completion vector to signal completion events +# via hardware interrupts. A large number of hardware interrupts can affect CPU performance. +# It is possible to tune the performance using rdma-completion-vector. +# +# Example 1. a) Pin hardware interrupt vectors [0, 3] to CPU [0, 3]. +# b) Set CPU affinity for valkey to CPU [4, X]. +# c) Any valkey server uses a random RDMA completion vector [-1]. +# All valkey servers will not affect each other and will be isolated from kernel interrupts. +# +# SYS SYS SYS SYS VALKEY VALKEY VALKEY +# | | | | | | | +# CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 ... CPUX +# | | | | +# INTR0 INTR1 INTR2 INTR3 +# +# Example 2. a) 1:1 pin hardware interrupt vectors [0, X] to CPU [0, X]. +# b) Set CPU affinity for valkey [M] to CPU [M]. +# c) Valkey server [M] uses RDMA completion vector [M]. +# A single CPU [M] handles hardware interrupts, the RDMA completion vector [M], +# and the valkey server [M] within its context only. +# This avoids overhead and function calls across multiple CPUs, fully isolating +# each valkey server from one another. +# +# VALKEY VALKEY VALKEY VALKEY VALKEY VALKEY VALKEY +# | | | | | | | +# CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 ... CPUX +# | | | | | | | +# INTR0 INTR1 INTR2 INTR3 INTR4 INTR5 INTRX +# +# Use 0 and positive numbers to specify the RDMA completion vector, or specify -1 to allow +# the server to use a random vector for a new connection. The default vector is -1. +# +# rdma-completion-vector 0 + ################################# GENERAL ##################################### # By default the server does not run as a daemon. Use 'yes' if you need it.