Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC support to FDB #11782

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e7629c7
Implement gRPC support
vishesh Aug 23, 2024
8cacc14
Move some CMake stuff around.
vishesh Aug 27, 2024
d322bb2
Fix typo
vishesh Aug 27, 2024
efc66d1
Add some test
vishesh Aug 27, 2024
e09d497
Add async client
vishesh Aug 27, 2024
0ad35ff
Add test for checking destroy
vishesh Aug 27, 2024
965cea6
[testing] Automatically discover unit-test and register as ctest
vishesh Aug 28, 2024
f7fcbdd
Merge branch 'discover_unit_tests' into grpc
vishesh Aug 28, 2024
f5bfe17
Fix some tests
vishesh Aug 28, 2024
780f53d
Use NetworkAddress
vishesh Aug 28, 2024
f4a6da3
Add another variant of call method
vishesh Aug 28, 2024
171d269
Add a failed call test
vishesh Aug 28, 2024
010a473
Refactor
vishesh Aug 28, 2024
86f9573
Cleanup shutdown
vishesh Aug 28, 2024
4272db5
Start working on streaming
vishesh Aug 28, 2024
c0a9baf
Implement server streaming
vishesh Aug 29, 2024
f6052bb
Cleanup some unnecessary templating
vishesh Aug 29, 2024
3d81782
Cleanup some tests
vishesh Aug 29, 2024
fc27c9b
WIP Client Streaming
vishesh Aug 29, 2024
02429ed
WIP
vishesh Sep 1, 2024
b69646d
File Transfer WIP
vishesh Sep 4, 2024
997d15b
Remove UnitTest.h
vishesh Sep 5, 2024
bf5ca81
Merge remote-tracking branch 'apple/main' into grpc
vishesh Sep 5, 2024
1b822c6
Take grpc addresses from command line
vishesh Sep 6, 2024
17eb913
startup grpc in fdbserver
vishesh Sep 12, 2024
b24bedb
Cancel if future ref is 0
vishesh Sep 18, 2024
ee8ccc7
noop
vishesh Sep 18, 2024
dc3c12a
Merge remote-tracking branch 'apple/main' into grpc
vishesh Nov 2, 2024
8814ab4
Update some Cmake files
vishesh Nov 7, 2024
8036f3b
Fix some build/run issues
vishesh Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmake/FDBComponents.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ if(toml11_FOUND)
add_library(toml11_target INTERFACE)
target_link_libraries(toml11_target INTERFACE toml11::toml11)
else()
include(ExternalProject)
include(ExternalProject)
ExternalProject_add(toml11Project
URL "https://github.com/ToruNiina/toml11/archive/v3.4.0.tar.gz"
URL_HASH SHA256=bc6d733efd9216af8c119d8ac64a805578c79cc82b813e4d1d880ca128bd154d
Expand Down Expand Up @@ -231,6 +231,12 @@ endif()

################################################################################

# TODO (Vishesh): Replace with target_include_directories.
include_directories("${CMAKE_CURRENT_BINARY_DIR}/generated/")

set(gRPC_DIR /usr/local/lib/cmake/grpc)
find_package(gRPC CONFIG REQUIRED)

file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/packages)
add_custom_target(packages)

Expand Down
43 changes: 41 additions & 2 deletions cmake/utils.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ function(fdb_find_sources out)
file(GLOB res
LIST_DIRECTORIES false
RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}"
CONFIGURE_DEPENDS "*.cpp" "*.c" "*.h" "*.hpp")
CONFIGURE_DEPENDS "*.cpp" "*.cc" "*.c" "*.h" "*.hpp")
file(GLOB_RECURSE res_includes
LIST_DIRECTORIES false
RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}/include"
CONFIGURE_DEPENDS "include/*.cpp" "include/*.c" "include/*.h" "include/*.hpp")
CONFIGURE_DEPENDS "include/*.cpp" "include/*.cc" "include/*.c" "include/*.h" "include/*.hpp")
file(GLOB_RECURSE res_workloads
LIST_DIRECTORIES false
RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}/workloads"
Expand All @@ -66,3 +66,42 @@ function(fdb_find_sources out)
endforeach()
set(${out} "${res}" PARENT_SCOPE)
endfunction()

function(package_name_to_path result_var pkg_name)
set(_pkg_name "${pkg_name}")
string(REPLACE "." "/" _pkg_name ${_pkg_name})
set(${result_var} ${_pkg_name} PARENT_SCOPE)
endfunction()

function(package_name_to_proto_target result_var pkg_name)
set(_pkg_name "${pkg_name}")
string(REPLACE "." "_" _pkg_name ${_pkg_name})
set(${result_var} "proto_${_pkg_name}" PARENT_SCOPE)
endfunction()

# Args: package_name proto_file ...
function(generate_grpc_protobuf pkg_name)
set(proto_files ${ARGN})
package_name_to_proto_target(target_name ${pkg_name})
package_name_to_path(out_rel_path ${pkg_name})

add_library(${target_name} ${proto_files})
target_link_libraries(${target_name} PUBLIC gRPC::grpc++)

set(protoc_out_dir "${CMAKE_BINARY_DIR}/generated/${out_rel_path}/")
protobuf_generate(
TARGET ${target_name}
PROTOC_OUT_DIR ${protoc_out_dir}
GENERATE_EXTENSIONS .pb.h .pb.cc
)

protobuf_generate(
TARGET ${target_name}
LANGUAGE grpc
PROTOC_OUT_DIR ${protoc_out_dir}
PLUGIN protoc-gen-grpc=$<TARGET_FILE:gRPC::grpc_cpp_plugin>
GENERATE_EXTENSIONS .grpc.pb.h .grpc.pb.cc
)

target_include_directories(${target_name} PUBLIC "${protoc_out_dir}")
endfunction()
10 changes: 7 additions & 3 deletions fdbrpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ if(${COROUTINE_IMPL} STREQUAL libcoro)
endif()
endif()

target_include_directories(fdbrpc PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/libeio)
target_link_libraries(fdbrpc PUBLIC flow libb64 md5 PRIVATE rapidjson)
generate_grpc_protobuf(fdbrpc.file_transfer file_transfer.proto)

target_include_directories(fdbrpc PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})
target_include_directories(fdbrpc PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/libeio ${Protobuf_INCLUDE_DIRS} ${gRPC_INCLUDE_DIRS})
target_link_libraries(fdbrpc PUBLIC proto_fdbrpc_test proto_fdbrpc_file_transfer)
target_link_libraries(fdbrpc PUBLIC flow libb64 md5 PRIVATE rapidjson gRPC::grpc++)

target_include_directories(fdbrpc_sampling PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/libeio)
target_link_libraries(fdbrpc_sampling PUBLIC flow_sampling libb64 md5 PRIVATE rapidjson)
Expand All @@ -86,4 +90,4 @@ if(WIN32)
add_dependencies(tokensign_actors fdbrpc_actors)
endif()

add_subdirectory(tests)
add_subdirectory(tests)
21 changes: 21 additions & 0 deletions fdbrpc/FileTransfer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* FileTransfer.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "fdbrpc/FileTransfer.h"
57 changes: 57 additions & 0 deletions fdbrpc/FlowGrpc.actor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* gRPC.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cstdio>
#include <thread>
#include "fdbrpc/FlowGrpc.h"

#include "flow/actorcompiler.h" // This must be the last #include.

GrpcServer::GrpcServer(const NetworkAddress& addr) : address_(addr) {}

GrpcServer::~GrpcServer() {
if (server_) {
shutdown();
}

if (server_thread_.joinable()) {
server_thread_.join();
}
}

Future<Void> GrpcServer::run() {
grpc::ServerBuilder builder_;
builder_.AddListeningPort(address_.toString(), grpc::InsecureServerCredentials());
for (auto& service : registered_services_) {
builder_.RegisterService(service.get());
}
server_ = builder_.BuildAndStart();
return server_promise_.getFuture();
}

void GrpcServer::shutdown() {
server_->Shutdown(); // TODO (Vishesh): This needs to be Future.
server_promise_.send(Void());
server_ = nullptr;
}

void GrpcServer::registerService(std::shared_ptr<grpc::Service> service) {
registered_services_.push_back(service);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No ACTOR in here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why it would be actor?

99 changes: 99 additions & 0 deletions fdbrpc/FlowGrpcTests.actor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* FlowGrpcTests.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cstdio>

#include "flow/UnitTest.h"
#include "fdbrpc/FlowGrpc.h"
#include "fdbrpc/FlowGrpcTests.h"

#include "flow/actorcompiler.h" // This must be the last #include.

// So that tests are not optimized out. :/
void forceLinkGrpcTests() {}

namespace fdbrpc_test {
namespace asio = boost::asio;

TEST_CASE("/fdbrpc/grpc/basic_sync_client") {
state NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50001"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test will fail if this port is already occupied?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I have make some sort of getFreePort() function. Currently, I'm just making sure each test has different port.

state GrpcServer server(addr);
server.registerService(make_shared<TestEchoServiceImpl>());
state Future<Void> server_actor = server.run();

EchoClient client(grpc::CreateChannel(addr.toString(), grpc::InsecureChannelCredentials()));
std::string reply = client.Echo("Ping!");
std::cout << "Echo received: " << reply << std::endl;
ASSERT_EQ(reply, "Echo: Ping!");

server.shutdown();
wait(server_actor);
return Void();
}

TEST_CASE("/fdbrpc/grpc/basic_async_client") {
state NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50003"));
state GrpcServer server(addr);
server.registerService(make_shared<TestEchoServiceImpl>());
state Future<Void> _ = server.run();

state shared_ptr<asio::thread_pool> pool = make_shared<asio::thread_pool>(4);
state AsyncGrpcClient<TestEchoService> client(addr.toString(), pool);

try {
state EchoRequest request;
request.set_message("Ping!");
EchoResponse response = wait(client.call(&TestEchoService::Stub::Echo, request));
std::cout << "Echo received: " << response.message() << std::endl;
ASSERT_EQ(response.message(), "Echo: Ping!");
} catch (Error& e) {
ASSERT_EQ(e.code(), error_code_grpc_error);
ASSERT(false);
}

return Void();
}

TEST_CASE("/fdbrpc/grpc/no_server_running") {
state NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50004"));
state shared_ptr<asio::thread_pool> pool(make_shared<asio::thread_pool>(4));
state AsyncGrpcClient<TestEchoService> client(addr.toString(), pool);

try {
state EchoRequest request;
request.set_message("Ping!");
EchoResponse response = wait(client.call(&TestEchoService::Stub::Echo, request));
ASSERT(false); // RPC should fail as there is no server running.;
} catch (Error& e) {
ASSERT_EQ(e.code(), error_code_grpc_error);
}

return Void();
}

TEST_CASE("/fdbrpc/grpc/destroy_server_without_shutdown") {
state NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50005"));
state GrpcServer server(addr);
server.registerService(make_shared<TestEchoServiceImpl>());
state Future<Void> _ = server.run();
return Void();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expectation here? Can we assert for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If lose last server reference, it cleans up, shutsdown and doesn't cause any random segfaults etc.


} // namespace fdbrpc_test
Loading