Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpma: make gpspm server use seperate RCQ #272

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 155 additions & 74 deletions engines/librpma_gpspm.c
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,10 @@ struct server_data {
/* resources for messaging buffer from DRAM allocated by fio */
struct rpma_mr_local *msg_mr;

uint32_t msg_sqe_available; /* # of free SQ slots */

/* in-memory queues */
struct ibv_wc *msgs_queued;
uint32_t msg_queued_nr;
/* # of free SQ slots */
uint32_t msg_sqe_available;
/* receive CQ */
struct rpma_cq *rcq;

librpma_fio_persist_fn persist;
};
Expand All @@ -401,13 +400,6 @@ static int server_init(struct thread_data *td)
goto err_server_cleanup;
}

/* allocate in-memory queue */
sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
if (sd->msgs_queued == NULL) {
td_verror(td, errno, "calloc");
goto err_free_sd;
}

#ifdef CONFIG_LIBPMEM2_INSTALLED
/* get libpmem2 persist function from pmem2_map */
sd->persist = pmem2_get_persist_fn(csd->mem.map);
Expand All @@ -427,9 +419,6 @@ static int server_init(struct thread_data *td)

return 0;

err_free_sd:
free(sd);

err_server_cleanup:
librpma_fio_server_cleanup(td);

Expand Down Expand Up @@ -500,7 +489,6 @@ static void server_cleanup(struct thread_data *td)
if ((ret = rpma_mr_dereg(&sd->msg_mr)))
librpma_td_verror(td, ret, "rpma_mr_dereg");

free(sd->msgs_queued);
free(sd);
}

Expand Down Expand Up @@ -533,6 +521,7 @@ static int prepare_connection(struct thread_data *td,
static int server_open_file(struct thread_data *td, struct fio_file *f)
{
struct librpma_fio_server_data *csd = td->io_ops_data;
struct server_data *sd = csd->server_data;
struct rpma_conn_cfg *cfg = NULL;
uint16_t max_msg_num = td->o.iodepth;
int ret;
Expand All @@ -546,7 +535,7 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
}

/*
* Calculate the required queue sizes where:
* The required queue sizes are:
* - the send queue (SQ) has to be big enough to accommodate
* all possible flush requests (SENDs)
* - the receive queue (RQ) has to be big enough to accommodate
Expand All @@ -562,12 +551,25 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
goto err_cfg_delete;
}
if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num))) {
librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
goto err_cfg_delete;
}
if ((ret = rpma_conn_cfg_set_rcq_size(cfg, max_msg_num))) {
librpma_td_verror(td, ret, "rpma_conn_cfg_set_rcq_size");
goto err_cfg_delete;
}
if ((ret = rpma_conn_cfg_set_compl_channel(cfg, true))) {
librpma_td_verror(td, ret, "rpma_conn_cfg_set_compl_channel");
goto err_cfg_delete;
}

if ((ret = librpma_fio_server_open_file(td, f, cfg)))
goto err_cfg_delete;

ret = librpma_fio_server_open_file(td, f, cfg);
/* get the connection's receive CQ */
if ((ret = rpma_conn_get_rcq(csd->conn, &sd->rcq)))
librpma_td_verror(td, ret, "rpma_conn_get_rcq");

err_cfg_delete:
(void) rpma_conn_cfg_delete(&cfg);
Expand Down Expand Up @@ -660,92 +662,171 @@ static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
return -1;
}

static inline int server_queue_process(struct thread_data *td)
/*
* server_cmpl_poll - poll and process a completion
*
* Return value:
* 0 or 1 - number of received completions
* -1 - in case of an error
*/
static int server_cmpl_poll(struct thread_data *td, struct rpma_cq *cq,
struct ibv_wc *wc)
{
struct librpma_fio_server_data *csd = td->io_ops_data;
struct server_data *sd = csd->server_data;
int ret;
int i;

/* min(# of queue entries, # of SQ entries available) */
uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
if (qes_to_process == 0)
ret = rpma_cq_get_wc(cq, 1, wc, NULL);
if (ret == RPMA_E_NO_COMPLETION) {
/* lack of completion is not an error */
return 0;
}
if (ret) {
librpma_td_verror(td, ret, "rpma_cq_get_wc");
goto err_terminate;
}

/* validate the completion */
if (wc->status != IBV_WC_SUCCESS)
goto err_terminate;

if (wc->opcode == IBV_WC_SEND)
++sd->msg_sqe_available;

return 1;

err_terminate:
td->terminate = true;

return -1;
}

static int server_queue_poll(struct thread_data *td)
{
struct librpma_fio_server_data *csd = td->io_ops_data;
struct server_data *sd = csd->server_data;
struct ibv_wc cq_wc, rcq_wc;
int ret;

/* process a receive completion */
ret = server_cmpl_poll(td, sd->rcq, &rcq_wc);
if (ret < 0)
return ret;

/* process queued completions */
for (i = 0; i < qes_to_process; ++i) {
if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
if (ret == 0) {
/* process a send completion if nothing to be done with RCQ */
ret = server_cmpl_poll(td, csd->cq, &cq_wc);
if (ret < 0)
return ret;
}

/* progress the queue */
for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
memcpy(&sd->msgs_queued[i],
&sd->msgs_queued[qes_to_process + i],
sizeof(sd->msgs_queued[i]));
return 0;
}

sd->msg_queued_nr -= qes_to_process;
/* here means rcq_wc.opcode == IBV_WC_RECV */

return 0;
/* ensure that at least one SQ slot is available */
while (sd->msg_sqe_available == 0) {
/* process a send completion */
ret = server_cmpl_poll(td, csd->cq, &cq_wc);
if (ret < 0)
return ret;
}

return server_qe_process(td, &rcq_wc);
}

static int server_cmpl_process(struct thread_data *td)
static int server_queue_wait_poll(struct thread_data *td)
{
struct librpma_fio_server_data *csd = td->io_ops_data;
struct server_data *sd = csd->server_data;
struct ibv_wc *wc = &sd->msgs_queued[sd->msg_queued_nr];
struct librpma_fio_options_values *o = td->eo;
struct rpma_cq *cq;
struct ibv_wc cq_wc, rcq_wc;
bool is_rcq;
int ret;

ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
if (ret == RPMA_E_NO_COMPLETION) {
if (o->busy_wait_polling)
return 0; /* lack of completion is not an error */

ret = rpma_cq_wait(csd->cq);
if (ret == RPMA_E_NO_COMPLETION)
return 0; /* lack of completion is not an error */
if (ret) {
librpma_td_verror(td, ret, "rpma_cq_wait");
goto err_terminate;
}
/* process a receive completion */
ret = server_cmpl_poll(td, sd->rcq, &rcq_wc);
if (ret < 0)
return ret;

ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
if (ret == RPMA_E_NO_COMPLETION)
return 0; /* lack of completion is not an error */
if (ret) {
librpma_td_verror(td, ret, "rpma_cq_get_wc");
goto err_terminate;
}
} else if (ret) {
librpma_td_verror(td, ret, "rpma_cq_get_wc");
goto err_terminate;
if (ret == 0) {
/* process all available send completions before wait for RCQ */
do {
ret = server_cmpl_poll(td, csd->cq, &cq_wc);
if (ret < 0)
return ret;
} while (ret);

do {
ret = rpma_conn_wait(csd->conn, &cq, &is_rcq);
if (ret) {
librpma_td_verror(td, ret, "rpma_conn_wait");
td->terminate = true;
return ret;
}

if (!is_rcq) {
/* process all available send completions */
do {
ret = server_cmpl_poll(td, cq, &cq_wc);
if (ret < 0)
return ret;
} while (ret);
} else {
/* process a receive completion */
ret = server_cmpl_poll(td, cq, &rcq_wc);
if (ret < 0)
return ret;
}
} while (!is_rcq);

/* return if there is still no receive completion */
if (ret == 0)
return ret;
}

/* validate the completion */
if (wc->status != IBV_WC_SUCCESS)
goto err_terminate;
/* here means rcq_wc.opcode == IBV_WC_RECV */

if (wc->opcode == IBV_WC_RECV)
++sd->msg_queued_nr;
else if (wc->opcode == IBV_WC_SEND)
++sd->msg_sqe_available;
/* ensure that at least one SQ slot is available */
while (sd->msg_sqe_available == 0) {
/* process a send completion */
ret = server_cmpl_poll(td, csd->cq, &cq_wc);
if (ret < 0)
return ret;

return 0;
if (ret == 0) {
do {
ret = rpma_conn_wait(csd->conn, &cq, &is_rcq);
if (ret) {
librpma_td_verror(td, ret, "rpma_conn_wait");
td->terminate = true;
return ret;
}
} while (is_rcq);

/* process a send completion again */
ret = server_cmpl_poll(td, cq, &cq_wc);
if (ret < 0)
return ret;
}
}

err_terminate:
td->terminate = true;
return server_qe_process(td, &rcq_wc);
}

return -1;
static inline int server_queue_process(struct thread_data *td)
{
struct librpma_fio_options_values *o = td->eo;

if (o->busy_wait_polling)
return server_queue_poll(td);
else
return server_queue_wait_poll(td);
}

static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
{
do {
if (server_cmpl_process(td))
return FIO_Q_BUSY;

if (server_queue_process(td))
return FIO_Q_BUSY;

Expand Down