From c2e9fdd8ece06bf09a492b5195c9de20a39e7190 Mon Sep 17 00:00:00 2001 From: Xiao Yang Date: Wed, 16 Mar 2022 22:09:42 +0800 Subject: [PATCH] rpma: make gpspm server use separate RCQ Also use epoll APIs when busy_wait_polling == 0. Signed-off-by: Xiao Yang --- engines/librpma_gpspm.c | 323 ++++++++++++++++++++++++++++++---------- 1 file changed, 247 insertions(+), 76 deletions(-) diff --git a/engines/librpma_gpspm.c b/engines/librpma_gpspm.c index aae8a5bb7c..5de1ca61b2 100644 --- a/engines/librpma_gpspm.c +++ b/engines/librpma_gpspm.c @@ -17,6 +17,7 @@ #include "librpma_fio.h" #include +#include /* Generated by the protocol buffer compiler from: librpma_gpspm_flush.proto */ #include "librpma_gpspm_flush.pb-c.h" @@ -368,11 +369,17 @@ struct server_data { /* resources for messaging buffer from DRAM allocated by fio */ struct rpma_mr_local *msg_mr; - uint32_t msg_sqe_available; /* # of free SQ slots */ - - /* in-memory queues */ - struct ibv_wc *msgs_queued; - uint32_t msg_queued_nr; + /* # of free SQ slots */ + uint32_t msg_sqe_available; + /* receive CQ */ + struct rpma_cq *rcq; + + /* epoll's file descriptor */ + int ep_fd; + /* receive CQ's file descriptor */ + int rcq_fd; + /* main CQ's file descriptor */ + int cq_fd; }; static int server_init(struct thread_data *td) @@ -393,13 +400,6 @@ static int server_init(struct thread_data *td) goto err_server_cleanup; } - /* allocate in-memory queue */ - sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued)); - if (sd->msgs_queued == NULL) { - td_verror(td, errno, "calloc"); - goto err_free_sd; - } - /* * Assure a single io_u buffer can store both SEND and RECV messages and * an io_us buffer allocation is page-size-aligned which is required @@ -412,9 +412,6 @@ static int server_init(struct thread_data *td) return 0; -err_free_sd: - free(sd); - err_server_cleanup: librpma_fio_server_cleanup(td); @@ -485,7 +482,6 @@ static void server_cleanup(struct thread_data *td) if ((ret = rpma_mr_dereg(&sd->msg_mr))) librpma_td_verror(td, ret, "rpma_mr_dereg"); - free(sd->msgs_queued); free(sd); } @@ -515,9 +511,77 @@ static int prepare_connection(struct thread_data *td, return 0; } +static int epoll_init(struct thread_data *td) +{ + struct librpma_fio_server_data *csd = td->io_ops_data; + struct server_data *sd = csd->server_data; + struct epoll_event event = { + .events = EPOLLIN + }; + int ret; + + sd->ep_fd = epoll_create1(EPOLL_CLOEXEC); + if (sd->ep_fd == -1) { + td_verror(td, errno, "epoll_create1"); + return -1; + } + + if ((ret = rpma_cq_get_fd(csd->cq, &sd->cq_fd))) { + librpma_td_verror(td, ret, "rpma_cq_get_fd"); + goto err_ep_fd_close; + } + + event.data.fd = sd->cq_fd; + if ((ret = epoll_ctl(sd->ep_fd, EPOLL_CTL_ADD, sd->cq_fd, &event))) { + td_verror(td, errno, "epoll_ctl"); + goto err_ep_fd_close; + } + + if ((ret = rpma_cq_get_fd(sd->rcq, &sd->rcq_fd))) { + librpma_td_verror(td, ret, "rpma_cq_get_fd"); + goto err_cq_fd_delete; + } + + event.data.fd = sd->rcq_fd; + if ((ret = epoll_ctl(sd->ep_fd, EPOLL_CTL_ADD, sd->rcq_fd, &event))) { + td_verror(td, errno, "epoll_ctl"); + goto err_cq_fd_delete; + } + + return ret; + +err_cq_fd_delete: + (void) epoll_ctl(sd->ep_fd, EPOLL_CTL_DEL, sd->cq_fd, NULL); + +err_ep_fd_close: + (void) close(sd->ep_fd); + + return ret; +} + +static int epoll_cleanup(struct thread_data *td) +{ + struct librpma_fio_server_data *csd = td->io_ops_data; + struct server_data *sd = csd->server_data; + int ret; + + if ((ret = epoll_ctl(sd->ep_fd, EPOLL_CTL_DEL, sd->rcq_fd, NULL))) + td_verror(td, errno, "epoll_ctl"); + + if ((ret = epoll_ctl(sd->ep_fd, EPOLL_CTL_DEL, sd->cq_fd, NULL))) + td_verror(td, errno, "epoll_ctl"); + + if ((ret = close(sd->ep_fd))) + td_verror(td, errno, "close"); + + return ret; +} + static int server_open_file(struct thread_data *td, struct fio_file *f) { struct librpma_fio_server_data *csd = td->io_ops_data; + struct server_data *sd = csd->server_data; + struct librpma_fio_options_values *o = td->eo; struct rpma_conn_cfg *cfg = NULL; uint16_t max_msg_num = td->o.iodepth; int ret; @@ -531,13 +595,15 @@ static int server_open_file(struct thread_data *td, struct fio_file *f) } /* - * Calculate the required queue sizes where: + * The required queue sizes are: * - the send queue (SQ) has to be big enough to accommodate * all possible flush requests (SENDs) * - the receive queue (RQ) has to be big enough to accommodate * all flush responses (RECVs) - * - the completion queue (CQ) has to be big enough to accommodate - * all success and error completions (sq_size + rq_size) + * - the main completion queue (CQ) has to be big enough to + * accommodate all success and error completions (sq_size) + * - the receive completion queue (RCQ) has to be big enough to + * accommodate all success and error completions (rq_size) */ if ((ret = rpma_conn_cfg_set_sq_size(cfg, max_msg_num))) { librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size"); @@ -547,12 +613,24 @@ static int server_open_file(struct thread_data *td, struct fio_file *f) librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size"); goto err_cfg_delete; } - if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) { + if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num))) { librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size"); goto err_cfg_delete; } + if ((ret = rpma_conn_cfg_set_rcq_size(cfg, max_msg_num))) { + librpma_td_verror(td, ret, "rpma_conn_cfg_set_rcq_size"); + goto err_cfg_delete; + } + + if ((ret = librpma_fio_server_open_file(td, f, cfg))) + goto err_cfg_delete; + + /* get the connection's receive CQ */ + if ((ret = rpma_conn_get_rcq(csd->conn, &sd->rcq))) + librpma_td_verror(td, ret, "rpma_conn_get_rcq"); - ret = librpma_fio_server_open_file(td, f, cfg); + if (o->busy_wait_polling == 0) + ret = epoll_init(td); err_cfg_delete: (void) rpma_conn_cfg_delete(&cfg); @@ -560,6 +638,19 @@ static int server_open_file(struct thread_data *td, struct fio_file *f) return ret; } +static int server_close_file(struct thread_data *td, struct fio_file *f) +{ + struct librpma_fio_options_values *o = td->eo; + int ret; + + ret = librpma_fio_server_close_file(td, f); + + if (o->busy_wait_polling == 0) + ret |= epoll_cleanup(td); + + return ret; +} + static int server_qe_process(struct thread_data *td, struct ibv_wc *wc) { struct librpma_fio_server_data *csd = td->io_ops_data; @@ -645,92 +736,172 @@ static int server_qe_process(struct thread_data *td, struct ibv_wc *wc) return -1; } -static inline int server_queue_process(struct thread_data *td) +/* + * server_cmpl_poll - poll and process a completion + * + * Return value: + * 0 or 1 - number of received completions + * -1 - in case of an error + */ +static int server_cmpl_poll(struct thread_data *td, struct rpma_cq *cq, + struct ibv_wc *wc) { struct librpma_fio_server_data *csd = td->io_ops_data; struct server_data *sd = csd->server_data; int ret; - int i; - /* min(# of queue entries, # of SQ entries available) */ - uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available); - if (qes_to_process == 0) + ret = rpma_cq_get_wc(cq, 1, wc, NULL); + if (ret == RPMA_E_NO_COMPLETION) { + /* lack of completion is not an error */ return 0; + } + if (ret) { + librpma_td_verror(td, ret, "rpma_cq_get_wc"); + goto err_terminate; + } - /* process queued completions */ - for (i = 0; i < qes_to_process; ++i) { - if ((ret = server_qe_process(td, &sd->msgs_queued[i]))) - return ret; + /* validate the completion */ + if (wc->status != IBV_WC_SUCCESS) + goto err_terminate; + + if (wc->opcode == IBV_WC_SEND) + ++sd->msg_sqe_available; + + return 1; + +err_terminate: + td->terminate = true; + + return -1; +} + +/* + * server_cmpl_wait_and_poll - wait, poll and process a completion + * + * Return value: + * 0 or 1 - number of received completions + * -1 - in case of an error + */ +static int server_cmpl_wait_and_poll(struct thread_data *td, struct rpma_cq *cq, + struct ibv_wc *wc) +{ + int ret; + + ret = rpma_cq_wait(cq); + if (ret == RPMA_E_NO_COMPLETION) { + /* lack of completion is not an error */ + return 0; + } + if (ret) { + librpma_td_verror(td, ret, "rpma_cq_wait"); + td->terminate = true; + return -1; } - /* progress the queue */ - for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) { - memcpy(&sd->msgs_queued[i], - &sd->msgs_queued[qes_to_process + i], - sizeof(sd->msgs_queued[i])); + return server_cmpl_poll(td, cq, wc); +} + +static int server_queue_epoll_wait(struct thread_data *td) +{ + struct librpma_fio_server_data *csd = td->io_ops_data; + struct server_data *sd = csd->server_data; + struct epoll_event event; + struct ibv_wc cq_wc, rcq_wc; + int ret, rv; + + ret = epoll_wait(sd->ep_fd, &event, 1, 10000 /* timeout: 10s */); + if (ret < 0) + return ret; + + if (ret == 1 && event.data.fd == sd->rcq_fd) { + /* process the receive completion */ + rv = server_cmpl_wait_and_poll(td, sd->rcq, &rcq_wc); + if (rv < 0) + return rv; + + /* rv == 1 means rcq_wc.opcode == IBV_WC_RECV */ + if (rv == 1) { + /* ensure that at least one SQ slot is available */ + while (sd->msg_sqe_available == 0) { + /* process the send completion */ + rv = server_cmpl_poll(td, csd->cq, &cq_wc); + if (rv < 0) + return rv; + + if (sd->msg_sqe_available) + break; + + rv = server_cmpl_wait_and_poll(td, csd->cq, &cq_wc); + if (rv < 0) + return rv; + } + + if ((rv = server_qe_process(td, &rcq_wc))) + return rv; + } } - sd->msg_queued_nr -= qes_to_process; + if (ret == 1 && event.data.fd == sd->cq_fd) { + /* process the send completion */ + rv = server_cmpl_wait_and_poll(td, csd->cq, &cq_wc); + if (rv < 0) + return rv; + } return 0; } -static int server_cmpl_process(struct thread_data *td) +static int server_queue_poll(struct thread_data *td) { struct librpma_fio_server_data *csd = td->io_ops_data; struct server_data *sd = csd->server_data; - struct ibv_wc *wc = &sd->msgs_queued[sd->msg_queued_nr]; - struct librpma_fio_options_values *o = td->eo; + struct ibv_wc cq_wc, rcq_wc; int ret; - ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL); - if (ret == RPMA_E_NO_COMPLETION) { - if (o->busy_wait_polling) - return 0; /* lack of completion is not an error */ - - ret = rpma_cq_wait(csd->cq); - if (ret == RPMA_E_NO_COMPLETION) - return 0; /* lack of completion is not an error */ - if (ret) { - librpma_td_verror(td, ret, "rpma_cq_wait"); - goto err_terminate; - } - - ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL); - if (ret == RPMA_E_NO_COMPLETION) - return 0; /* lack of completion is not an error */ - if (ret) { - librpma_td_verror(td, ret, "rpma_cq_get_wc"); - goto err_terminate; - } - } else if (ret) { - librpma_td_verror(td, ret, "rpma_cq_get_wc"); - goto err_terminate; + /* process as many send completions as possible */ + while ((ret = server_cmpl_poll(td, csd->cq, &cq_wc))) { + if (ret < 0) + return ret; } - /* validate the completion */ - if (wc->status != IBV_WC_SUCCESS) - goto err_terminate; + /* process the receive completion */ + ret = server_cmpl_poll(td, sd->rcq, &rcq_wc); + if (ret < 0) + return ret; - if (wc->opcode == IBV_WC_RECV) - ++sd->msg_queued_nr; - else if (wc->opcode == IBV_WC_SEND) - ++sd->msg_sqe_available; + /* ret == 1 means rcq_wc.opcode == IBV_WC_RECV */ + if (ret == 1) { + /* ensure that at least one SQ slot is available */ + while (sd->msg_sqe_available == 0) { + /* process the send completion */ + ret = server_cmpl_poll(td, csd->cq, &cq_wc); + if (ret < 0) + return ret; + } + + if ((ret = server_qe_process(td, &rcq_wc))) + return ret; + } return 0; +} -err_terminate: - td->terminate = true; +static inline int server_queue_process(struct thread_data *td) +{ + struct librpma_fio_options_values *o = td->eo; + int ret; - return -1; + if (o->busy_wait_polling) + ret = server_queue_poll(td); + else + ret = server_queue_epoll_wait(td); + + return ret; } static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u) { do { - if (server_cmpl_process(td)) - return FIO_Q_BUSY; - if (server_queue_process(td)) return FIO_Q_BUSY; @@ -745,7 +916,7 @@ FIO_STATIC struct ioengine_ops ioengine_server = { .init = server_init, .post_init = server_post_init, .open_file = server_open_file, - .close_file = librpma_fio_server_close_file, + .close_file = server_close_file, .queue = server_queue, .invalidate = librpma_fio_file_nop, .cleanup = server_cleanup,