Skip to content

Commit

Permalink
Merge pull request #158 from tvegas1/uct_read_plugin
Browse files Browse the repository at this point in the history
UCT plugin: Add read-based implementation
  • Loading branch information
tvegas1 authored May 27, 2024
2 parents 6b8d60f + c0abc27 commit 3fb6c8d
Show file tree
Hide file tree
Showing 9 changed files with 578 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .ci/run_nccl_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ for TEST_EXE in ${NCCL_TEST_EXE[@]}; do
#===================
# Enable ucx_rma tests once this is resolved: https://redmine.mellanox.com/issues/3037941
# for P2P_LAYER in ucx ucx_rma ib
for P2P_LAYER in ucx ib ucx_uct; do
for P2P_LAYER in ib ucx ucx_uct ucx_uct_read; do
MPIRUN_OPTIONS_PLUGIN_P2P_LAYER="-x NCCL_PLUGIN_P2P=${P2P_LAYER}"

#===================
Expand Down
1 change: 1 addition & 0 deletions include/p2p_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ typedef enum nccl_p2p_plugin {
NCCL_P2P_UCX,
NCCL_P2P_UCX_RMA,
NCCL_P2P_UCX_UCT,
NCCL_P2P_UCX_UCT_RD,
NCCL_P2P_LAST
} nccl_p2p_plugin_t;

Expand Down
8 changes: 8 additions & 0 deletions include/ucx_uct_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
#define NCCL_UCT_LISTEN_HANDLE_MAGIC 0x43cf19ed91abdb85
#define NCCL_UCT_REG_ALIGN 4096

typedef enum {
NCCL_UCT_AM_RTR = 14, /* Use particular values */
NCCL_UCT_AM_ATP = 15,
NCCL_UCT_AM_RTS = 16,
NCCL_UCT_AM_ATS = 17
} nccl_uct_am_type_t;

typedef enum {
NCCL_UCT_START = 0,
NCCL_UCT_CONNECT,
Expand Down Expand Up @@ -206,6 +213,7 @@ int nccl_uct_flush_index(nccl_uct_comm_t *base, int *sizes, int n);
ncclResult_t nccl_uct_flush(nccl_uct_comm_t *base_comm, void *data, int size,
nccl_uct_memh_t *uct_memh,
uct_completion_t *completion, void **request);
void nccl_uct_empty_callback(uct_completion_t *comp);

/* NCCL common plugin callbacks */
ncclResult_t nccl_uct_listen(int dev, void *listen_handle, void **listen_comm);
Expand Down
103 changes: 103 additions & 0 deletions include/ucx_uct_ring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*************************************************************************
* Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/

#ifndef NCCL_UCX_UCT_RING_H_
#define NCCL_UCX_UCT_RING_H_

#include "nccl.h"
#include <assert.h>

#define NCCL_UCT_RING_SIZE (1 << 7)
#define NCCL_UCT_RING_MASK (NCCL_UCT_RING_SIZE - 1)

typedef struct nccl_uct_ring {
unsigned first;
unsigned last;
unsigned size;
unsigned entry_size;
int tag[NCCL_UCT_RING_SIZE];
void *entry;
} nccl_uct_ring_t;

static inline ncclResult_t nccl_uct_ring_init(nccl_uct_ring_t *ring,
unsigned entry_size) {
int i;

ring->first = 0;
ring->last = 0;
ring->entry_size = entry_size;
ring->entry = malloc(entry_size * NCCL_UCT_RING_SIZE);
if (ring->entry == NULL) {
free(ring->entry);
return ncclSystemError;
}

for (i = 0; i < NCCL_UCT_RING_SIZE; i++) {
ring->tag[i] = INT_MAX;
}
return ncclSuccess;
}

static inline void nccl_uct_ring_deinit(nccl_uct_ring_t *ring) {
free(ring->entry);
}

static inline void *nccl_uct_ring_get_entry(nccl_uct_ring_t *ring, unsigned i) {
return (uint8_t*)ring->entry + (ring->entry_size * (i & NCCL_UCT_RING_MASK));
}

static inline void nccl_uct_ring_append(nccl_uct_ring_t *ring, int tag,
void *data, size_t len) {
int j = ring->last & NCCL_UCT_RING_MASK;

ring->last++;

assert((ring->last & NCCL_UCT_RING_MASK) !=
(ring->first & NCCL_UCT_RING_MASK));
assert(ring->tag[j] == INT_MAX);
assert(len == ring->entry_size);

ring->tag[j] = tag;
memcpy(nccl_uct_ring_get_entry(ring, j), data, len);
}

static inline int nccl_uct_ring_is_empty(const nccl_uct_ring_t *ring) {
return ring->first == ring->last;
}

static inline void nccl_uct_ring_consume(nccl_uct_ring_t *ring, unsigned i) {
unsigned j = i & NCCL_UCT_RING_MASK;

assert(ring->tag[j] != INT_MAX);
ring->tag[j] = INT_MAX;

/* Cleanup upon tag hit */
if (i == ring->first) {
for (; i != ring->last; i++) {
j = i & NCCL_UCT_RING_MASK;
if (ring->tag[j] != INT_MAX) {
break;
}
ring->first = i + 1;
}
}
}

static inline unsigned nccl_uct_ring_find(nccl_uct_ring_t *ring, int tag) {
unsigned i;

assert(tag != INT_MAX);

for (i = ring->first; i != ring->last; i++) {
if (ring->tag[i & NCCL_UCT_RING_MASK] == tag) {
return i;
}
}

return ring->last;
}

#endif /* NCCL_UCX_UCT_RING_H_ */
3 changes: 2 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ libnccl_net_la_SOURCES += \
ucx_plugin.c \
ucx_rma_plugin.c \
ucx_uct_lib.c \
ucx_uct_plugin.c
ucx_uct_plugin.c \
ucx_uct_rd_plugin.c
endif

if HAVE_SHARP_PLUGIN
Expand Down
20 changes: 18 additions & 2 deletions src/p2p_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ extern ncclNet_v8_t ucxUctPlugin_v8;
extern ncclNet_v7_t ucxUctPlugin_v7;
extern ncclNet_v6_t ucxUctPlugin_v6;
extern ncclNet_v5_t ucxUctPlugin_v5;
extern ncclNet_v8_t ucxUctRdPlugin_v8;
extern ncclNet_v7_t ucxUctRdPlugin_v7;
extern ncclNet_v6_t ucxUctRdPlugin_v6;
extern ncclNet_v5_t ucxUctRdPlugin_v5;
#endif

extern ncclNet_v8_t ibPlugin_v8;
Expand Down Expand Up @@ -77,6 +81,10 @@ ncclNet_v5_t ncclNetPlugin_v5 = {

static nccl_p2p_plugin_t p2p_plugin = NCCL_P2P_LAST;

static int nccl_p2p_is_uct_plugin(nccl_p2p_plugin_t plugin) {
return (plugin == NCCL_P2P_UCX_UCT) || (plugin == NCCL_P2P_UCX_UCT_RD);
}

static void pluginSetup()
{
p2p_plugin = NCCL_P2P_IB;
Expand All @@ -92,6 +100,7 @@ static void pluginSetup()
else if (!strcasecmp(p2p_layer, "ucx")) p2p_plugin = NCCL_P2P_UCX;
else if (!strcasecmp(p2p_layer, "ucx_rma")) p2p_plugin = NCCL_P2P_UCX_RMA;
else if (!strcasecmp(p2p_layer, "ucx_uct")) p2p_plugin = NCCL_P2P_UCX_UCT;
else if (!strcasecmp(p2p_layer, "ucx_uct_read")) p2p_plugin = NCCL_P2P_UCX_UCT_RD;
#endif
else {
WARN("Invalid value %s for NCCL_PLUGIN_P2P, using default", p2p_layer);
Expand All @@ -117,6 +126,12 @@ static void pluginSetup()
ncclNetPlugin_v6 = ucxUctPlugin_v6;
ncclNetPlugin_v5 = ucxUctPlugin_v5;
break;
case NCCL_P2P_UCX_UCT_RD:
ncclNetPlugin_v8 = ucxUctRdPlugin_v8;
ncclNetPlugin_v7 = ucxUctRdPlugin_v7;
ncclNetPlugin_v6 = ucxUctRdPlugin_v6;
ncclNetPlugin_v5 = ucxUctRdPlugin_v5;
break;
#endif
default:
ncclNetPlugin_v8 = ibPlugin_v8;
Expand Down Expand Up @@ -221,7 +236,8 @@ ncclResult_t nccl_p2p_ib_get_properties(ncclIbDev *devs, int dev, ncclNetPropert
INFO(NCCL_NET,"NET/IB : GPU Direct RDMA (nvidia-peermem) enabled for HCA %d '%s", dev, devs[dev].devName);
}
props->regIsGlobal = 1;
if (((p2p_plugin == NCCL_P2P_UCX_UCT) || (p2p_plugin == NCCL_P2P_IB)) && nccl_p2p_dmabuf_support(dev) == ncclSuccess) {
if ((nccl_p2p_is_uct_plugin(p2p_plugin) || (p2p_plugin == NCCL_P2P_IB)) &&
nccl_p2p_dmabuf_support(dev) == ncclSuccess) {
props->ptrSupport |= NCCL_PTR_DMABUF; // GDR support via DMA-BUF
INFO(NCCL_NET,"NET/IB : GPU Direct RDMA (DMABUF) enabled for HCA %d '%s", dev, devs[dev].devName);
}
Expand All @@ -231,7 +247,7 @@ ncclResult_t nccl_p2p_ib_get_properties(ncclIbDev *devs, int dev, ncclNetPropert
props->maxComms = ibDev->maxQp;

if (p2p_plugin == NCCL_P2P_IB || p2p_plugin == NCCL_P2P_UCX ||
p2p_plugin == NCCL_P2P_UCX_UCT) {
nccl_p2p_is_uct_plugin(p2p_plugin)) {
props->maxRecvs = NCCL_NET_IB_MAX_RECVS;
} else {
props->maxRecvs = 1;
Expand Down
14 changes: 13 additions & 1 deletion src/ucx_uct_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ nccl_uct_context_t context = {
.dev_count = -1
};

void nccl_uct_empty_callback(uct_completion_t *comp) {
assert(comp->count == 0);
}

ncclResult_t nccl_uct_iface_set_handler(nccl_uct_iface_t *uct_iface,
int id, uct_am_callback_t callback) {
UCXCHECK(uct_iface_set_am_handler(uct_iface->iface, id, callback, NULL, 0),
Expand Down Expand Up @@ -42,6 +46,14 @@ static uct_iface_h nccl_uct_resource_iface_open(uct_worker_h worker,
UCXCHECK(uct_md_iface_config_read(md, tl->tl_name, NULL, NULL, &config),
return NULL, "read MD iface config for TL '%s'", tl->tl_name);

status = uct_config_modify(config, "IB_TX_INLINE_RESP", "0");
if (status != UCS_OK) {
WARN("Failed to modify MD configuration for '%s', error %s",
tl->tl_name, ucs_status_string(status));
uct_config_release(config);
return NULL;
}

params.field_mask =
UCT_IFACE_PARAM_FIELD_OPEN_MODE | UCT_IFACE_PARAM_FIELD_DEVICE |
UCT_IFACE_PARAM_FIELD_STATS_ROOT | UCT_IFACE_PARAM_FIELD_RX_HEADROOM;
Expand Down Expand Up @@ -739,7 +751,7 @@ ncclResult_t nccl_uct_flush(nccl_uct_comm_t *base_comm, void *data, int size,
uct_iov_t iov;

iov.buffer = base_comm->gpu_flush.mem;
iov.length = base_comm->uct_iface->min_get_zcopy;
iov.length = base_comm->uct_iface->min_get_zcopy? : 1;
iov.memh = base_comm->gpu_flush.memh;
iov.stride = 0;
iov.count = 1;
Expand Down
9 changes: 0 additions & 9 deletions src/ucx_uct_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@

#include "ucx_uct_lib.h"

typedef enum {
NCCL_UCT_AM_RTR = 14, /* Use particular values */
NCCL_UCT_AM_ATP = 15
} nccl_uct_am_type_t;

typedef enum {
NCCL_UCT_REQ_IRECV = -1,
NCCL_UCT_REQ_IFLUSH = -2
Expand Down Expand Up @@ -137,10 +132,6 @@ static void nccl_uct_rdesc_set(nccl_uct_rdesc_t *rdesc, uint64_t id, int n,
}
}

static void nccl_uct_empty_callback(uct_completion_t *comp) {
assert(comp->count == 0);
}

static nccl_uct_req_t *nccl_uct_rdesc_get_req(nccl_uct_rdesc_t *rdesc, int i,
int size) {
nccl_uct_req_t *req;
Expand Down
Loading

0 comments on commit 3fb6c8d

Please sign in to comment.