Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TransferEngine] Add topology discovery #46

Merged
merged 2 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/en/transfer-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ For instance, as illustrated in figure above, to transfer data from buffer 0 (as
To further maximize bandwidth utilization, if a single request's transfer is internally divided into multiple slices if its length exeeds 16KB.
Each slice might use a different path, enabling collaborative work among all RDMA NICs.

If you do not want to manually configure the topology matrix, we also provide a function (`mooncake::discoverTopologyMatrix` in `topology.h`) to automatically discover the toplogy between CPU/CUDA and RDMA devices. The automatic discovery mechanism might not always be accurate, and we welcome your feedbacks and improvement ideas!
wx-csy marked this conversation as resolved.
Show resolved Hide resolved

### Endpoint Management
Mooncake Store employs a pair of end-
points to represent the connection between a local RDMA
Expand Down
5 changes: 5 additions & 0 deletions mooncake-transfer-engine/include/topology.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include <string>

namespace mooncake {
std::string discoverTopologyMatrix();
}
205 changes: 205 additions & 0 deletions mooncake-transfer-engine/src/topology.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
#include <glog/logging.h>
#include <jsoncpp/json/json.h>

#include <fstream>
#include <iostream>
#include <map>
#include <string>
#include <utility>
#include <vector>

#ifdef USE_CUDA
#include "cuda_runtime.h"
#endif

#include <ctype.h>
#include <dirent.h>
#include <limits.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>

#include "topology.h"

struct InfinibandDevice {
std::string name;
std::string pci_bus_id;
int numa_node;
};

struct TopologyEntry {
std::string name;
std::vector<std::string> preferred_hca;
std::vector<std::string> avail_hca;

Json::Value to_json() {
Json::Value matrix(Json::arrayValue);
Json::Value hca_list(Json::arrayValue);
for (auto &hca : preferred_hca) {
hca_list.append(hca);
}
matrix.append(hca_list);
hca_list.clear();
for (auto &hca : avail_hca) {
hca_list.append(hca);
}
matrix.append(hca_list);
return matrix;
}
};

static std::vector<InfinibandDevice> list_infiniband_devices() {
DIR *dir = opendir("/sys/class/infiniband");
struct dirent *entry;
std::vector<InfinibandDevice> devices;

if (dir == NULL) {
LOG(WARNING) << "failed to list /sys/class/infiniband";
return {};
}
while ((entry = readdir(dir))) {
if (entry->d_name[0] == '.') {
continue;
}

std::string device_name = entry->d_name;

char path[PATH_MAX];
char resolved_path[PATH_MAX];
snprintf(path, sizeof(path), "/sys/class/infiniband/%s/../..",
wx-csy marked this conversation as resolved.
Show resolved Hide resolved
entry->d_name);
if (realpath(path, resolved_path) == NULL) {
LOG(ERROR) << "realpath: " << strerror(errno);
continue;
}
std::string pci_bus_id = basename(resolved_path);

int numa_node = -1;
snprintf(path, sizeof(path), "%s/numa_node", resolved_path);
std::ifstream(path) >> numa_node;

devices.push_back(InfinibandDevice{.name = std::move(device_name),
.pci_bus_id = std::move(pci_bus_id),
.numa_node = numa_node});
}
(void)closedir(dir);
return devices;
}

static std::vector<TopologyEntry> discover_cpu_topology(
const std::vector<InfinibandDevice> &all_hca) {
DIR *dir = opendir("/sys/devices/system/node");
struct dirent *entry;
std::vector<TopologyEntry> topology;

if (dir == NULL) {
LOG(WARNING) << "failed to list /sys/devices/system/node";
return {};
}
while ((entry = readdir(dir))) {
const char *prefix = "node";
if (entry->d_type != DT_DIR ||
strncmp(entry->d_name, prefix, strlen(prefix)) != 0) {
continue;
}
int node_id = atoi(entry->d_name + strlen(prefix));
std::vector<std::string> preferred_hca;
std::vector<std::string> avail_hca;
for (const auto &hca : all_hca) {
if (hca.numa_node == node_id) {
preferred_hca.push_back(hca.name);
} else {
avail_hca.push_back(hca.name);
}
}
topology.push_back(
TopologyEntry{.name = "cpu:" + std::to_string(node_id),
.preferred_hca = std::move(preferred_hca),
.avail_hca = std::move(avail_hca)});
}
(void)closedir(dir);
return topology;
}

#ifdef USE_CUDA

static int get_pci_distance(const char *bus1, const char *bus2) {
char buf[PATH_MAX];
char path1[PATH_MAX];
char path2[PATH_MAX];
snprintf(buf, sizeof(buf), "/sys/bus/pci/devices/%s", bus1);
if (realpath(buf, path1) == NULL) {
return -1;
}
snprintf(buf, sizeof(buf), "/sys/bus/pci/devices/%s", bus2);
if (realpath(buf, path2) == NULL) {
return -1;
}

char *ptr1 = path1;
char *ptr2 = path2;
while (*ptr1 && *ptr1 == *ptr2) {
ptr1++;
ptr2++;
}
int distance = 0;
for (; *ptr1; ptr1++) {
distance += (*ptr1 == '/');
}
for (; *ptr2; ptr2++) {
distance += (*ptr2 == '/');
}

return distance;
}

static std::vector<TopologyEntry> discover_cuda_topology(
const std::vector<InfinibandDevice> &all_hca) {
std::vector<TopologyEntry> topology;
int device_count;
if (cudaGetDeviceCount(&device_count) != cudaSuccess) {
device_count = 0;
}
for (int i = 0; i < device_count; i++) {
char pci_bus_id[20];
if (cudaDeviceGetPCIBusId(pci_bus_id, sizeof(pci_bus_id), i) !=
cudaSuccess) {
continue;
}
for (char *ch = pci_bus_id; (*ch = tolower(*ch)); ch++)
;

std::vector<std::string> preferred_hca;
std::vector<std::string> avail_hca;
for (const auto &hca : all_hca) {
if (get_pci_distance(hca.pci_bus_id.c_str(), pci_bus_id) == 0) {
preferred_hca.push_back(hca.name);
} else {
avail_hca.push_back(hca.name);
}
}
topology.push_back(
TopologyEntry{.name = "cuda:" + std::to_string(i),
.preferred_hca = std::move(preferred_hca),
.avail_hca = std::move(avail_hca)});
}
return topology;
}

#endif // USE_CUDA

namespace mooncake {
std::string discoverTopologyMatrix() {
auto all_hca = list_infiniband_devices();
Json::Value value(Json::objectValue);
for (auto &ent : discover_cpu_topology(all_hca)) {
value[ent.name] = ent.to_json();
}
#ifdef USE_CUDA
for (auto &ent : discover_cuda_topology(all_hca)) {
value[ent.name] = ent.to_json();
}
#endif
return value.toStyledString();
}
} // namespace mooncake
6 changes: 4 additions & 2 deletions mooncake-transfer-engine/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ target_link_libraries(rdma_transport_test PUBLIC transfer_engine)
add_executable(transport_uint_test transport_uint_test.cpp)
target_link_libraries(transport_uint_test PUBLIC transfer_engine gtest gtest_main )


add_executable(rdma_transport_test2 rdma_transport_test2.cpp)
target_link_libraries(rdma_transport_test2 PUBLIC transfer_engine gtest gtest_main )

add_executable(tcp_transport_test tcp_transport_test.cpp)
target_link_libraries(tcp_transport_test PUBLIC transfer_engine gtest gtest_main )

add_executable(transfer_metadata_test transfer_metadata_test.cpp)
target_link_libraries(transfer_metadata_test PUBLIC transfer_engine gtest gtest_main)
target_link_libraries(transfer_metadata_test PUBLIC transfer_engine gtest gtest_main)

add_executable(topology_test topology_test.cpp)
target_link_libraries(topology_test PUBLIC transfer_engine gtest gtest_main)
21 changes: 21 additions & 0 deletions mooncake-transfer-engine/tests/topology_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include "topology.h"

#include <glog/logging.h>
#include <gtest/gtest.h>

#include "transfer_metadata.h"

TEST(ToplogyTest, GetTopologyMatrix)
{
std::string topo = mooncake::discoverTopologyMatrix();
LOG(INFO) << topo;
mooncake::TransferMetadata::PriorityMatrix matrix;
std::vector<std::string> rnic_list;
mooncake::TransferMetadata::parseNicPriorityMatrix(topo, matrix, rnic_list);
}

int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}