diff --git a/cmake/FDBComponents.cmake b/cmake/FDBComponents.cmake index e1fac0967f4..39f0fb18f2a 100644 --- a/cmake/FDBComponents.cmake +++ b/cmake/FDBComponents.cmake @@ -187,7 +187,7 @@ if(toml11_FOUND) add_library(toml11_target INTERFACE) target_link_libraries(toml11_target INTERFACE toml11::toml11) else() - include(ExternalProject) + include(ExternalProject) ExternalProject_add(toml11Project URL "https://github.com/ToruNiina/toml11/archive/v3.4.0.tar.gz" URL_HASH SHA256=bc6d733efd9216af8c119d8ac64a805578c79cc82b813e4d1d880ca128bd154d @@ -231,6 +231,12 @@ endif() ################################################################################ +# TODO (Vishesh): Replace with target_include_directories. +include_directories("${CMAKE_CURRENT_BINARY_DIR}/generated/") + +set(gRPC_DIR /usr/local/lib/cmake/grpc) +find_package(gRPC CONFIG REQUIRED) + file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/packages) add_custom_target(packages) diff --git a/cmake/utils.cmake b/cmake/utils.cmake index 9e09e51f164..671d65544e5 100644 --- a/cmake/utils.cmake +++ b/cmake/utils.cmake @@ -48,11 +48,11 @@ function(fdb_find_sources out) file(GLOB res LIST_DIRECTORIES false RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" - CONFIGURE_DEPENDS "*.cpp" "*.c" "*.h" "*.hpp") + CONFIGURE_DEPENDS "*.cpp" "*.cc" "*.c" "*.h" "*.hpp") file(GLOB_RECURSE res_includes LIST_DIRECTORIES false RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}/include" - CONFIGURE_DEPENDS "include/*.cpp" "include/*.c" "include/*.h" "include/*.hpp") + CONFIGURE_DEPENDS "include/*.cpp" "include/*.cc" "include/*.c" "include/*.h" "include/*.hpp") file(GLOB_RECURSE res_workloads LIST_DIRECTORIES false RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}/workloads" @@ -66,3 +66,42 @@ function(fdb_find_sources out) endforeach() set(${out} "${res}" PARENT_SCOPE) endfunction() + +function(package_name_to_path result_var pkg_name) + set(_pkg_name "${pkg_name}") + string(REPLACE "." "/" _pkg_name ${_pkg_name}) + set(${result_var} ${_pkg_name} PARENT_SCOPE) +endfunction() + +function(package_name_to_proto_target result_var pkg_name) + set(_pkg_name "${pkg_name}") + string(REPLACE "." "_" _pkg_name ${_pkg_name}) + set(${result_var} "proto_${_pkg_name}" PARENT_SCOPE) +endfunction() + +# Args: package_name proto_file ... +function(generate_grpc_protobuf pkg_name) + set(proto_files ${ARGN}) + package_name_to_proto_target(target_name ${pkg_name}) + package_name_to_path(out_rel_path ${pkg_name}) + + add_library(${target_name} ${proto_files}) + target_link_libraries(${target_name} PUBLIC gRPC::grpc++) + + set(protoc_out_dir "${CMAKE_BINARY_DIR}/generated/${out_rel_path}/") + protobuf_generate( + TARGET ${target_name} + PROTOC_OUT_DIR ${protoc_out_dir} + GENERATE_EXTENSIONS .pb.h .pb.cc + ) + + protobuf_generate( + TARGET ${target_name} + LANGUAGE grpc + PROTOC_OUT_DIR ${protoc_out_dir} + PLUGIN protoc-gen-grpc=$ + GENERATE_EXTENSIONS .grpc.pb.h .grpc.pb.cc + ) + + target_include_directories(${target_name} PUBLIC "${protoc_out_dir}") +endfunction() diff --git a/fdbrpc/CMakeLists.txt b/fdbrpc/CMakeLists.txt index bc9b4c5490e..ac111c4630d 100644 --- a/fdbrpc/CMakeLists.txt +++ b/fdbrpc/CMakeLists.txt @@ -64,8 +64,12 @@ if(${COROUTINE_IMPL} STREQUAL libcoro) endif() endif() -target_include_directories(fdbrpc PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/libeio) -target_link_libraries(fdbrpc PUBLIC flow libb64 md5 PRIVATE rapidjson) +generate_grpc_protobuf(fdbrpc.file_transfer file_transfer.proto) + +target_include_directories(fdbrpc PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) +target_include_directories(fdbrpc PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/libeio ${Protobuf_INCLUDE_DIRS} ${gRPC_INCLUDE_DIRS}) +target_link_libraries(fdbrpc PUBLIC proto_fdbrpc_test proto_fdbrpc_file_transfer) +target_link_libraries(fdbrpc PUBLIC flow libb64 md5 PRIVATE rapidjson gRPC::grpc++) target_include_directories(fdbrpc_sampling PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/libeio) target_link_libraries(fdbrpc_sampling PUBLIC flow_sampling libb64 md5 PRIVATE rapidjson) @@ -86,4 +90,4 @@ if(WIN32) add_dependencies(tokensign_actors fdbrpc_actors) endif() -add_subdirectory(tests) \ No newline at end of file +add_subdirectory(tests) diff --git a/fdbrpc/FileTransfer.cpp b/fdbrpc/FileTransfer.cpp new file mode 100644 index 00000000000..f59ed93c5de --- /dev/null +++ b/fdbrpc/FileTransfer.cpp @@ -0,0 +1,21 @@ +/** + * FileTransfer.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbrpc/FileTransfer.h" \ No newline at end of file diff --git a/fdbrpc/FlowGrpc.actor.cpp b/fdbrpc/FlowGrpc.actor.cpp new file mode 100644 index 00000000000..505cc2d1e66 --- /dev/null +++ b/fdbrpc/FlowGrpc.actor.cpp @@ -0,0 +1,57 @@ +/** + * gRPC.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "fdbrpc/FlowGrpc.h" + +#include "flow/actorcompiler.h" // This must be the last #include. + +GrpcServer::GrpcServer(const NetworkAddress& addr) : address_(addr) {} + +GrpcServer::~GrpcServer() { + if (server_) { + shutdown(); + } + + if (server_thread_.joinable()) { + server_thread_.join(); + } +} + +Future GrpcServer::run() { + grpc::ServerBuilder builder_; + builder_.AddListeningPort(address_.toString(), grpc::InsecureServerCredentials()); + for (auto& service : registered_services_) { + builder_.RegisterService(service.get()); + } + server_ = builder_.BuildAndStart(); + return server_promise_.getFuture(); +} + +void GrpcServer::shutdown() { + server_->Shutdown(); // TODO (Vishesh): This needs to be Future. + server_promise_.send(Void()); + server_ = nullptr; +} + +void GrpcServer::registerService(std::shared_ptr service) { + registered_services_.push_back(service); +} diff --git a/fdbrpc/FlowGrpcTests.actor.cpp b/fdbrpc/FlowGrpcTests.actor.cpp new file mode 100644 index 00000000000..10b2ca7f943 --- /dev/null +++ b/fdbrpc/FlowGrpcTests.actor.cpp @@ -0,0 +1,99 @@ +/** + * FlowGrpcTests.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "flow/UnitTest.h" +#include "fdbrpc/FlowGrpc.h" +#include "fdbrpc/FlowGrpcTests.h" + +#include "flow/actorcompiler.h" // This must be the last #include. + +// So that tests are not optimized out. :/ +void forceLinkGrpcTests() {} + +namespace fdbrpc_test { +namespace asio = boost::asio; + +TEST_CASE("/fdbrpc/grpc/basic_sync_client") { + state NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50001")); + state GrpcServer server(addr); + server.registerService(make_shared()); + state Future server_actor = server.run(); + + EchoClient client(grpc::CreateChannel(addr.toString(), grpc::InsecureChannelCredentials())); + std::string reply = client.Echo("Ping!"); + std::cout << "Echo received: " << reply << std::endl; + ASSERT_EQ(reply, "Echo: Ping!"); + + server.shutdown(); + wait(server_actor); + return Void(); +} + +TEST_CASE("/fdbrpc/grpc/basic_async_client") { + state NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50003")); + state GrpcServer server(addr); + server.registerService(make_shared()); + state Future _ = server.run(); + + state shared_ptr pool = make_shared(4); + state AsyncGrpcClient client(addr.toString(), pool); + + try { + state EchoRequest request; + request.set_message("Ping!"); + EchoResponse response = wait(client.call(&TestEchoService::Stub::Echo, request)); + std::cout << "Echo received: " << response.message() << std::endl; + ASSERT_EQ(response.message(), "Echo: Ping!"); + } catch (Error& e) { + ASSERT_EQ(e.code(), error_code_grpc_error); + ASSERT(false); + } + + return Void(); +} + +TEST_CASE("/fdbrpc/grpc/no_server_running") { + state NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50004")); + state shared_ptr pool(make_shared(4)); + state AsyncGrpcClient client(addr.toString(), pool); + + try { + state EchoRequest request; + request.set_message("Ping!"); + EchoResponse response = wait(client.call(&TestEchoService::Stub::Echo, request)); + ASSERT(false); // RPC should fail as there is no server running.; + } catch (Error& e) { + ASSERT_EQ(e.code(), error_code_grpc_error); + } + + return Void(); +} + +TEST_CASE("/fdbrpc/grpc/destroy_server_without_shutdown") { + state NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50005")); + state GrpcServer server(addr); + server.registerService(make_shared()); + state Future _ = server.run(); + return Void(); +} + +} // namespace fdbrpc_test \ No newline at end of file diff --git a/fdbrpc/FlowGrpcTests.cpp b/fdbrpc/FlowGrpcTests.cpp new file mode 100644 index 00000000000..d6559d777cb --- /dev/null +++ b/fdbrpc/FlowGrpcTests.cpp @@ -0,0 +1,190 @@ + +/** + * FlowGrpcTests.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "flow/UnitTest.h" +#include "fdbrpc/FlowGrpc.h" +#include "fdbrpc/FileTransfer.h" +#include "fdbrpc/FlowGrpcTests.h" +#include "flow/flow.h" + +// So that tests are not optimized out. :/ +void forceLinkGrpcTests2() {} + +namespace fdbrpc_test { +namespace asio = boost::asio; + +TEST_CASE("/fdbrpc/grpc/basic_coro") { + NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50500")); + GrpcServer server(addr); + server.registerService(make_shared()); + Future _ = server.run(); + + shared_ptr pool = make_shared(4); + AsyncGrpcClient client(addr.toString(), pool); + + try { + EchoRequest request; + request.set_message("Ping!"); + EchoResponse response = co_await client.call(&TestEchoService::Stub::Echo, request); + std::cout << "Echo received: " << response.message() << std::endl; + ASSERT_EQ(response.message(), "Echo: Ping!"); + } catch (Error& e) { + ASSERT_EQ(e.code(), error_code_grpc_error); + ASSERT(false); + } +} + +TEST_CASE("/fdbrpc/grpc/basic_stream_server") { + NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50501")); + GrpcServer server(addr); + server.registerService(make_shared()); + Future _ = server.run(); + + shared_ptr pool = make_shared(4); + AsyncGrpcClient client(addr.toString(), pool); + + int count = 0; + try { + EchoRequest request; + request.set_message("Ping!"); + auto stream = client.call(&TestEchoService::Stub::EchoRecv10, request); + loop { + auto response = co_await stream; + ASSERT_EQ(response.message(), "Echo: Ping!"); + count += 1; + } + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + ASSERT_EQ(count, 10); // Should send 10 reponses. + co_return; + } + ASSERT(false); + } + co_return; +} + +TEST_CASE("/fdbrpc/grpc/future_destroy") { + NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50500")); + GrpcServer server(addr); + server.registerService(make_shared()); + Future _ = server.run(); + + shared_ptr pool = make_shared(4); + AsyncGrpcClient client(addr.toString(), pool); + + try { + EchoRequest request; + request.set_message("Ping!"); + { + auto w = client.call(&TestEchoService::Stub::Echo, request); + // Out-of-scope + } + } catch (Error& e) { + ASSERT(false); + } + co_await delay(1); // So that lifetime of client stays. + co_return; +} + + +TEST_CASE("/fdbrpc/grpc/stream_destroy") { + NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50501")); + GrpcServer server(addr); + server.registerService(make_shared()); + Future _ = server.run(); + + shared_ptr pool = make_shared(4); + AsyncGrpcClient client(addr.toString(), pool); + + int count = 0; + try { + EchoRequest request; + request.set_message("Ping!"); + { + auto stream = client.call(&TestEchoService::Stub::EchoRecv10, request); + auto response = co_await stream; + ASSERT_EQ(response.message(), "Echo: Ping!"); + } + //TODO: Test if server cancels. + } catch (Error& e) { + if (e.code() == error_code_end_of_stream) { + ASSERT_EQ(count, 10); // Should send 10 reponses. + co_return; + } + ASSERT(false); + } + co_return; +} + +// T_EST_CASE("/fdbrpc/grpc/basic_stream_client") { +// NetworkAddress addr(NetworkAddress::parse("127.0.0.1:50502")); +// GrpcServer server(addr); +// server.registerService(make_shared()); +// Future _ = server.run(); + +// shared_ptr pool = make_shared(4); +// AsyncGrpcClient client(addr.toString(), pool); + +// int count = 0; +// try { +// EchoRequest request; +// request.set_message("Ping!"); +// auto stream = client.call(&TestEchoService::Stub::EchoSend10, request); +// loop { +// auto response = co_await stream; +// ASSERT_EQ(response.message(), "Echo: Ping!"); +// count += 1; +// } +// } catch (Error& e) { +// if (e.code() == error_code_end_of_stream) { +// ASSERT_EQ(count, 10); // Should send 10 reponses. +// co_return; +// } +// ASSERT(false); +// } +// co_return; +// } + +TEST_CASE("/fdbrpc/grpc/file_transfer") { + // -- Server -- + std::string server_address("127.0.0.1:50051"); + FileTransferServiceImpl service; + + grpc::ServerBuilder builder; + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.RegisterService(&service); + + std::unique_ptr server(builder.BuildAndStart()); + std::cout << "Server listening on " << server_address << std::endl; + + // -- Client -- + auto channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()); + auto client = FileTransferClient(channel); + auto start = std::chrono::high_resolution_clock::now(); + client.DownloadFile("/root/ftexample.bin", "example.bin"); + auto end = std::chrono::high_resolution_clock::now(); + auto diff = std::chrono::duration_cast(end - start); + std::cout << "Time taken: " << diff.count() << std::endl; + + server->Shutdown(); + return Void(); +} + +} // namespace fdbrpc_test \ No newline at end of file diff --git a/fdbrpc/file_transfer.proto b/fdbrpc/file_transfer.proto new file mode 100644 index 00000000000..78b9e6d33ce --- /dev/null +++ b/fdbrpc/file_transfer.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package fdbrpc; + +service FileTransferService { + rpc DownloadFile(DownloadRequest) returns (stream DownloadChunk); +} + +message DownloadRequest { + string file_name = 1; + int32 chunk_size = 2; + int32 first_chunk_index = 3; + int32 last_chunk_index = 4; +} + +message DownloadChunk { + int32 offset = 2; + bytes data = 3; +} diff --git a/fdbrpc/include/fdbrpc/FileTransfer.h b/fdbrpc/include/fdbrpc/FileTransfer.h new file mode 100644 index 00000000000..085fcbaa46c --- /dev/null +++ b/fdbrpc/include/fdbrpc/FileTransfer.h @@ -0,0 +1,111 @@ +/** + * FileTransfer.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBRPC_FILE_TRANSFER_H +#define FDBRPC_FILE_TRANSFER_H + +#include +#include + +#include + +#include "fdbrpc/file_transfer/file_transfer.grpc.pb.h" + +using fdbrpc::DownloadChunk; +using fdbrpc::DownloadRequest; +using fdbrpc::FileTransferService; +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +class FileTransferServiceImpl final : public FileTransferService::Service { +public: + grpc::Status DownloadFile(grpc::ServerContext* context, + const DownloadRequest* request, + grpc::ServerWriter* writer) override { + std::ifstream input_file(request->file_name(), std::ios::binary | std::ios::ate); + if (!input_file.is_open()) { + return grpc::Status(grpc::StatusCode::NOT_FOUND, "File not found"); + } + + // const size_t file_size = input_file.tellg(); + input_file.seekg(0); + + const size_t buffer_size = 1024 * 1024; // 1MB buffer + std::vector buffer(buffer_size); + + int64_t offset = 0; + while (input_file.good()) { + input_file.read(buffer.data(), buffer_size); + std::streamsize bytes_read = input_file.gcount(); + + DownloadChunk chunk; + chunk.set_offset(offset); + chunk.set_data(buffer.data(), bytes_read); + writer->Write(chunk); + + offset += bytes_read; + } + + input_file.close(); + return grpc::Status::OK; + } +}; + +class FileTransferClient { +public: + FileTransferClient(std::shared_ptr channel) : stub_(FileTransferService::NewStub(channel)) {} + + bool DownloadFile(const std::string& filename, const std::string& output_filename) { + ClientContext context; + DownloadRequest request; + request.set_file_name(filename); + + std::unique_ptr> reader(stub_->DownloadFile(&context, request)); + + std::ofstream output_file(output_filename, std::ios::binary); + if (!output_file.is_open()) { + std::cerr << "Failed to open file for writing: " << output_filename << std::endl; + return false; + } + + DownloadChunk chunk; + while (reader->Read(&chunk)) { + output_file.seekp(chunk.offset()); + output_file.write(chunk.data().data(), chunk.data().size()); + } + + output_file.close(); + + Status grpc_status = reader->Finish(); + if (grpc_status.ok()) { + std::cout << "File downloaded successfully: " << output_filename << std::endl; + return true; + } else { + std::cerr << "File download failed: " << grpc_status.error_message() << std::endl; + return false; + } + } + +private: + std::unique_ptr stub_; +}; + +#endif \ No newline at end of file diff --git a/fdbrpc/include/fdbrpc/FlowGrpc.h b/fdbrpc/include/fdbrpc/FlowGrpc.h new file mode 100644 index 00000000000..ba3b4a72b5b --- /dev/null +++ b/fdbrpc/include/fdbrpc/FlowGrpc.h @@ -0,0 +1,189 @@ +/** + * FlowGrpc.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBRPC_FLOW_GRPC_H +#define FDBRPC_FLOW_GRPC_H + +#include +#include +#include + +#include + +#include "flow/IThreadPool.h" +#include "flow/flow.h" + +class GrpcServer { +public: + GrpcServer(const NetworkAddress& addr); + ~GrpcServer(); + + Future run(); + void shutdown(); + void registerService(std::shared_ptr service); + + static GrpcServer* initInstance(const NetworkAddress& addr) { + GrpcServer* server = new GrpcServer(addr); + g_network->setGlobal(INetwork::enGrpcServer, (flowGlobalType)server); + return server; + } + + static GrpcServer* instance() { + return static_cast((void*)g_network->global(INetwork::enGrpcServer)); + } + +private: + NetworkAddress address_; + std::unique_ptr server_; + ThreadReturnPromise server_promise_; + std::thread server_thread_; + std::vector> registered_services_; +}; + +template +class AsyncGrpcClient { + template + using UnaryRpcFn = grpc::Status (ServiceType::Stub::*)(grpc::ClientContext*, const Request&, Response*); + + template + using ServerStreamingRpcFn = + std::unique_ptr> (ServiceType::Stub::*)(grpc::ClientContext*, const Request&); + + template + using ClientStreamingRpcFn = + std::unique_ptr> (ServiceType::Stub::*)(grpc::ClientContext*, Response*); + +public: + using Rpc = typename ServiceType::Stub; + + AsyncGrpcClient() {} + AsyncGrpcClient(const std::string& endpoint, std::shared_ptr pool) + : pool_(pool), channel_(grpc::CreateChannel(endpoint, grpc::InsecureChannelCredentials())), + stub_(ServiceType::NewStub(channel_)) {} + + template + Future call(UnaryRpcFn rpc, + const RequestType& request, + ResponseType* response) { + // ASSERT(g_network->isOnMainThread()); + auto promise = std::make_shared>(); + + boost::asio::post(*pool_, [this, promise, rpc, request, response]() { + grpc::ClientContext context; + auto status = (stub_.get()->*rpc)(&context, request, response); + if (promise->getFutureReferenceCount() == 0) { + return; + } + promise->send(status); + }); + + return promise->getFuture(); + } + + template + Future call(UnaryRpcFn rpc, const RequestType& request) { + // ASSERT(g_network->isOnMainThread()); + auto promise = std::make_shared>(); + + boost::asio::post(*pool_, [this, promise, rpc, request]() { + if (promise->getFutureReferenceCount() == 0) { + return; + } + + grpc::ClientContext context; + ResponseType response; + auto status = (stub_.get()->*rpc)(&context, request, &response); + + if (promise->getFutureReferenceCount() == 0) { + return; + } + + if (status.ok()) { + promise->send(response); + } else { + std::cout << "Error: " << status.error_message() << std::endl; + promise->sendError(grpc_error()); // TODO (Vishesh): Propogate the gRPC error codes. + } + }); + + return promise->getFuture(); + } + + template + FutureStream call(ServerStreamingRpcFn rpc, const RequestType& request) { + // ASSERT(g_network->isOnMainThread()); + auto promise = std::make_shared>(); + + boost::asio::post(*pool_, [this, promise, rpc, request]() { + grpc::ClientContext context; + ResponseType response; + auto reader = (stub_.get()->*rpc)(&context, request); + while (reader->Read(&response)) { + if (promise->getFutureReferenceCount() == 0) { + std::cout << "Stream cancelled.\n"; + context.TryCancel(); + return; + } + + promise->send(response); + } + + auto status = reader->Finish(); + if (status.ok()) { + promise->sendError(end_of_stream()); + } else { + std::cout << "Error: " << status.error_message() << std::endl; + promise->sendError(grpc_error()); // TODO (Vishesh): Propogate the gRPC error codes. + } + }); + + return promise->getFuture(); + } + + // template + // void call(ClientStreamingRpcFn rpc, const RequestType& request) { + // auto promise = std::make_shared>(); + + // boost::asio::post(*pool_, [this, promise, rpc, request]() { + // grpc::ClientContext context; + // ResponseType response; + // auto reader = (stub_.get()->*rpc)(&context, request); + // while (reader->Read(&response)) { + // promise->send(response); + // } + + // auto status = reader->Finish(); + // if (status.ok()) { + // promise->sendError(end_of_stream()); + // } else { + // promise->sendError(grpc_error()); // TODO (Vishesh): Propogate the gRPC error codes. + // } + // }); + + // return promise->getFuture(); + // } + +private: + std::shared_ptr pool_; + std::shared_ptr channel_; + std::unique_ptr stub_; +}; + +#endif // FDBRPC_FLOW_GRPC_H diff --git a/fdbrpc/include/fdbrpc/FlowGrpcTests.h b/fdbrpc/include/fdbrpc/FlowGrpcTests.h new file mode 100644 index 00000000000..3e315688a7e --- /dev/null +++ b/fdbrpc/include/fdbrpc/FlowGrpcTests.h @@ -0,0 +1,103 @@ +/** + * FlowGrpcTests.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "flow/Error.h" +#include "fdbrpc/test/echo.grpc.pb.h" + +namespace fdbrpc_test { + +using std::make_shared; +using std::shared_ptr; +using std::thread; + +using grpc::Channel; +using grpc::ClientContext; +using grpc::ServerContext; +using grpc::ServerReader; +using grpc::ServerWriter; +using grpc::Status; + +using fdbrpc::test::EchoRequest; +using fdbrpc::test::EchoResponse; +using fdbrpc::test::TestEchoService; + +// Service implementation +class TestEchoServiceImpl final : public TestEchoService::Service { + Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* reply) override { + reply->set_message("Echo: " + request->message()); + return Status::OK; + } + + Status EchoRecv10(ServerContext* context, const EchoRequest* request, ServerWriter* writer) override { + for (int ii = 0; ii < 10; ii++) { + if (context->IsCancelled()) { + std::cout << "Request Cancelled.\n"; + return Status::CANCELLED; + } + EchoResponse reply; + reply.set_message("Echo: " + request->message()); + writer->Write(reply); + } + return Status::OK; + } + + Status EchoSend10(ServerContext* context, ServerReader* reader, EchoResponse* reply) override { + EchoRequest request; + std::string res; + int count = 0; + while (reader->Read(&request)) { + count++; + res += request.message(); + } + reply->set_message(res); + ASSERT_EQ(count, 10); + return Status::OK; + } +}; + +class EchoClient { +public: + EchoClient(const shared_ptr& channel) : stub_(TestEchoService::NewStub(channel)) {} + + std::string Echo(const std::string& message) { + EchoRequest request; + request.set_message(message); + + EchoResponse reply; + ClientContext context; + + Status status = stub_->Echo(&context, request, &reply); + + if (status.ok()) { + return reply.message(); + } else { + std::cout << "RPC failed" << std::endl; + return "RPC failed"; + } + } + +private: + std::unique_ptr stub_; +}; + + +} // namespace fdbrpc_test \ No newline at end of file diff --git a/fdbrpc/include/fdbrpc/fdbrpc.h b/fdbrpc/include/fdbrpc/fdbrpc.h index 092ce41ec5b..ba47ea8c652 100644 --- a/fdbrpc/include/fdbrpc/fdbrpc.h +++ b/fdbrpc/include/fdbrpc/fdbrpc.h @@ -29,7 +29,6 @@ #include "fdbrpc/FailureMonitor.h" #include "fdbrpc/networksender.actor.h" #include "fdbrpc/simulator.h" - #ifdef WITH_SWIFT #include #endif /* WITH_SWIFT */ diff --git a/fdbrpc/tests/CMakeLists.txt b/fdbrpc/tests/CMakeLists.txt index ccc2909e087..78a0acbfda5 100644 --- a/fdbrpc/tests/CMakeLists.txt +++ b/fdbrpc/tests/CMakeLists.txt @@ -8,5 +8,7 @@ if(NOT WIN32) endif() endif() +include(AddFdbTest) +generate_grpc_protobuf(fdbrpc.test echo.proto) add_flow_target(EXECUTABLE NAME fdbrpc_bench SRCS fdbrpc_bench.actor.cpp) target_link_libraries(fdbrpc_bench PUBLIC flow fdbrpc boost_target_program_options) diff --git a/fdbrpc/tests/echo.proto b/fdbrpc/tests/echo.proto new file mode 100644 index 00000000000..a665d0fe087 --- /dev/null +++ b/fdbrpc/tests/echo.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package fdbrpc.test; + +service TestEchoService { + rpc Echo(EchoRequest) returns (EchoResponse); + rpc EchoRecv10(EchoRequest) returns (stream EchoResponse); + rpc EchoSend10(stream EchoRequest) returns (EchoResponse); +} + +message EchoRequest { + string message = 1; +} + +message EchoResponse { + string message = 1; +} \ No newline at end of file diff --git a/fdbserver/fdbserver.actor.cpp b/fdbserver/fdbserver.actor.cpp index f6f9843e93c..ffdaff50826 100644 --- a/fdbserver/fdbserver.actor.cpp +++ b/fdbserver/fdbserver.actor.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -53,6 +54,7 @@ #include "fdbrpc/Net2FileSystem.h" #include "fdbrpc/PerfMetric.h" #include "fdbrpc/fdbrpc.h" +#include "fdbrpc/FlowGrpc.h" #include "fdbrpc/simulator.h" #include "fdbserver/ConflictSet.h" #include "fdbserver/CoordinationInterface.h" @@ -1075,7 +1077,7 @@ struct CLIOptions { std::string testServersStr; std::string whitelistBinPaths; - std::vector publicAddressStrs, listenAddressStrs; + std::vector publicAddressStrs, listenAddressStrs, grpcAddressStrs; NetworkAddressList publicAddresses, listenAddresses; const char* targetKey = nullptr; @@ -1136,6 +1138,10 @@ struct CLIOptions { flushAndExit(FDB_EXIT_ERROR); } + for (auto& s : grpcAddressStrs) { + fmt::printf("gRPC Endpoint: %s\n", s); + } + if (role == ServerRole::ConsistencyCheck || role == ServerRole::ConsistencyCheckUrgent) { if (!publicAddressStrs.empty()) { fprintf(stderr, "ERROR: Public address cannot be specified for consistency check processes\n"); @@ -1341,7 +1347,13 @@ struct CLIOptions { case OPT_PUBLICADDR: argStr = args.OptionArg(); boost::split(tmpStrings, argStr, [](char c) { return c == ','; }); - publicAddressStrs.insert(publicAddressStrs.end(), tmpStrings.begin(), tmpStrings.end()); + for (auto& addr : tmpStrings) { + if (addr.ends_with(":grpc")) { + grpcAddressStrs.push_back(addr.substr(0, addr.size() - std::string(":grpc").size())); + } else { + publicAddressStrs.push_back(addr); + } + } break; case OPT_LISTEN: argStr = args.OptionArg(); @@ -2403,7 +2415,10 @@ int main(int argc, char* argv[]) { opts.consistencyCheckUrgentMode)); actors.push_back(histogramReport()); // actors.push_back( recurring( []{}, .001 ) ); // for ASIO latency measurement - + if (opts.grpcAddressStrs.size() > 0) { + auto grpcServer = GrpcServer::initInstance(NetworkAddress::parse(opts.grpcAddressStrs[0])); + actors.push_back(grpcServer->run()); + } f = stopAfter(waitForAll(actors)); g_network->run(); } diff --git a/fdbserver/workloads/UnitTests.actor.cpp b/fdbserver/workloads/UnitTests.actor.cpp index c0e9ecb979e..47e44cc3f3b 100644 --- a/fdbserver/workloads/UnitTests.actor.cpp +++ b/fdbserver/workloads/UnitTests.actor.cpp @@ -55,6 +55,8 @@ void forceLinkRandomKeyValueUtilsTests(); void forceLinkSimKmsVaultTests(); void forceLinkRESTSimKmsVaultTest(); void forceLinkActorFuzzUnitTests(); +void forceLinkGrpcTests(); +void forceLinkGrpcTests2(); struct UnitTestWorkload : TestWorkload { static constexpr auto NAME = "UnitTests"; @@ -125,6 +127,8 @@ struct UnitTestWorkload : TestWorkload { forceLinkSimKmsVaultTests(); forceLinkRESTSimKmsVaultTest(); forceLinkActorFuzzUnitTests(); + forceLinkGrpcTests(); + forceLinkGrpcTests2(); } Future setup(Database const& cx) override { diff --git a/flow/include/flow/IThreadPool.h b/flow/include/flow/IThreadPool.h index 04660f5f59f..c0ef985820e 100644 --- a/flow/include/flow/IThreadPool.h +++ b/flow/include/flow/IThreadPool.h @@ -110,6 +110,9 @@ class ThreadReturnPromise : NonCopyable { bool isValid() const { return promise.isValid(); } bool canBeSet() const { return promise.canBeSet(); } + int getFutureReferenceCount() const { return promise.getFutureReferenceCount(); } + int getPromiseReferenceCount() const { return promise.getPromiseReferenceCount(); } + private: Promise promise; }; @@ -140,6 +143,9 @@ class ThreadReturnPromiseStream : NonCopyable { : TaskPriority::DefaultOnMainThread); } + int getFutureReferenceCount() const { return promiseStream.getFutureReferenceCount(); } + int getPromiseReferenceCount() const { return promiseStream.getPromiseReferenceCount(); } + private: PromiseStream promiseStream; }; diff --git a/flow/include/flow/error_definitions.h b/flow/include/flow/error_definitions.h index bd6ed384731..f7ecfa973dd 100755 --- a/flow/include/flow/error_definitions.h +++ b/flow/include/flow/error_definitions.h @@ -411,6 +411,10 @@ ERROR( digital_signature_ops_error, 6002, "Digital signature operation error" ) ERROR( authorization_token_verify_failed, 6003, "Failed to verify authorization token" ) ERROR( pkey_decode_error, 6004, "Failed to decode public/private key" ) ERROR( pkey_encode_error, 6005, "Failed to encode public/private key" ) + +// gRPC error +ERROR( grpc_error, 7000, "gRPC Error" ) + // clang-format on #undef ERROR diff --git a/flow/include/flow/flow.h b/flow/include/flow/flow.h index 49d5505bd04..f6a6e0f594b 100644 --- a/flow/include/flow/flow.h +++ b/flow/include/flow/flow.h @@ -1459,6 +1459,9 @@ class SWIFT_SENDABLE PromiseStream { return queue->onEmpty.getFuture(); } + int getFutureReferenceCount() const { return queue->getFutureReferenceCount(); } + int getPromiseReferenceCount() const { return queue->getPromiseReferenceCount(); } + private: NotifiedQueue* queue; }; diff --git a/flow/include/flow/network.h b/flow/include/flow/network.h index b84876d69d4..bb89b6c9385 100644 --- a/flow/include/flow/network.h +++ b/flow/include/flow/network.h @@ -170,6 +170,7 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE INetwork { enHistogram = 18, enTokenCache = 19, enMetrics = 20, + enGrpcServer = 21, COUNT // Add new fields before this enumerator }; diff --git a/flow/network.cpp b/flow/network.cpp index 4c25e20a01c..0bfa4316580 100644 --- a/flow/network.cpp +++ b/flow/network.cpp @@ -17,9 +17,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -#include - #include #include "flow/Arena.h"