Skip to content

Commit

Permalink
rpma: make gpspm server use separate RCQ
Browse files Browse the repository at this point in the history
Also use shared completion channel.

Signed-off-by: Xiao Yang <[email protected]>
  • Loading branch information
yangx-jy committed May 13, 2022
1 parent 1563444 commit d64eeb3
Showing 1 changed file with 155 additions and 74 deletions.
229 changes: 155 additions & 74 deletions engines/librpma_gpspm.c
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,10 @@ struct server_data {
/* resources for messaging buffer from DRAM allocated by fio */
struct rpma_mr_local *msg_mr;

uint32_t msg_sqe_available; /* # of free SQ slots */

/* in-memory queues */
struct ibv_wc *msgs_queued;
uint32_t msg_queued_nr;
/* # of free SQ slots */
uint32_t msg_sqe_available;
/* receive CQ */
struct rpma_cq *rcq;

librpma_fio_persist_fn persist;
};
Expand All @@ -401,13 +400,6 @@ static int server_init(struct thread_data *td)
goto err_server_cleanup;
}

/* allocate in-memory queue */
sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
if (sd->msgs_queued == NULL) {
td_verror(td, errno, "calloc");
goto err_free_sd;
}

#ifdef CONFIG_LIBPMEM2_INSTALLED
/* get libpmem2 persist function from pmem2_map */
sd->persist = pmem2_get_persist_fn(csd->mem.map);
Expand All @@ -427,9 +419,6 @@ static int server_init(struct thread_data *td)

return 0;

err_free_sd:
free(sd);

err_server_cleanup:
librpma_fio_server_cleanup(td);

Expand Down Expand Up @@ -500,7 +489,6 @@ static void server_cleanup(struct thread_data *td)
if ((ret = rpma_mr_dereg(&sd->msg_mr)))
librpma_td_verror(td, ret, "rpma_mr_dereg");

free(sd->msgs_queued);
free(sd);
}

Expand Down Expand Up @@ -533,6 +521,7 @@ static int prepare_connection(struct thread_data *td,
static int server_open_file(struct thread_data *td, struct fio_file *f)
{
struct librpma_fio_server_data *csd = td->io_ops_data;
struct server_data *sd = csd->server_data;
struct rpma_conn_cfg *cfg = NULL;
uint16_t max_msg_num = td->o.iodepth;
int ret;
Expand All @@ -546,7 +535,7 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
}

/*
* Calculate the required queue sizes where:
* The required queue sizes are:
* - the send queue (SQ) has to be big enough to accommodate
* all possible flush requests (SENDs)
* - the receive queue (RQ) has to be big enough to accommodate
Expand All @@ -562,12 +551,25 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
goto err_cfg_delete;
}
if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num))) {
librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
goto err_cfg_delete;
}
if ((ret = rpma_conn_cfg_set_rcq_size(cfg, max_msg_num))) {
librpma_td_verror(td, ret, "rpma_conn_cfg_set_rcq_size");
goto err_cfg_delete;
}
if ((ret = rpma_conn_cfg_set_compl_channel(cfg, true))) {
librpma_td_verror(td, ret, "rpma_conn_cfg_set_compl_channel");
goto err_cfg_delete;
}

if ((ret = librpma_fio_server_open_file(td, f, cfg)))
goto err_cfg_delete;

ret = librpma_fio_server_open_file(td, f, cfg);
/* get the connection's receive CQ */
if ((ret = rpma_conn_get_rcq(csd->conn, &sd->rcq)))
librpma_td_verror(td, ret, "rpma_conn_get_rcq");

err_cfg_delete:
(void) rpma_conn_cfg_delete(&cfg);
Expand Down Expand Up @@ -660,92 +662,171 @@ static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
return -1;
}

static inline int server_queue_process(struct thread_data *td)
/*
* server_cmpl_poll - poll and process a completion
*
* Return value:
* 0 or 1 - number of received completions
* -1 - in case of an error
*/
static int server_cmpl_poll(struct thread_data *td, struct rpma_cq *cq,
struct ibv_wc *wc)
{
struct librpma_fio_server_data *csd = td->io_ops_data;
struct server_data *sd = csd->server_data;
int ret;
int i;

/* min(# of queue entries, # of SQ entries available) */
uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
if (qes_to_process == 0)
ret = rpma_cq_get_wc(cq, 1, wc, NULL);
if (ret == RPMA_E_NO_COMPLETION) {
/* lack of completion is not an error */
return 0;
}
if (ret) {
librpma_td_verror(td, ret, "rpma_cq_get_wc");
goto err_terminate;
}

/* validate the completion */
if (wc->status != IBV_WC_SUCCESS)
goto err_terminate;

if (wc->opcode == IBV_WC_SEND)
++sd->msg_sqe_available;

return 1;

err_terminate:
td->terminate = true;

return -1;
}

static int server_queue_poll(struct thread_data *td)
{
struct librpma_fio_server_data *csd = td->io_ops_data;
struct server_data *sd = csd->server_data;
struct ibv_wc cq_wc, rcq_wc;
int ret;

/* process a receive completion */
ret = server_cmpl_poll(td, sd->rcq, &rcq_wc);
if (ret < 0)
return ret;

/* process queued completions */
for (i = 0; i < qes_to_process; ++i) {
if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
if (ret == 0) {
/* process a send completion if nothing to be done with RCQ */
ret = server_cmpl_poll(td, csd->cq, &cq_wc);
if (ret < 0)
return ret;
}

/* progress the queue */
for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
memcpy(&sd->msgs_queued[i],
&sd->msgs_queued[qes_to_process + i],
sizeof(sd->msgs_queued[i]));
return 0;
}

sd->msg_queued_nr -= qes_to_process;
/* here means rcq_wc.opcode == IBV_WC_RECV */

return 0;
/* ensure that at least one SQ slot is available */
while (sd->msg_sqe_available == 0) {
/* process a send completion */
ret = server_cmpl_poll(td, csd->cq, &cq_wc);
if (ret < 0)
return ret;
}

return server_qe_process(td, &rcq_wc);
}

static int server_cmpl_process(struct thread_data *td)
static int server_queue_wait_poll(struct thread_data *td)
{
struct librpma_fio_server_data *csd = td->io_ops_data;
struct server_data *sd = csd->server_data;
struct ibv_wc *wc = &sd->msgs_queued[sd->msg_queued_nr];
struct librpma_fio_options_values *o = td->eo;
struct rpma_cq *cq;
struct ibv_wc cq_wc, rcq_wc;
bool is_rcq;
int ret;

ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
if (ret == RPMA_E_NO_COMPLETION) {
if (o->busy_wait_polling)
return 0; /* lack of completion is not an error */

ret = rpma_cq_wait(csd->cq);
if (ret == RPMA_E_NO_COMPLETION)
return 0; /* lack of completion is not an error */
if (ret) {
librpma_td_verror(td, ret, "rpma_cq_wait");
goto err_terminate;
}
/* process a receive completion */
ret = server_cmpl_poll(td, sd->rcq, &rcq_wc);
if (ret < 0)
return ret;

ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
if (ret == RPMA_E_NO_COMPLETION)
return 0; /* lack of completion is not an error */
if (ret) {
librpma_td_verror(td, ret, "rpma_cq_get_wc");
goto err_terminate;
}
} else if (ret) {
librpma_td_verror(td, ret, "rpma_cq_get_wc");
goto err_terminate;
if (ret == 0) {
/* process all available send completions before wait for RCQ */
do {
ret = server_cmpl_poll(td, csd->cq, &cq_wc);
if (ret < 0)
return ret;
} while (ret);

do {
ret = rpma_conn_wait(csd->conn, &cq, &is_rcq);
if (ret) {
librpma_td_verror(td, ret, "rpma_conn_wait");
td->terminate = true;
return ret;
}

if (!is_rcq) {
/* process all available send completions */
do {
ret = server_cmpl_poll(td, cq, &cq_wc);
if (ret < 0)
return ret;
} while (ret);
} else {
/* process a receive completion */
ret = server_cmpl_poll(td, cq, &rcq_wc);
if (ret < 0)
return ret;
}
} while (!is_rcq);

/* return if there is still no receive completion */
if (ret == 0)
return ret;
}

/* validate the completion */
if (wc->status != IBV_WC_SUCCESS)
goto err_terminate;
/* here means rcq_wc.opcode == IBV_WC_RECV */

if (wc->opcode == IBV_WC_RECV)
++sd->msg_queued_nr;
else if (wc->opcode == IBV_WC_SEND)
++sd->msg_sqe_available;
/* ensure that at least one SQ slot is available */
while (sd->msg_sqe_available == 0) {
/* process a send completion */
ret = server_cmpl_poll(td, csd->cq, &cq_wc);
if (ret < 0)
return ret;

return 0;
if (ret == 0) {
do {
ret = rpma_conn_wait(csd->conn, &cq, &is_rcq);
if (ret) {
librpma_td_verror(td, ret, "rpma_conn_wait");
td->terminate = true;
return ret;
}
} while (is_rcq);

/* process a send completion again */
ret = server_cmpl_poll(td, cq, &cq_wc);
if (ret < 0)
return ret;
}
}

err_terminate:
td->terminate = true;
return server_qe_process(td, &rcq_wc);
}

return -1;
static inline int server_queue_process(struct thread_data *td)
{
struct librpma_fio_options_values *o = td->eo;

if (o->busy_wait_polling)
return server_queue_poll(td);
else
return server_queue_wait_poll(td);
}

static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
{
do {
if (server_cmpl_process(td))
return FIO_Q_BUSY;

if (server_queue_process(td))
return FIO_Q_BUSY;

Expand Down

0 comments on commit d64eeb3

Please sign in to comment.