From b14697ef529d3a159eb6e25d3d38e63a0d1fb610 Mon Sep 17 00:00:00 2001 From: root Date: Sun, 10 Dec 2023 15:32:40 +0000 Subject: [PATCH] Adding Com Channel fast data path capabilities --- src/client.cpp | 18 +- src/common.h | 44 ++++- src/defs.h | 4 +- src/doca_cc_helper.h | 444 +++++++++++++++++++++++++++++++++++++++++-- src/input_handlers.h | 30 ++- src/server.cpp | 18 +- src/sockperf.cpp | 19 +- 7 files changed, 549 insertions(+), 28 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index 767e5dc3..c005ca94 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -888,11 +888,25 @@ int Client::initBeforeLoop() { #if defined(USING_DOCA_COMM_CHANNEL_API) if (s_user_params.doca_comm_channel) { // Waiting for connection - struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)data->doca_cc_ctx; - while (ctx_client->state != CC_CONNECTED) { + doca_error_t result; + while (data->doca_cc_ctx->state != CC_CONNECTED) { doca_pe_progress(s_user_params.pe); } log_dbg("[fd=%d] Client connected successfully", ifd); + if (s_user_params.doca_fast_path) { + struct cc_local_mem_bufs *local_producer_mem = &(data->doca_cc_ctx->ctx_fifo.producer_mem); + // Buf is needed for registering with memrange + local_producer_mem->mem = m_pMsgRequest->getBuf(); + result = cc_init_local_mem_bufs(local_producer_mem, data->doca_cc_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init producer memory with error = %s", doca_error_get_name(result)); + return result; + } + log_dbg("[fd=%d] Init producer memory succeeded", ifd); + while (data->doca_cc_ctx->ctx_fifo.fifo_state != CC_FIFO_CONNECTED) { + doca_pe_progress(s_user_params.pe); + } + } } // Avoid Client binding in Com Channel mode if (p_client_bind_addr->addr.sa_family != AF_UNSPEC && !s_user_params.doca_comm_channel) { diff --git a/src/common.h b/src/common.h index 31a82df3..eefc28b1 100644 --- a/src/common.h +++ b/src/common.h @@ -115,6 +115,7 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes, #if defined(USING_DOCA_COMM_CHANNEL_API) if (s_user_params.doca_comm_channel) { doca_error_t doca_error; + struct doca_cc_producer_send_task *producer_task; struct doca_cc_send_task *task; struct doca_task *task_obj; struct timespec ts = { @@ -123,14 +124,26 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes, }; struct cc_ctx *ctx = g_fds_array[fd]->doca_cc_ctx; do { - if (s_user_params.mode == MODE_SERVER) { - struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx; - doca_error = doca_cc_server_send_task_alloc_init(ctx_server->server, ctx_server->ctx.connection, buf, - nbytes, &task); - } else { // MODE_CLIENT - struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)ctx; - doca_error = doca_cc_client_send_task_alloc_init(ctx_client->client, ctx_client->ctx.connection, buf, - nbytes, &task); + if (s_user_params.doca_fast_path) { + struct doca_buf *doca_buf; + //memcpy(ctx->ctx_fifo.producer_mem.mem, buf, nbytes); + doca_error = doca_buf_inventory_buf_get_by_data(ctx->ctx_fifo.producer_mem.buf_inv, ctx->ctx_fifo.producer_mem.mmap, + buf, nbytes, &doca_buf); + if (doca_error != DOCA_SUCCESS) { + log_err("Failed to get doca buf from producer mmap with error = %s", doca_error_get_name(doca_error)); + } + doca_error = doca_cc_producer_send_task_alloc_init(ctx->ctx_fifo.producer, doca_buf, + ctx->ctx_fifo.remote_consumer_id, &producer_task); + } else { // Not doca fast path + if (s_user_params.mode == MODE_SERVER) { + struct cc_ctx_server *ctx_server = (struct cc_ctx_server*)ctx; + doca_error = doca_cc_server_send_task_alloc_init(ctx_server->server, ctx_server->ctx.connection, buf, + nbytes, &task); + } else { // MODE_CLIENT + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)ctx; + doca_error = doca_cc_client_send_task_alloc_init(ctx_client->client, ctx_client->ctx.connection, buf, + nbytes, &task); + } } if (doca_error == DOCA_ERROR_NO_MEMORY) { // Queue is full of tasks, need to free tasks with completion callback @@ -147,7 +160,11 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes, ret = RET_SOCKET_SHUTDOWN; } } else { // task_alloc_init succeeded - task_obj = doca_cc_send_task_as_task(task); + if (s_user_params.doca_fast_path) { + task_obj = doca_cc_producer_send_task_as_task(producer_task); + } else { + task_obj = doca_cc_send_task_as_task(task); + } do { doca_error = doca_task_submit(task_obj); if (doca_error == DOCA_ERROR_AGAIN) { @@ -169,7 +186,14 @@ static inline int msg_sendto(int fd, uint8_t *buf, int nbytes, } } // Additional call for better performance- release pressure on send queue - doca_pe_progress(s_user_params.pe); + if (!s_user_params.doca_fast_path) { + doca_pe_progress(s_user_params.pe); + } else { + // for fast data path we need to make sure for completion in ping pong + while (doca_pe_progress(s_user_params.pe) == 0) { + nanosleep(&ts, &ts); + } + } } else #endif /* USING_DOCA_COMM_CHANNEL_API */ { diff --git a/src/defs.h b/src/defs.h index fea3ba3c..631bbc4b 100644 --- a/src/defs.h +++ b/src/defs.h @@ -301,7 +301,8 @@ enum { #if defined(USING_DOCA_COMM_CHANNEL_API) OPT_DOCA, OPT_PCI, - OPT_PCI_REP + OPT_PCI_REP, + OPT_DOCA_FAST_PATH #endif /* USING_DOCA_COMM_CHANNEL_API */ }; @@ -820,6 +821,7 @@ struct user_params_t { #endif /* DEFINED_TLS */ #if defined(USING_DOCA_COMM_CHANNEL_API) bool doca_comm_channel = false; /* Flag to indicate using Com Channel*/ + bool doca_fast_path = false; /* Flag to indicate using fast path*/ char cc_dev_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device PCI address */ char cc_dev_rep_pci_addr[PCI_ADDR_LEN]; /* Comm Channel DOCA device representor PCI address */ struct doca_pe *pe = nullptr; /* Progress engine for doca, one per thread*/ diff --git a/src/doca_cc_helper.h b/src/doca_cc_helper.h index e13f3256..a53b64ed 100644 --- a/src/doca_cc_helper.h +++ b/src/doca_cc_helper.h @@ -39,9 +39,17 @@ #include #include #include +#include +#include +#include +#include + #include "os_abstract.h" #define MSG_SIZE 4080 +static int MAX_BUF_SIZE = 65507 * 2; +#define MAX_BUFS 10 +#define CC_DATA_PATH_LOG_TASK_NUM 10 /* Maximum amount of CC consumer and producer task number */ #define PCI_ADDR_LEN 8 #define CC_MAX_QUEUE_SIZE 1024 /* Maximum amount of message in queue */ #define CC_REC_QUEUE_SIZE 10 /* Maximum amount of message in queue */ @@ -54,10 +62,30 @@ #define DOCA_LOG_ERR(format, ...) log_dbg(format, ##__VA_ARGS__) #define DOCA_LOG_DBG(format, ...) log_dbg(format, ##__VA_ARGS__) -enum cc_client_state { +enum cc_state { CONNECTION_IN_PROGRESS, CC_CONNECTED }; +enum cc_fifo_state { + FIFO_CONNECTION_IN_PROGRESS, + CC_FIFO_CONNECTED +}; +struct cc_local_mem_bufs { + void *mem; /* Memory address for DOCA buf mmap */ + struct doca_mmap *mmap; /* DOCA mmap object */ + struct doca_buf_inventory *buf_inv; /* DOCA buf inventory object */ +}; + +struct cc_ctx_fifo { + struct doca_cc_consumer *consumer; /**< CC consumer object */ + struct cc_local_mem_bufs consumer_mem; /**< Mmap and DOCA buf objects for consumer */ + struct doca_cc_producer *producer; /**< CC producer object */ + struct cc_local_mem_bufs producer_mem; /**< Mmap and DOCA buf objects for producer */ + uint32_t remote_consumer_id; /**< Consumer ID on the peer side */ + struct doca_pe *pe; /**< Progress Engine for */ + enum cc_fifo_state fifo_state; /**< Holding state for fast path connection >*/ +}; + struct cc_ctx { struct doca_dev *hw_dev; /**< Doca Device used per PCI address > */ struct doca_cc_connection *connection; /**< Connection object used for pairing a connection >*/ @@ -67,7 +95,10 @@ struct cc_ctx { bool recv_flag; /**< flag indicates when message received >*/ int fd; /**< File Descriptor >*/ os_mutex_t lock; /**< For underload mode only>*/ - os_cond_t cond; /**< For underload mode only>*/ + os_cond_t cond; /**< For underload mode only>*/ + enum cc_state state; /**< Holding state of client connection >*/ + bool fast_path; /**< Indicated for using fast data path*/ + struct cc_ctx_fifo ctx_fifo; /**< Data path objects */ }; struct cc_ctx_server { @@ -79,7 +110,6 @@ struct cc_ctx_server { struct cc_ctx_client { struct cc_ctx ctx; /**< Base common ctx >*/ struct doca_cc_client *client; /**< Client object >*/ - enum cc_client_state state; /**< Holding state of client connection >*/ bool underload_mode; /**< For using different callback>*/ }; @@ -98,6 +128,8 @@ struct priv_doca_pci_bdf { }; /************** General ******************/ +static doca_error_t cc_init_producer(struct cc_ctx *ctx); +static doca_error_t cc_init_consumer(struct cc_ctx *ctx); static doca_error_t cc_parse_pci_addr(char const *pci_addr, struct priv_doca_pci_bdf *out_bdf) { @@ -213,6 +245,58 @@ cc_open_doca_device_rep_with_pci(struct doca_dev *local, enum doca_devinfo_rep_f return DOCA_ERROR_NOT_FOUND; } +static doca_error_t +cc_init_local_mem_bufs(struct cc_local_mem_bufs *local_mem, struct cc_ctx *ctx) +{ + doca_error_t result; + + // if (local_mem->need_alloc_mem == true) { + // local_mem->mem = (char *)malloc(MAX_BUFS * MSG_SIZE); + // if (local_mem->mem == NULL) { + // result = DOCA_ERROR_NO_MEMORY; + // DOCA_LOG_ERR("Unable to alloc memory to mmap: %s", doca_error_get_descr(result)); + // return result; + // } + // } + result = doca_buf_inventory_create(MAX_BUFS, &(local_mem->buf_inv)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to create inventory: %s", doca_error_get_descr(result)); + } + + result = doca_buf_inventory_start(local_mem->buf_inv); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to start inventory: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_create(&local_mem->mmap); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to create mmap: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_add_dev(local_mem->mmap, ctx->hw_dev); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to add device to mmap: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_set_permissions(local_mem->mmap, DOCA_ACCESS_FLAG_PCI_READ_WRITE); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to set permission to mmap: %s", doca_error_get_descr(result)); + } + + // set here sockperf buf as local->mem + result = doca_mmap_set_memrange(local_mem->mmap, local_mem->mem, sizeof(uint8_t) * 65507 * 2); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to set memrange to mmap: %s", doca_error_get_descr(result)); + } + + result = doca_mmap_start(local_mem->mmap); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Unable to start mmap: %s", doca_error_get_descr(result)); + } + + return DOCA_SUCCESS; +} + /************** SERVER ******************/ /** @@ -230,7 +314,7 @@ cc_server_send_task_completion_callback(struct doca_cc_send_task *task, union do (void)user_data; (void)task_user_data; - // DOCA_LOG_INFO("Task sent successfully"); + DOCA_LOG_INFO("Task sent successfully"); doca_task_free(doca_cc_send_task_as_task(task)); } @@ -289,14 +373,117 @@ cc_server_message_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t *r /* Save the connection that the ping was sent over for sending the response */ struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; ctx_server->ctx.connection = cc_connection; - - //DOCA_LOG_INFO("Message received: '%d, pointer is %p", (int)msg_len, recv_buffer); + + DOCA_LOG_INFO("Message received: '%d, pointer is %p", (int)msg_len, recv_buffer); memcpy(ctx_server->ctx.recv_buffer, recv_buffer, msg_len); ctx_server->ctx.buf_size = (int)msg_len; ctx_server->ctx.recv_flag = true; } +/** + * Callback for consumer post recv task successfull completion + * + * @task [in]: Recv task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_consumer_recv_task_completion_callback(struct doca_cc_consumer_post_recv_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + size_t recv_msg_len; + void *recv_msg; + struct doca_buf *buf; + doca_error_t result; + + (void)task_user_data; + struct cc_ctx *ctx = (struct cc_ctx *)user_data.ptr; + + buf = doca_cc_consumer_post_recv_task_get_buf(task); + + result = doca_buf_get_data(buf, &recv_msg); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get data address from DOCA buf with error = %s", doca_error_get_name(result)); + } + + result = doca_buf_get_data_len(buf, &recv_msg_len); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get data length from DOCA buf with error = %s", doca_error_get_name(result)); + } + + //memcpy(ctx->recv_buffer, recv_msg, recv_msg_len); + ctx->buf_size = (int)recv_msg_len; + ctx->recv_flag = true; + + DOCA_LOG_INFO("Message received: '%.*s'", (int)recv_msg_len, (char *)recv_msg); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_consumer_post_recv_task_as_task(task)); +} + +/** + * Callback for consumer post recv task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_consumer_recv_task_completion_err_callback(struct doca_cc_consumer_post_recv_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + struct doca_buf *buf; + doca_error_t result; + + (void)task_user_data; + + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + result = doca_task_get_status(doca_cc_consumer_post_recv_task_as_task(task)); + DOCA_LOG_ERR("Consumer failed to recv message with error = %s", doca_error_get_name(result)); + + buf = doca_cc_consumer_post_recv_task_get_buf(task); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_consumer_post_recv_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_consumer_as_ctx(ctx_server->ctx.ctx_fifo.consumer)); +} + +static doca_error_t +cc_init_consumer(struct cc_ctx *ctx) +{ + doca_error_t result; + doca_data user_data; + struct doca_ctx *doca_ctx; + struct cc_local_mem_bufs *local_consumer_mem = &(ctx->ctx_fifo.consumer_mem); + + result = doca_cc_consumer_create(ctx->connection, local_consumer_mem->mmap, &(ctx->ctx_fifo.consumer)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to create consumer with error = %s", doca_error_get_name(result)); + return result; + } + doca_ctx = doca_cc_consumer_as_ctx(ctx->ctx_fifo.consumer); + result = doca_pe_connect_ctx(ctx->ctx_fifo.pe, doca_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding pe context to server with error = %s", doca_error_get_name(result)); + } + result = doca_cc_consumer_post_recv_task_set_conf(ctx->ctx_fifo.consumer, cc_consumer_recv_task_completion_callback, + cc_consumer_recv_task_completion_err_callback, CC_DATA_PATH_LOG_TASK_NUM); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting consumer recv task cbs with error = %s", doca_error_get_name(result)); + return result; + } + user_data.ptr = (void*) ctx; + result = doca_ctx_set_user_data(doca_ctx, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting consumer user data with error = %s", doca_error_get_name(result)); + return result; + } + result = doca_ctx_start(doca_ctx); + if (result != DOCA_ERROR_IN_PROGRESS) { + DOCA_LOG_ERR("Failed to start consumer context with error = %s", doca_error_get_name(result)); + } + return DOCA_SUCCESS; +} + /** * Callback for connection event * @@ -331,6 +518,21 @@ cc_server_connection_event_callback(struct doca_cc_event_connection_status_chang } ctx_server->ctx.num_connected_clients++; + ctx_server->ctx.connection = cc_conn; + + if (ctx_server->ctx.fast_path) { + /* Init a cc consumer */ + result = cc_init_consumer(&ctx_server->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a consumer with error = %s", ctx_server->ctx.fd, doca_error_get_name(result)); + } + /* Init a cc producer */ + result = cc_init_producer(&ctx_server->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a producer with error = %s", ctx_server->ctx.fd, doca_error_get_name(result)); + } + DOCA_LOG_INFO("Consumer & Producer were created successfully"); + } DOCA_LOG_INFO("[fd=%d] New client connected to server", ctx_server->ctx.fd); } @@ -369,8 +571,6 @@ cc_server_disconnection_event_callback(struct doca_cc_event_connection_status_ch DOCA_LOG_INFO("[fd=%d] client was disconnected from server", ctx_server->ctx.fd); } - - /** * Callback triggered whenever CC server context state changes * @@ -413,6 +613,54 @@ cc_server_state_changed_callback(const union doca_data user_data, struct doca_ct } } +/** + * Callback for new consumer arrival event + * + * @event [in]: New remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the new remote consumer + */ +static void +cc_server_new_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + union doca_data user_data; + struct doca_cc_server *cc_server; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_server = doca_cc_server_get_server_ctx(cc_connection); + + result = doca_ctx_get_user_data(doca_cc_server_as_ctx(cc_server), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + struct cc_ctx_server *ctx_server = (struct cc_ctx_server *)user_data.ptr; + ctx_server->ctx.ctx_fifo.remote_consumer_id = id; + DOCA_LOG_INFO("[fd=%d] Got a new remote consumer with ID = [%d]",ctx_server->ctx.fd, id); + ctx_server->ctx.ctx_fifo.fifo_state = CC_FIFO_CONNECTED; + +} + +/** + * Callback for expired consumer arrival event + * + * @event [in]: Expired remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the expired remote consumer + */ +static void +cc_server_expired_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + /* These arguments are not in use */ + (void)event; + (void)cc_connection; + (void)id; +} + + static doca_error_t cc_doca_server_set_params(struct cc_ctx_server *ctx_server) { @@ -457,6 +705,21 @@ cc_doca_server_set_params(struct cc_ctx_server *ctx_server) if (result != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); } + if (ctx_server->ctx.fast_path) { // Fast path option + result = doca_cc_server_event_consumer_register(ctx_server->server, cc_server_new_consumer_callback, + cc_server_expired_consumer_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding consumer event cb with error = %s", doca_error_get_name(result)); + } + struct cc_local_mem_bufs *local_consumer_mem = &(ctx_server->ctx.ctx_fifo.consumer_mem); + local_consumer_mem->mem = ctx_server->ctx.recv_buffer; + result = cc_init_local_mem_bufs(local_consumer_mem, &ctx_server->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init consumer memory with error = %s", doca_error_get_name(result)); + return result; + } + DOCA_LOG_DBG("Init consumer memory succeeded"); + } user_data.ptr = (void *)ctx_server; result = doca_ctx_set_user_data(ctx, user_data); @@ -468,7 +731,7 @@ cc_doca_server_set_params(struct cc_ctx_server *ctx_server) if (result != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to start server context with error = %s", doca_error_get_name(result)); } - DOCA_LOG_DBG("[fd=%d] server properties setters succeeded", ctx_server->ctx.fd); + DOCA_LOG_INFO("[fd=%d] server properties setters succeeded", ctx_server->ctx.fd); return result; } @@ -580,6 +843,91 @@ cc_client_message_UL_recv_callback(struct doca_cc_event_msg_recv *event, uint8_t os_mutex_unlock(&cc_client->ctx.lock); } + +/** + * Callback for producer send task successfull completion + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_producer_send_task_completion_callback(struct doca_cc_producer_send_task *task, union doca_data task_user_data, + union doca_data ctx_user_data) +{ + (void)task_user_data; + (void)ctx_user_data; + struct doca_buf *buf; + + DOCA_LOG_INFO("Producer task sent successfully"); + buf = doca_cc_producer_send_task_get_buf(task); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_producer_send_task_as_task(task)); +} + +/** + * Callback for producer send task completion with error + * + * @task [in]: Send task object + * @task_user_data [in]: User data for task + * @ctx_user_data [in]: User data for context + */ +static void +cc_producer_send_task_completion_err_callback(struct doca_cc_producer_send_task *task, union doca_data task_user_data, + union doca_data user_data) +{ + struct doca_buf *buf; + doca_error_t result; + + (void)task_user_data; + + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)user_data.ptr; + result = doca_task_get_status(doca_cc_producer_send_task_as_task(task)); + DOCA_LOG_ERR("Producer message failed to send with error = %s", + doca_error_get_name(result)); + + buf = doca_cc_producer_send_task_get_buf(task); + (void)doca_buf_dec_refcount(buf, NULL); + doca_task_free(doca_cc_producer_send_task_as_task(task)); + (void)doca_ctx_stop(doca_cc_producer_as_ctx(ctx_client->ctx.ctx_fifo.producer)); +} + +static doca_error_t +cc_init_producer(struct cc_ctx *ctx) +{ + doca_error_t result; + doca_data user_data; + struct doca_ctx *doca_ctx; + + result = doca_cc_producer_create(ctx->connection, &(ctx->ctx_fifo.producer)); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to create producer with error = %s", doca_error_get_name(result)); + return result; + } + doca_ctx = doca_cc_producer_as_ctx(ctx->ctx_fifo.producer); + result = doca_pe_connect_ctx(ctx->ctx_fifo.pe, doca_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding pe context to producer with error = %s", doca_error_get_name(result)); + } + result = doca_cc_producer_send_task_set_conf(ctx->ctx_fifo.producer, cc_producer_send_task_completion_callback, + cc_producer_send_task_completion_err_callback, CC_DATA_PATH_LOG_TASK_NUM); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting producer send task cbs with error = %s", doca_error_get_name(result)); + } + + user_data.ptr = (void*) ctx; + result = doca_ctx_set_user_data(doca_ctx, user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed setting producer user data with error = %s", doca_error_get_name(result)); + return result; + } + result = doca_ctx_start(doca_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to start producer context with error = %s", doca_error_get_name(result)); + } + return DOCA_SUCCESS; +} + /** * Init message on client * @@ -606,7 +954,18 @@ cc_init_client_send_message(struct cc_ctx_client *cc_client) return result; } - cc_client->state = CC_CONNECTED; + if (cc_client->ctx.fast_path) { + /* Init a cc producer */ + result = cc_init_producer(&cc_client->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a producer with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + } + result = cc_init_consumer(&cc_client->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("[fd=%d] Failed to init a consumer with error = %s", cc_client->ctx.fd, doca_error_get_name(result)); + } + } + cc_client->ctx.state = CC_CONNECTED; DOCA_LOG_INFO("[fd=%d] init_client_send_message succeeded", cc_client->ctx.fd); return DOCA_SUCCESS; } @@ -657,6 +1016,53 @@ cc_client_state_changed_callback(const union doca_data user_data, struct doca_ct break; } } +/** + * Callback for new consumer arrival event + * + * @event [in]: New remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the new remote consumer + */ +static void +cc_client_new_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + union doca_data user_data; + struct doca_cc_client *cc_client; + doca_error_t result; + + /* This argument is not in use */ + (void)event; + + cc_client = doca_cc_client_get_client_ctx(cc_connection); + + result = doca_ctx_get_user_data(doca_cc_client_as_ctx(cc_client), &user_data); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get user data from ctx with error = %s", doca_error_get_name(result)); + return; + } + + struct cc_ctx_client *ctx_client = (struct cc_ctx_client *)(user_data.ptr); + ctx_client->ctx.ctx_fifo.remote_consumer_id = id; + + ctx_client->ctx.ctx_fifo.fifo_state = CC_FIFO_CONNECTED; + DOCA_LOG_INFO("[fd=%d] Got a new remote consumer with ID = [%d]",ctx_client->ctx.fd, id); +} + +/** + * Callback for expired consumer arrival event + * + * @event [in]: Expired remote consumer event object + * @cc_connection [in]: The connection related to the consumer + * @id [in]: The ID of the expired remote consumer + */ +static void +cc_client_expired_consumer_callback(struct doca_cc_event_consumer *event, struct doca_cc_connection *cc_connection, uint32_t id) +{ + /* These arguments are not in use */ + (void)event; + (void)cc_connection; + (void)id; +} static doca_error_t cc_doca_client_set_params(struct cc_ctx_client *cc_client) @@ -697,8 +1103,24 @@ cc_doca_client_set_params(struct cc_ctx_client *cc_client) if (result != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to set msg size property with error = %s", doca_error_get_name(result)); } - user_data.ptr = (void *)cc_client; + if (cc_client->ctx.fast_path) { // Fast path option + result = doca_cc_client_event_consumer_register(cc_client->client, cc_client_new_consumer_callback, + cc_client_expired_consumer_callback); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed adding consumer event cb with error = %s", doca_error_get_name(result)); + } + struct cc_local_mem_bufs *local_consumer_mem = &(cc_client->ctx.ctx_fifo.consumer_mem); + local_consumer_mem->mem = cc_client->ctx.recv_buffer; + result = cc_init_local_mem_bufs(local_consumer_mem, &cc_client->ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to consumer memory with error = %s", doca_error_get_name(result)); + return result; + } + DOCA_LOG_DBG("Init consumer memory succeeded"); + } + + user_data.ptr = (void *)cc_client; result = doca_ctx_set_user_data(ctx, user_data); if (result != DOCA_SUCCESS) { DOCA_LOG_ERR("Failed to set ctx user data with error = %s", doca_error_get_name(result)); diff --git a/src/input_handlers.h b/src/input_handlers.h index b9483a0a..6c7c86f9 100644 --- a/src/input_handlers.h +++ b/src/input_handlers.h @@ -78,6 +78,7 @@ class RecvFromInputHandler : public MessageParser { .tv_nsec = NANOS_10_X_1000, }; struct cc_ctx *ctx = g_fds_array[fd]->doca_cc_ctx; + struct doca_buf *doca_buf; // todo: array of doca_buf according to queue size if (s_user_params.mode == MODE_CLIENT && !g_pApp->m_const_params.b_client_ping_pong && !g_pApp->m_const_params.b_stream) { // latency_under_load os_mutex_lock(&ctx->lock); @@ -85,8 +86,33 @@ class RecvFromInputHandler : public MessageParser { // UL only-> wait for signal, once done copy buffer os_cond_wait(&ctx->cond, &ctx->lock); } - } else { - // Waiting for meesage receive callback - blocking mode + } else { // ping pong or throughput + if (s_user_params.doca_fast_path) { + struct doca_cc_consumer_post_recv_task *consumer_task; + struct doca_task *task_obj; + doca_error_t result = DOCA_SUCCESS; + /**/ + // doca_buf_inventory_buf_get_by_addr(ctx->ctx_fifo.consumer_mem.buf_inv, ctx->ctx_fifo.consumer_mem.mmap, + // (ctx->ctx_fifo.consumer_mem.mem + (MSG_SIZE * counter) % SIZE_OF_QUEUE), MSG_SIZE, &doca_buf); + result = doca_buf_inventory_buf_get_by_addr(ctx->ctx_fifo.consumer_mem.buf_inv, ctx->ctx_fifo.consumer_mem.mmap, + buf, MSG_SIZE, &doca_buf); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to get doca buf from consumer mmap with error = %s", doca_error_get_name(result)); + return RET_SOCKET_SHUTDOWN; + } + result = doca_cc_consumer_post_recv_task_alloc_init(ctx->ctx_fifo.consumer, doca_buf, &consumer_task); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to allocate task for consumer with error = %s", doca_error_get_name(result)); + return RET_SOCKET_SHUTDOWN; + } + task_obj = doca_cc_consumer_post_recv_task_as_task(consumer_task); + result = doca_task_submit(task_obj); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("failed submitting send task with error = %s", doca_error_get_name(result)); + return RET_SOCKET_SHUTDOWN; + } + } // end of doca fast path + // Waiting for meesage receive callback - blocking mode if (s_user_params.is_blocked) { while (!ctx->recv_flag) { if (doca_pe_progress(s_user_params.pe) == 0) { diff --git a/src/server.cpp b/src/server.cpp index 401f19ec..a21e4702 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -96,7 +96,23 @@ int ServerBase::initBeforeLoop() { std::string hostport = sockaddr_to_hostport(p_bind_addr); #if defined(USING_DOCA_COMM_CHANNEL_API) - if (!s_user_params.doca_comm_channel && bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { + if (s_user_params.doca_comm_channel && s_user_params.doca_fast_path) { + doca_error_t result; + struct cc_local_mem_bufs *local_producer_mem = &(g_fds_array[ifd]->doca_cc_ctx->ctx_fifo.producer_mem); + // Buf is needed for registering with memrange + local_producer_mem->mem = g_fds_array[ifd]->doca_cc_ctx->recv_buffer; + result = cc_init_local_mem_bufs(local_producer_mem, g_fds_array[ifd]->doca_cc_ctx); + if (result != DOCA_SUCCESS) { + DOCA_LOG_ERR("Failed to init producer memory with error = %s", doca_error_get_name(result)); + return result; + } + log_dbg("[fd=%d] Init producer memory succeeded", ifd); + // Waiting for connection recv before using fast path + while (g_fds_array[ifd]->doca_cc_ctx->ctx_fifo.fifo_state != CC_FIFO_CONNECTED) { + doca_pe_progress(s_user_params.pe); + } + log_dbg("[fd=%d] New client connected successfully", ifd); + } else if (!s_user_params.doca_comm_channel && bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { #else log_dbg("[fd=%d] Binding to: %s...", ifd, hostport.c_str()); if (bind(ifd, reinterpret_cast(p_bind_addr), bind_addr_len) < 0) { diff --git a/src/sockperf.cpp b/src/sockperf.cpp index c568d43e..932b88f5 100644 --- a/src/sockperf.cpp +++ b/src/sockperf.cpp @@ -303,6 +303,8 @@ static const AOPT_DESC common_opt_desc[] = { #if defined(USING_DOCA_COMM_CHANNEL_API) { OPT_DOCA, AOPT_NOARG, aopt_set_literal(0), aopt_set_string("doca-comm-channel"), "Use Doca communication channel" }, + { OPT_DOCA_FAST_PATH, AOPT_NOARG, aopt_set_literal(0), + aopt_set_string("doca-fast-path"), "Use Doca fast data path (required doca-comm-channel option)" }, { OPT_PCI, AOPT_ARG, aopt_set_literal(0), aopt_set_string("pci-address"), "Comm Channel DOCA device PCI address"}, { OPT_PCI_REP, AOPT_ARG, aopt_set_literal(0), @@ -2265,6 +2267,14 @@ static int parse_common_opt(const AOPT_OBJECT *common_obj) { } log_dbg("doca_pe_create succeeded"); } + if (!rc && aopt_check(common_obj, OPT_DOCA_FAST_PATH)) { + if (!aopt_check(common_obj, OPT_DOCA)) { + log_msg("--doca-comm-channel is required for fast path option"); + rc = SOCKPERF_ERR_BAD_ARGUMENT; + } else { + s_user_params.doca_fast_path = true; + } + } #endif /* USING_DOCA_COMM_CHANNEL_API */ } @@ -3624,6 +3634,13 @@ int bringup_for_doca(std::unique_ptr &tmp) os_mutex_init(&cc_ctx.lock); os_cond_init(&cc_ctx.cond); + if (s_user_params.doca_fast_path) { + cc_ctx.fast_path = true; + cc_ctx.ctx_fifo.fifo_state = FIFO_CONNECTION_IN_PROGRESS; + cc_ctx.ctx_fifo.pe = s_user_params.pe; + } else { + cc_ctx.fast_path = false; + } struct priv_doca_pci_bdf dev_pcie = {0}; doca_error_t doca_error = DOCA_SUCCESS; struct doca_ctx *ctx; @@ -3684,7 +3701,7 @@ int bringup_for_doca(std::unique_ptr &tmp) } else { // MODE_CLIENT ctx_client = (struct cc_ctx_client*)MALLOC(sizeof(struct cc_ctx_client)); - ctx_client->state = CONNECTION_IN_PROGRESS; + cc_ctx.state = CONNECTION_IN_PROGRESS; cc_ctx.recv_flag = false; if (!s_user_params.b_client_ping_pong && !s_user_params.b_stream) { // latency_under_load ctx_client->underload_mode = true;