From 4a7fc4267f9a8a0f6ea904302082b2edbe3875d2 Mon Sep 17 00:00:00 2001 From: Dennis Hezel Date: Sun, 22 Oct 2023 10:57:48 +0200 Subject: [PATCH] perf: Avoid creating copies of the rpc_handler for each request in register_sender_rpc_handler --- example/snippets/server_rpc.cpp | 2 +- .../detail/register_rpc_handler_base.hpp | 2 - .../detail/register_sender_rpc_handler.hpp | 30 +++++----- test/cmake/superbuild/src/out_var.cpp | 2 +- test/src/test_unifex_20.cpp | 56 ++++++++++++++----- 5 files changed, 59 insertions(+), 33 deletions(-) diff --git a/example/snippets/server_rpc.cpp b/example/snippets/server_rpc.cpp index 39ed2b7a..89ddda86 100644 --- a/example/snippets/server_rpc.cpp +++ b/example/snippets/server_rpc.cpp @@ -72,7 +72,7 @@ void server_rpc_unary(agrpc::GrpcContext& grpc_context, example::v1::Example::As [](RPC& rpc, RPC::Request& request) -> asio::awaitable { RPC::Response response; - response.set_integer(42); + response.set_integer(request.integer()); co_await rpc.finish(response, grpc::Status::OK); }, asio::detached); diff --git a/src/agrpc/detail/register_rpc_handler_base.hpp b/src/agrpc/detail/register_rpc_handler_base.hpp index f6b3f26a..f6dc08f8 100644 --- a/src/agrpc/detail/register_rpc_handler_base.hpp +++ b/src/agrpc/detail/register_rpc_handler_base.hpp @@ -57,8 +57,6 @@ struct RegisterRPCHandlerOperationBase return stop_context_.is_stopped() || has_error_.load(std::memory_order_relaxed); } - void stop() noexcept { stop_context_.stop(); } - agrpc::GrpcContext& grpc_context() const noexcept { return detail::query_grpc_context(executor_); } const ServerRPCExecutor& get_executor() const noexcept { return executor_; } diff --git a/src/agrpc/detail/register_sender_rpc_handler.hpp b/src/agrpc/detail/register_sender_rpc_handler.hpp index aaa81fb7..06e1f125 100644 --- a/src/agrpc/detail/register_sender_rpc_handler.hpp +++ b/src/agrpc/detail/register_sender_rpc_handler.hpp @@ -102,8 +102,7 @@ struct RPCHandlerOperationFinish auto& rpc = op.rpc_; if (eptr) { - op.base_.stop(); - op.base_.set_error(static_cast(*eptr)); + op.base().set_error(static_cast(*eptr)); } if (!detail::ServerRPCContextBaseAccess::is_finished(rpc)) { @@ -157,18 +156,18 @@ struct RPCHandlerOperation void set_value(bool ok) { auto& op = rpc_handler_op_; - detail::AllocationGuard ptr{&op, op.get_allocator()}; + detail::AllocationGuard guard{&op, op.get_allocator()}; if (ok) { if (auto exception_ptr = op.emplace_rpc_handler_operation_state()) { op.rpc_.cancel(); - op.base_.set_error(static_cast(*exception_ptr)); + op.base().set_error(static_cast(*exception_ptr)); return; } - detail::create_and_start_rpc_handler_operation(op.base_, op.get_allocator()); + detail::create_and_start_rpc_handler_operation(op.base(), op.get_allocator()); op.start_rpc_handler_operation_state(); - ptr.release(); + guard.release(); } } @@ -219,8 +218,7 @@ struct RPCHandlerOperation using OperationState = std::variant; explicit RPCHandlerOperation(RegisterRPCHandlerSenderOperationBase& operation, const Allocator& allocator) - : base_(operation), - impl1_(operation.rpc_handler()), + : impl1_(operation), rpc_(detail::ServerRPCContextBaseAccess::construct(operation.get_executor())), impl2_(detail::SecondThenVariadic{}, allocator, std::in_place_type, detail::InplaceWithFunction{}, @@ -231,7 +229,7 @@ struct RPCHandlerOperation .connect(StartReceiver{*this}); }) { - base_.increment_ref_count(); + base().increment_ref_count(); } RPCHandlerOperation(const RPCHandlerOperation& other) = delete; @@ -239,9 +237,9 @@ struct RPCHandlerOperation ~RPCHandlerOperation() noexcept { - if (base_.decrement_ref_count()) + if (base().decrement_ref_count()) { - base_.complete(); + base().complete(); } } @@ -258,8 +256,7 @@ struct RPCHandlerOperation detail::InplaceWithFunction{}, [&] { - return exec::connect(initial_request().invoke(static_cast(rpc_handler()), rpc_), - FinishReceiver{*this}); + return exec::connect(initial_request().invoke(rpc_handler(), rpc_), FinishReceiver{*this}); }); return {}; } @@ -279,7 +276,9 @@ struct RPCHandlerOperation state.value_.start(); } - auto& rpc_handler() noexcept { return impl1_.first(); } + auto& base() noexcept { return impl1_.first(); } + + auto& rpc_handler() noexcept { return base().rpc_handler(); } auto& initial_request() noexcept { return impl1_.second(); } @@ -287,8 +286,7 @@ struct RPCHandlerOperation auto& get_allocator() noexcept { return impl2_.second(); } - RegisterRPCHandlerSenderOperationBase& base_; - detail::CompressedPair impl1_; + detail::CompressedPair impl1_; ServerRPC rpc_; detail::CompressedPair impl2_; }; diff --git a/test/cmake/superbuild/src/out_var.cpp b/test/cmake/superbuild/src/out_var.cpp index 5886f345..b8baf8a4 100644 --- a/test/cmake/superbuild/src/out_var.cpp +++ b/test/cmake/superbuild/src/out_var.cpp @@ -35,7 +35,7 @@ void run_out_var() request.set_integer(42); grpc::ServerAsyncResponseWriter writer{&server_context}; - const auto cb = boost::asio::bind_executor(grpc_context, [](bool) {}); + auto cb = boost::asio::bind_executor(grpc_context, [](bool) {}); const auto is_void = std::is_same_v; diff --git a/test/src/test_unifex_20.cpp b/test/src/test_unifex_20.cpp index 9729aabd..affdcd40 100644 --- a/test/src/test_unifex_20.cpp +++ b/test/src/test_unifex_20.cpp @@ -671,17 +671,21 @@ TEST_CASE_FIXTURE(UnifexClientServerTest, "unifex repeatedly_request client stre CHECK_EQ(4, request_count); } -struct UnifexClientRPCTest : UnifexTestMixin> +template +struct UnifexClientRPCTest : UnifexTestMixin> { - template + using Base = test::ClientServerRPCTest; + + template void register_and_perform_requests(RPCHandler&& handler, ClientFunctions&&... client_functions) { int counter{}; - run(agrpc::register_sender_rpc_handler(this->grpc_context, this->service, handler), + this->run( + agrpc::register_sender_rpc_handler(this->grpc_context, this->service, handler), [&counter, &client_functions, &server_shutdown = this->server_shutdown]() -> unifex::task { - typename ClientRPC::Request request; - typename ClientRPC::Response response; + typename Base::ClientRPC::Request request; + typename Base::ClientRPC::Response response; co_await client_functions(request, response); ++counter; if (counter == sizeof...(client_functions)) @@ -692,12 +696,37 @@ struct UnifexClientRPCTest : UnifexTestMixin, "unifex UnaryClientRPC success") +{ + auto client_func = [&](Request& request, Response& response) -> unifex::task + { + grpc::ClientContext c; + test::set_default_deadline(c); + request.set_integer(42); + const auto status = co_await request_rpc(c, request, response, agrpc::use_sender); + CHECK_EQ(42, response.integer()); + CHECK_EQ(grpc::StatusCode::OK, status.error_code()); + }; + register_and_perform_requests( + [&](ServerRPC& rpc, ServerRPC::Request& request) + { + return unifex::let_value_with( + [] + { + return ServerRPC::Response{}; + }, + [&](auto& response) + { + response.set_integer(request.integer()); + return rpc.finish(response, grpc::Status::OK); + }); + }, + client_func, client_func, client_func); +} + +TEST_CASE_FIXTURE(UnifexClientRPCTest, + "unifex BidirectionalStreamingClientRPC success") { - // ODR-use function to work around undefined reference bug in GCC 10 - using RPC = - agrpc::ServerRPC<&test::v1::Test::WithAsyncMethod_BidirectionalStreaming>>::RequestBidirectionalStreaming>; auto client_func = [&](Request& request, Response& response) -> unifex::task { auto rpc = create_rpc(); @@ -711,8 +740,8 @@ TEST_CASE_FIXTURE(UnifexClientRPCTest, "unifex BidirectionalStreamingClientRPC s CHECK_EQ(1, response.integer()); CHECK_EQ(grpc::StatusCode::OK, (co_await rpc.finish()).error_code()); }; - register_and_perform_requests( - [&](RPC& rpc) -> unifex::task + register_and_perform_requests( + [&](ServerRPC& rpc) -> unifex::task { Response response; response.set_integer(1); @@ -726,7 +755,8 @@ TEST_CASE_FIXTURE(UnifexClientRPCTest, "unifex BidirectionalStreamingClientRPC s client_func, client_func, client_func); } -TEST_CASE_FIXTURE(UnifexClientRPCTest, "unifex BidirectionalStreamingClientRPC can be canelled") +TEST_CASE_FIXTURE(UnifexClientRPCTest, + "unifex BidirectionalStreamingClientRPC can be cancelled") { const auto with_deadline = [&](std::chrono::system_clock::time_point deadline) {