Skip to content

Commit

Permalink
feat: Make unary ClientRPC's constructible and add start, read_initia…
Browse files Browse the repository at this point in the history
…l_metadata and finish member functions
  • Loading branch information
Tradias committed Oct 21, 2023
1 parent a0440f8 commit 8986c7a
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 198 deletions.
283 changes: 101 additions & 182 deletions src/agrpc/client_rpc.hpp

Large diffs are not rendered by default.

107 changes: 107 additions & 0 deletions src/agrpc/detail/client_rpc_base.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2023 Dennis Hezel
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef AGRPC_DETAIL_CLIENT_RPC_BASE_HPP
#define AGRPC_DETAIL_CLIENT_RPC_BASE_HPP

#include <agrpc/detail/client_rpc_context_base.hpp>
#include <agrpc/detail/client_rpc_sender.hpp>
#include <agrpc/detail/config.hpp>
#include <agrpc/detail/initiate_sender_implementation.hpp>
#include <agrpc/detail/rpc_executor_base.hpp>

AGRPC_NAMESPACE_BEGIN()

namespace detail
{
/**
* @brief (experimental) ServerRPC base
*
* @since 2.7.0
*/
template <class Responder, class Executor>
class ClientRPCBase : public detail::RPCExecutorBase<Executor>, public detail::ClientRPCContextBase<Responder>
{
public:
/**
* @brief Construct from a GrpcContext
*/
explicit ClientRPCBase(agrpc::GrpcContext& grpc_context)
: detail::RPCExecutorBase<Executor>(grpc_context.get_executor())
{
}

/**
* @brief Construct from a GrpcContext and an init function
*
* @tparam ClientContextInitFunction A function with signature `void(grpc::ClientContext&)` which will be invoked
* during construction. It can, for example, be used to set this rpc's deadline.
*/
template <class ClientContextInitFunction>
ClientRPCBase(agrpc::GrpcContext& grpc_context, ClientContextInitFunction&& init_function)
: detail::RPCExecutorBase<Executor>(grpc_context.get_executor()),
detail::ClientRPCContextBase<Responder>(static_cast<ClientContextInitFunction&&>(init_function))
{
}

/**
* @brief Construct from an executor
*/
explicit ClientRPCBase(const Executor& executor) : detail::RPCExecutorBase<Executor>(executor) {}

/**
* @brief Construct from an executor and init function
*
* @tparam ClientContextInitFunction A function with signature `void(grpc::ClientContext&)` which will be invoked
* during construction. It can, for example, be used to set this rpc's deadline.
*/
template <class ClientContextInitFunction>
ClientRPCBase(const Executor& executor, ClientContextInitFunction&& init_function)
: detail::RPCExecutorBase<Executor>(executor),
detail::ClientRPCContextBase<Responder>(static_cast<ClientContextInitFunction&&>(init_function))
{
}

/**
* @brief Read initial metadata
*
* Request notification of the reading of the initial metadata.
*
* This call is optional.
*
* Side effect:
*
* @arg Upon receiving initial metadata from the server, the ClientContext associated with this call is updated, and
* the calling code can access the received metadata through the ClientContext.
*
* @attention If the server does not explicitly send initial metadata (e.g. by calling
* `agrpc::send_initial_metadata`) but waits for a message from the client instead then this function won't
* complete until `write()` is called.
*
* @param token A completion token like `asio::yield_context` or `agrpc::use_sender`. The completion signature is
* `void(bool)`. `true` indicates that the metadata was read. If it is `false`, then the call is dead.
*/
template <class CompletionToken = detail::DefaultCompletionTokenT<Executor>>
auto read_initial_metadata(CompletionToken&& token = detail::DefaultCompletionTokenT<Executor>{})
{
return detail::async_initiate_sender_implementation(
this->grpc_context(), detail::ReadInitialMetadataSenderInitiation<Responder>{*this},
detail::ReadInitialMetadataSenderImplementation{}, static_cast<CompletionToken&&>(token));
}
};
}

AGRPC_NAMESPACE_END

#endif // AGRPC_DETAIL_CLIENT_RPC_BASE_HPP
38 changes: 38 additions & 0 deletions src/agrpc/detail/client_rpc_sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,44 @@ struct ClientFinishSenderInitiation
}
};

template <class Responder>
struct ClientFinishUnarySenderImplementation;

template <template <class> class Responder, class Response>
struct ClientFinishUnarySenderImplementation<Responder<Response>> : StatusSenderImplementationBase
{
explicit ClientFinishUnarySenderImplementation(detail::ClientRPCContextBase<Responder<Response>>& rpc,
Response& response)
: rpc_(rpc), response_(response)
{
}

template <class OnComplete>
void complete(OnComplete on_complete, bool)
{
ClientRPCAccess::set_finished(rpc_);
on_complete(static_cast<grpc::Status&&>(status_));
}

detail::ClientRPCContextBase<Responder<Response>>& rpc_;
Response& response_;
};

struct ClientFinishUnarySenderInitation
{
template <class Responder>
static auto& stop_function_arg(const ClientFinishUnarySenderImplementation<Responder>& impl) noexcept
{
return impl.rpc_.context();
}

template <class Responder>
static void initiate(const agrpc::GrpcContext&, ClientFinishUnarySenderImplementation<Responder>& impl, void* tag)
{
ClientRPCAccess::responder(impl.rpc_).Finish(&impl.response_, &impl.status_, tag);
}
};

template <class Responder>
struct ClientFinishServerStreamingSenderImplementation : StatusSenderImplementationBase
{
Expand Down
6 changes: 6 additions & 0 deletions src/agrpc/detail/forward.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ struct ServerRPCContextBaseAccess;

struct RPCExecutorBaseAccess;

template <class Responder, class Executor>
class ClientRPCBase;

template <auto, class>
class ClientRPCUnaryBase;

template <bool IsNotifyWhenDone, class Responder, class Executor>
class ServerRPCNotifyWhenDoneMixin;

Expand Down
6 changes: 6 additions & 0 deletions src/agrpc/detail/rpc_executor_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ class RPCExecutorBase
template <auto, class, class>
friend class agrpc::ServerRPC;

template <class, class>
friend class detail::ClientRPCBase;

template <auto, class>
friend class detail::ClientRPCUnaryBase;

template <auto, class>
friend class detail::ClientRPCServerStreamingBase;

Expand Down
21 changes: 15 additions & 6 deletions test/src/test_client_rpc_17.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ TEST_CASE_TEMPLATE("ClientRPC::request successfully", RPC, test::UnaryClientRPC,
});
}

TEST_CASE_TEMPLATE("Unary RPC::request automatically finishes RPC on error", RPC, test::UnaryClientRPC,
TEST_CASE_TEMPLATE("Unary ClientRPC::request automatically finishes rpc on error", RPC, test::UnaryClientRPC,
test::UnaryInterfaceClientRPC, test::GenericUnaryClientRPC)
{
ClientRPCRequestResponseTest<RPC> test;
Expand All @@ -128,16 +128,25 @@ TEST_CASE_TEMPLATE("Unary RPC::request automatically finishes RPC on error", RPC
test.grpc_context.run();
}

TEST_CASE_TEMPLATE("Streaming RPC can be destructed without being started", RPC, test::ClientStreamingClientRPC,
test::ClientStreamingInterfaceClientRPC, test::ServerStreamingClientRPC,
test::ServerStreamingInterfaceClientRPC, test::BidirectionalStreamingClientRPC,
test::BidirectionalStreamingInterfaceClientRPC, test::GenericStreamingClientRPC)
TEST_CASE_TEMPLATE("Unary ClientRPC can be destructed after start", RPC, test::UnaryClientRPC,
test::GenericUnaryClientRPC)
{
ClientRPCRequestResponseTest<RPC> test;
RPC rpc{test.get_executor()};
test.start_rpc(rpc, int{});
}

TEST_CASE_TEMPLATE("Streaming ClientRPC can be destructed without being started", RPC, test::UnaryClientRPC,
test::GenericUnaryClientRPC, test::ClientStreamingClientRPC, test::ClientStreamingInterfaceClientRPC,
test::ServerStreamingClientRPC, test::ServerStreamingInterfaceClientRPC,
test::BidirectionalStreamingClientRPC, test::BidirectionalStreamingInterfaceClientRPC,
test::GenericStreamingClientRPC)
{
agrpc::GrpcContext grpc_context;
CHECK_NOTHROW([[maybe_unused]] RPC rpc{grpc_context.get_executor()});
}

TEST_CASE_TEMPLATE("Streaming RPC::start returns false on error", RPC, test::ClientStreamingClientRPC,
TEST_CASE_TEMPLATE("Streaming ClientRPC::start returns false on error", RPC, test::ClientStreamingClientRPC,
test::ClientStreamingInterfaceClientRPC, test::ServerStreamingClientRPC,
test::ServerStreamingInterfaceClientRPC, test::BidirectionalStreamingClientRPC,
test::BidirectionalStreamingInterfaceClientRPC, test::GenericStreamingClientRPC)
Expand Down
80 changes: 70 additions & 10 deletions test/src/test_server_rpc_17.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,54 @@ TEST_CASE_TEMPLATE("ServerRPC unary success", RPC, test::UnaryServerRPC, test::N
});
}

TEST_CASE_TEMPLATE("ServerRPC unary start+finish success", RPC, test::UnaryServerRPC,
test::NotifyWhenDoneUnaryServerRPC)
{
ServerRPCTest<RPC> test{true};
bool use_finish_with_error{};
SUBCASE("finish") {}
SUBCASE("finish_with_error") { use_finish_with_error = true; }
test.register_and_perform_three_requests(
[&](RPC& rpc, test::msg::Request& request, const asio::yield_context& yield)
{
CHECK_EQ(42, request.integer());
if (use_finish_with_error)
{
CHECK(rpc.finish_with_error(test::create_already_exists_status(), yield));
}
else
{
typename RPC::Response response;
response.set_integer(21);
CHECK(rpc.finish(response, grpc::Status::OK, yield));
}
},
[&](auto& request, auto& response, const asio::yield_context& yield)
{
request.set_integer(42);
typename ServerRPCTest<RPC>::ClientRPC rpc{test.grpc_context, test::set_default_deadline};
rpc.start(*test.stub, request);
request = {};
const auto status = rpc.finish(response, yield);
if (use_finish_with_error)
{
CHECK_EQ(grpc::StatusCode::ALREADY_EXISTS, status.error_code());
}
else
{
CHECK(status.ok());
CHECK_EQ(21, response.integer());
}
});
}

TEST_CASE_TEMPLATE("Unary ClientRPC/ServerRPC read/send_initial_metadata successfully", RPC, test::UnaryServerRPC,
test::NotifyWhenDoneUnaryServerRPC)
{
ServerRPCTest<RPC> test{true};
bool use_start{};
SUBCASE("use request") {}
SUBCASE("use start") { use_start = true; }
test.register_and_perform_three_requests(
[&](RPC& rpc, auto&, const asio::yield_context& yield)
{
Expand All @@ -110,11 +154,20 @@ TEST_CASE_TEMPLATE("Unary ClientRPC/ServerRPC read/send_initial_metadata success
},
[&](auto& request, auto& response, const asio::yield_context& yield)
{
grpc::ClientContext client_context;
test::set_default_deadline(client_context);
CHECK_EQ(grpc::StatusCode::CANCELLED,
test.request_rpc(client_context, request, response, yield).error_code());
CHECK_EQ(0, client_context.GetServerInitialMetadata().find("test")->second.compare("a"));
if (use_start)
{
typename ServerRPCTest<RPC>::ClientRPC rpc{test.grpc_context, test::set_default_deadline};
rpc.start(*test.stub, request);
rpc.read_initial_metadata(yield);
CHECK_EQ(0, rpc.context().GetServerInitialMetadata().find("test")->second.compare("a"));
}
else
{
const auto client_context = test::create_client_context();
CHECK_EQ(grpc::StatusCode::CANCELLED,
test.request_rpc(*client_context, request, response, yield).error_code());
CHECK_EQ(0, client_context->GetServerInitialMetadata().find("test")->second.compare("a"));
}
});
}

Expand Down Expand Up @@ -314,9 +367,10 @@ TEST_CASE_TEMPLATE("ServerRPC/ClientRPC bidi streaming success", RPC, test::Bidi

TEST_CASE_FIXTURE(ServerRPCTest<test::GenericServerRPC>, "ServerRPC/ClientRPC generic unary RPC success")
{
bool use_executor_overload{};
int option{};
SUBCASE("executor overload") {}
SUBCASE("GrpcContext overload") { use_executor_overload = true; }
SUBCASE("GrpcContext overload") { option = 1; }
SUBCASE("start+finish") { option = 2; }
register_and_perform_three_requests(
[&](test::GenericServerRPC& rpc, const asio::yield_context& yield)
{
Expand All @@ -336,13 +390,19 @@ TEST_CASE_FIXTURE(ServerRPCTest<test::GenericServerRPC>, "ServerRPC/ClientRPC ge
request = test::message_to_grpc_buffer(typed_request);
auto status = [&]
{
if (use_executor_overload)
if (0 == option)
{
return test::GenericUnaryClientRPC::request(get_executor(), "/test.v1.Test/Unary", *stub,
client_context, request, response, yield);
}
return test::GenericUnaryClientRPC::request(grpc_context, "/test.v1.Test/Unary", *stub, client_context,
request, response, yield);
else if (1 == option)
{
return test::GenericUnaryClientRPC::request(grpc_context, "/test.v1.Test/Unary", *stub,
client_context, request, response, yield);
}
test::GenericUnaryClientRPC rpc{grpc_context, test::set_default_deadline};
rpc.start("/test.v1.Test/Unary", *stub, request);
return rpc.finish(response, yield);
}();
CHECK_EQ(grpc::StatusCode::OK, status.error_code());
CHECK_EQ(11, test::grpc_buffer_to_message<test::msg::Response>(response).integer());
Expand Down
14 changes: 14 additions & 0 deletions test/utils/utils/introspect_rpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ struct IntrospectRPC<agrpc::ClientRPC<PrepareAsync, Executor>, agrpc::ClientRPCT
{
return ClientRPC::request(executor, stub, context, request, response, token);
}

template <class CompletionToken>
static auto start(ClientRPC& rpc, typename ClientRPC::Stub& stub, const typename ClientRPC::Request& request,
typename ClientRPC::Response&, CompletionToken&&)
{
return rpc.start(stub, request);
}
};

template <class Executor>
Expand All @@ -55,6 +62,13 @@ struct IntrospectRPC<agrpc::GenericUnaryClientRPC<Executor>, agrpc::ClientRPCTyp
{
return ClientRPC::request(executor, "/test.v1.Test/Unary", stub, context, request, response, token);
}

template <class CompletionToken>
static auto start(ClientRPC& rpc, typename ClientRPC::Stub& stub, const typename ClientRPC::Request& request,
typename ClientRPC::Response&, CompletionToken&&)
{
return rpc.start("/test.v1.Test/Unary", stub, request);
}
};

template <auto PrepareAsync, class Executor>
Expand Down

0 comments on commit 8986c7a

Please sign in to comment.