diff --git a/lib/src/zap/ugni/zap_ugni.c b/lib/src/zap/ugni/zap_ugni.c index 25695f9aa..0126626ee 100644 --- a/lib/src/zap/ugni/zap_ugni.c +++ b/lib/src/zap/ugni/zap_ugni.c @@ -81,17 +81,6 @@ (elm)->link.le_prev = 0; \ } while(0) -static int __set_sockbuf_sz(int sockfd) -{ - int rc; - size_t optval = UGNI_SOCKBUF_SZ; - rc = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval)); - if (rc) - return rc; - rc = setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval)); - return rc; -} - static char *format_4tuple(struct zap_ep *ep, char *str, size_t len) { struct sockaddr la = {0}; @@ -227,8 +216,10 @@ static int zap_ugni_stalled_timeout; static LIST_HEAD(, z_ugni_ep) deferred_list = LIST_HEAD_INITIALIZER(0); static pthread_mutex_t deferred_list_mutex = PTHREAD_MUTEX_INITIALIZER; static uint32_t ugni_io_count = 0; -static uint32_t ugni_post_count = 0; #endif /* DEBUG */ +static uint32_t ugni_post_count; +static uint32_t ugni_post_max; +static uint32_t ugni_post_id; static pthread_mutex_t ugni_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t inst_id_cond = PTHREAD_COND_INITIALIZER; @@ -277,6 +268,43 @@ const char *zap_ugni_type_str(zap_ugni_type_t type) return __zap_ugni_type_str[type]; } +/* + * Use KEEP-ALIVE packets to shut down a connection if the remote peer fails + * to respond for 10 minutes + */ +#define ZAP_SOCK_KEEPCNT 10 /* Give up after 10 failed probes */ +#define ZAP_SOCK_KEEPIDLE 30 /* Start probing after 30s of inactivity */ +#define ZAP_SOCK_KEEPINTVL 60 /* Probe once a minute */ + +static int __set_keep_alive(int sock) +{ + int rc; + int optval; + + optval = ZAP_SOCK_KEEPCNT; + rc = setsockopt(sock, SOL_TCP, TCP_KEEPCNT, &optval, sizeof(int)); + + optval = ZAP_SOCK_KEEPIDLE; + rc = setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(int)); + + optval = ZAP_SOCK_KEEPINTVL; + rc = setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(int)); + + optval = 1; + rc = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(int)); + return rc; +} + +static int __set_sockbuf_sz(int sockfd) +{ + int rc; + size_t optval = UGNI_SOCKBUF_SZ; + rc = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval)); + if (rc) + return rc; + rc = setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval)); + return rc; +} static int __sock_nonblock(int fd) { int rc; @@ -460,6 +488,20 @@ void z_ugni_cleanup(void) free(zap_ugni_ep_id); } +static void __ep_release(struct z_ugni_ep *uep) +{ + gni_return_t grc; + if (uep->gni_ep) { + grc = GNI_EpUnbind(uep->gni_ep); + if (grc) + LOG_(uep, "GNI_EpUnbind() error: %s\n", gni_ret_str(grc)); + grc = GNI_EpDestroy(uep->gni_ep); + if (grc != GNI_RC_SUCCESS) + LOG_(uep, "GNI_EpDestroy() error: %s\n", gni_ret_str(grc)); + uep->gni_ep = NULL; + } +} + static zap_err_t z_ugni_close(zap_ep_t ep) { struct z_ugni_ep *uep = (struct z_ugni_ep *)ep; @@ -467,11 +509,11 @@ static zap_err_t z_ugni_close(zap_ep_t ep) DLOG_(uep, "Closing xprt: %p, state: %s\n", uep, __zap_ep_state_str(uep->ep.state)); pthread_mutex_lock(&uep->ep.lock); + uep->ep.state = ZAP_EP_CLOSE; switch (uep->ep.state) { case ZAP_EP_LISTENING: case ZAP_EP_CONNECTED: case ZAP_EP_PEER_CLOSE: - uep->ep.state = ZAP_EP_CLOSE; shutdown(uep->sock, SHUT_RDWR); break; case ZAP_EP_ERROR: @@ -537,7 +579,14 @@ static zap_err_t z_ugni_connect(zap_ep_t ep, if (__set_sockbuf_sz(uep->sock)) { zerr = ZAP_ERR_TRANSPORT; - LOG_(uep, "Error %d: fail to set the sockbuf sz in %s.\n", + LOG_(uep, "Error %d: setting the sockbuf sz in %s.\n", + errno, __func__); + goto out; + } + + if (__set_keep_alive(uep->sock)) { + zerr = ZAP_ERR_TRANSPORT; + LOG_(uep, "Error %d: enabling keep-alive in %s.\n", errno, __func__); goto out; } @@ -715,7 +764,6 @@ static void process_uep_msg_accepted(struct z_ugni_ep *uep) "%s: Unexpected state '%s'. " "Expected state 'ZAP_EP_CONNECTING'\n", __func__, __zap_ep_state_str(uep->ep.state)); - struct zap_event ev; struct zap_ugni_msg_accepted *msg; zap_err_t zerr; @@ -757,9 +805,11 @@ static void process_uep_msg_accepted(struct z_ugni_ep *uep) #endif /* ZAP_UGNI_DEBUG */ uep->ugni_ep_bound = 1; - ev.type = ZAP_EVENT_CONNECTED; - ev.data_len = msg->data_len; - ev.data = (msg->data_len)?((void*)msg->data):(NULL); + struct zap_event ev = { + .type = ZAP_EVENT_CONNECTED, + .data_len = msg->data_len, + .data = (msg->data_len ? (void*)msg->data : NULL) + }; if (!zap_ep_change_state(&uep->ep, ZAP_EP_CONNECTING, ZAP_EP_CONNECTED)) { uep->ep.cb((void*)uep, &ev); } else { @@ -777,7 +827,6 @@ static void process_uep_msg_accepted(struct z_ugni_ep *uep) static void process_uep_msg_connect(struct z_ugni_ep *uep) { struct zap_ugni_msg_connect *msg; - struct zap_event ev; msg = (void*)uep->rbuff->data; @@ -817,9 +866,11 @@ static void process_uep_msg_connect(struct z_ugni_ep *uep) uep->ugni_ep_bound = 1; pthread_mutex_unlock(&uep->ep.lock); - ev.type = ZAP_EVENT_CONNECT_REQUEST; - ev.data_len = msg->data_len; - ev.data = (msg->data_len)?((void*)msg->data):(NULL); + struct zap_event ev = { + .type = ZAP_EVENT_CONNECT_REQUEST, + .data_len = msg->data_len, + .data = (msg->data_len)?((void*)msg->data):(NULL) + }; uep->ep.cb(&uep->ep, &ev); return; @@ -832,15 +883,15 @@ static void process_uep_msg_connect(struct z_ugni_ep *uep) static void process_uep_msg_rejected(struct z_ugni_ep *uep) { struct zap_ugni_msg_regular *msg; - struct zap_event ev; int rc; msg = (void*)uep->rbuff->data; - ev.type = ZAP_EVENT_REJECTED; - ev.status = ZAP_ERR_OK; - ev.data_len = ntohl(msg->data_len); - ev.data = (ev.data_len)?((void*)msg->data):(NULL); + struct zap_event ev = { + .type = ZAP_EVENT_REJECTED, + .data_len = ntohl(msg->data_len), + .data = (ev.data_len ? (void*)msg->data : NULL) + }; rc = zap_ep_change_state(&uep->ep, ZAP_EP_CONNECTING, ZAP_EP_ERROR); if (rc != ZAP_ERR_OK) { return; @@ -860,8 +911,7 @@ static void process_uep_msg_ack_accepted(struct z_ugni_ep *uep) return; } struct zap_event ev = { - .type = ZAP_EVENT_CONNECTED, - .status = ZAP_ERR_OK, + .type = ZAP_EVENT_CONNECTED }; zap_get_ep(&uep->ep); /* Release when receive disconnect/error event */ uep->ep.cb(&uep->ep, &ev); @@ -1079,11 +1129,13 @@ int zap_ugni_err_handler(gni_cq_handle_t cq, gni_cq_entry_t cqe, static gni_return_t process_cq(gni_cq_handle_t cq, gni_cq_entry_t cqe_) { + struct zap_event zev; gni_cq_entry_t cqe = cqe_; gni_return_t grc; gni_post_descriptor_t *post; int count = 0; do { + memset(&zev, 0, sizeof(zev)); count++; if (GNI_CQ_GET_TYPE(cqe) != GNI_CQ_EVENT_TYPE_POST) { zap_ugni_log("Unexepcted cqe type %d cqe" @@ -1104,12 +1156,13 @@ static gni_return_t process_cq(gni_cq_handle_t cq, gni_cq_entry_t cqe_) DLOG("process_cq: post is NULL\n"); goto skip; } + __sync_sub_and_fetch(&ugni_post_count, 1); #ifdef DEBUG assert(ugni_post_count >= 0); - __sync_sub_and_fetch(&ugni_post_count, 1); #endif /* DEBUG */ struct zap_ugni_post_desc *desc = (void*) post; if (grc) { + zap_ugni_log("GNI_GetCompleted returned %s\n", gni_ret_str(grc)); if (0 == zap_ugni_err_handler(cq, cqe, desc)) __shutdown_on_error(desc->uep); else @@ -1124,7 +1177,7 @@ static gni_return_t process_cq(gni_cq_handle_t cq, gni_cq_entry_t cqe_) * has been flushed. The corresponding endpoint * might have been freed already. */ - LOG("%s: Received a complete event of a stalled post " + LOG("%s: Received a CQ event for a stalled post " "desc.\n", desc->ep_name); ZUGNI_LIST_REMOVE(desc, stalled_link); free(desc); @@ -1148,14 +1201,12 @@ static gni_return_t process_cq(gni_cq_handle_t cq, gni_cq_entry_t cqe_) if (uep->deferred_link.le_prev) LOG_(uep, "uep %p: Doh!! I'm on the deferred list.\n", uep); #endif /* DEBUG */ - struct zap_event zev = {0}; switch (desc->post.type) { case GNI_POST_RDMA_GET: - DLOG_(uep, "RDMA_GET: Read complete %p with %s\n", - desc, gni_ret_str(grc)); + DLOG_(uep, "RDMA_GET: Read complete %p with %s\n", desc, gni_ret_str(grc)); if (grc) { zev.status = ZAP_ERR_RESOURCE; - DLOG_(uep, "RDMA_GET: completing " + LOG_(uep, "RDMA_GET: completing " "with error %s.\n", gni_ret_str(grc)); } @@ -1163,7 +1214,7 @@ static gni_return_t process_cq(gni_cq_handle_t cq, gni_cq_entry_t cqe_) zev.context = desc->context; break; case GNI_POST_RDMA_PUT: - DLOG_(uep, "RDMA_GET: Read complete %p with %s\n", + DLOG_(uep, "RDMA_PUT: Write complete %p with %s\n", desc, gni_ret_str(grc)); if (grc) { zev.status = ZAP_ERR_RESOURCE; @@ -1188,10 +1239,11 @@ static gni_return_t process_cq(gni_cq_handle_t cq, gni_cq_entry_t cqe_) skip: pthread_mutex_lock(&ugni_lock); grc = GNI_CqGetEvent(cq, &cqe); + pthread_mutex_unlock(&ugni_lock); if (grc == GNI_RC_ERROR_RESOURCE) { zap_ugni_log("CQ overrun!\n"); + break; } - pthread_mutex_unlock(&ugni_lock); } while (grc != GNI_RC_NOT_DONE); if (count > 1) DLOG("process_cq: count %d\n", count); @@ -1270,12 +1322,12 @@ static void *cq_thread_proc(void *arg) static void *error_thread_proc(void *args) { - gni_err_handle_t err_hndl; - gni_error_event_t ev; - gni_return_t status; - uint32_t num; + gni_err_handle_t err_hndl; + gni_error_event_t ev; + gni_return_t status; + uint32_t num; - gni_error_mask_t err = + gni_error_mask_t err = GNI_ERRMASK_CORRECTABLE_MEMORY | GNI_ERRMASK_CRITICAL | GNI_ERRMASK_TRANSACTION | @@ -1381,21 +1433,11 @@ __ugni_send(struct z_ugni_ep *uep, enum zap_ugni_msg_type type, static void __deliver_disconnect_ev(struct z_ugni_ep *uep) { - /* Make certain we release the NTT resources */ - while (uep->ugni_ep_bound) { - gni_return_t grc = GNI_EpUnbind(uep->gni_ep); - if (grc != GNI_RC_NOT_DONE) - break; - zap_ugni_log("%s: GNI_EpUnbind returns GNI_RC_NOT_DONE...retrying\n"); - sleep(1); - } - /* Deliver the disconnected event */ pthread_mutex_lock(&z_ugni_list_mutex); #ifdef DEBUG zap_ugni_ep_id[uep->ep_id] = -1; #endif /* DEBUG */ - pthread_mutex_lock(&uep->ep.lock); #ifdef DEBUG /* It is in the queue already. */ @@ -1528,6 +1570,9 @@ static void ugni_sock_event(ovis_event_t ev) /* Reaching here means bev is one of the EOF, ERROR or TIMEOUT */ pthread_mutex_lock(&uep->ep.lock); + int defer = 0; + if (!LIST_EMPTY(&uep->post_desc_list)) + defer = 1; rc = ovis_scheduler_event_del(io_sched, ev); assert(rc == 0); /* ev must be in the scheduler, otherwise it is a bug */ switch (uep->ep.state) { @@ -1563,24 +1608,12 @@ static void ugni_sock_event(ovis_event_t ev) } DLOG_(uep, "%s: ep %p: state %s\n", __func__, uep, __zap_ep_state_str[uep->ep.state]); - int defer = 0; - if (uep->ugni_ep_bound) { - gni_return_t grc = GNI_EpUnbind(uep->gni_ep); - if (grc) - DLOG_(uep, "GNI_EpUnbind() error: %s\n", gni_ret_str(grc)); - if (grc == GNI_RC_NOT_DONE) - defer = 1; - else - uep->ugni_ep_bound = 0; - } - if (!LIST_EMPTY(&uep->post_desc_list)) - defer = 1; pthread_mutex_unlock(&uep->ep.lock); if (defer) { /* - * Defer to give time to uGNI to flush the outstanding - * completion events - */ + * Allow time for uGNI to flush outstanding + * completion events + */ __ugni_defer_disconnect_event(uep); } else { __deliver_disconnect_ev(uep); @@ -1798,7 +1831,7 @@ static zap_err_t z_ugni_send(zap_ep_t ep, char *buf, size_t len) } pthread_mutex_lock(&uep->ep.lock); - if (ep->state != ZAP_EP_CONNECTED) { + if (!uep->gni_ep || ep->state != ZAP_EP_CONNECTED) { pthread_mutex_unlock(&uep->ep.lock); return ZAP_ERR_NOT_CONNECTED; } @@ -1818,13 +1851,12 @@ static void stalled_timeout_cb(ovis_event_t ev) gettimeofday(&now, NULL); desc = LIST_FIRST(&stalled_desc_list); while (desc) { -#if 0 zap_ugni_log("%s: %s: Freeing stalled post desc:\n", __func__, desc->ep_name); -#endif if (zap_ugni_stalled_timeout <= now.tv_sec - desc->stalled_time.tv_sec) { ZUGNI_LIST_REMOVE(desc, stalled_link); free(desc); + __sync_sub_and_fetch(&ugni_post_count, 1); } else { break; } @@ -2190,9 +2222,19 @@ static int z_ugni_init() goto out; } } - + uint32_t fma_mode; + int dedicated; + char *dedicated_s = getenv("ZAP_UGNI_FMA_DEDICATED"); + if (dedicated_s) + dedicated = atoi(dedicated_s); + else + dedicated = 0; + if (dedicated) + fma_mode = GNI_CDM_MODE_FMA_DEDICATED; + else + fma_mode = GNI_CDM_MODE_FMA_SHARED; grc = GNI_CdmCreate(_dom.inst_id, _dom.ptag, _dom.cookie, - GNI_CDM_MODE_FMA_SHARED, &_dom.cdm); + fma_mode, &_dom.cdm); if (grc) { LOG("ERROR: GNI_CdmCreate() failed: %s\n", gni_ret_str(grc)); @@ -2384,7 +2426,6 @@ zap_ep_t z_ugni_new(zap_t z, zap_cb_fn_t cb) static void z_ugni_destroy(zap_ep_t ep) { struct z_ugni_ep *uep = (void*)ep; - gni_return_t grc; struct zap_ugni_send_wr *wr; DLOG_(uep, "destroying endpoint %p\n", uep); pthread_mutex_lock(&z_ugni_list_mutex); @@ -2412,12 +2453,7 @@ static void z_ugni_destroy(zap_ep_t ep) close(uep->sock); uep->sock = -1; } - if (uep->gni_ep) { - DLOG_(uep, "Destroying gni_ep: %p\n", uep->gni_ep); - grc = GNI_EpDestroy(uep->gni_ep); - if (grc) - LOG_(uep, "GNI_EpDestroy() error: %s\n", gni_ret_str(grc)); - } + __ep_release(uep); free(ep); } @@ -2591,6 +2627,8 @@ static zap_err_t z_ugni_read(zap_ep_t ep, zap_map_t src_map, char *src, zap_map_t dst_map, char *dst, size_t sz, void *context) { + zap_err_t zerr; + if (((uint64_t)src) & 3) return ZAP_ERR_PARAMETER; if (((uint64_t)dst) & 3) @@ -2630,16 +2668,16 @@ static zap_err_t z_ugni_read(zap_ep_t ep, zap_map_t src_map, char *src, } pthread_mutex_lock(&ep->lock); - if (ep->state != ZAP_EP_CONNECTED) { - pthread_mutex_unlock(&ep->lock); - return ZAP_ERR_ENDPOINT; + if (!uep->gni_ep || ep->state != ZAP_EP_CONNECTED) { + zerr = ZAP_ERR_NOT_CONNECTED; + goto out; } gni_return_t grc; struct zap_ugni_post_desc *desc = __alloc_post_desc(uep); if (!desc) { - pthread_mutex_unlock(&ep->lock); - return ZAP_ERR_RESOURCE; + zerr = ZAP_ERR_RESOURCE; + goto out; } desc->post.type = GNI_POST_RDMA_GET; @@ -2650,45 +2688,47 @@ static zap_err_t z_ugni_read(zap_ep_t ep, zap_map_t src_map, char *src, desc->post.remote_addr = (uint64_t)src; desc->post.remote_mem_hndl = smap->gni_mh; desc->post.length = sz; - /* - * We can track the posted rdma using - * the returned gni_post_descriptor_t address. - * - * We abuse the post_id field to store the endpoint context - * so that we can check at the completion time - * whether the endpoint still exists or not. - */ -#ifdef DEBUG - desc->post.post_id = uep->ep_id; -#endif /* DEBUG */ - desc->context = context; - pthread_mutex_unlock(&ep->lock); + desc->post.post_id = __sync_fetch_and_add(&ugni_post_id, 1); - pthread_mutex_lock(&ugni_lock); + desc->context = context; #ifdef DEBUG __sync_fetch_and_add(&ugni_io_count, 1); - __sync_fetch_and_add(&ugni_post_count, 1); #endif /* DEBUG */ + uint32_t count = __sync_fetch_and_add(&ugni_post_count, 1); + if (count > ugni_post_max) + ugni_post_max = count; + if (count >= _dom.cq_depth) + LOG_(uep, + "%s: posted desc count %d is greater than CQ depth %d\n", + __func__, count, _dom.cq_depth); + pthread_mutex_lock(&ugni_lock); grc = GNI_PostRdma(uep->gni_ep, &desc->post); + pthread_mutex_unlock(&ugni_lock); if (grc != GNI_RC_SUCCESS) { LOG_(uep, "%s: GNI_PostRdma() failed, grc: %s\n", __func__, gni_ret_str(grc)); + __shutdown_on_error(uep); #ifdef DEBUG __sync_sub_and_fetch(&ugni_io_count, 1); - __sync_sub_and_fetch(&ugni_post_count, 1); #endif /* DEBUG */ + __sync_sub_and_fetch(&ugni_post_count, 1); __free_post_desc(desc); - pthread_mutex_unlock(&ugni_lock); - return ZAP_ERR_RESOURCE; + zerr = ZAP_ERR_RESOURCE; + goto out; } - pthread_mutex_unlock(&ugni_lock); - return ZAP_ERR_OK; + zerr = ZAP_ERR_OK; + out: + pthread_mutex_unlock(&ep->lock); + return zerr; } static zap_err_t z_ugni_write(zap_ep_t ep, zap_map_t src_map, char *src, zap_map_t dst_map, char *dst, size_t sz, void *context) { + gni_return_t grc; + zap_err_t zerr; + if (((uint64_t)src) & 3) return ZAP_ERR_PARAMETER; if (((uint64_t)dst) & 3) @@ -2704,7 +2744,6 @@ static zap_err_t z_ugni_write(zap_ep_t ep, zap_map_t src_map, char *src, struct z_ugni_ep *uep = (void*)ep; struct zap_ugni_map *smap = (void*)src_map; struct zap_ugni_map *dmap = (void*)dst_map; - gni_return_t grc; /* node state validation */ if (_node_state.check_state) { @@ -2729,15 +2768,15 @@ static zap_err_t z_ugni_write(zap_ep_t ep, zap_map_t src_map, char *src, } pthread_mutex_lock(&ep->lock); - if (ep->state != ZAP_EP_CONNECTED) { - pthread_mutex_unlock(&ep->lock); - return ZAP_ERR_ENDPOINT; + if (!uep->gni_ep || ep->state != ZAP_EP_CONNECTED) { + zerr = ZAP_ERR_NOT_CONNECTED; + goto out; } struct zap_ugni_post_desc *desc = __alloc_post_desc(uep); if (!desc) { - pthread_mutex_unlock(&ep->lock); - return ZAP_ERR_ENDPOINT; + zerr = ZAP_ERR_RESOURCE; + goto out; } desc->post.type = GNI_POST_RDMA_PUT; @@ -2748,29 +2787,31 @@ static zap_err_t z_ugni_write(zap_ep_t ep, zap_map_t src_map, char *src, desc->post.remote_addr = (uint64_t)dst; desc->post.remote_mem_hndl = dmap->gni_mh; desc->post.length = sz; - desc->post.post_id = (uint64_t)(unsigned long)desc; + desc->post.post_id = __sync_fetch_and_add(&ugni_post_id, 1); desc->context = context; - pthread_mutex_unlock(&ep->lock); - pthread_mutex_lock(&ugni_lock); #ifdef DEBUG __sync_fetch_and_add(&ugni_io_count, 1); - __sync_fetch_and_add(&ugni_post_count, 1); #endif /* DEBUG */ + pthread_mutex_lock(&ugni_lock); grc = GNI_PostRdma(uep->gni_ep, &desc->post); + pthread_mutex_unlock(&ugni_lock); + __sync_fetch_and_add(&ugni_post_count, 1); if (grc != GNI_RC_SUCCESS) { LOG_(uep, "%s: GNI_PostRdma() failed, grc: %s\n", __func__, gni_ret_str(grc)); #ifdef DEBUG __sync_sub_and_fetch(&ugni_io_count, 1); - __sync_sub_and_fetch(&ugni_post_count, 1); #endif /* DEBUG */ + __sync_sub_and_fetch(&ugni_post_count, 1); __free_post_desc(desc); - pthread_mutex_unlock(&ugni_lock); - return ZAP_ERR_RESOURCE; + zerr = ZAP_ERR_RESOURCE; + goto out; } - pthread_mutex_unlock(&ugni_lock); - return ZAP_ERR_OK; + zerr = ZAP_ERR_OK; +out: + pthread_mutex_unlock(&ep->lock); + return zerr; } zap_err_t zap_transport_get(zap_t *pz, zap_log_fn_t log_fn, @@ -2778,10 +2819,8 @@ zap_err_t zap_transport_get(zap_t *pz, zap_log_fn_t log_fn, { zap_t z; size_t sendrecv_sz, rendezvous_sz; -#if 0 if (log_fn) zap_ugni_log = log_fn; -#endif if (!init_complete && init_once()) goto err;