Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT plugin: Add read-based implementation #158

Merged
merged 1 commit into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/run_nccl_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ for TEST_EXE in ${NCCL_TEST_EXE[@]}; do
#===================
# Enable ucx_rma tests once this is resolved: https://redmine.mellanox.com/issues/3037941
# for P2P_LAYER in ucx ucx_rma ib
for P2P_LAYER in ucx ib ucx_uct; do
for P2P_LAYER in ib ucx ucx_uct ucx_uct_read; do
MPIRUN_OPTIONS_PLUGIN_P2P_LAYER="-x NCCL_PLUGIN_P2P=${P2P_LAYER}"

#===================
Expand Down
1 change: 1 addition & 0 deletions include/p2p_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ typedef enum nccl_p2p_plugin {
NCCL_P2P_UCX,
NCCL_P2P_UCX_RMA,
NCCL_P2P_UCX_UCT,
NCCL_P2P_UCX_UCT_RD,
NCCL_P2P_LAST
} nccl_p2p_plugin_t;

Expand Down
8 changes: 8 additions & 0 deletions include/ucx_uct_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
#define NCCL_UCT_LISTEN_HANDLE_MAGIC 0x43cf19ed91abdb85
#define NCCL_UCT_REG_ALIGN 4096

typedef enum {
NCCL_UCT_AM_RTR = 14, /* Use particular values */
NCCL_UCT_AM_ATP = 15,
NCCL_UCT_AM_RTS = 16,
NCCL_UCT_AM_ATS = 17
} nccl_uct_am_type_t;

typedef enum {
NCCL_UCT_START = 0,
NCCL_UCT_CONNECT,
Expand Down Expand Up @@ -206,6 +213,7 @@ int nccl_uct_flush_index(nccl_uct_comm_t *base, int *sizes, int n);
ncclResult_t nccl_uct_flush(nccl_uct_comm_t *base_comm, void *data, int size,
nccl_uct_memh_t *uct_memh,
uct_completion_t *completion, void **request);
void nccl_uct_empty_callback(uct_completion_t *comp);

/* NCCL common plugin callbacks */
ncclResult_t nccl_uct_listen(int dev, void *listen_handle, void **listen_comm);
Expand Down
103 changes: 103 additions & 0 deletions include/ucx_uct_ring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*************************************************************************
* Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/

#ifndef NCCL_UCX_UCT_RING_H_
#define NCCL_UCX_UCT_RING_H_

#include "nccl.h"
#include <assert.h>

#define NCCL_UCT_RING_SIZE (1 << 7)
#define NCCL_UCT_RING_MASK (NCCL_UCT_RING_SIZE - 1)

typedef struct nccl_uct_ring {
unsigned first;
unsigned last;
unsigned size;
unsigned entry_size;
int tag[NCCL_UCT_RING_SIZE];
void *entry;
} nccl_uct_ring_t;

static inline ncclResult_t nccl_uct_ring_init(nccl_uct_ring_t *ring,
unsigned entry_size) {
int i;

ring->first = 0;
ring->last = 0;
ring->entry_size = entry_size;
ring->entry = malloc(entry_size * NCCL_UCT_RING_SIZE);
if (ring->entry == NULL) {
free(ring->entry);
return ncclSystemError;
}

for (i = 0; i < NCCL_UCT_RING_SIZE; i++) {
ring->tag[i] = INT_MAX;
}
return ncclSuccess;
}

static inline void nccl_uct_ring_deinit(nccl_uct_ring_t *ring) {
free(ring->entry);
}

static inline void *nccl_uct_ring_get_entry(nccl_uct_ring_t *ring, unsigned i) {
return (uint8_t*)ring->entry + (ring->entry_size * (i & NCCL_UCT_RING_MASK));
}

static inline void nccl_uct_ring_append(nccl_uct_ring_t *ring, int tag,
void *data, size_t len) {
int j = ring->last & NCCL_UCT_RING_MASK;

ring->last++;

assert((ring->last & NCCL_UCT_RING_MASK) !=
(ring->first & NCCL_UCT_RING_MASK));
assert(ring->tag[j] == INT_MAX);
assert(len == ring->entry_size);

ring->tag[j] = tag;
memcpy(nccl_uct_ring_get_entry(ring, j), data, len);
}

static inline int nccl_uct_ring_is_empty(const nccl_uct_ring_t *ring) {
return ring->first == ring->last;
}

static inline void nccl_uct_ring_consume(nccl_uct_ring_t *ring, unsigned i) {
unsigned j = i & NCCL_UCT_RING_MASK;

assert(ring->tag[j] != INT_MAX);
ring->tag[j] = INT_MAX;

/* Cleanup upon tag hit */
if (i == ring->first) {
for (; i != ring->last; i++) {
j = i & NCCL_UCT_RING_MASK;
if (ring->tag[j] != INT_MAX) {
break;
}
ring->first = i + 1;
}
}
}

static inline unsigned nccl_uct_ring_find(nccl_uct_ring_t *ring, int tag) {
unsigned i;

assert(tag != INT_MAX);

for (i = ring->first; i != ring->last; i++) {
if (ring->tag[i & NCCL_UCT_RING_MASK] == tag) {
return i;
}
}

return ring->last;
}

#endif /* NCCL_UCX_UCT_RING_H_ */
3 changes: 2 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ libnccl_net_la_SOURCES += \
ucx_plugin.c \
ucx_rma_plugin.c \
ucx_uct_lib.c \
ucx_uct_plugin.c
ucx_uct_plugin.c \
ucx_uct_rd_plugin.c
endif

if HAVE_SHARP_PLUGIN
Expand Down
20 changes: 18 additions & 2 deletions src/p2p_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ extern ncclNet_v8_t ucxUctPlugin_v8;
extern ncclNet_v7_t ucxUctPlugin_v7;
extern ncclNet_v6_t ucxUctPlugin_v6;
extern ncclNet_v5_t ucxUctPlugin_v5;
extern ncclNet_v8_t ucxUctRdPlugin_v8;
extern ncclNet_v7_t ucxUctRdPlugin_v7;
extern ncclNet_v6_t ucxUctRdPlugin_v6;
extern ncclNet_v5_t ucxUctRdPlugin_v5;
#endif

extern ncclNet_v8_t ibPlugin_v8;
Expand Down Expand Up @@ -77,6 +81,10 @@ ncclNet_v5_t ncclNetPlugin_v5 = {

static nccl_p2p_plugin_t p2p_plugin = NCCL_P2P_LAST;

static int nccl_p2p_is_uct_plugin(nccl_p2p_plugin_t plugin) {
return (plugin == NCCL_P2P_UCX_UCT) || (plugin == NCCL_P2P_UCX_UCT_RD);
}

static void pluginSetup()
{
p2p_plugin = NCCL_P2P_IB;
Expand All @@ -92,6 +100,7 @@ static void pluginSetup()
else if (!strcasecmp(p2p_layer, "ucx")) p2p_plugin = NCCL_P2P_UCX;
else if (!strcasecmp(p2p_layer, "ucx_rma")) p2p_plugin = NCCL_P2P_UCX_RMA;
else if (!strcasecmp(p2p_layer, "ucx_uct")) p2p_plugin = NCCL_P2P_UCX_UCT;
else if (!strcasecmp(p2p_layer, "ucx_uct_read")) p2p_plugin = NCCL_P2P_UCX_UCT_RD;
#endif
else {
WARN("Invalid value %s for NCCL_PLUGIN_P2P, using default", p2p_layer);
Expand All @@ -117,6 +126,12 @@ static void pluginSetup()
ncclNetPlugin_v6 = ucxUctPlugin_v6;
ncclNetPlugin_v5 = ucxUctPlugin_v5;
break;
case NCCL_P2P_UCX_UCT_RD:
ncclNetPlugin_v8 = ucxUctRdPlugin_v8;
ncclNetPlugin_v7 = ucxUctRdPlugin_v7;
ncclNetPlugin_v6 = ucxUctRdPlugin_v6;
ncclNetPlugin_v5 = ucxUctRdPlugin_v5;
break;
#endif
default:
ncclNetPlugin_v8 = ibPlugin_v8;
Expand Down Expand Up @@ -221,7 +236,8 @@ ncclResult_t nccl_p2p_ib_get_properties(ncclIbDev *devs, int dev, ncclNetPropert
INFO(NCCL_NET,"NET/IB : GPU Direct RDMA (nvidia-peermem) enabled for HCA %d '%s", dev, devs[dev].devName);
}
props->regIsGlobal = 1;
if (((p2p_plugin == NCCL_P2P_UCX_UCT) || (p2p_plugin == NCCL_P2P_IB)) && nccl_p2p_dmabuf_support(dev) == ncclSuccess) {
if ((nccl_p2p_is_uct_plugin(p2p_plugin) || (p2p_plugin == NCCL_P2P_IB)) &&
nccl_p2p_dmabuf_support(dev) == ncclSuccess) {
props->ptrSupport |= NCCL_PTR_DMABUF; // GDR support via DMA-BUF
INFO(NCCL_NET,"NET/IB : GPU Direct RDMA (DMABUF) enabled for HCA %d '%s", dev, devs[dev].devName);
}
Expand All @@ -231,7 +247,7 @@ ncclResult_t nccl_p2p_ib_get_properties(ncclIbDev *devs, int dev, ncclNetPropert
props->maxComms = ibDev->maxQp;

if (p2p_plugin == NCCL_P2P_IB || p2p_plugin == NCCL_P2P_UCX ||
p2p_plugin == NCCL_P2P_UCX_UCT) {
nccl_p2p_is_uct_plugin(p2p_plugin)) {
props->maxRecvs = NCCL_NET_IB_MAX_RECVS;
} else {
props->maxRecvs = 1;
Expand Down
14 changes: 13 additions & 1 deletion src/ucx_uct_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ nccl_uct_context_t context = {
.dev_count = -1
};

void nccl_uct_empty_callback(uct_completion_t *comp) {
assert(comp->count == 0);
}

ncclResult_t nccl_uct_iface_set_handler(nccl_uct_iface_t *uct_iface,
int id, uct_am_callback_t callback) {
UCXCHECK(uct_iface_set_am_handler(uct_iface->iface, id, callback, NULL, 0),
Expand Down Expand Up @@ -42,6 +46,14 @@ static uct_iface_h nccl_uct_resource_iface_open(uct_worker_h worker,
UCXCHECK(uct_md_iface_config_read(md, tl->tl_name, NULL, NULL, &config),
return NULL, "read MD iface config for TL '%s'", tl->tl_name);

status = uct_config_modify(config, "IB_TX_INLINE_RESP", "0");
if (status != UCS_OK) {
WARN("Failed to modify MD configuration for '%s', error %s",
tl->tl_name, ucs_status_string(status));
uct_config_release(config);
return NULL;
}

params.field_mask =
UCT_IFACE_PARAM_FIELD_OPEN_MODE | UCT_IFACE_PARAM_FIELD_DEVICE |
UCT_IFACE_PARAM_FIELD_STATS_ROOT | UCT_IFACE_PARAM_FIELD_RX_HEADROOM;
Expand Down Expand Up @@ -739,7 +751,7 @@ ncclResult_t nccl_uct_flush(nccl_uct_comm_t *base_comm, void *data, int size,
uct_iov_t iov;

iov.buffer = base_comm->gpu_flush.mem;
iov.length = base_comm->uct_iface->min_get_zcopy;
iov.length = base_comm->uct_iface->min_get_zcopy? : 1;
brminich marked this conversation as resolved.
Show resolved Hide resolved
iov.memh = base_comm->gpu_flush.memh;
iov.stride = 0;
iov.count = 1;
Expand Down
9 changes: 0 additions & 9 deletions src/ucx_uct_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@

#include "ucx_uct_lib.h"

typedef enum {
NCCL_UCT_AM_RTR = 14, /* Use particular values */
NCCL_UCT_AM_ATP = 15
} nccl_uct_am_type_t;

typedef enum {
NCCL_UCT_REQ_IRECV = -1,
NCCL_UCT_REQ_IFLUSH = -2
Expand Down Expand Up @@ -137,10 +132,6 @@ static void nccl_uct_rdesc_set(nccl_uct_rdesc_t *rdesc, uint64_t id, int n,
}
}

static void nccl_uct_empty_callback(uct_completion_t *comp) {
assert(comp->count == 0);
}

static nccl_uct_req_t *nccl_uct_rdesc_get_req(nccl_uct_rdesc_t *rdesc, int i,
int size) {
nccl_uct_req_t *req;
Expand Down
Loading
Loading