Skip to content

Commit

Permalink
v2.24 update
Browse files Browse the repository at this point in the history
  • Loading branch information
bureddy committed Oct 18, 2024
1 parent 2a632df commit dd45372
Show file tree
Hide file tree
Showing 19 changed files with 1,099 additions and 468 deletions.
2 changes: 2 additions & 0 deletions include/debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ extern ncclDebugLogger_t pluginLogFunction;

void ncclSetThreadName(pthread_t thread, const char *fmt, ...);

void ncclResetDebugInit();

#endif
13 changes: 13 additions & 0 deletions include/ibvwrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#define NCCL_IBVWRAP_H_
#include "config.h"
#include "core.h"
#include "utils.h"
#include <arpa/inet.h>
#include <netinet/in.h>
#include <infiniband/verbs.h>

#if !HAVE_DECL_IBV_ACCESS_RELAXED_ORDERING
Expand Down Expand Up @@ -82,4 +85,14 @@ ncclResult_t wrap_ibv_post_send(struct ibv_qp *qp, struct ibv_send_wr *wr, struc
ncclResult_t wrap_ibv_post_recv(struct ibv_qp *qp, struct ibv_recv_wr *wr, struct ibv_recv_wr **bad_wr);
ncclResult_t wrap_ibv_event_type_str(char **ret, enum ibv_event_type event);

// converts a GID into a readable string. On success, returns a non-null pointer to gidStr.
// NULL is returned if there was an error, with errno set to indicate the error.
// errno = ENOSPC if the converted string would exceed strLen.
static inline const char* ibvGetGidStr(union ibv_gid* gid, char* gidStr, size_t strLen) {
// GID is a 16B handle, to convert it to a readable form, we use inet_ntop
// sizeof(ibv_gid) == sizeof(struct in6_addr), so using AF_INET6
NCCL_STATIC_ASSERT(sizeof(union ibv_gid) == sizeof(struct in6_addr), "the sizeof struct ibv_gid must be the size of struct in6_addr");
return inet_ntop(AF_INET6, gid->raw, gidStr, strLen);
}

#endif //End include guard
16 changes: 11 additions & 5 deletions include/nccl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#if CUDART_VERSION >= 11000
#include <cuda_bf16.h>
#endif
#if CUDART_VERSION >= 11080
#include <cuda_fp8.h>
#endif

#define NCCL_MAJOR 2
#define NCCL_MINOR 20
Expand Down Expand Up @@ -146,6 +149,11 @@ const char* pncclGetErrorString(ncclResult_t result);
const char* ncclGetLastError(ncclComm_t comm);
const char* pncclGetLastError(ncclComm_t comm);

/* Reload environment variables that determine logging. */
void ncclResetDebugInit();
void pncclResetDebugInit();


/* Checks whether the comm has encountered any asynchronous errors */
ncclResult_t ncclCommGetAsyncError(ncclComm_t comm, ncclResult_t *asyncError);
ncclResult_t pncclCommGetAsyncError(ncclComm_t comm, ncclResult_t *asyncError);
Expand Down Expand Up @@ -201,12 +209,10 @@ typedef enum { ncclInt8 = 0, ncclChar = 0,
ncclFloat16 = 6, ncclHalf = 6,
ncclFloat32 = 7, ncclFloat = 7,
ncclFloat64 = 8, ncclDouble = 8,
#if CUDART_VERSION >= 11000
ncclBfloat16 = 9,
ncclNumTypes = 10
#else
ncclNumTypes = 9
#endif
ncclFloat8e4m3 = 10,
ncclFloat8e5m2 = 11,
ncclNumTypes = 12
} ncclDataType_t;

/* ncclScalarResidence_t: Location and dereferencing logic for scalar arguments. */
Expand Down
5 changes: 5 additions & 0 deletions include/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
#include <stdlib.h>

#define NCCL_NET_HANDLE_MAXSIZE 128
//Maximum value NCCL can accept for maxP2pBytes and maxCollBytes net properties
#define NCCL_MAX_NET_SIZE_BYTES (1*1024*1024*1024*1024L)
#define NCCL_NET_OPTIONAL_RECV_COMPLETION 0x1


#define NCCL_PTR_HOST 0x1
#define NCCL_PTR_CUDA 0x2
Expand All @@ -22,6 +26,7 @@ typedef enum {NCCL_INIT=1, NCCL_COLL=2, NCCL_P2P=4, NCCL_SHM=8, NCCL_NET=16, NCC

typedef void (*ncclDebugLogger_t)(ncclDebugLogLevel level, unsigned long flags, const char *file, int line, const char *fmt, ...);

#include "net_v9.h"
#include "net_v8.h"
#include "net_v7.h"
#include "net_v6.h"
Expand Down
3 changes: 2 additions & 1 deletion include/net_device.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ typedef struct {
} ncclNetDeviceHandle_v7_t;

typedef ncclNetDeviceHandle_v7_t ncclNetDeviceHandle_v8_t;
typedef ncclNetDeviceHandle_v8_t ncclNetDeviceHandle_t;
typedef ncclNetDeviceHandle_v8_t ncclNetDeviceHandle_v9_t;
typedef ncclNetDeviceHandle_v9_t ncclNetDeviceHandle_t;

#endif
2 changes: 0 additions & 2 deletions include/net_v8.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ typedef struct {
int netDeviceVersion; // Version number for network offload
} ncclNetProperties_v8_t;

typedef ncclNetProperties_v8_t ncclNetProperties_t;

typedef struct {
// Name of the network (mainly for logs)
const char* name;
Expand Down
25 changes: 15 additions & 10 deletions include/p2p_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,13 @@ struct ncclIbMrCache {
int capacity, population;
};

#define NCCL_IB_MAX_DEVS_PER_NIC 2
#define NCCL_IB_MAX_DEVS_PER_NIC 4
#define MAX_MERGED_DEV_NAME (MAXNAMESIZE*NCCL_IB_MAX_DEVS_PER_NIC)+NCCL_IB_MAX_DEVS_PER_NIC
struct ncclIbMergedDev {
int ndevs;
int devs[NCCL_IB_MAX_DEVS_PER_NIC]; // Points to an index in ncclIbDevs
typedef struct ncclIbMergedDev {
ncclNetVDeviceProps_t vProps;
int speed;
char devName[MAX_MERGED_DEV_NAME]; // Up to NCCL_IB_MAX_DEVS_PER_NIC * name size, and a character for each '+'
int dmaBufSupported; // 0 = uninit, 1 = yes, -1 = no
} __attribute__((aligned(64)));
} __attribute__((aligned(64))) ncclIbMergedDev;

struct ncclIbStats {
int fatalErrorCount;
Expand Down Expand Up @@ -108,17 +106,21 @@ typedef struct ncclIbDev {
struct ibv_pd* pd;
char devName[MAXNAMESIZE];
char *pciPath;
char* virtualPciPath;
int realPort;
int maxQp;
float latency;
struct ncclIbMrCache mrCache;
int ar; // ADAPTIVE_ROUTING
struct ibv_port_attr portAttr;
struct ncclIbStats stats;
int dmaBufSupported;
} __attribute__((aligned(64))) ncclIbDev;


#define MAX_IB_DEVS 32
extern struct ncclIbMergedDev ncclIbMergedDevs[MAX_IB_DEVS];
#define MAX_IB_DEVS 32
#define MAX_IB_VDEVS MAX_IB_DEVS*8
extern struct ncclIbMergedDev ncclIbMergedDevs[MAX_IB_VDEVS];
extern struct ncclIbDev ncclIbDevs[MAX_IB_DEVS];
/* Detect whether GDR can work on a given NIC with the current CUDA device
* Returns :
Expand All @@ -130,9 +132,10 @@ ncclResult_t nccl_p2p_dmabuf_support(int dev);

ncclResult_t nccl_p2p_ib_pci_path(ncclIbDev *devs, int num_devs, char* dev_name, char** path, int* real_port);

ncclResult_t nccl_p2p_ib_get_properties(ncclIbDev *devs, int dev, ncclNetProperties_t* props);
ncclResult_t nccl_p2p_ib_get_properties(ncclIbDev *devs, int ncclNMergedIbDevs, int dev, ncclNetProperties_t* props);

ncclResult_t nccl_p2p_ib_init(int *num_devs, ncclIbDev *ncclIbDevs, char *ncclIbIfName, union ncclSocketAddress *ncclIbIfAddr, pthread_t *ncclIbAsyncThread, ncclDebugLogger_t logFunction);
ncclResult_t nccl_p2p_ib_init(int *nDevs, int *nmDevs, ncclIbDev *ncclIbDevs, char *ncclIbIfName, union ncclSocketAddress *ncclIbIfAddr,
pthread_t *ncclIbAsyncThread, ncclDebugLogger_t logFunction, int disableMergeDevices);

/* Convert value returtned by ibv_query_port to actual link width */
int nccl_p2p_ib_width(int width);
Expand All @@ -152,4 +155,6 @@ nccl_p2p_plugin_t nccl_p2p_get_plugin_type();

ncclResult_t ncclIbStatsInit(struct ncclIbStats* stat);

ncclResult_t ncclIbMakeVDeviceInternal(int* d, ncclNetVDeviceProps_t* props, int nDevs, int *nmDevs, int disanleMergeDevices);

#endif
27 changes: 14 additions & 13 deletions include/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

#define MAX_IFS 16
#define MAX_IF_NAME_SIZE 16
#define SLEEP_INT 1000 // connection retry sleep interval in usec
#define RETRY_REFUSED_TIMES 2e4 // connection refused retry times before reporting a timeout (20 sec)
#define RETRY_TIMEDOUT_TIMES 3 // connection timed out retry times (each one can take 20s)
#define SOCKET_NAME_MAXLEN (NI_MAXHOST+NI_MAXSERV)
#define NCCL_SOCKET_MAGIC 0x564ab9f2fc4b9d6cULL

Expand All @@ -41,40 +38,44 @@ enum ncclSocketState {
ncclSocketStateConnectPolling = 5,
ncclSocketStateConnected = 6,
ncclSocketStateReady = 7,
ncclSocketStateClosed = 8,
ncclSocketStateError = 9,
ncclSocketStateNum = 10
ncclSocketStateTerminating = 8,
ncclSocketStateClosed = 9,
ncclSocketStateError = 10,
ncclSocketStateNum = 11

};

enum ncclSocketType {
ncclSocketTypeUnknown = 0,
ncclSocketTypeBootstrap = 1,
ncclSocketTypeProxy = 2,
ncclSocketTypeNetSocket = 3,
ncclSocketTypeNetIb = 4
ncclSocketTypeNetIb = 4,
ncclSocketTypeRasNetwork = 5
};

struct ncclSocket {
int fd;
int acceptFd;
int timedOutRetries;
int refusedRetries;
int errorRetries;
union ncclSocketAddress addr;
volatile uint32_t* abortFlag;
int asyncFlag;
enum ncclSocketState state;
int salen;
uint64_t magic;
enum ncclSocketType type;
int customRetry;
int finalizeCounter; // Used to keep track of initial handshake for async sockets.
char finalizeBuffer[sizeof(uint64_t)]; // Used to keep track of initial handshake for async sockets.
};

const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf, const int numericHostForm);
const char *ncclSocketToString(const union ncclSocketAddress *addr, char *buf, const int numericHostForm);
ncclResult_t ncclSocketGetAddrFromString(union ncclSocketAddress* ua, const char* ip_port_pair);
int ncclFindInterfaceMatchSubnet(char* ifNames, union ncclSocketAddress* localAddrs, union ncclSocketAddress* remoteAddr, int ifNameMaxSize, int maxIfs);
int ncclFindInterfaces(char* ifNames, union ncclSocketAddress *ifAddrs, int ifNameMaxSize, int maxIfs);

// Initialize a socket
ncclResult_t ncclSocketInit(struct ncclSocket* sock, union ncclSocketAddress* addr, uint64_t magic, enum ncclSocketType type, volatile uint32_t* abortFlag, int asyncFlag);
ncclResult_t ncclSocketInit(struct ncclSocket* sock, const union ncclSocketAddress* addr, uint64_t magic, enum ncclSocketType type, volatile uint32_t* abortFlag, int asyncFlag, int customRetry);
// Create a listening socket. sock->addr can be pre-filled with IP & port info. sock->fd is set after a successful call
ncclResult_t ncclSocketListen(struct ncclSocket* sock);
ncclResult_t ncclSocketGetAddr(struct ncclSocket* sock, union ncclSocketAddress* addr);
Expand All @@ -90,7 +91,7 @@ ncclResult_t ncclSocketSetFd(int fd, struct ncclSocket* sock);
#define NCCL_SOCKET_SEND 0
#define NCCL_SOCKET_RECV 1

ncclResult_t ncclSocketProgress(int op, struct ncclSocket* sock, void* ptr, int size, int* offset);
ncclResult_t ncclSocketProgress(int op, struct ncclSocket* sock, void* ptr, int size, int* offset, int* closed);
ncclResult_t ncclSocketWait(int op, struct ncclSocket* sock, void* ptr, int size, int* offset);
ncclResult_t ncclSocketSend(struct ncclSocket* sock, void* ptr, int size);
ncclResult_t ncclSocketRecv(struct ncclSocket* sock, void* ptr, int size);
Expand Down
29 changes: 20 additions & 9 deletions include/ucx_uct_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ typedef struct nccl_uct_context {

/* IB devices available */
int dev_count;
int merge_dev_count;

/* Use by common code to setup communicators */
struct nccl_uct_ops {
Expand Down Expand Up @@ -230,6 +231,8 @@ ncclResult_t nccl_uct_reg_mr(void *reg_comm, void *data, size_t size, int type,
ncclResult_t nccl_uct_dereg_mr(void *dereg_comm, void *mhandle);

/* Compatibility callback */
ncclResult_t nccl_uct_get_properties_v8(int dev,
ncclNetProperties_v8_t *props_v8);
ncclResult_t nccl_uct_get_properties_v7(int dev,
ncclNetProperties_v7_t *props_v7);
ncclResult_t nccl_uct_reg_mr_v7(void *comm, void *data, int size, int type,
Expand All @@ -242,7 +245,8 @@ ncclResult_t nccl_uct_get_properties(int dev, ncclNetProperties_t *props);


#define NCCL_UCT_PLUGIN_BASE(plugin_name, prefix, get_properties_func, \
connect_func, accept_func, reg_mr_func) \
connect_func, accept_func, reg_mr_func, \
isend_func, irecv_func) \
{ \
.name = plugin_name, \
.init = prefix##_init, \
Expand All @@ -254,27 +258,34 @@ ncclResult_t nccl_uct_get_properties(int dev, ncclNetProperties_t *props);
.regMr = reg_mr_func, \
.regMrDmaBuf = nccl_uct_reg_mr_dmabuf, \
.deregMr = nccl_uct_dereg_mr, \
.isend = prefix##_isend, \
.irecv = prefix##_irecv, \
.isend = prefix##_##isend_func, \
.irecv = prefix##_##irecv_func, \
.iflush = prefix##_iflush, \
.test = prefix##_test, \
.closeSend = prefix##_close, \
.closeRecv = prefix##_close, \
.closeListen = nccl_uct_close_listen \
}

#define NCCL_UCT_PLUGIN_V8(plugin_name, prefix) \
#define NCCL_UCT_PLUGIN_V9(plugin_name, prefix) \
NCCL_UCT_PLUGIN_BASE(plugin_name, prefix, nccl_uct_get_properties, \
nccl_uct_connect, nccl_uct_accept, nccl_uct_reg_mr)
nccl_uct_connect, nccl_uct_accept, nccl_uct_reg_mr, \
isend, irecv)

#define NCCL_UCT_PLUGIN_V8(plugin_name, prefix) \
NCCL_UCT_PLUGIN_BASE(plugin_name, prefix, nccl_uct_get_properties_v8, \
nccl_uct_connect, nccl_uct_accept, nccl_uct_reg_mr, \
isend_v8, irecv_v8)

#define NCCL_UCT_PLUGIN_V7(plugin_name, prefix) \
NCCL_UCT_PLUGIN_BASE(plugin_name, prefix, nccl_uct_get_properties_v7, \
nccl_uct_connect, nccl_uct_accept, nccl_uct_reg_mr_v7)
nccl_uct_connect, nccl_uct_accept, nccl_uct_reg_mr_v7, \
isend_v8, irecv_v8)

#define NCCL_UCT_PLUGIN_V6(plugin_name, prefix) \
NCCL_UCT_PLUGIN_BASE(plugin_name, prefix, nccl_uct_get_properties_v6, \
nccl_uct_connect_v6, nccl_uct_accept_v6, \
nccl_uct_reg_mr_v7)
nccl_uct_reg_mr_v7, isend_v8, irecv_v8)

#define NCCL_UCT_PLUGIN_V5(plugin_name, prefix) \
{ \
Expand All @@ -287,8 +298,8 @@ ncclResult_t nccl_uct_get_properties(int dev, ncclNetProperties_t *props);
.accept = nccl_uct_accept_v6, \
.regMr = nccl_uct_reg_mr_v7, \
.deregMr = nccl_uct_dereg_mr, \
.isend = prefix##_isend, \
.irecv = prefix##_irecv, \
.isend = prefix##_isend_v8, \
.irecv = prefix##_irecv_v8, \
.iflush = prefix##_iflush, \
.test = prefix##_test, \
.closeSend = prefix##_close, \
Expand Down
Loading

0 comments on commit dd45372

Please sign in to comment.