Skip to content

Commit

Permalink
Narrow changes
Browse files Browse the repository at this point in the history
  • Loading branch information
tvegas1 committed Nov 7, 2023
1 parent 8b1bd3b commit 08dd346
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 20 deletions.
4 changes: 2 additions & 2 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ AC_ARG_ENABLE([debug],AS_HELP_STRING([--enable-debug], [Enable extra debugging c

if test $enable_debug = yes; then
AC_DEFINE([ENABLE_DEBUG], [1], [Enable debugging code])
CFLAGS="$CFLAGS -O0 -g3 -Wall -Werror"
CFLAGS="$CFLAGS -O0 -g3 -Werror"
else
CFLAGS="$CFLAGS -O3 -DNDEBUG -Wall -Werror"
CFLAGS="$CFLAGS -O3 -DNDEBUG -Werror"
fi

#check for cuda
Expand Down
2 changes: 2 additions & 0 deletions include/p2p_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#include <stdint.h>
#include <unistd.h>
#define ENABLE_TIMER 0
#include "timer.h"
#include <assert.h>

#include "nccl.h"
Expand Down
6 changes: 3 additions & 3 deletions include/timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ static double startTimes[8];
printf("\n"); \
} while (0);
#else
#define TIME_START(index) do {} while(0);
#define TIME_STOP(index) do {} while(0);
#define TIME_CANCEL(index) do {} while(0);
#define TIME_START(index) while(0);
#define TIME_STOP(index) while(0);
#define TIME_CANCEL(index) while(0);
#define TIME_PRINT(name)
#endif
#endif
24 changes: 22 additions & 2 deletions src/ib_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
#include <sys/types.h>
#include <sys/time.h>
#include <unistd.h>

#define ENABLE_TIMER 0
#include "timer.h"

#include "p2p_plugin.h"
#include "core.h"
#include "socket.h"
Expand Down Expand Up @@ -46,11 +46,25 @@ NCCL_PARAM(IbTc, "IB_TC", 0);
NCCL_PARAM(IbArThreshold, "IB_AR_THRESHOLD", 8192);
NCCL_PARAM(IbPciRelaxedOrdering, "IB_PCI_RELAXED_ORDERING", 2);

static pthread_t ncclIbAsyncThread;
pthread_t ncclIbAsyncThread;
static void* ncclIbAsyncThreadMain(void* args) {
struct ibv_context* context = (struct ibv_context*)args;
while (1) {
struct ibv_async_event event;
if (ncclSuccess != wrap_ibv_get_async_event(context, &event)) { break; }
char *str;
if (ncclSuccess != wrap_ibv_event_type_str(&str, event.event_type)) { break; }
if (event.event_type != IBV_EVENT_COMM_EST)
WARN("NET/IB : Got async event : %s", str);
if (ncclSuccess != wrap_ibv_ack_async_event(&event)) { break; }
}
return NULL;
}

// Determine whether RELAXED_ORDERING is enabled and possible
int ncclIbRelaxedOrderingCapable(void) {
int roMode = ncclParamIbPciRelaxedOrdering();
ncclResult_t r = ncclInternalError;
if (roMode == 1 || roMode == 2) {
if (!IBV_ACCESS_RELAXED_ORDERING) {
if(roMode == 1)
Expand Down Expand Up @@ -96,6 +110,11 @@ ncclResult_t ncclIbGetProperties_v6(int dev, ncclNetProperties_v6_t* props_v6)
return ncclSuccess;
}

static ncclResult_t GetSocketAddr(union ncclSocketAddress* addr) {
memcpy(addr, &ncclIbIfAddr, sizeof(*addr));
return ncclSuccess;
}

#define NCCL_IB_MAX_QPS 128

typedef struct ncclIbQpInfo {
Expand Down Expand Up @@ -346,6 +365,7 @@ ncclResult_t ncclIbListen(int dev, void* opaqueHandle, void** listenComm) {

ncclResult_t ncclIbConnect(int dev, void* opaqueHandle, void** sendComm, ncclNetDeviceHandle_t** sendDevComm) {
struct ncclIbHandle* handle = (struct ncclIbHandle*) opaqueHandle;
enum ncclSocketState conState;
struct ncclIbCommStage* stage = &handle->stage;
struct ncclIbSendComm* comm = (struct ncclIbSendComm*)stage->comm;
struct ncclIbQpInfo remQpInfo;
Expand Down
12 changes: 7 additions & 5 deletions src/p2p_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ static nccl_p2p_plugin_t p2p_plugin = NCCL_P2P_LAST;

static void pluginSetup()
{

p2p_plugin = NCCL_P2P_IB;
const char *plugin_path = get_plugin_lib_path();
if (plugin_path != NULL) {
Expand All @@ -83,6 +84,11 @@ static void pluginSetup()
}
}
switch (p2p_plugin) {
case NCCL_P2P_IB:
ncclNetPlugin_v7 = ibPlugin_v7;
ncclNetPlugin_v6 = ibPlugin_v6;
ncclNetPlugin_v5 = ibPlugin_v5;
break;
#ifdef HAVE_UCX_PLUGIN
case NCCL_P2P_UCX:
ncclNetPlugin_v7 = ucxPlugin_v7;
Expand All @@ -95,11 +101,6 @@ static void pluginSetup()
ncclNetPlugin_v5 = ucxRmaPlugin_v5;
break;
#endif
default:
ncclNetPlugin_v7 = ibPlugin_v7;
ncclNetPlugin_v6 = ibPlugin_v6;
ncclNetPlugin_v5 = ibPlugin_v5;
break;
}

}
Expand Down Expand Up @@ -412,3 +413,4 @@ nccl_p2p_plugin_t nccl_p2p_get_plugin_type()

struct ncclIbDev ncclIbDevs[MAX_IB_DEVS];
struct ncclIbDev userIbDevs[MAX_IB_DEVS];

2 changes: 1 addition & 1 deletion src/sharp_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ int ncclSharpAllGather(void *context, void *buf, int len) {
if (rrequest == NULL) NCCLCHECK(ncclNetPlugin_v7.irecv(cComm->recvComm, 1, &rbuf, &len, &tag, &rMhandle, &rrequest));
}
while (srequest || rrequest) {
int done = 0; /* silent uninitialized false positive */
int done;
if (rrequest) NCCLCHECK(ncclNetPlugin_v7.test(rrequest, &done, NULL));
if (done) rrequest = NULL;
if (srequest) NCCLCHECK(ncclNetPlugin_v7.test(srequest, &done, NULL));
Expand Down
15 changes: 8 additions & 7 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ int matchIfList(const char* string, int port, struct netIf* ifList, int listSize
return 0;
}

static int readFileVarArg(char *buffer, size_t max, const char *filename_fmt,
va_list ap) {
static size_t readFileVarArg(char *buffer, size_t max,
const char *filename_fmt, va_list ap)
{
char filename[PATH_MAX];
ssize_t read_bytes;
int fd;
Expand All @@ -125,30 +126,30 @@ static int readFileVarArg(char *buffer, size_t max, const char *filename_fmt,

read_bytes = read(fd, buffer, max - 1);
if (read_bytes < 0) {
close(fd);
return -1;
}

if (read_bytes < max) {
buffer[read_bytes] = '\0';
}

out_close:
close(fd);
return 0;
}

int readFileNumber(long *value, const char *filename_fmt, ...)
{
char buffer[64], *tail;
int ret;
ssize_t read_bytes;
va_list ap;
long n;

va_start(ap, filename_fmt);
ret = readFileVarArg(buffer, sizeof(buffer) - 1, filename_fmt, ap);
read_bytes = readFileVarArg(buffer, sizeof(buffer) - 1,
filename_fmt, ap);
va_end(ap);

if (ret < 0) {
if (read_bytes < 0) {
/* read error */
return -1;
}
Expand Down

0 comments on commit 08dd346

Please sign in to comment.