diff --git a/conanfile.py b/conanfile.py index ab6e0ce..cb2ee4a 100644 --- a/conanfile.py +++ b/conanfile.py @@ -10,7 +10,7 @@ class NuRaftMesgConan(ConanFile): name = "nuraft_mesg" - version = "2.4.1" + version = "3.0.1" homepage = "https://github.com/eBay/nuraft_mesg" description = "A gRPC service for NuRAFT" diff --git a/include/nuraft_mesg/common.hpp b/include/nuraft_mesg/common.hpp index 8967442..72bb88e 100644 --- a/include/nuraft_mesg/common.hpp +++ b/include/nuraft_mesg/common.hpp @@ -16,6 +16,10 @@ SISL_LOGGING_DECL(nuraft_mesg) #define NURAFTMESG_LOG_MODS nuraft_mesg, grpc_server +namespace sisl { +class GenericClientResponse; +} // namespace sisl + namespace nuraft_mesg { using peer_id_t = boost::uuids::uuid; diff --git a/include/nuraft_mesg/mesg_factory.hpp b/include/nuraft_mesg/mesg_factory.hpp index 082b2ef..ee7e136 100644 --- a/include/nuraft_mesg/mesg_factory.hpp +++ b/include/nuraft_mesg/mesg_factory.hpp @@ -107,8 +107,9 @@ class mesg_factory final : public grpc_factory { NullAsyncResult data_service_request_unidirectional(std::optional< Result< peer_id_t > > const& dest, std::string const& request_name, io_blob_list_t const& cli_buf); - AsyncResult< sisl::io_blob > data_service_request_bidirectional(std::optional< Result< peer_id_t > > const&, - std::string const&, io_blob_list_t const&); + AsyncResult< sisl::GenericClientResponse > + data_service_request_bidirectional(std::optional< Result< peer_id_t > > const&, std::string const&, + io_blob_list_t const&); }; } // namespace nuraft_mesg diff --git a/include/nuraft_mesg/mesg_state_mgr.hpp b/include/nuraft_mesg/mesg_state_mgr.hpp index 49c7bfd..4e9c238 100644 --- a/include/nuraft_mesg/mesg_state_mgr.hpp +++ b/include/nuraft_mesg/mesg_state_mgr.hpp @@ -11,11 +11,11 @@ namespace nuraft { class raft_server; class state_machine; -} +} // namespace nuraft namespace sisl { class GenericRpcData; -} +} // namespace sisl namespace nuraft_mesg { @@ -46,7 +46,7 @@ class repl_service_ctx { nuraft::raft_server* _server; bool is_raft_leader() const; const std::string& raft_leader_id() const; - std::vector< peer_info >get_raft_status() const; + std::vector< peer_info > get_raft_status() const; // return a list of replica configs for the peers of the raft group void get_cluster_config(std::list< replica_config >& cluster_config) const; @@ -55,9 +55,9 @@ class repl_service_ctx { virtual NullAsyncResult data_service_request_unidirectional(destination_t const& dest, std::string const& request_name, io_blob_list_t const& cli_buf) = 0; - virtual AsyncResult< sisl::io_blob > data_service_request_bidirectional(destination_t const& dest, - std::string const& request_name, - io_blob_list_t const& cli_buf) = 0; + virtual AsyncResult< sisl::GenericClientResponse > + data_service_request_bidirectional(destination_t const& dest, std::string const& request_name, + io_blob_list_t const& cli_buf) = 0; // Send response to a data service request and finish the async call. virtual void send_data_service_response(io_blob_list_t const& outgoing_buf, diff --git a/src/lib/common_lib.hpp b/src/lib/common_lib.hpp index 3ac9a61..df81f78 100644 --- a/src/lib/common_lib.hpp +++ b/src/lib/common_lib.hpp @@ -14,26 +14,6 @@ namespace nuraft_mesg { -[[maybe_unused]] static void serialize_to_byte_buffer(grpc::ByteBuffer& cli_byte_buf, io_blob_list_t const& cli_buf) { - folly::small_vector< grpc::Slice, 4 > slices; - for (auto const& blob : cli_buf) { - slices.emplace_back(blob.cbytes(), blob.size(), grpc::Slice::STATIC_SLICE); - } - cli_byte_buf.Clear(); - grpc::ByteBuffer tmp(slices.data(), cli_buf.size()); - cli_byte_buf.Swap(&tmp); -} - -[[maybe_unused]] static grpc::Status deserialize_from_byte_buffer(grpc::ByteBuffer const& cli_byte_buf, - sisl::io_blob& cli_buf) { - grpc::Slice slice; - auto status = cli_byte_buf.TrySingleSlice(&slice); - if (!status.ok()) { return status; } - cli_buf.set_bytes(slice.begin()); - cli_buf.set_size(slice.size()); - return status; -} - // generic rpc server looks up rpc name in a map and calls the corresponding callback. To avoid another lookup in this // layer, we registed one callback for each (group_id, request_name) pair. The rpc_name is their concatenation. [[maybe_unused]] static std::string get_generic_method_name(std::string const& request_name, diff --git a/src/lib/repl_service_ctx.cpp b/src/lib/repl_service_ctx.cpp index 5da798b..fd2046e 100644 --- a/src/lib/repl_service_ctx.cpp +++ b/src/lib/repl_service_ctx.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include "nuraft_mesg/mesg_factory.hpp" @@ -61,9 +62,9 @@ NullAsyncResult repl_service_ctx_grpc::data_service_request_unidirectional(desti : m_mesg_factory->data_service_request_unidirectional(get_peer_id(dest), request_name, cli_buf); } -AsyncResult< sisl::io_blob > repl_service_ctx_grpc::data_service_request_bidirectional(destination_t const& dest, - std::string const& request_name, - io_blob_list_t const& cli_buf) { +AsyncResult< sisl::GenericClientResponse > +repl_service_ctx_grpc::data_service_request_bidirectional(destination_t const& dest, std::string const& request_name, + io_blob_list_t const& cli_buf) { return (!m_mesg_factory) ? folly::makeUnexpected(nuraft::cmd_result_code::SERVER_NOT_FOUND) : m_mesg_factory->data_service_request_bidirectional(get_peer_id(dest), request_name, cli_buf); @@ -121,8 +122,7 @@ repl_service_ctx_grpc::repl_service_ctx_grpc(grpc_server* server, std::shared_pt void repl_service_ctx_grpc::send_data_service_response(io_blob_list_t const& outgoing_buf, boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) { - serialize_to_byte_buffer(rpc_data->response(), outgoing_buf); - rpc_data->send_response(); + rpc_data->send_response(outgoing_buf); } } // namespace nuraft_mesg diff --git a/src/lib/repl_service_ctx.hpp b/src/lib/repl_service_ctx.hpp index e1dd7c0..6e4785a 100644 --- a/src/lib/repl_service_ctx.hpp +++ b/src/lib/repl_service_ctx.hpp @@ -16,9 +16,9 @@ class repl_service_ctx_grpc : public repl_service_ctx { NullAsyncResult data_service_request_unidirectional(destination_t const& dest, std::string const& request_name, io_blob_list_t const& cli_buf) override; - AsyncResult< sisl::io_blob > data_service_request_bidirectional(destination_t const& dest, - std::string const& request_name, - io_blob_list_t const& cli_buf) override; + AsyncResult< sisl::GenericClientResponse > + data_service_request_bidirectional(destination_t const& dest, std::string const& request_name, + io_blob_list_t const& cli_buf) override; void send_data_service_response(io_blob_list_t const& outgoing_buf, boost::intrusive_ptr< sisl::GenericRpcData >& rpc_data) override; diff --git a/src/proto/proto_mesg_factory.cpp b/src/proto/proto_mesg_factory.cpp index 253a16f..ac22f76 100644 --- a/src/proto/proto_mesg_factory.cpp +++ b/src/proto/proto_mesg_factory.cpp @@ -79,11 +79,8 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha NullAsyncResult data_service_request_unidirectional(std::string const& request_name, io_blob_list_t const& cli_buf) { - grpc::ByteBuffer cli_byte_buf; - serialize_to_byte_buffer(cli_byte_buf, cli_buf); return _generic_stub - ->call_unary(cli_byte_buf, request_name, - NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs)) + ->call_unary(cli_buf, request_name, NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs)) .deferValue([](auto&& response) -> NullResult { if (response.hasError()) { LOGE("Failed to send data_service_request, error: {}", response.error().error_message()); @@ -93,21 +90,16 @@ class messaging_client : public grpc_client< Messaging >, public std::enable_sha }); } - AsyncResult< sisl::io_blob > data_service_request_bidirectional(std::string const& request_name, - io_blob_list_t const& cli_buf) { - grpc::ByteBuffer cli_byte_buf; - serialize_to_byte_buffer(cli_byte_buf, cli_buf); + AsyncResult< sisl::GenericClientResponse > data_service_request_bidirectional(std::string const& request_name, + io_blob_list_t const& cli_buf) { return _generic_stub - ->call_unary(cli_byte_buf, request_name, - NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs)) - .deferValue([](auto&& response) -> Result< sisl::io_blob > { + ->call_unary(cli_buf, request_name, NURAFT_MESG_CONFIG(mesg_factory_config->data_request_deadline_secs)) + .deferValue([](auto&& response) -> Result< sisl::GenericClientResponse > { if (response.hasError()) { LOGE("Failed to send data_service_request, error: {}", response.error().error_message()); return folly::makeUnexpected(nuraft::cmd_result_code::CANCELLED); } - sisl::io_blob svr_buf; - deserialize_from_byte_buffer(response.value(), svr_buf); - return svr_buf; + return std::move(response.value()); }); } @@ -157,8 +149,8 @@ class grpc_proto_client : public grpc_base_client { return _client->data_service_request_unidirectional(request_name, cli_buf); } - AsyncResult< sisl::io_blob > data_service_request_bidirectional(std::string const& request_name, - io_blob_list_t const& cli_buf) { + AsyncResult< sisl::GenericClientResponse > data_service_request_bidirectional(std::string const& request_name, + io_blob_list_t const& cli_buf) { return _client->data_service_request_bidirectional(request_name, cli_buf); } }; @@ -208,10 +200,10 @@ NullAsyncResult mesg_factory::data_service_request_unidirectional(std::optional< // We ignore the vector of future response from collect all and st the value as folly::unit. // This is because we do not have a use case to handle the errors that happen during the unidirectional call to all // the peers. - return folly::collectAll(calls).deferValue([](auto&&) -> NullResult { return folly::unit; }); + return folly::collectAll(calls).deferValue([](auto &&) -> NullResult { return folly::unit; }); } -AsyncResult< sisl::io_blob > +AsyncResult< sisl::GenericClientResponse > mesg_factory::data_service_request_bidirectional(std::optional< Result< peer_id_t > > const& dest, std::string const& request_name, io_blob_list_t const& cli_buf) { std::shared_lock< client_factory_lock_type > rl(_client_lock); diff --git a/src/tests/data_service_tests.cpp b/src/tests/data_service_tests.cpp index abfbf36..d04da5c 100644 --- a/src/tests/data_service_tests.cpp +++ b/src/tests/data_service_tests.cpp @@ -84,7 +84,7 @@ TEST_F(DataServiceFixture, BasicTest1) { results.push_back(sm4_1->data_service_request_bidirectional(nuraft_mesg::role_regex::LEADER, REQUEST_DATA, cli_buf) .deferValue([](auto e) -> NullResult { - test_state_mgr::verify_data(e.value()); + test_state_mgr::verify_data(e.value().response_blob()); return folly::Unit(); })); diff --git a/src/tests/test_state_manager.cpp b/src/tests/test_state_manager.cpp index 4d002a9..fa44755 100644 --- a/src/tests/test_state_manager.cpp +++ b/src/tests/test_state_manager.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include "nuraft_mesg/nuraft_mesg.hpp" #include "jungle_logstore/jungle_log_store.h" @@ -176,7 +177,7 @@ void test_state_mgr::leave() {} ///// data service api helpers -nuraft_mesg::AsyncResult< sisl::io_blob > +nuraft_mesg::AsyncResult< sisl::GenericClientResponse > test_state_mgr::data_service_request_bidirectional(nuraft_mesg::destination_t const& dest, std::string const& request_name, nuraft_mesg::io_blob_list_t const& cli_buf) { diff --git a/src/tests/test_state_manager.h b/src/tests/test_state_manager.h index 449fe46..f967d78 100644 --- a/src/tests/test_state_manager.h +++ b/src/tests/test_state_manager.h @@ -46,7 +46,7 @@ class test_state_mgr : public nuraft_mesg::mesg_state_mgr { void leave() override; ///// data service helper apis - nuraft_mesg::AsyncResult< sisl::io_blob > + nuraft_mesg::AsyncResult< sisl::GenericClientResponse > data_service_request_bidirectional(nuraft_mesg::destination_t const& dest, std::string const& request_name, nuraft_mesg::io_blob_list_t const& cli_buf); nuraft_mesg::NullAsyncResult data_service_request_unidirectional(nuraft_mesg::destination_t const& dest,