Skip to content
This repository has been archived by the owner on Apr 17, 2024. It is now read-only.

Commit

Permalink
remote persistent memory pool
Browse files Browse the repository at this point in the history
1. support remote_alloc/remote_free/remote_write/remote_read
2. support RDMA protocol
3. support cpp and java interface

Signed-off-by: Haodong Tang <[email protected]>
  • Loading branch information
Haodong Tang committed Mar 5, 2020
1 parent 81f2220 commit 928c20c
Show file tree
Hide file tree
Showing 68 changed files with 11,401 additions and 7 deletions.
6 changes: 0 additions & 6 deletions .gitlab-ci.yml

This file was deleted.

4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[submodule "rpmp/include/spdlog"]
path = rpmp/include/spdlog
url = https://github.com/gabime/spdlog.git
branch = master
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ final class RdmaShuffleBlockFetcherIterator(context: TaskContext,
reqsInFlight.incrementAndGet
val blockManagerId = rdmaRequest.blockManagerId
val shuffleBlockIdName = rdmaRequest.shuffleBlockIdName
println("shuffle block name " + shuffleBlockIdName)

val pmofTransferService = shuffleClient.asInstanceOf[PmofTransferService]

Expand Down
45 changes: 45 additions & 0 deletions rpmp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
cmake_minimum_required(VERSION 3.11)
project(rpmof)

set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} "-std=c++14 -g -pthread -fPIC")

# Generate compile_commands.json
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

# place binaries and libraries according to GNU standards
include(GNUInstallDirs)
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/${CMAKE_INSTALL_LIBDIR})
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/${CMAKE_INSTALL_LIBDIR})
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/${CMAKE_INSTALL_BINDIR})

if(CMAKE_CXX_COMPILER_ID MATCHES GNU)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fprofile-arcs -ftest-coverage")
endif()

include(cmake/googletest.cmake)

fetch_googletest(
${PROJECT_SOURCE_DIR}/cmake
${PROJECT_BINARY_DIR}/googletest
)

find_package(Boost REQUIRED COMPONENTS program_options)
if(Boost_FOUND)
include_directories(${Boost_INCLUDE_DIRS})
endif()

include_directories(${PROJECT_SOURCE_DIR}/)
include_directories(${PROJECT_SOURCE_DIR}/include)
include_directories(${PROJECT_SOURCE_DIR}/include/spdlog/include)
include_directories(${PROJECT_BINARY_DIR}/googletest/googletest-src/googletest/include)

enable_testing()

add_subdirectory(include/spdlog)
add_subdirectory(pmpool)
add_subdirectory(test)
add_subdirectory(benchmark)

add_executable(main main.cc)
target_link_libraries(main pmpool spdlog)
1 change: 1 addition & 0 deletions rpmp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# RPMP
17 changes: 17 additions & 0 deletions rpmp/benchmark/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
add_executable(local_allocate local_allocate.cc)
target_link_libraries(local_allocate pmpool)

add_executable(remote_allocate remote_allocate.cc)
target_link_libraries(remote_allocate pmpool)

add_executable(remote_write remote_write.cc)
target_link_libraries(remote_write pmpool)

add_executable(remote_allocate_write remote_allocate_write.cc)
target_link_libraries(remote_allocate_write pmpool)

add_executable(circularbuffer circularbuffer.cc)
target_link_libraries(circularbuffer pmpool)

add_executable(remote_read remote_read.cc)
target_link_libraries(remote_read pmpool)
35 changes: 35 additions & 0 deletions rpmp/benchmark/circularbuffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Filename: /mnt/spark-pmof/tool/rpmp/benchmark/circularbuffer.cc
* Path: /mnt/spark-pmof/tool/rpmp/benchmark
* Created Date: Monday, December 30th 2019, 9:57:10 am
* Author: root
*
* Copyright (c) 2019 Your Company
*/

#include <string.h>

#include "pmpool/buffer/CircularBuffer.h"

#include <chrono> // NOLINT

uint64_t timestamp_now() {
return std::chrono::high_resolution_clock::now().time_since_epoch() /
std::chrono::milliseconds(1);
}

int main() {
CircularBuffer circularbuffer(1024 * 1024, 2048);
uint64_t start = timestamp_now();
char str[1048576];
memset(str, '0', 1048576);
for (int i = 0; i < 20480; i++) {
char* buf = circularbuffer.get(1048576);
memcpy(buf, str, 1048576);
circularbuffer.put(buf, 1048576);
}
uint64_t end = timestamp_now();
std::cout << "pmemkv put test: 1048576 "
<< " bytes test, consumes " << (end - start) / 1000.0 << std::endl;
return 0;
}
85 changes: 85 additions & 0 deletions rpmp/benchmark/local_allocate.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Filename: /mnt/spark-pmof/tool/rpmp/benchmark/local_allocate.cc
* Path: /mnt/spark-pmof/tool/rpmp/benchmark
* Created Date: Tuesday, December 24th 2019, 8:54:38 am
* Author: root
*
* Copyright (c) 2019 Intel
*/

#include <string.h>

#include <iostream>
#include <memory>
#include <mutex> // NOLINT
#include <thread> // NOLINT
#include <vector>

#include "../pmpool/AllocatorProxy.h"
#include "../pmpool/Config.h"
#include "../pmpool/Log.h"
#include "gtest/gtest.h"

uint64_t timestamp_now() {
return std::chrono::high_resolution_clock::now().time_since_epoch() /
std::chrono::milliseconds(1);
}

std::mutex mtx;
uint64_t count = 0;
char str[1048576];

void func(AllocatorProxy *proxy, int index) {
while (true) {
std::unique_lock<std::mutex> lk(mtx);
uint64_t count_ = count++;
lk.unlock();
if (count_ < 20480) {
uint64_t addr = proxy->allocate_and_write(1048576, nullptr, index);
proxy->write(addr, str, 1048576);
} else {
break;
}
}
}

int main() {
std::shared_ptr<Config> config = std::make_shared<Config>();
config->init(0, nullptr);
std::shared_ptr<Log> log = std::make_shared<Log>(config.get());
auto allocatorProxy = new AllocatorProxy(config.get(), log.get(), nullptr);
allocatorProxy->init();
std::vector<std::thread *> threads;
memset(str, '0', 1048576);

uint64_t start = timestamp_now();
int num = 0;
for (int i = 0; i < 4; i++) {
num++;
auto t = new std::thread(func, allocatorProxy, i);
threads.push_back(t);
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
if (i == 0) {
CPU_SET(2, &cpuset);
} else if (i == 1) {
CPU_SET(40, &cpuset);
} else if (i == 2) {
CPU_SET(27, &cpuset);
} else {
CPU_SET(60, &cpuset);
}
int rc =
pthread_setaffinity_np(t->native_handle(), sizeof(cpu_set_t), &cpuset);
}
for (int i = 0; i < num; i++) {
threads[i]->join();
delete threads[i];
}
uint64_t end = timestamp_now();
std::cout << "pmemkv put test: 1048576 "
<< " bytes test, consumes " << (end - start) / 1000.0
<< "s, throughput is " << 20480 / ((end - start) / 1000.0) << "MB/s"
<< std::endl;
allocatorProxy->release_all();
}
86 changes: 86 additions & 0 deletions rpmp/benchmark/remote_allocate.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Filename: /mnt/spark-pmof/tool/rpmp/benchmark/allocate_perf.cc
* Path: /mnt/spark-pmof/tool/rpmp/benchmark
* Created Date: Friday, December 20th 2019, 8:29:23 am
* Author: root
*
* Copyright (c) 2019 Intel
*/

#include <thread> // NOLINT
#include <atomic>
#include "pmpool/client/PmPoolClient.h"

uint64_t timestamp_now() {
return std::chrono::high_resolution_clock::now().time_since_epoch() /
std::chrono::milliseconds(1);
}

std::atomic<uint64_t> count = {0};
std::mutex mtx;
std::vector<PmPoolClient *> clients;
std::map<int, std::vector<uint64_t>> addresses;

void func(int i) {
while (true) {
uint64_t count_ = count++;
if (count_ < 20480) {
clients[i]->begin_tx();
if (addresses.count(i) != 0) {
auto vec = addresses[i];
uint64_t addr = clients[i]->alloc(1048576);
vec.push_back(addr);
} else {
std::vector<uint64_t> vec;
uint64_t addr = clients[i]->alloc(1048576);
vec.push_back(addr);
addresses[i] = vec;
}
clients[i]->end_tx();
} else {
break;
}
}
}

int main() {
std::vector<std::thread*> threads;
int num = 0;
for (int i = 0; i < 4; i++) {
PmPoolClient *client = new PmPoolClient("172.168.0.40", "12346");
client->begin_tx();
client->init();
client->end_tx();
clients.push_back(client);
num++;
}
uint64_t start = timestamp_now();
for (int i = 0; i < num; i++) {
auto t = new std::thread(func, i);
threads.push_back(t);
}
for (int i = 0; i < num; i++) {
threads[i]->join();
delete threads[i];
}
uint64_t end = timestamp_now();
std::cout << "pmemkv put test: 1048576 "
<< " bytes test, consumes " << (end - start) / 1000.0
<< "s, throughput is " << 20480 / ((end - start) / 1000.0) << "MB/s"
<< std::endl;

for (int i = 0; i < num; i++) {
auto vec = addresses[i];
while (!vec.empty()) {
auto address = vec.back();
vec.pop_back();
clients[i]->free(address);
}
}
std::cout << "freed." << std::endl;
for (int i = 0; i < num; i++) {
clients[i]->wait();
delete clients[i];
}
return 0;
}
90 changes: 90 additions & 0 deletions rpmp/benchmark/remote_allocate_write.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Filename: /mnt/spark-pmof/tool/rpmp/benchmark/allocate_perf.cc
* Path: /mnt/spark-pmof/tool/rpmp/benchmark
* Created Date: Friday, December 20th 2019, 8:29:23 am
* Author: root
*
* Copyright (c) 2019 Intel
*/

#include <string.h>
#include <thread> // NOLINT
#include <atomic>
#include "pmpool/client/PmPoolClient.h"

uint64_t timestamp_now() {
return std::chrono::high_resolution_clock::now().time_since_epoch() /
std::chrono::milliseconds(1);
}

std::atomic<uint64_t> count = {0};
std::mutex mtx;
char str[1048576];
std::vector<PmPoolClient *> clients;
std::map<int, std::vector<uint64_t>> addresses;

void func1(int i) {
while (true) {
uint64_t count_ = count++;
if (count_ < 20480) {
clients[i]->begin_tx();
if (addresses.count(i) != 0) {
auto vec = addresses[i];
vec.push_back(clients[i]->write(str, 1048576));
} else {
std::vector<uint64_t> vec;
vec.push_back(clients[i]->write(str, 1048576));
addresses[i] = vec;
}
clients[i]->end_tx();
} else {
break;
}
}
}

int main() {
std::vector<std::thread*> threads;
memset(str, '0', 1048576);

int num = 0;
std::cout << "start write." << std::endl;
num = 0;
count = 0;
for (int i = 0; i < 4; i++) {
PmPoolClient *client = new PmPoolClient("172.168.0.40", "12346");
client->begin_tx();
client->init();
client->end_tx();
clients.push_back(client);
num++;
}
uint64_t start = timestamp_now();
for (int i = 0; i < num; i++) {
auto t = new std::thread(func1, i);
threads.push_back(t);
}
for (int i = 0; i < num; i++) {
threads[i]->join();
delete threads[i];
}
uint64_t end = timestamp_now();
std::cout << "pmemkv put test: 1048576 "
<< " bytes test, consumes " << (end - start) / 1000.0
<< "s, throughput is " << 20480 / ((end - start) / 1000.0) << "MB/s"
<< std::endl;
for (int i = 0; i < num; i++) {
auto vec = addresses[i];
while (!vec.empty()) {
auto address = vec.back();
vec.pop_back();
clients[i]->free(address);
}
}
std::cout << "freed." << std::endl;
for (int i = 0; i < num; i++) {
clients[i]->wait();
delete clients[i];
}
return 0;
}
Loading

0 comments on commit 928c20c

Please sign in to comment.