Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TransferEngine] Add topology discovery #46

Merged
merged 2 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/en/transfer-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ For instance, as illustrated in figure above, to transfer data from buffer 0 (as
To further maximize bandwidth utilization, if a single request's transfer is internally divided into multiple slices if its length exeeds 16KB.
Each slice might use a different path, enabling collaborative work among all RDMA NICs.

If you do not want to manually configure the topology matrix, we also provide a function (`mooncake::discoverTopologyMatrix` in `topology.h`) to automatically discover the toplogy between CPU/CUDA and RDMA devices. Supports for more device types are working in progress. The automatic discovery mechanism might not always be accurate, and we welcome your feedbacks and improvement ideas!

### Endpoint Management
Mooncake Store employs a pair of end-
points to represent the connection between a local RDMA
Expand Down
2 changes: 2 additions & 0 deletions doc/zh/transfer-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ BatchTransfer API 使用请求(Request)对象数组传入用户请求,需

为了进一步最大化带宽利用率,如果单个请求的传输长度超过16KB,则其内部被划分为多个切片。每个切片可能使用不同的路径,使所有RDMA NIC能够协同工作。

如果不想手动配置拓扑矩阵,也可以直接调用`mooncake::discoverTopologyMatrix`(位于`topology.h`)来自动生成拓扑矩阵。该函数能够自动探查CPU/CUDA和RDMA网卡之间的拓扑关系。对于更多设备种类的支持正在开发中。目前,拓扑自动发现机制可能无法给出准确的硬件拓扑,我们欢迎您的反馈和改进建议!

### 端点管理
Transfer Engine 使用一对端点来表示本地RDMA NIC和远程RDMA NIC之间的连接。实际上,每个端点包括一个或多个RDMA QP对象。
Transfer Engine 中的连接是按需建立的;端点在第一次请求之前保持未配对状态。
Expand Down
5 changes: 5 additions & 0 deletions mooncake-transfer-engine/include/topology.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include <string>

namespace mooncake {
std::string discoverTopologyMatrix();
}
212 changes: 212 additions & 0 deletions mooncake-transfer-engine/src/topology.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
#include <glog/logging.h>
#include <jsoncpp/json/json.h>

#include <fstream>
#include <iostream>
#include <map>
#include <string>
#include <utility>
#include <vector>

#ifdef USE_CUDA
#include <cuda_runtime.h>
#endif

#include <ctype.h>
#include <dirent.h>
#include <limits.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>

#include "topology.h"

struct InfinibandDevice {
std::string name;
std::string pci_bus_id;
int numa_node;
};

struct TopologyEntry {
std::string name;
std::vector<std::string> preferred_hca;
std::vector<std::string> avail_hca;

Json::Value to_json() {
Json::Value matrix(Json::arrayValue);
Json::Value hca_list(Json::arrayValue);
for (auto &hca : preferred_hca) {
hca_list.append(hca);
}
matrix.append(hca_list);
hca_list.clear();
for (auto &hca : avail_hca) {
hca_list.append(hca);
}
matrix.append(hca_list);
return matrix;
}
};

static std::vector<InfinibandDevice> list_infiniband_devices() {
DIR *dir = opendir("/sys/class/infiniband");
struct dirent *entry;
std::vector<InfinibandDevice> devices;

if (dir == NULL) {
LOG(WARNING) << "failed to list /sys/class/infiniband";
return {};
}
while ((entry = readdir(dir))) {
if (entry->d_name[0] == '.') {
continue;
}

std::string device_name = entry->d_name;

char path[PATH_MAX];
char resolved_path[PATH_MAX];
// Get the PCI bus id for the infiniband device. Note that
// "/sys/class/infiniband/mlx5_X/" is a symlink to
// "/sys/devices/pciXXXX:XX/XXXX:XX:XX.X/infiniband/mlx5_X/".
snprintf(path, sizeof(path), "/sys/class/infiniband/%s/../..",
wx-csy marked this conversation as resolved.
Show resolved Hide resolved
entry->d_name);
if (realpath(path, resolved_path) == NULL) {
LOG(ERROR) << "realpath: " << strerror(errno);
continue;
}
std::string pci_bus_id = basename(resolved_path);

int numa_node = -1;
snprintf(path, sizeof(path), "%s/numa_node", resolved_path);
std::ifstream(path) >> numa_node;

devices.push_back(InfinibandDevice{.name = std::move(device_name),
.pci_bus_id = std::move(pci_bus_id),
.numa_node = numa_node});
}
(void)closedir(dir);
return devices;
}

static std::vector<TopologyEntry> discover_cpu_topology(
const std::vector<InfinibandDevice> &all_hca) {
DIR *dir = opendir("/sys/devices/system/node");
struct dirent *entry;
std::vector<TopologyEntry> topology;

if (dir == NULL) {
LOG(WARNING) << "failed to list /sys/devices/system/node";
return {};
}
while ((entry = readdir(dir))) {
const char *prefix = "node";
if (entry->d_type != DT_DIR ||
strncmp(entry->d_name, prefix, strlen(prefix)) != 0) {
continue;
}
int node_id = atoi(entry->d_name + strlen(prefix));
std::vector<std::string> preferred_hca;
std::vector<std::string> avail_hca;
// an HCA connected to the same cpu NUMA node is preferred
for (const auto &hca : all_hca) {
if (hca.numa_node == node_id) {
preferred_hca.push_back(hca.name);
} else {
avail_hca.push_back(hca.name);
}
}
topology.push_back(
TopologyEntry{.name = "cpu:" + std::to_string(node_id),
.preferred_hca = std::move(preferred_hca),
.avail_hca = std::move(avail_hca)});
}
(void)closedir(dir);
return topology;
}

#ifdef USE_CUDA

static int get_pci_distance(const char *bus1, const char *bus2) {
char buf[PATH_MAX];
char path1[PATH_MAX];
char path2[PATH_MAX];
snprintf(buf, sizeof(buf), "/sys/bus/pci/devices/%s", bus1);
if (realpath(buf, path1) == NULL) {
return -1;
}
snprintf(buf, sizeof(buf), "/sys/bus/pci/devices/%s", bus2);
if (realpath(buf, path2) == NULL) {
return -1;
}

char *ptr1 = path1;
char *ptr2 = path2;
while (*ptr1 && *ptr1 == *ptr2) {
ptr1++;
ptr2++;
}
int distance = 0;
for (; *ptr1; ptr1++) {
distance += (*ptr1 == '/');
}
for (; *ptr2; ptr2++) {
distance += (*ptr2 == '/');
}

return distance;
}

static std::vector<TopologyEntry> discover_cuda_topology(
const std::vector<InfinibandDevice> &all_hca) {
std::vector<TopologyEntry> topology;
int device_count;
if (cudaGetDeviceCount(&device_count) != cudaSuccess) {
device_count = 0;
}
for (int i = 0; i < device_count; i++) {
char pci_bus_id[20];
if (cudaDeviceGetPCIBusId(pci_bus_id, sizeof(pci_bus_id), i) !=
cudaSuccess) {
continue;
}
for (char *ch = pci_bus_id; (*ch = tolower(*ch)); ch++)
;

std::vector<std::string> preferred_hca;
std::vector<std::string> avail_hca;
for (const auto &hca : all_hca) {
// FIXME: currently we only identify the NICs connected to the same
// PCIe switch/RC with GPU as preferred.
if (get_pci_distance(hca.pci_bus_id.c_str(), pci_bus_id) == 0) {
preferred_hca.push_back(hca.name);
} else {
avail_hca.push_back(hca.name);
}
}
topology.push_back(
TopologyEntry{.name = "cuda:" + std::to_string(i),
.preferred_hca = std::move(preferred_hca),
.avail_hca = std::move(avail_hca)});
}
return topology;
}

#endif // USE_CUDA

namespace mooncake {
// TODO: add black/white lists for devices.
std::string discoverTopologyMatrix() {
auto all_hca = list_infiniband_devices();
Json::Value value(Json::objectValue);
for (auto &ent : discover_cpu_topology(all_hca)) {
value[ent.name] = ent.to_json();
}
#ifdef USE_CUDA
for (auto &ent : discover_cuda_topology(all_hca)) {
value[ent.name] = ent.to_json();
}
#endif
return value.toStyledString();
}
} // namespace mooncake
6 changes: 4 additions & 2 deletions mooncake-transfer-engine/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ target_link_libraries(rdma_transport_test PUBLIC transfer_engine)
add_executable(transport_uint_test transport_uint_test.cpp)
target_link_libraries(transport_uint_test PUBLIC transfer_engine gtest gtest_main )


add_executable(rdma_transport_test2 rdma_transport_test2.cpp)
target_link_libraries(rdma_transport_test2 PUBLIC transfer_engine gtest gtest_main )

add_executable(tcp_transport_test tcp_transport_test.cpp)
target_link_libraries(tcp_transport_test PUBLIC transfer_engine gtest gtest_main )

add_executable(transfer_metadata_test transfer_metadata_test.cpp)
target_link_libraries(transfer_metadata_test PUBLIC transfer_engine gtest gtest_main)
target_link_libraries(transfer_metadata_test PUBLIC transfer_engine gtest gtest_main)

add_executable(topology_test topology_test.cpp)
target_link_libraries(topology_test PUBLIC transfer_engine gtest gtest_main)
21 changes: 21 additions & 0 deletions mooncake-transfer-engine/tests/topology_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include "topology.h"

#include <glog/logging.h>
#include <gtest/gtest.h>

#include "transfer_metadata.h"

TEST(ToplogyTest, GetTopologyMatrix)
{
std::string topo = mooncake::discoverTopologyMatrix();
LOG(INFO) << topo;
mooncake::TransferMetadata::PriorityMatrix matrix;
std::vector<std::string> rnic_list;
mooncake::TransferMetadata::parseNicPriorityMatrix(topo, matrix, rnic_list);
}

int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}