From 39fe29d93dac1e2c6918f3c98a981d5d27feaaee Mon Sep 17 00:00:00 2001 From: Devendar Bureddy Date: Mon, 16 Sep 2024 19:31:35 +0300 Subject: [PATCH] Update with nccl-2.23.4-1 --- include/core.h | 62 +++++++++------ include/p2p_plugin.h | 8 ++ include/timer.h | 14 ++-- src/ib_plugin.c | 183 ++++++++++++++++++++++++++++--------------- src/p2p_plugin.c | 168 +++++++++++++++++++++++++++++---------- src/param.c | 30 ++++--- src/socket.c | 16 ++-- 7 files changed, 332 insertions(+), 149 deletions(-) diff --git a/include/core.h b/include/core.h index 76b5da0c..e5e432c4 100644 --- a/include/core.h +++ b/include/core.h @@ -50,21 +50,17 @@ #include // Check system calls -#define SYSCHECK(call, name) do { \ +#define SYSCHECK(statement, name) do { \ int retval; \ - SYSCHECKVAL(call, name, retval); \ -} while (false) - -#define SYSCHECKVAL(call, name, retval) do { \ - SYSCHECKSYNC(call, name, retval); \ + SYSCHECKSYNC((statement), name, retval); \ if (retval == -1) { \ - WARN("Call to " name " failed : %s", strerror(errno)); \ + WARN("Call to " name " failed: %s", strerror(errno)); \ return ncclSystemError; \ } \ } while (false) -#define SYSCHECKSYNC(call, name, retval) do { \ - retval = call; \ +#define SYSCHECKSYNC(statement, name, retval) do { \ + retval = (statement); \ if (retval == -1 && (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)) { \ INFO(NCCL_ALL,"Call to " name " returned %s, retrying", strerror(errno)); \ } else { \ @@ -72,14 +68,34 @@ } \ } while(true) -#define SYSCHECKGOTO(statement, RES, label) do { \ - if ((statement) == -1) { \ - /* Print the back trace*/ \ - RES = ncclSystemError; \ - INFO(NCCL_ALL,"%s:%d -> %d (%s)", __FILE__, __LINE__, RES, strerror(errno)); \ +#define SYSCHECKGOTO(statement, name, RES, label) do { \ + int retval; \ + SYSCHECKSYNC((statement), name, retval); \ + if (retval == -1) { \ + WARN("Call to " name " failed: %s", strerror(errno)); \ + RES = ncclSystemError; \ goto label; \ } \ -} while (0); +} while (0) + +// Pthread calls don't set errno and never return EINTR. +#define PTHREADCHECK(statement, name) do { \ + int retval = (statement); \ + if (retval != 0) { \ + WARN("Call to " name " failed: %s", strerror(retval)); \ + return ncclSystemError; \ + } \ +} while (0) + +#define PTHREADCHECKGOTO(statement, name, RES, label) do { \ + int retval = (statement); \ + if (retval != 0) { \ + WARN("Call to " name " failed: %s", strerror(retval)); \ + RES = ncclSystemError; \ + goto label; \ + } \ +} while (0) + #define NEQCHECK(statement, value) do { \ if ((statement) != value) { \ @@ -87,7 +103,7 @@ INFO(NCCL_ALL,"%s:%d -> %d (%s)", __FILE__, __LINE__, ncclSystemError, strerror(errno)); \ return ncclSystemError; \ } \ -} while (0); +} while (0) #define NEQCHECKGOTO(statement, value, RES, label) do { \ if ((statement) != value) { \ @@ -96,7 +112,7 @@ INFO(NCCL_ALL,"%s:%d -> %d (%s)", __FILE__, __LINE__, RES, strerror(errno)); \ goto label; \ } \ -} while (0); +} while (0) #define EQCHECK(statement, value) do { \ if ((statement) == value) { \ @@ -104,7 +120,7 @@ INFO(NCCL_ALL,"%s:%d -> %d (%s)", __FILE__, __LINE__, ncclSystemError, strerror(errno)); \ return ncclSystemError; \ } \ -} while (0); +} while (0) #define EQCHECKGOTO(statement, value, RES, label) do { \ if ((statement) == value) { \ @@ -113,7 +129,7 @@ INFO(NCCL_ALL,"%s:%d -> %d (%s)", __FILE__, __LINE__, RES, strerror(errno)); \ goto label; \ } \ -} while (0); +} while (0) // Propagate errors up #define NCCLCHECK(call) do { \ @@ -122,7 +138,7 @@ /* Print the back trace*/ \ return RES; \ } \ -} while (0); +} while (0) #define NCCLCHECKGOTO(call, RES, label) do { \ RES = call; \ @@ -130,7 +146,7 @@ /* Print the back trace*/ \ goto label; \ } \ -} while (0); +} while (0) #define NCCLWAIT(call, cond, abortFlagPtr) do { \ volatile uint32_t* tmpAbortFlag = (abortFlagPtr); \ @@ -139,7 +155,7 @@ return ncclInternalError; \ } \ if (tmpAbortFlag) NEQCHECK(*tmpAbortFlag, 0); \ -} while (!(cond)); +} while (!(cond)) #define NCCLWAITGOTO(call, cond, abortFlagPtr, RES, label) do { \ volatile uint32_t* tmpAbortFlag = (abortFlagPtr); \ @@ -148,7 +164,7 @@ goto label; \ } \ if (tmpAbortFlag) NEQCHECKGOTO(*tmpAbortFlag, 0, RES, label); \ -} while (!(cond)); +} while (!(cond)) #define NCCLCHECKTHREAD(a, args) do { \ if (((args)->ret = (a)) != ncclSuccess && (args)->ret != ncclInProgress) { \ diff --git a/include/p2p_plugin.h b/include/p2p_plugin.h index 10493b7d..26d5a933 100644 --- a/include/p2p_plugin.h +++ b/include/p2p_plugin.h @@ -53,8 +53,13 @@ struct ncclIbMergedDev { int devs[NCCL_IB_MAX_DEVS_PER_NIC]; // Points to an index in ncclIbDevs int speed; char devName[MAX_MERGED_DEV_NAME]; // Up to NCCL_IB_MAX_DEVS_PER_NIC * name size, and a character for each '+' + int dmaBufSupported; // 0 = uninit, 1 = yes, -1 = no } __attribute__((aligned(64))); +struct ncclIbStats { + int fatalErrorCount; +}; + struct ncclIbRequest { struct ncclIbNetCommBase* base; int type; @@ -108,6 +113,7 @@ typedef struct ncclIbDev { struct ncclIbMrCache mrCache; int ar; // ADAPTIVE_ROUTING struct ibv_port_attr portAttr; + struct ncclIbStats stats; } __attribute__((aligned(64))) ncclIbDev; @@ -144,4 +150,6 @@ int ncclIbRelaxedOrderingCapable(void); nccl_p2p_plugin_t nccl_p2p_get_plugin_type(); +ncclResult_t ncclIbStatsInit(struct ncclIbStats* stat); + #endif diff --git a/include/timer.h b/include/timer.h index 7f4eb77b..c4f1c70d 100755 --- a/include/timer.h +++ b/include/timer.h @@ -33,15 +33,15 @@ static double startTimes[8]; #define TIME_START(index) do { \ counts[index]++; \ startTimes[index] = gettime(); \ -} while (0); +} while (0) #define TIME_STOP(index) do { \ times[index] += gettime() - startTimes[index]; \ -} while (0); +} while (0) #define TIME_CANCEL(index) do { \ counts[index]--; \ -} while (0); +} while (0) #define TIME_PRINT(name) do { \ printf("%s stats", name); \ @@ -50,11 +50,11 @@ static double startTimes[8]; counts[i] = 0; \ } \ printf("\n"); \ -} while (0); +} while (0) #else -#define TIME_START(index) do {} while(0); -#define TIME_STOP(index) do {} while(0); -#define TIME_CANCEL(index) do {} while(0); +#define TIME_START(index) do {} while(0) +#define TIME_STOP(index) do {} while(0) +#define TIME_CANCEL(index) do {} while(0) #define TIME_PRINT(name) #endif #endif diff --git a/src/ib_plugin.c b/src/ib_plugin.c index 9f56eea4..e6d6db21 100644 --- a/src/ib_plugin.c +++ b/src/ib_plugin.c @@ -39,7 +39,7 @@ NCCL_PARAM(IbGidIndex, "IB_GID_INDEX", -1); NCCL_PARAM(IbRoutableFlidIbGidIndex, "IB_ROUTABLE_FLID_GID_INDEX", 1); NCCL_PARAM(IbRoceVersionNum, "IB_ROCE_VERSION_NUM", 2); NCCL_PARAM(IbIsGlobal, "IB_IS_GLOBAL", 0); -NCCL_PARAM(IbTimeout, "IB_TIMEOUT", 18); +NCCL_PARAM(IbTimeout, "IB_TIMEOUT", 20); NCCL_PARAM(IbRetryCnt, "IB_RETRY_CNT", 7); NCCL_PARAM(IbPkey, "IB_PKEY", 0); NCCL_PARAM(IbUseInline, "IB_USE_INLINE", 0); @@ -48,6 +48,16 @@ NCCL_PARAM(IbTc, "IB_TC", 0); NCCL_PARAM(IbArThreshold, "IB_AR_THRESHOLD", 8192); NCCL_PARAM(IbPciRelaxedOrdering, "IB_PCI_RELAXED_ORDERING", 2); NCCL_PARAM(IbFifoTc, "IB_FIFO_TC", 0); +NCCL_PARAM(IbAsyncEvents,"IB_RETURN_ASYNC_EVENTS",1); +NCCL_PARAM(IbEceEnable,"IB_ECE_ENABLE",1); + +static ncclResult_t ncclIbStatsCheckFatalCount(struct ncclIbStats* stat, const char* funcName) { + if (ncclParamIbAsyncEvents() && __atomic_load_n(&stat->fatalErrorCount, __ATOMIC_RELAXED)) { + WARN("communicator encountered a fatal error (detected in %s)\n", funcName); + return ncclSystemError; + } + return ncclSuccess; +} static pthread_t ncclIbAsyncThread; @@ -100,11 +110,11 @@ static void* envIbAddrRange(sa_family_t af, int* mask) { char addrString[128] = { 0 }; snprintf(addrString, 128, "%s", env); char *addrStrPtr = addrString; - char *maskStrPtr = strstr(addrString, "/") + 1; + char *maskStrPtr = strstr(addrString, "/"); if (NULL == maskStrPtr) { return NULL; } - *(maskStrPtr - 1) = '\0'; + *(maskStrPtr++) = '\0'; if (inet_pton(af, addrStrPtr, ret) == 0) { WARN("NET/IB: Ip address '%s' is invalid for family %s, ignoring address", addrStrPtr, (af == AF_INET) ? "AF_INET" : "AF_INET6"); @@ -202,12 +212,14 @@ static ncclResult_t ncclIbRoceGetVersionNum(const char* deviceName, int portNum, int fd = open(roceTypePath, O_RDONLY); if (fd == -1) { + WARN("NET/IB: open failed in ncclIbRoceGetVersionNum: %s", strerror(errno)); return ncclSystemError; } int ret = read(fd, gidRoceVerStr, 15); close(fd); if (ret == -1) { + WARN("NET/IB: read failed in ncclIbRoceGetVersionNum: %s", strerror(errno)); return ncclSystemError; } @@ -485,16 +497,19 @@ typedef struct ncclIbNetCommBase { // Track necessary remDevInfo here int nRemDevs; struct ncclIbDevInfo remDevs[NCCL_IB_MAX_DEVS_PER_NIC]; + // statistics about the comm + struct ncclIbStats stats; } __attribute__((aligned(32))) ncclIbNetCommBase; struct ncclIbSendComm { struct ncclIbNetCommBase base; + // Start with fifo and ibv structs as they have alignment restrictions struct ncclIbSendFifo fifo[MAX_REQUESTS][NCCL_NET_IB_MAX_RECVS]; + struct ibv_sge sges[NCCL_NET_IB_MAX_RECVS]; + struct ibv_send_wr wrs[NCCL_NET_IB_MAX_RECVS + 1]; // Each dev correlates to a mergedIbDev struct ncclIbSendCommDev devs[NCCL_IB_MAX_DEVS_PER_NIC]; struct ncclIbRequest* fifoReqs[MAX_REQUESTS][NCCL_NET_IB_MAX_RECVS]; - struct ibv_sge sges[NCCL_NET_IB_MAX_RECVS]; - struct ibv_send_wr wrs[NCCL_NET_IB_MAX_RECVS+1]; struct ncclIbRemSizesFifo remSizesFifo; uint64_t fifoHead; int ar; // Use adaptive routing when all merged devices have it enabled @@ -554,7 +569,7 @@ static void ncclIbAddEvent(struct ncclIbRequest* req, int devIndex, struct ncclI req->devBases[devIndex] = base; } -ncclResult_t ncclIbInitCommDevBase(int ibDevN, struct ncclIbNetCommDevBase* base) { +ncclResult_t ncclIbInitCommDevBase(int ibDevN, struct ncclIbNetCommDevBase* base, void* cq_context) { base->ibDevN = ibDevN; ncclIbDev* ibDev = ncclIbDevs + ibDevN; pthread_mutex_lock(&ibDev->lock); @@ -571,7 +586,7 @@ ncclResult_t ncclIbInitCommDevBase(int ibDevN, struct ncclIbNetCommDevBase* base pthread_mutex_unlock(&ibDev->lock); // Recv requests can generate 2 completions (one for the post FIFO, one for the Recv). - NCCLCHECK(wrap_ibv_create_cq(&base->cq, ibDev->context, 2*MAX_REQUESTS*ncclParamIbQpsPerConn(), NULL, NULL, 0)); + NCCLCHECK(wrap_ibv_create_cq(&base->cq, ibDev->context, 2*MAX_REQUESTS*ncclParamIbQpsPerConn(), cq_context, NULL, 0)); return ncclSuccess; } @@ -590,9 +605,10 @@ ncclResult_t ncclIbDestroyBase(struct ncclIbNetCommDevBase* base) { return res; } -ncclResult_t ncclIbCreateQp(uint8_t ib_port, struct ncclIbNetCommDevBase* base, int access_flags, struct ncclIbQp* qp) { +ncclResult_t ncclIbCreateQp(uint8_t ib_port, struct ncclIbNetCommDevBase* base, int access_flags, void* qp_context, struct ncclIbQp* qp) { struct ibv_qp_init_attr qpInitAttr; memset(&qpInitAttr, 0, sizeof(struct ibv_qp_init_attr)); + qpInitAttr.qp_context = qp_context; qpInitAttr.send_cq = base->cq; qpInitAttr.recv_cq = base->cq; qpInitAttr.qp_type = IBV_QPT_RC; @@ -678,6 +694,7 @@ ncclResult_t ncclIbRtsQp(struct ibv_qp* qp) { } ncclResult_t ncclIbListen(int dev, void* opaqueHandle, void** listenComm) { + ncclResult_t ret = ncclSuccess; struct ncclIbListenComm* comm; comm = malloc(sizeof(struct ncclIbListenComm)); memset(comm, 0, sizeof(struct ncclIbListenComm)); @@ -686,14 +703,20 @@ ncclResult_t ncclIbListen(int dev, void* opaqueHandle, void** listenComm) { memset(handle, 0, sizeof(struct ncclIbHandle)); comm->dev = dev; handle->magic = NCCL_SOCKET_MAGIC; - NCCLCHECK(ncclSocketInit(&comm->sock, &ncclIbIfAddr, handle->magic, ncclSocketTypeNetIb, NULL, 1)); - NCCLCHECK(ncclSocketListen(&comm->sock)); - NCCLCHECK(ncclSocketGetAddr(&comm->sock, &handle->connectAddr)); + NCCLCHECKGOTO(ncclSocketInit(&comm->sock, &ncclIbIfAddr, handle->magic, ncclSocketTypeNetIb, NULL, 1), ret, fail); + NCCLCHECKGOTO(ncclSocketListen(&comm->sock), ret, fail); + NCCLCHECKGOTO(ncclSocketGetAddr(&comm->sock, &handle->connectAddr), ret, fail); *listenComm = comm; - return ncclSuccess; +exit: + return ret; +fail: + (void)ncclSocketClose(&comm->sock); + free(comm); + goto exit; } ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNetDeviceHandle_t** sendDevComm) { + ncclResult_t ret = ncclSuccess; struct ncclIbHandle* handle = (struct ncclIbHandle*) opaqueHandle; struct ncclIbCommStage* stage = &handle->stage; struct ncclIbSendComm* comm = (struct ncclIbSendComm*)stage->comm; @@ -709,16 +732,18 @@ ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNet WARN("Error: trying to connect already connected sendComm"); return ncclInternalError; } + stage->buffer = NULL; NCCLCHECK(ncclIbMalloc((void**)&comm, sizeof(struct ncclIbSendComm))); - NCCLCHECK(ncclSocketInit(&comm->base.sock, &handle->connectAddr, handle->magic, ncclSocketTypeNetIb, NULL, 1)); + NCCLCHECKGOTO(ncclIbStatsInit(&comm->base.stats), ret, fail); + NCCLCHECKGOTO(ncclSocketInit(&comm->base.sock, &handle->connectAddr, handle->magic, ncclSocketTypeNetIb, NULL, 1), ret, fail); stage->comm = comm; stage->state = ncclIbCommStateConnect; - NCCLCHECK(ncclSocketConnect(&comm->base.sock)); + NCCLCHECKGOTO(ncclSocketConnect(&comm->base.sock), ret, fail); ib_connect_check: /* since ncclSocketConnect is async, we must check if connection is complete */ - NCCLCHECK(ncclSocketReady(&comm->base.sock, &ready)); + NCCLCHECKGOTO(ncclSocketReady(&comm->base.sock, &ready), ret, fail); if (!ready) return ncclSuccess; // IB Setup @@ -732,7 +757,7 @@ ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNet comm->ar = 1; // Set to 1 for logic for (int i = 0; i < mergedDev->ndevs; i++) { int ibDevN = mergedDev->devs[i]; - NCCLCHECK(ncclIbInitCommDevBase(ibDevN, &comm->devs[i].base)); + NCCLCHECKGOTO(ncclIbInitCommDevBase(ibDevN, &comm->devs[i].base, &comm->base.stats), ret, fail); comm->ar = comm->ar && ncclIbDevs[dev].ar; // ADAPTIVE_ROUTING - if all merged devs have it enabled } @@ -745,13 +770,17 @@ ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNet for (int q = 0; q < comm->base.nqps; q++) { ncclIbSendCommDev* commDev = comm->devs + devIndex; ncclIbDev* ibDev = ncclIbDevs + commDev->base.ibDevN; - NCCLCHECK(ncclIbCreateQp(ibDev->portNum, &commDev->base, IBV_ACCESS_REMOTE_WRITE, comm->base.qps+q)); + NCCLCHECKGOTO(ncclIbCreateQp(ibDev->portNum, &commDev->base, IBV_ACCESS_REMOTE_WRITE, &comm->base.stats, comm->base.qps + q), ret, fail); comm->base.qps[q].devIndex = devIndex; meta.qpInfo[q].qpn = comm->base.qps[q].qp->qp_num; meta.qpInfo[q].devIndex = comm->base.qps[q].devIndex; - // Query ece capabilities (enhanced connection establishment) - NCCLCHECK(wrap_ibv_query_ece(comm->base.qps[q].qp, &meta.qpInfo[q].ece, &meta.qpInfo[q].ece_supported)); + if (ncclParamIbEceEnable()) { + // Query ece capabilities (enhanced connection establishment) + NCCLCHECKGOTO(wrap_ibv_query_ece(comm->base.qps[q].qp, &meta.qpInfo[q].ece, &meta.qpInfo[q].ece_supported), ret, fail); + } else { + meta.qpInfo[q].ece_supported = 0; + } devIndex = (devIndex + 1) % comm->base.ndevs; } @@ -766,7 +795,7 @@ ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNet devInfo->lid = ibDev->portAttr.lid; // Prepare my fifo - NCCLCHECK(wrap_ibv_reg_mr(&commDev->fifoMr, commDev->base.pd, comm->fifo, sizeof(struct ncclIbSendFifo)*MAX_REQUESTS*NCCL_NET_IB_MAX_RECVS, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ)); + NCCLCHECKGOTO(wrap_ibv_reg_mr(&commDev->fifoMr, commDev->base.pd, comm->fifo, sizeof(struct ncclIbSendFifo)*MAX_REQUESTS*NCCL_NET_IB_MAX_RECVS, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ), ret, fail); devInfo->fifoRkey = commDev->fifoMr->rkey; devInfo->is_global = (ncclParamIbIsGlobal() @@ -777,8 +806,8 @@ ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNet // Pack local GID info devInfo->link_layer = commDev->base.gidInfo.link_layer = ibDev->portAttr.link_layer; - NCCLCHECK(ncclIbGetGidIndex(ibDev->context, ibDev->portNum, &ibDev->portAttr, &commDev->base.gidInfo.localGidIndex)); - NCCLCHECK(wrap_ibv_query_gid(ibDev->context, ibDev->portNum, commDev->base.gidInfo.localGidIndex, &commDev->base.gidInfo.localGid)); + NCCLCHECKGOTO(ncclIbGetGidIndex(ibDev->context, ibDev->portNum, &ibDev->portAttr, &commDev->base.gidInfo.localGidIndex), ret, fail); + NCCLCHECKGOTO(wrap_ibv_query_gid(ibDev->context, ibDev->portNum, commDev->base.gidInfo.localGidIndex, &commDev->base.gidInfo.localGid), ret, fail); devInfo->gid.global.subnet_prefix = commDev->base.gidInfo.localGid.global.subnet_prefix; devInfo->gid.global.interface_id = commDev->base.gidInfo.localGid.global.interface_id; @@ -809,12 +838,12 @@ ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNet stage->state = ncclIbCommStateSend; stage->offset = 0; - NCCLCHECK(ncclIbMalloc((void**)&stage->buffer, sizeof(meta))); + NCCLCHECKGOTO(ncclIbMalloc((void**)&stage->buffer, sizeof(meta)), ret, fail); memcpy(stage->buffer, &meta, sizeof(meta)); ib_send: - NCCLCHECK(ncclSocketProgress(NCCL_SOCKET_SEND, &comm->base.sock, stage->buffer, sizeof(meta), &stage->offset)); + NCCLCHECKGOTO(ncclSocketProgress(NCCL_SOCKET_SEND, &comm->base.sock, stage->buffer, sizeof(meta), &stage->offset), ret, fail); if (stage->offset != sizeof(meta)) return ncclSuccess; @@ -824,7 +853,7 @@ ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNet memset(stage->buffer, 0, sizeof(meta)); ib_connect: - NCCLCHECK(ncclSocketProgress(NCCL_SOCKET_RECV, &comm->base.sock, stage->buffer, sizeof(ncclIbConnectionMetadata), &stage->offset)); + NCCLCHECKGOTO(ncclSocketProgress(NCCL_SOCKET_RECV, &comm->base.sock, stage->buffer, sizeof(ncclIbConnectionMetadata), &stage->offset), ret, fail); if (stage->offset != sizeof(remMeta)) return ncclSuccess; memcpy(&remMeta, stage->buffer, sizeof(ncclIbConnectionMetadata)); @@ -858,7 +887,7 @@ ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNet } for (int i=0; i < comm->base.ndevs; i++) { - NCCLCHECK(wrap_ibv_reg_mr(comm->remSizesFifo.mrs+i, comm->devs[i].base.pd, &comm->remSizesFifo.elems, sizeof(int)*MAX_REQUESTS*NCCL_NET_IB_MAX_RECVS, IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_READ)); + NCCLCHECKGOTO(wrap_ibv_reg_mr(comm->remSizesFifo.mrs+i, comm->devs[i].base.pd, &comm->remSizesFifo.elems, sizeof(int)*MAX_REQUESTS*NCCL_NET_IB_MAX_RECVS, IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_READ), ret, fail); } comm->base.nRemDevs = remMeta.ndevs; for (int q = 0; q < comm->base.nqps; q++) { @@ -872,10 +901,10 @@ ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNet struct ibv_qp* qp = comm->base.qps[q].qp; if (remQpInfo->ece_supported && remQpInfo->ece_supported) - NCCLCHECK(wrap_ibv_set_ece(qp, &remQpInfo->ece, &remQpInfo->ece_supported)); + NCCLCHECKGOTO(wrap_ibv_set_ece(qp, &remQpInfo->ece, &remQpInfo->ece_supported), ret, fail); - NCCLCHECK(ncclIbRtrQp(qp, &commDev->base.gidInfo, remQpInfo->qpn, remDevInfo, false)); - NCCLCHECK(ncclIbRtsQp(qp)); + NCCLCHECKGOTO(ncclIbRtrQp(qp, &commDev->base.gidInfo, remQpInfo->qpn, remDevInfo, false), ret, fail); + NCCLCHECKGOTO(ncclIbRtsQp(qp), ret, fail); } if (link_layer == IBV_LINK_LAYER_ETHERNET ) { // RoCE @@ -892,14 +921,17 @@ ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNet stage->offset = 0; ib_send_ready: - NCCLCHECK(ncclSocketProgress(NCCL_SOCKET_SEND, &comm->base.sock, &comm->base.ready, sizeof(int), &stage->offset)); + NCCLCHECKGOTO(ncclSocketProgress(NCCL_SOCKET_SEND, &comm->base.sock, &comm->base.ready, sizeof(int), &stage->offset), ret, fail); if (stage->offset != sizeof(int)) return ncclSuccess; - free(stage->buffer); - stage->state = ncclIbCommStateStart; - *sendComm = comm; - return ncclSuccess; +exit: + if (stage->buffer) free(stage->buffer); + stage->state = ncclIbCommStateStart; + return ret; +fail: + free(comm); + goto exit; } ncclResult_t ncclIbConnect_v6(int dev, void* opaqueHandle, void** sendComm) { @@ -910,6 +942,7 @@ ncclResult_t ncclIbConnect_v6(int dev, void* opaqueHandle, void** sendComm) { NCCL_PARAM(IbGdrFlushDisable, "GDR_FLUSH_DISABLE", 0); ncclResult_t ncclIbAccept(void* listenComm, void** recvComm, ncclNetDeviceHandle_t** recvDevComm) { + ncclResult_t ret = ncclSuccess; struct ncclIbListenComm* lComm = (struct ncclIbListenComm*)listenComm; struct ncclIbCommStage* stage = &lComm->stage; struct ncclIbRecvComm* rComm = (struct ncclIbRecvComm*)stage->comm; @@ -926,22 +959,23 @@ ncclResult_t ncclIbAccept(void* listenComm, void** recvComm, ncclNetDeviceHandl } NCCLCHECK(ncclIbMalloc((void**)&rComm, sizeof(struct ncclIbRecvComm))); + NCCLCHECKGOTO(ncclIbStatsInit(&rComm->base.stats), ret, fail); stage->comm = rComm; stage->state = ncclIbCommStateAccept; - NCCLCHECK(ncclSocketInit(&rComm->base.sock, NULL, NCCL_SOCKET_MAGIC, ncclSocketTypeUnknown, NULL, 0)); - NCCLCHECK(ncclSocketAccept(&rComm->base.sock, &lComm->sock)); + NCCLCHECKGOTO(ncclSocketInit(&rComm->base.sock, NULL, NCCL_SOCKET_MAGIC, ncclSocketTypeUnknown, NULL, 0), ret, fail); + NCCLCHECKGOTO(ncclSocketAccept(&rComm->base.sock, &lComm->sock), ret, fail); ib_accept_check: - NCCLCHECK(ncclSocketReady(&rComm->base.sock, &ready)); + NCCLCHECKGOTO(ncclSocketReady(&rComm->base.sock, &ready), ret, fail); if (!ready) return ncclSuccess; struct ncclIbConnectionMetadata remMeta; stage->state = ncclIbCommStateRecv; stage->offset = 0; - NCCLCHECK(ncclIbMalloc((void**)&stage->buffer, sizeof(remMeta))); + NCCLCHECKGOTO(ncclIbMalloc((void**)&stage->buffer, sizeof(remMeta)), ret, fail);; ib_recv: - NCCLCHECK(ncclSocketProgress(NCCL_SOCKET_RECV, &rComm->base.sock, stage->buffer, sizeof(remMeta), &stage->offset)); + NCCLCHECKGOTO(ncclSocketProgress(NCCL_SOCKET_RECV, &rComm->base.sock, stage->buffer, sizeof(remMeta), &stage->offset), ret, fail); if (stage->offset != sizeof(remMeta)) return ncclSuccess; /* copy back the received info */ @@ -972,10 +1006,10 @@ ncclResult_t ncclIbAccept(void* listenComm, void** recvComm, ncclNetDeviceHandl for (int i = 0; i < rComm->base.ndevs; i++) { rCommDev = rComm->devs + i; ibDevN = mergedDev->devs[i]; - NCCLCHECK(ncclIbInitCommDevBase(ibDevN, &rCommDev->base)); + NCCLCHECKGOTO(ncclIbInitCommDevBase(ibDevN, &rCommDev->base, &rComm->base.stats), ret, fail); ibDev = ncclIbDevs + ibDevN; - NCCLCHECK(ncclIbGetGidIndex(ibDev->context, ibDev->portNum, &ibDev->portAttr, &rCommDev->base.gidInfo.localGidIndex)); - NCCLCHECK(wrap_ibv_query_gid(ibDev->context, ibDev->portNum, rCommDev->base.gidInfo.localGidIndex, &rCommDev->base.gidInfo.localGid)); + NCCLCHECKGOTO(ncclIbGetGidIndex(ibDev->context, ibDev->portNum, &ibDev->portAttr, &rCommDev->base.gidInfo.localGidIndex), ret, fail); + NCCLCHECKGOTO(wrap_ibv_query_gid(ibDev->context, ibDev->portNum, rCommDev->base.gidInfo.localGidIndex, &rCommDev->base.gidInfo.localGid), ret, fail); } // Copy remDevInfo for things like remGidInfo, remFifoAddr, etc. @@ -1000,22 +1034,26 @@ ncclResult_t ncclIbAccept(void* listenComm, void** recvComm, ncclNetDeviceHandl // Local ibDevN ibDevN = rComm->devs[devIndex].base.ibDevN; ibDev = ncclIbDevs + ibDevN; - NCCLCHECK(ncclIbCreateQp(ibDev->portNum, &rCommDev->base, IBV_ACCESS_REMOTE_WRITE, qp)); + NCCLCHECKGOTO(ncclIbCreateQp(ibDev->portNum, &rCommDev->base, IBV_ACCESS_REMOTE_WRITE, &rComm->base.stats, qp), ret, fail); qp->devIndex = devIndex; devIndex = (devIndex + 1) % rComm->base.ndevs; // Set the ece (enhanced connection establishment) on this QP before RTR if (remMeta.qpInfo[q].ece_supported) { - NCCLCHECK(wrap_ibv_set_ece(qp->qp, &remMeta.qpInfo[q].ece, &meta.qpInfo[q].ece_supported)); + // Coverity suspects a copy-paste error below due to the use of remMeta in one argument and meta in another. + // However, this has been confirmed to be intentional. + // coverity[copy_paste_error] + NCCLCHECKGOTO(wrap_ibv_set_ece(qp->qp, &remMeta.qpInfo[q].ece, &meta.qpInfo[q].ece_supported), ret, fail); + // Query the reduced ece for this QP (matching enhancements between the requestor and the responder) // Store this in our own qpInfo for returning to the requestor if (meta.qpInfo[q].ece_supported) - NCCLCHECK(wrap_ibv_query_ece(qp->qp, &meta.qpInfo[q].ece, &meta.qpInfo[q].ece_supported)); + NCCLCHECKGOTO(wrap_ibv_query_ece(qp->qp, &meta.qpInfo[q].ece, &meta.qpInfo[q].ece_supported), ret, fail); } bool override_tc = (q == 0) ? true : false; - NCCLCHECK(ncclIbRtrQp(qp->qp, &rCommDev->base.gidInfo, remMeta.qpInfo[q].qpn, remDevInfo, override_tc)); - NCCLCHECK(ncclIbRtsQp(qp->qp)); + NCCLCHECKGOTO(ncclIbRtrQp(qp->qp, &rCommDev->base.gidInfo, remMeta.qpInfo[q].qpn, remDevInfo, override_tc), ret, fail); + NCCLCHECKGOTO(ncclIbRtsQp(qp->qp), ret, fail); } rComm->flushEnabled = ((nccl_p2p_gdr_support() == ncclSuccess || nccl_p2p_dmabuf_support(lComm->dev) == ncclSuccess) @@ -1029,17 +1067,17 @@ ncclResult_t ncclIbAccept(void* listenComm, void** recvComm, ncclNetDeviceHandl // Retain remote fifo info and prepare my RDMA ops rCommDev->fifoRkey = remMeta.devs[i].fifoRkey; rComm->remFifo.addr = remMeta.fifoAddr; - NCCLCHECK(wrap_ibv_reg_mr(&rCommDev->fifoMr, rCommDev->base.pd, &rComm->remFifo.elems, sizeof(struct ncclIbSendFifo)*MAX_REQUESTS*NCCL_NET_IB_MAX_RECVS, IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_READ)); + NCCLCHECKGOTO(wrap_ibv_reg_mr(&rCommDev->fifoMr, rCommDev->base.pd, &rComm->remFifo.elems, sizeof(struct ncclIbSendFifo)*MAX_REQUESTS*NCCL_NET_IB_MAX_RECVS, IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_READ), ret, fail); rCommDev->fifoSge.lkey = rCommDev->fifoMr->lkey; if (ncclParamIbUseInline()) rComm->remFifo.flags = IBV_SEND_INLINE; // Allocate Flush dummy buffer for GPU Direct RDMA if (rComm->flushEnabled) { - NCCLCHECK(wrap_ibv_reg_mr(&rCommDev->gpuFlush.hostMr, rCommDev->base.pd, &rComm->gpuFlushHostMem, sizeof(int), IBV_ACCESS_LOCAL_WRITE)); + NCCLCHECKGOTO(wrap_ibv_reg_mr(&rCommDev->gpuFlush.hostMr, rCommDev->base.pd, &rComm->gpuFlushHostMem, sizeof(int), IBV_ACCESS_LOCAL_WRITE), ret, fail); rCommDev->gpuFlush.sge.addr = (uint64_t)&rComm->gpuFlushHostMem; rCommDev->gpuFlush.sge.length = 1; rCommDev->gpuFlush.sge.lkey = rCommDev->gpuFlush.hostMr->lkey; - NCCLCHECK(ncclIbCreateQp(ibDev->portNum, &rCommDev->base, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ, &rCommDev->gpuFlush.qp)); + NCCLCHECKGOTO(ncclIbCreateQp(ibDev->portNum, &rCommDev->base, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ, &rComm->base.stats, &rCommDev->gpuFlush.qp), ret, fail);; struct ncclIbDevInfo devInfo; devInfo.lid = ibDev->portAttr.lid; devInfo.link_layer = ibDev->portAttr.link_layer; @@ -1052,8 +1090,8 @@ ncclResult_t ncclIbAccept(void* listenComm, void** recvComm, ncclNetDeviceHandl #endif ); devInfo.mtu = ibDev->portAttr.active_mtu; - NCCLCHECK(ncclIbRtrQp(rCommDev->gpuFlush.qp.qp, &rCommDev->base.gidInfo, rCommDev->gpuFlush.qp.qp->qp_num, &devInfo, false)); - NCCLCHECK(ncclIbRtsQp(rCommDev->gpuFlush.qp.qp)); + NCCLCHECKGOTO(ncclIbRtrQp(rCommDev->gpuFlush.qp.qp, &rCommDev->base.gidInfo, rCommDev->gpuFlush.qp.qp->qp_num, &devInfo, false), ret, fail); + NCCLCHECKGOTO(ncclIbRtsQp(rCommDev->gpuFlush.qp.qp), ret, fail); } // Fill Handle @@ -1073,7 +1111,7 @@ ncclResult_t ncclIbAccept(void* listenComm, void** recvComm, ncclNetDeviceHandl meta.devs[i].mtu = remMeta.devs[i].mtu; // Prepare sizes fifo - NCCLCHECK(wrap_ibv_reg_mr(&rComm->devs[i].sizesFifoMr, rComm->devs[i].base.pd, rComm->sizesFifo, sizeof(int)*MAX_REQUESTS*NCCL_NET_IB_MAX_RECVS, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ)); + NCCLCHECKGOTO(wrap_ibv_reg_mr(&rComm->devs[i].sizesFifoMr, rComm->devs[i].base.pd, rComm->sizesFifo, sizeof(int)*MAX_REQUESTS*NCCL_NET_IB_MAX_RECVS, IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ), ret, fail); meta.devs[i].fifoRkey = rComm->devs[i].sizesFifoMr->rkey; } meta.fifoAddr = (uint64_t)rComm->sizesFifo; @@ -1088,29 +1126,35 @@ ncclResult_t ncclIbAccept(void* listenComm, void** recvComm, ncclNetDeviceHandl stage->state = ncclIbCommStateSend; stage->offset = 0; - if (stage->buffer) free(stage->buffer); - NCCLCHECK(ncclIbMalloc((void**)&stage->buffer, sizeof(struct ncclIbConnectionMetadata))); + if (stage->buffer) { + free(stage->buffer); + stage->buffer = NULL; + } + NCCLCHECKGOTO(ncclIbMalloc((void**)&stage->buffer, sizeof(struct ncclIbConnectionMetadata)), ret, fail); memcpy(stage->buffer, &meta, sizeof(struct ncclIbConnectionMetadata)); ib_send: - NCCLCHECK(ncclSocketProgress(NCCL_SOCKET_SEND, &rComm->base.sock, stage->buffer, sizeof(struct ncclIbConnectionMetadata), &stage->offset)); + NCCLCHECKGOTO(ncclSocketProgress(NCCL_SOCKET_SEND, &rComm->base.sock, stage->buffer, sizeof(struct ncclIbConnectionMetadata), &stage->offset), ret, fail); if (stage->offset < sizeof(struct ncclIbConnectionMetadata)) return ncclSuccess; stage->offset = 0; stage->state = ncclIbCommStatePendingReady; ib_recv_ready: - NCCLCHECK(ncclSocketProgress(NCCL_SOCKET_RECV, &rComm->base.sock, &rComm->base.ready, sizeof(int), &stage->offset)); + NCCLCHECKGOTO(ncclSocketProgress(NCCL_SOCKET_RECV, &rComm->base.sock, &rComm->base.ready, sizeof(int), &stage->offset), ret, fail); if (stage->offset != sizeof(int)) return ncclSuccess; - free(stage->buffer); *recvComm = rComm; - +exit: /* reset lComm stage */ + if (stage->buffer) free(stage->buffer); stage->state = ncclIbCommStateStart; stage->offset = 0; stage->comm = NULL; stage->buffer = NULL; - return ncclSuccess; + return ret; +fail: + free(rComm); + goto exit; } ncclResult_t ncclIbAccept_v6(void* listenComm, void** recvComm) { @@ -1209,16 +1253,21 @@ struct ncclIbNetCommDevBase* ncclIbGetNetCommDevBase(ncclIbNetCommBase* base, in /* DMA-BUF support */ ncclResult_t ncclIbRegMrDmaBuf(void* comm, void* data, size_t size, int type, uint64_t offset, int fd, void** mhandle) { + ncclResult_t ret = ncclSuccess; assert(size > 0); struct ncclIbNetCommBase* base = (struct ncclIbNetCommBase*) comm; struct ncclIbMrHandle* mhandleWrapper = (struct ncclIbMrHandle*) malloc(sizeof(struct ncclIbMrHandle)); for (int i = 0; i < base->ndevs; i++) { // Each ncclIbNetCommDevBase is at different offset in send and recv netComms struct ncclIbNetCommDevBase* devComm = ncclIbGetNetCommDevBase(base, i); - NCCLCHECK(ncclIbRegMrDmaBufInternal(devComm, data, size, type, offset, fd, mhandleWrapper->mrs + i)); + NCCLCHECKGOTO(ncclIbRegMrDmaBufInternal(devComm, data, size, type, offset, fd, mhandleWrapper->mrs + i), ret, fail); } *mhandle = (void*) mhandleWrapper; - return ncclSuccess; +exit: + return ret; +fail: + free(mhandleWrapper); + goto exit; } ncclResult_t ncclIbRegMr(void* comm, void* data, size_t size, int type, void** mhandle) { @@ -1377,6 +1426,7 @@ ncclResult_t ncclIbIsend(void* sendComm, void* data, int size, int tag, void* mh struct ncclIbSendComm* comm = (struct ncclIbSendComm*)sendComm; if (comm->base.ready == 0) { WARN("NET/IB: ncclIbIsend() called when comm->base.ready == 0"); return ncclInternalError; } if (comm->base.ready == 0) { *request = NULL; return ncclSuccess; } + NCCLCHECK(ncclIbStatsCheckFatalCount(&comm->base.stats,__func__)); struct ncclIbMrHandle* mhandleWrapper = (struct ncclIbMrHandle*) mhandle; @@ -1541,6 +1591,7 @@ ncclResult_t ncclIbIrecv(void* recvComm, int n, void** data, int* sizes, int* ta struct ncclIbRecvComm* comm = (struct ncclIbRecvComm*)recvComm; if (comm->base.ready == 0) { WARN("NET/IB: ncclIbIrecv() called when comm->base.ready == 0"); return ncclInternalError; } if (comm->base.ready == 0) { *request = NULL; return ncclSuccess; } + NCCLCHECK(ncclIbStatsCheckFatalCount(&comm->base.stats,__func__)); if (n > NCCL_NET_IB_MAX_RECVS) return ncclInternalError; @@ -1621,11 +1672,14 @@ ncclResult_t ncclIbIflush(void* recvComm, int n, void** data, int* sizes, void** return ncclSuccess; } +#define HCA_NAME(req, index) ((req)->devBases[(index)]->pd->context->device->name) + ncclResult_t ncclIbTest(void* request, int* done, int* sizes) { struct ncclIbRequest *r = (struct ncclIbRequest*)request; *done = 0; while (1) { + NCCLCHECK(ncclIbStatsCheckFatalCount(&r->base->stats,__func__)); if (r->events[0] == 0 && r->events[1] == 0) { TRACE(NCCL_NET, "r=%p done", r); *done = 1; @@ -1685,7 +1739,7 @@ ncclResult_t ncclIbTest(void* request, int* done, int* sizes) { TRACE(NCCL_NET, "Got completion from peer %s with status=%d opcode=%d len=%d wr_id=%ld r=%p type=%d events={%d,%d}, i=%d", ncclSocketToString(&addr, line, 1), wc->status, wc->opcode,wc->byte_len, wc->wr_id, req, req->type, req->events[0], req->events[1], i); #endif - if (req->type == NCCL_NET_IB_REQ_SEND) { + if (req && req->type == NCCL_NET_IB_REQ_SEND) { for (int j = 0; j < req->nreqs; j++) { struct ncclIbRequest* sendReq = r->base->reqs+((wc->wr_id >> (j*8)) & 0xff); if ((sendReq->events[i] <= 0)) { @@ -1707,6 +1761,9 @@ ncclResult_t ncclIbTest(void* request, int* done, int* sizes) { req->events[i]--; } } + // Once the IB fatal event is reported in the async thread, we want to propagate this error + // to communicator and prevent further polling to reduce error pollution. + NCCLCHECK(ncclIbStatsCheckFatalCount(&ncclIbDevs[r->devBases[i]->ibDevN].stats,__func__)); } } diff --git a/src/p2p_plugin.c b/src/p2p_plugin.c index 500150ff..744f2ccf 100644 --- a/src/p2p_plugin.c +++ b/src/p2p_plugin.c @@ -171,54 +171,74 @@ ncclResult_t pluginInit_v5(ncclDebugLogger_t logFunction) { return ncclNetPlugin_v5.init(logFunction); } +// Detect whether GDR can work on a given NIC with the current CUDA device +// Returns : +// ncclSuccess : GDR works +// ncclSystemError : no module or module loaded but not supported by GPU +#define KNL_MODULE_LOADED(a) ((access(a, F_OK) == -1) ? 0 : 1) +static int ncclIbGdrModuleLoaded = 0; // 1 = true, 0 = false +static void ibGdrSupportInitOnce() { + // Check for the nv_peer_mem module being loaded + ncclIbGdrModuleLoaded = KNL_MODULE_LOADED("/sys/kernel/mm/memory_peers/nv_mem/version") || + KNL_MODULE_LOADED("/sys/kernel/mm/memory_peers/nv_mem_nc/version") || + KNL_MODULE_LOADED("/sys/module/nvidia_peermem/version"); +} + ncclResult_t nccl_p2p_gdr_support() { - static int module_loaded = -1; - - if (module_loaded == -1) { - module_loaded = (access("/sys/kernel/mm/memory_peers/nv_mem/version", F_OK) == -1) ? 0 : 1; - } + static pthread_once_t once = PTHREAD_ONCE_INIT; + pthread_once(&once, ibGdrSupportInitOnce); + if (!ncclIbGdrModuleLoaded) + return ncclSystemError; + return ncclSuccess; +} - if (module_loaded == 0) { - return ncclSystemError; +static __thread int ibDmaSupportInitDev; // which device to init, must be thread local +static void ibDmaBufSupportInitOnce(){ + ncclResult_t res; + // select the appropriate + struct ncclIbMergedDev* mergedDev = ncclIbMergedDevs + ibDmaSupportInitDev; + // Test each real devices + int dev_fail = 0; + for (int i = 0; i < mergedDev->ndevs; i++) { + int ibDev = mergedDev->devs[i]; + struct ibv_pd* pd; + struct ibv_context* ctx = ncclIbDevs[ibDev].context; + NCCLCHECKGOTO(wrap_ibv_alloc_pd(&pd, ctx), res, failure); + // Test kernel DMA-BUF support with a dummy call (fd=-1) + (void)wrap_direct_ibv_reg_dmabuf_mr(pd, 0ULL /*offset*/, 0ULL /*len*/, 0ULL /*iova*/, -1 /*fd*/, 0 /*flags*/); + // ibv_reg_dmabuf_mr() will fail with EOPNOTSUPP/EPROTONOSUPPORT if not supported (EBADF otherwise) + dev_fail |= (errno == EOPNOTSUPP) || (errno == EPROTONOSUPPORT); + NCCLCHECKGOTO(wrap_ibv_dealloc_pd(pd), res, failure); + // stop the search and goto failure + if (dev_fail) goto failure; } - - return ncclSuccess; + mergedDev->dmaBufSupported = 1; + return; +failure: + mergedDev->dmaBufSupported = -1; + return; } + +struct oncewrap { + pthread_once_t once; +}; +static struct oncewrap onces[MAX_IB_DEVS]; // Detect whether DMA-BUF support is present in the kernel // Returns : // ncclSuccess : DMA-BUF support is available // ncclSystemError : DMA-BUF is not supported by the kernel ncclResult_t nccl_p2p_dmabuf_support(int dev) { - static int dmaBufSupported = -1; - if (dmaBufSupported == -1) { - ncclResult_t res; - struct ibv_pd* pd; - struct ibv_context* ctx; - struct ncclIbMergedDev* mergedDev = ncclIbMergedDevs + dev; - - // Test each dev - for (int i = 0; i < mergedDev->ndevs; i++) { - int ibDev = mergedDev->devs[i]; - ctx = ncclIbDevs[ibDev].context; - NCCLCHECKGOTO(wrap_ibv_alloc_pd(&pd, ctx), res, failure); - // Test kernel DMA-BUF support with a dummy call (fd=-1) - (void) wrap_direct_ibv_reg_dmabuf_mr(pd, 0ULL/*offset*/, 0ULL/*len*/, 0ULL/*iova*/, -1/*fd*/, 0/*flags*/); - // ibv_reg_dmabuf_mr() will fail with EOPNOTSUPP/EPROTONOSUPPORT if not supported (EBADF otherwise) - dmaBufSupported = (errno != EOPNOTSUPP && errno != EPROTONOSUPPORT) ? 1 : 0; - NCCLCHECKGOTO(wrap_ibv_dealloc_pd(pd), res, failure); - } + // init the device only once + ibDmaSupportInitDev = dev; + pthread_once(&onces[dev].once, ibDmaBufSupportInitOnce); - } - if (dmaBufSupported == 0) return ncclSystemError; - return ncclSuccess; -failure: - dmaBufSupported = 0; + int dmaBufSupported = ncclIbMergedDevs[dev].dmaBufSupported; + if (dmaBufSupported == 1) return ncclSuccess; return ncclSystemError; } - ncclResult_t nccl_p2p_ib_get_properties(ncclIbDev *devs, int dev, ncclNetProperties_t* props) { struct ncclIbMergedDev* mergedDev = ncclIbMergedDevs+dev; @@ -258,15 +278,77 @@ ncclResult_t nccl_p2p_ib_get_properties(ncclIbDev *devs, int dev, ncclNetPropert return ncclSuccess; } +ncclResult_t ncclIbStatsInit(struct ncclIbStats* stat) { + __atomic_store_n(&stat->fatalErrorCount, 0, __ATOMIC_RELAXED); + return ncclSuccess; +} + +static void ncclIbStatsFatalError(struct ncclIbStats* stat){ + __atomic_fetch_add(&stat->fatalErrorCount, 1, __ATOMIC_RELAXED); +} +static void ncclIbQpFatalError(struct ibv_qp* qp) { + ncclIbStatsFatalError((struct ncclIbStats*)qp->qp_context); +} +static void ncclIbCqFatalError(struct ibv_cq* cq) { + ncclIbStatsFatalError((struct ncclIbStats*)cq->cq_context); +} +static void ncclIbDevFatalError(struct ncclIbDev* dev) { + ncclIbStatsFatalError(&dev->stats); +} + static void* ncclIbAsyncThreadMain(void* args) { struct ncclIbDev* dev = (struct ncclIbDev*)args; while (1) { struct ibv_async_event event; if (ncclSuccess != wrap_ibv_get_async_event(dev->context, &event)) { break; } char *str; + struct ibv_cq* cq = event.element.cq; // only valid if CQ error + struct ibv_qp* qp = event.element.qp; // only valid if QP error + struct ibv_srq* srq = event.element.srq; // only valid if SRQ error if (ncclSuccess != wrap_ibv_event_type_str(&str, event.event_type)) { break; } - if (event.event_type != IBV_EVENT_COMM_EST) - WARN("NET/IB : %s:%d Got async event : %s", dev->devName, dev->portNum, str); + switch (event.event_type) { + case IBV_EVENT_DEVICE_FATAL: + // the above is device fatal error + WARN("NET/IB : %s:%d async fatal event: %s", dev->devName, dev->portNum, str); + ncclIbDevFatalError(dev); + break; + case IBV_EVENT_CQ_ERR: + // the above is a CQ fatal error + WARN("NET/IB : %s:%d async fatal event on CQ (%p): %s", dev->devName, dev->portNum, cq, str); + ncclIbCqFatalError(cq); + break; + case IBV_EVENT_QP_FATAL: + case IBV_EVENT_QP_REQ_ERR: + case IBV_EVENT_QP_ACCESS_ERR: + // the above are QP fatal errors + WARN("NET/IB : %s:%d async fatal event on QP (%p): %s", dev->devName, dev->portNum, qp, str); + ncclIbQpFatalError(qp); + break; + case IBV_EVENT_SRQ_ERR: + // SRQ are not used in NCCL + WARN("NET/IB : %s:%d async fatal event on SRQ, unused for now (%p): %s", dev->devName, dev->portNum, srq, str); + break; + case IBV_EVENT_PATH_MIG_ERR: + case IBV_EVENT_PORT_ERR: + case IBV_EVENT_PATH_MIG: + case IBV_EVENT_PORT_ACTIVE: + case IBV_EVENT_SQ_DRAINED: + case IBV_EVENT_LID_CHANGE: + case IBV_EVENT_PKEY_CHANGE: + case IBV_EVENT_SM_CHANGE: + case IBV_EVENT_QP_LAST_WQE_REACHED: + case IBV_EVENT_CLIENT_REREGISTER: + case IBV_EVENT_SRQ_LIMIT_REACHED: + // the above are non-fatal + WARN("NET/IB : %s:%d Got async error event: %s", dev->devName, dev->portNum, str); + break; + case IBV_EVENT_COMM_EST: + break; + default: + WARN("NET/IB : %s:%d unknown event type (%d)", dev->devName, dev->portNum, event.event_type); + break; + } + // acknowledgment needs to happen last to avoid user-after-free if (ncclSuccess != wrap_ibv_ack_async_event(&event)) { break; } } return NULL; @@ -304,9 +386,11 @@ int ncclIbFindMatchingDev(int dev) { ncclResult_t nccl_p2p_ib_init(int *num_devs, ncclIbDev *ncclIbDevs, char *ncclIbIfName, union ncclSocketAddress *ncclIbIfAddr, pthread_t *ncclIbAsyncThread, ncclDebugLogger_t logFunction) { int ncclNIbDevs = *num_devs; - ncclResult_t ret; + ncclResult_t ret = ncclSuccess; pluginLogFunction = logFunction; if (ncclNIbDevs == -1) { + for (int i=0; i< MAX_IB_DEVS; i++) + onces[i].once = PTHREAD_ONCE_INIT; pthread_mutex_lock(&nccl_p2p_lock); wrap_ibv_fork_init(); if (ncclNIbDevs == -1) { @@ -374,11 +458,12 @@ ncclResult_t nccl_p2p_ib_init(int *num_devs, ncclIbDev *ncclIbDevs, char *ncclIb ncclIbDevs[ncclNIbDevs].pdRefs = 0; ncclIbDevs[ncclNIbDevs].pd = NULL; strncpy(ncclIbDevs[ncclNIbDevs].devName, devices[d]->name, MAXNAMESIZE); - NCCLCHECK(nccl_p2p_ib_pci_path(ncclIbDevs, ncclNIbDevs, ncclIbDevs[ncclNIbDevs].devName, &ncclIbDevs[ncclNIbDevs].pciPath, &ncclIbDevs[ncclNIbDevs].realPort)); + NCCLCHECKGOTO(nccl_p2p_ib_pci_path(ncclIbDevs, ncclNIbDevs, ncclIbDevs[ncclNIbDevs].devName, &ncclIbDevs[ncclNIbDevs].pciPath, &ncclIbDevs[ncclNIbDevs].realPort), ret, fail); ncclIbDevs[ncclNIbDevs].maxQp = devAttr.max_qp; ncclIbDevs[ncclNIbDevs].mrCache.capacity = 0; ncclIbDevs[ncclNIbDevs].mrCache.population = 0; ncclIbDevs[ncclNIbDevs].mrCache.slots = NULL; + NCCLCHECK(ncclIbStatsInit(&ncclIbDevs[ncclNIbDevs].stats)); // Enable ADAPTIVE_ROUTING by default on IB networks // But allow it to be overloaded by an env parameter @@ -395,9 +480,9 @@ ncclResult_t nccl_p2p_ib_init(int *num_devs, ncclIbDev *ncclIbDevs, char *ncclIb TRACE(NCCL_NET,"NET/IB: [%d] %s:%s:%d/%s speed=%d context=%p pciPath=%s ar=%d", d, devices[d]->name, devices[d]->dev_name, ncclIbDevs[ncclNIbDevs].portNum, portAttr.link_layer == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE", ncclIbDevs[ncclNIbDevs].speed, context, ncclIbDevs[ncclNIbDevs].pciPath, ncclIbDevs[ncclNIbDevs].ar); if (ncclIbAsyncThread != NULL) { - pthread_create(ncclIbAsyncThread, NULL, ncclIbAsyncThreadMain, ncclIbDevs + ncclNIbDevs); + PTHREADCHECKGOTO(pthread_create(ncclIbAsyncThread, NULL, ncclIbAsyncThreadMain, ncclIbDevs + ncclNIbDevs), "pthread_create", ret, fail); ncclSetThreadName(*ncclIbAsyncThread, "NCCL IbAsync %2d", ncclNIbDevs); - pthread_detach(*ncclIbAsyncThread); // will not be pthread_join()'d + PTHREADCHECKGOTO(pthread_detach(*ncclIbAsyncThread), "pthread_detach", ret, fail); // will not be pthread_join()'d } int mergedDev = ncclNMergedIbDevs; @@ -490,10 +575,11 @@ ncclResult_t nccl_p2p_ib_init(int *num_devs, ncclIbDev *ncclIbDevs, char *ncclIb *num_devs = ncclNMergedIbDevs; pthread_mutex_unlock(&nccl_p2p_lock); } - return ncclSuccess; +exit: + return ret; fail: pthread_mutex_unlock(&nccl_p2p_lock); - return ret; + goto exit; } ncclResult_t nccl_p2p_ib_pci_path(ncclIbDev *devs, int num_devs, char* dev_name, char** path, int* real_port) diff --git a/src/param.c b/src/param.c index ebb8df19..98a6b5e0 100755 --- a/src/param.c +++ b/src/param.c @@ -38,7 +38,7 @@ void setEnvFile(const char* fileName) { while (line[s] != '\0' && line[s] != '=') s++; if (line[s] == '\0') continue; strncpy(envVar, line, MIN(1023,s)); - envVar[s] = '\0'; + envVar[MIN(1023,s)] = '\0'; s++; strncpy(envValue, line+s, 1023); envValue[1023]='\0'; @@ -49,17 +49,28 @@ void setEnvFile(const char* fileName) { fclose(file); } -void initEnv() { +static void initEnvFunc() { char confFilePath[1024]; - const char * userDir = userHomeDir(); - if (userDir) { - sprintf(confFilePath, "%s/.nccl.conf", userDir); + const char* userFile = getenv("NCCL_CONF_FILE"); + if (userFile && strlen(userFile) > 0) { + snprintf(confFilePath, sizeof(confFilePath), "%s", userFile); setEnvFile(confFilePath); + } else { + const char* userDir = userHomeDir(); + if (userDir) { + snprintf(confFilePath, sizeof(confFilePath), "%s/.nccl.conf", userDir); + setEnvFile(confFilePath); + } } - sprintf(confFilePath, "/etc/nccl.conf"); + snprintf(confFilePath, sizeof(confFilePath), "/etc/nccl.conf"); setEnvFile(confFilePath); } +void initEnv() { + static pthread_once_t once = PTHREAD_ONCE_INIT; + pthread_once(&once, initEnvFunc); +} + void ncclLoadParam(char const* env, int64_t deftVal, int64_t uninitialized, int64_t* cache) { static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_lock(&mutex); @@ -81,8 +92,7 @@ void ncclLoadParam(char const* env, int64_t deftVal, int64_t uninitialized, int6 pthread_mutex_unlock(&mutex); } -const char *ncclGetEnv(const char *name) { - static pthread_once_t once = PTHREAD_ONCE_INIT; - pthread_once(&once, initEnv); +const char* ncclGetEnv(const char* name) { + initEnv(); return getenv(name); -} +} \ No newline at end of file diff --git a/src/socket.c b/src/socket.c index 8d746c88..7c4a6fb2 100755 --- a/src/socket.c +++ b/src/socket.c @@ -286,6 +286,7 @@ ncclResult_t ncclSocketGetAddrFromString(union ncclSocketAddress* ua, const char sin6->sin6_scope_id = 0; // should be global scope, set to 0 } else { WARN("Net : unsupported IP family"); + freeaddrinfo(p); return ncclInvalidArgument; } @@ -412,7 +413,7 @@ ncclResult_t ncclSocketGetAddr(struct ncclSocket* sock, union ncclSocketAddress* static ncclResult_t socketTryAccept(struct ncclSocket* sock) { socklen_t socklen = sizeof(union ncclSocketAddress); - sock->fd = accept(sock->acceptFd, &sock->addr.sa, &socklen); + sock->fd = accept(sock->acceptFd, (struct sockaddr*)&sock->addr, &socklen); if (sock->fd != -1) { sock->state = ncclSocketStateAccepted; } else if (errno != EAGAIN && errno != EWOULDBLOCK) { @@ -505,8 +506,9 @@ static ncclResult_t socketPollConnect(struct ncclSocket* sock) { } else if (ret < 0) { WARN("socketPollConnect poll() failed with error %s", strerror(errno)); return ncclRemoteError; - } else { - EQCHECK(ret == 1 && (pfd.revents & POLLOUT), 0); + } else if (ret != 1 || (pfd.revents & POLLOUT) == 0) { + WARN("socketPollConnect poll() returned %d%s", ret, (pfd.revents & POLLOUT) ? "" : ", no POLLOUT events"); + return ncclSystemError;; } /* check socket status */ @@ -738,13 +740,17 @@ ncclResult_t ncclSocketInit(struct ncclSocket* sock, union ncclSocketAddress* ad /* Set socket as non-blocking if async or if we need to be able to abort */ if ((sock->asyncFlag || sock->abortFlag) && sock->fd >= 0) { int flags; - EQCHECKGOTO(flags = fcntl(sock->fd, F_GETFL), -1, ret, fail); - SYSCHECKGOTO(fcntl(sock->fd, F_SETFL, flags | O_NONBLOCK), ret, fail); + SYSCHECKGOTO(flags = fcntl(sock->fd, F_GETFL), "fcntl", ret, fail); + SYSCHECKGOTO(fcntl(sock->fd, F_SETFL, flags | O_NONBLOCK), "fcntl", ret, fail); } exit: return ret; fail: + if (sock->fd != -1) { + close(sock->fd); + sock->fd = -1; + } goto exit; }