Skip to content

Commit

Permalink
perf: Avoid creating copies of the rpc_handler for each request in re…
Browse files Browse the repository at this point in the history
…gister_sender_rpc_handler
  • Loading branch information
Tradias committed Oct 22, 2023
1 parent 8986c7a commit 4a7fc42
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 33 deletions.
2 changes: 1 addition & 1 deletion example/snippets/server_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void server_rpc_unary(agrpc::GrpcContext& grpc_context, example::v1::Example::As
[](RPC& rpc, RPC::Request& request) -> asio::awaitable<void>
{
RPC::Response response;
response.set_integer(42);
response.set_integer(request.integer());
co_await rpc.finish(response, grpc::Status::OK);
},
asio::detached);
Expand Down
2 changes: 0 additions & 2 deletions src/agrpc/detail/register_rpc_handler_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ struct RegisterRPCHandlerOperationBase
return stop_context_.is_stopped() || has_error_.load(std::memory_order_relaxed);
}

void stop() noexcept { stop_context_.stop(); }

agrpc::GrpcContext& grpc_context() const noexcept { return detail::query_grpc_context(executor_); }

const ServerRPCExecutor& get_executor() const noexcept { return executor_; }
Expand Down
30 changes: 14 additions & 16 deletions src/agrpc/detail/register_sender_rpc_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ struct RPCHandlerOperationFinish
auto& rpc = op.rpc_;
if (eptr)
{
op.base_.stop();
op.base_.set_error(static_cast<std::exception_ptr&&>(*eptr));
op.base().set_error(static_cast<std::exception_ptr&&>(*eptr));
}
if (!detail::ServerRPCContextBaseAccess::is_finished(rpc))
{
Expand Down Expand Up @@ -157,18 +156,18 @@ struct RPCHandlerOperation
void set_value(bool ok)
{
auto& op = rpc_handler_op_;
detail::AllocationGuard ptr{&op, op.get_allocator()};
detail::AllocationGuard guard{&op, op.get_allocator()};
if (ok)
{
if (auto exception_ptr = op.emplace_rpc_handler_operation_state())
{
op.rpc_.cancel();
op.base_.set_error(static_cast<std::exception_ptr&&>(*exception_ptr));
op.base().set_error(static_cast<std::exception_ptr&&>(*exception_ptr));
return;
}
detail::create_and_start_rpc_handler_operation(op.base_, op.get_allocator());
detail::create_and_start_rpc_handler_operation(op.base(), op.get_allocator());
op.start_rpc_handler_operation_state();
ptr.release();
guard.release();
}
}

Expand Down Expand Up @@ -219,8 +218,7 @@ struct RPCHandlerOperation
using OperationState = std::variant<StartOperationState, FinishOperationState, WaitForDoneOperationState>;

explicit RPCHandlerOperation(RegisterRPCHandlerSenderOperationBase& operation, const Allocator& allocator)
: base_(operation),
impl1_(operation.rpc_handler()),
: impl1_(operation),
rpc_(detail::ServerRPCContextBaseAccess::construct<ServerRPC>(operation.get_executor())),
impl2_(detail::SecondThenVariadic{}, allocator, std::in_place_type<StartOperationState>,
detail::InplaceWithFunction{},
Expand All @@ -231,17 +229,17 @@ struct RPCHandlerOperation
.connect(StartReceiver{*this});
})
{
base_.increment_ref_count();
base().increment_ref_count();
}

RPCHandlerOperation(const RPCHandlerOperation& other) = delete;
RPCHandlerOperation(RPCHandlerOperation&& other) = delete;

~RPCHandlerOperation() noexcept
{
if (base_.decrement_ref_count())
if (base().decrement_ref_count())
{
base_.complete();
base().complete();
}
}

Expand All @@ -258,8 +256,7 @@ struct RPCHandlerOperation
detail::InplaceWithFunction{},
[&]
{
return exec::connect(initial_request().invoke(static_cast<RPCHandler&&>(rpc_handler()), rpc_),
FinishReceiver{*this});
return exec::connect(initial_request().invoke(rpc_handler(), rpc_), FinishReceiver{*this});
});
return {};
}
Expand All @@ -279,16 +276,17 @@ struct RPCHandlerOperation
state.value_.start();
}

auto& rpc_handler() noexcept { return impl1_.first(); }
auto& base() noexcept { return impl1_.first(); }

auto& rpc_handler() noexcept { return base().rpc_handler(); }

auto& initial_request() noexcept { return impl1_.second(); }

auto& operation_state() noexcept { return impl2_.first(); }

auto& get_allocator() noexcept { return impl2_.second(); }

RegisterRPCHandlerSenderOperationBase& base_;
detail::CompressedPair<RPCHandler, InitialRequest> impl1_;
detail::CompressedPair<RegisterRPCHandlerSenderOperationBase&, InitialRequest> impl1_;
ServerRPC rpc_;
detail::CompressedPair<OperationState, Allocator> impl2_;
};
Expand Down
2 changes: 1 addition & 1 deletion test/cmake/superbuild/src/out_var.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void run_out_var()
request.set_integer(42);

grpc::ServerAsyncResponseWriter<out_var::msg::Response> writer{&server_context};
const auto cb = boost::asio::bind_executor(grpc_context, [](bool) {});
auto cb = boost::asio::bind_executor(grpc_context, [](bool) {});
const auto is_void =
std::is_same_v<void, decltype(agrpc::request(out_var_v1_rpc, service, server_context, request, writer, cb))>;

Expand Down
56 changes: 43 additions & 13 deletions test/src/test_unifex_20.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,17 +671,21 @@ TEST_CASE_FIXTURE(UnifexClientServerTest, "unifex repeatedly_request client stre
CHECK_EQ(4, request_count);
}

struct UnifexClientRPCTest : UnifexTestMixin<test::ClientServerRPCTest<test::BidirectionalStreamingClientRPC>>
template <class RPC>
struct UnifexClientRPCTest : UnifexTestMixin<test::ClientServerRPCTest<RPC>>
{
template <class RPC, class RPCHandler, class... ClientFunctions>
using Base = test::ClientServerRPCTest<RPC>;

template <class RPCHandler, class... ClientFunctions>
void register_and_perform_requests(RPCHandler&& handler, ClientFunctions&&... client_functions)
{
int counter{};
run(agrpc::register_sender_rpc_handler<RPC>(this->grpc_context, this->service, handler),
this->run(
agrpc::register_sender_rpc_handler<typename Base::ServerRPC>(this->grpc_context, this->service, handler),
[&counter, &client_functions, &server_shutdown = this->server_shutdown]() -> unifex::task<void>
{
typename ClientRPC::Request request;
typename ClientRPC::Response response;
typename Base::ClientRPC::Request request;
typename Base::ClientRPC::Response response;
co_await client_functions(request, response);
++counter;
if (counter == sizeof...(client_functions))
Expand All @@ -692,12 +696,37 @@ struct UnifexClientRPCTest : UnifexTestMixin<test::ClientServerRPCTest<test::Bid
}
};

TEST_CASE_FIXTURE(UnifexClientRPCTest, "unifex BidirectionalStreamingClientRPC success")
TEST_CASE_FIXTURE(UnifexClientRPCTest<test::UnaryClientRPC>, "unifex UnaryClientRPC success")
{
auto client_func = [&](Request& request, Response& response) -> unifex::task<void>
{
grpc::ClientContext c;
test::set_default_deadline(c);
request.set_integer(42);
const auto status = co_await request_rpc(c, request, response, agrpc::use_sender);
CHECK_EQ(42, response.integer());
CHECK_EQ(grpc::StatusCode::OK, status.error_code());
};
register_and_perform_requests(
[&](ServerRPC& rpc, ServerRPC::Request& request)
{
return unifex::let_value_with(
[]
{
return ServerRPC::Response{};
},
[&](auto& response)
{
response.set_integer(request.integer());
return rpc.finish(response, grpc::Status::OK);
});
},
client_func, client_func, client_func);
}

TEST_CASE_FIXTURE(UnifexClientRPCTest<test::BidirectionalStreamingClientRPC>,
"unifex BidirectionalStreamingClientRPC success")
{
// ODR-use function to work around undefined reference bug in GCC 10
using RPC =
agrpc::ServerRPC<&test::v1::Test::WithAsyncMethod_BidirectionalStreaming<test::v1::Test::WithAsyncMethod_Unary<
test::v1::Test::WithAsyncMethod_Subscribe<test::v1::Test::Service>>>::RequestBidirectionalStreaming>;
auto client_func = [&](Request& request, Response& response) -> unifex::task<void>
{
auto rpc = create_rpc();
Expand All @@ -711,8 +740,8 @@ TEST_CASE_FIXTURE(UnifexClientRPCTest, "unifex BidirectionalStreamingClientRPC s
CHECK_EQ(1, response.integer());
CHECK_EQ(grpc::StatusCode::OK, (co_await rpc.finish()).error_code());
};
register_and_perform_requests<RPC>(
[&](RPC& rpc) -> unifex::task<void>
register_and_perform_requests(
[&](ServerRPC& rpc) -> unifex::task<void>
{
Response response;
response.set_integer(1);
Expand All @@ -726,7 +755,8 @@ TEST_CASE_FIXTURE(UnifexClientRPCTest, "unifex BidirectionalStreamingClientRPC s
client_func, client_func, client_func);
}

TEST_CASE_FIXTURE(UnifexClientRPCTest, "unifex BidirectionalStreamingClientRPC can be canelled")
TEST_CASE_FIXTURE(UnifexClientRPCTest<test::BidirectionalStreamingClientRPC>,
"unifex BidirectionalStreamingClientRPC can be cancelled")
{
const auto with_deadline = [&](std::chrono::system_clock::time_point deadline)
{
Expand Down

0 comments on commit 4a7fc42

Please sign in to comment.