Skip to content

Commit

Permalink
doc: Consistently use AwaitableClient/ServerRPC in examples. Document…
Browse files Browse the repository at this point in the history
… how the executor for spawn/co_spawn is obtained in register_rpc_handler functions
  • Loading branch information
Tradias committed Nov 17, 2023
1 parent ec62ce9 commit 12c5826
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 84 deletions.
5 changes: 3 additions & 2 deletions example/hello-world-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "awaitable_client_rpc.hpp"
#include "helloworld/helloworld.grpc.pb.h"
#include "helper.hpp"

Expand All @@ -37,12 +38,12 @@ int main(int argc, const char** argv)
grpc_context,
[&]() -> asio::awaitable<void>
{
using RPC = agrpc::ClientRPC<&helloworld::Greeter::Stub::PrepareAsyncSayHello>;
using RPC = example::AwaitableClientRPC<&helloworld::Greeter::Stub::PrepareAsyncSayHello>;
grpc::ClientContext client_context;
helloworld::HelloRequest request;
request.set_name("world");
helloworld::HelloReply response;
status = co_await RPC::request(grpc_context, stub, client_context, request, response, asio::use_awaitable);
status = co_await RPC::request(grpc_context, stub, client_context, request, response);
std::cout << status.ok() << " response: " << response.message() << std::endl;
},
asio::detached);
Expand Down
5 changes: 3 additions & 2 deletions example/hello-world-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "awaitable_server_rpc.hpp"
#include "helloworld/helloworld.grpc.pb.h"

#include <agrpc/asio_grpc.hpp>
Expand Down Expand Up @@ -42,14 +43,14 @@ int main(int argc, const char** argv)
builder.RegisterService(&service);
server = builder.BuildAndStart();

using RPC = agrpc::ServerRPC<&helloworld::Greeter::AsyncService::RequestSayHello>;
using RPC = example::AwaitableServerRPC<&helloworld::Greeter::AsyncService::RequestSayHello>;
agrpc::register_awaitable_rpc_handler<RPC>(
grpc_context, service,
[&](RPC& rpc, RPC::Request& request) -> asio::awaitable<void>
{
helloworld::HelloReply response;
response.set_message("Hello " + request.name());
co_await rpc.finish(response, grpc::Status::OK, asio::use_awaitable);
co_await rpc.finish(response, grpc::Status::OK);
server->Shutdown();
},
asio::detached);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef AGRPC_HELPER_CLIENT_RPC_HPP
#define AGRPC_HELPER_CLIENT_RPC_HPP
#ifndef AGRPC_HELPER_AWAITABLE_CLIENT_RPC_HPP
#define AGRPC_HELPER_AWAITABLE_CLIENT_RPC_HPP

#include <agrpc/client_rpc.hpp>
#include <boost/asio/use_awaitable.hpp>
Expand All @@ -24,4 +24,4 @@ template <auto PrepareAsync>
using AwaitableClientRPC = boost::asio::use_awaitable_t<>::as_default_on_t<agrpc::ClientRPC<PrepareAsync>>;
}

#endif // AGRPC_HELPER_CLIENT_RPC_HPP
#endif // AGRPC_HELPER_AWAITABLE_CLIENT_RPC_HPP
27 changes: 27 additions & 0 deletions example/helper/awaitable_server_rpc.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 Dennis Hezel
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef AGRPC_HELPER_AWAITABLE_SERVER_RPC_HPP
#define AGRPC_HELPER_AWAITABLE_SERVER_RPC_HPP

#include <agrpc/server_rpc.hpp>
#include <boost/asio/use_awaitable.hpp>

namespace example
{
template <auto RequestRPC>
using AwaitableServerRPC = boost::asio::use_awaitable_t<>::as_default_on_t<agrpc::ServerRPC<RequestRPC>>;
}

#endif // AGRPC_HELPER_AWAITABLE_SERVER_RPC_HPP
6 changes: 3 additions & 3 deletions example/multi-threaded-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "awaitable_client_rpc.hpp"
#include "helloworld/helloworld.grpc.pb.h"
#include "helper.hpp"

Expand Down Expand Up @@ -56,14 +57,13 @@ class RoundRobin

asio::awaitable<void> make_request(agrpc::GrpcContext& grpc_context, helloworld::Greeter::Stub& stub)
{
using RPC = agrpc::ClientRPC<&helloworld::Greeter::Stub::PrepareAsyncSayHello>;
using RPC = example::AwaitableClientRPC<&helloworld::Greeter::Stub::PrepareAsyncSayHello>;
grpc::ClientContext client_context;
client_context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(5));
RPC::Request request;
request.set_name("world");
RPC::Response response;
const auto status =
co_await RPC::request(grpc_context, stub, client_context, request, response, asio::use_awaitable);
const auto status = co_await RPC::request(grpc_context, stub, client_context, request, response);

abort_if_not(status.ok());
}
Expand Down
8 changes: 3 additions & 5 deletions example/share-io-context-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "awaitable_client_rpc.hpp"
#include "example/v1/example.grpc.pb.h"
#include "helper.hpp"

Expand All @@ -27,8 +28,6 @@
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>

#include <optional>

namespace asio = boost::asio;

// begin-snippet: client-side-share-io-context
Expand All @@ -50,14 +49,13 @@ asio::awaitable<void> make_tcp_request(asio::ip::port_type port)
// A unary RPC request that will be handled by the GrpcContext
asio::awaitable<void> make_grpc_request(agrpc::GrpcContext& grpc_context, example::v1::Example::Stub& stub)
{
using RPC = agrpc::ClientRPC<&example::v1::Example::Stub::PrepareAsyncUnary>;
using RPC = example::AwaitableClientRPC<&example::v1::Example::Stub::PrepareAsyncUnary>;
grpc::ClientContext client_context;
client_context.set_deadline(std::chrono::system_clock::now() + std::chrono::seconds(5));
RPC::Request request;
request.set_integer(42);
RPC::Response response;
const auto status =
co_await RPC::request(grpc_context, stub, client_context, request, response, asio::use_awaitable);
const auto status = co_await RPC::request(grpc_context, stub, client_context, request, response);

abort_if_not(status.ok());
abort_if_not(42 == response.integer());
Expand Down
5 changes: 3 additions & 2 deletions example/share-io-context-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "awaitable_server_rpc.hpp"
#include "example/v1/example.grpc.pb.h"
#include "helper.hpp"
#include "server_shutdown_asio.hpp"
Expand Down Expand Up @@ -51,7 +52,7 @@ asio::awaitable<void> handle_tcp_request(asio::ip::port_type port)
}

// A unary RPC request that will be handled by the GrpcContext.
using RPC = agrpc::ServerRPC<&example::v1::Example::AsyncService::RequestUnary>;
using RPC = example::AwaitableServerRPC<&example::v1::Example::AsyncService::RequestUnary>;

int main(int argc, const char** argv)
{
Expand Down Expand Up @@ -79,7 +80,7 @@ int main(int argc, const char** argv)
{
example::v1::Response response;
response.set_integer(request.integer());
co_await rpc.finish(response, grpc::Status::OK, asio::use_awaitable);
co_await rpc.finish(response, grpc::Status::OK);
server_shutdown.shutdown();
},
asio::detached);
Expand Down
2 changes: 1 addition & 1 deletion example/streaming-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "client_rpc.hpp"
#include "awaitable_client_rpc.hpp"
#include "example/v1/example.grpc.pb.h"
#include "example/v1/example_ext.grpc.pb.h"
#include "helper.hpp"
Expand Down
13 changes: 6 additions & 7 deletions example/streaming-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "awaitable_server_rpc.hpp"
#include "example/v1/example.grpc.pb.h"
#include "example/v1/example_ext.grpc.pb.h"
#include "helper.hpp"
Expand Down Expand Up @@ -41,8 +42,7 @@ using ExampleExtService = example::v1::ExampleExt::AsyncService;
// A simple client-streaming rpc handler using C++20 coroutines.
// ---------------------------------------------------
// end-snippet
using ClientStreamingRPC =
asio::use_awaitable_t<>::as_default_on_t<agrpc::ServerRPC<&ExampleService::RequestClientStreaming>>;
using ClientStreamingRPC = example::AwaitableServerRPC<&ExampleService::RequestClientStreaming>;

asio::awaitable<void> handle_client_streaming_request(ClientStreamingRPC& rpc)
{
Expand Down Expand Up @@ -76,8 +76,7 @@ asio::awaitable<void> handle_client_streaming_request(ClientStreamingRPC& rpc)
// A simple server-streaming rpc handler using C++20 coroutines.
// ---------------------------------------------------
// end-snippet
using ServerStreamingRPC =
asio::use_awaitable_t<>::as_default_on_t<agrpc::ServerRPC<&ExampleService::RequestServerStreaming>>;
using ServerStreamingRPC = example::AwaitableServerRPC<&ExampleService::RequestServerStreaming>;

asio::awaitable<void> handle_server_streaming_request(ServerStreamingRPC& rpc, example::v1::Request& request)
{
Expand All @@ -98,8 +97,7 @@ asio::awaitable<void> handle_server_streaming_request(ServerStreamingRPC& rpc, e
// back to the client.
// ---------------------------------------------------
// end-snippet
using BidiStreamingRPC =
asio::use_awaitable_t<>::as_default_on_t<agrpc::ServerRPC<&ExampleService::RequestBidirectionalStreaming>>;
using BidiStreamingRPC = example::AwaitableServerRPC<&ExampleService::RequestBidirectionalStreaming>;

using Channel = asio::experimental::channel<void(boost::system::error_code, example::v1::Request)>;

Expand Down Expand Up @@ -176,6 +174,7 @@ auto bidirectional_streaming_rpc_handler(asio::thread_pool& thread_pool)

// ---------------------------------------------------
// The SlowUnary endpoint is used by the client to demonstrate per-RPC step cancellation. See streaming-client.cpp.
// It also demonstrates how to use an awaitable with a different executor type.
// ---------------------------------------------------
using SlowUnaryRPC =
asio::use_awaitable_t<agrpc::GrpcExecutor>::as_default_on_t<agrpc::ServerRPC<&ExampleExtService::RequestSlowUnary>>;
Expand All @@ -191,7 +190,7 @@ asio::awaitable<void, agrpc::GrpcExecutor> handle_slow_unary_request(SlowUnaryRP
// ---------------------------------------------------
//

using ShutdownRPC = asio::use_awaitable_t<>::as_default_on_t<agrpc::ServerRPC<&ExampleExtService::RequestShutdown>>;
using ShutdownRPC = example::AwaitableServerRPC<&ExampleExtService::RequestShutdown>;

int main(int argc, const char** argv)
{
Expand Down
73 changes: 33 additions & 40 deletions src/agrpc/detail/register_callback_rpc_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#ifndef AGRPC_DETAIL_REGISTER_CALLBACK_RPC_HANDLER_HPP
#define AGRPC_DETAIL_REGISTER_CALLBACK_RPC_HANDLER_HPP

#include <agrpc/bind_allocator.hpp>
#include <agrpc/detail/config.hpp>
#include <agrpc/detail/register_rpc_handler_asio_base.hpp>
#include <agrpc/detail/server_rpc_with_request.hpp>
Expand Down Expand Up @@ -49,6 +48,38 @@ struct RegisterCallbackRPCHandlerOperation
RegisterCallbackRPCHandlerOperation& self_;
};

struct StartCallback
{
using allocator_type = Allocator;

void operator()(bool ok)
{
if (ok)
{
self_.initiate_next();
AGRPC_TRY { ptr_.server_rpc_->invoke(self_.rpc_handler(), static_cast<ServerRPCPtr&&>(ptr_)); }
AGRPC_CATCH(...)
{
// Technically `this` could already be deallocated at this point but we rely on the
// fact that completing this operation is done in a manner similar to asio::post and
// therefore never before this lambda ends.
self_.set_error(std::current_exception());
}
}
else
{
[[maybe_unused]] RefCountGuard a{self_};
[[maybe_unused]] detail::AllocationGuard b{static_cast<ServerRPCAllocation*>(ptr_.release()),
self_.get_allocator()};
}
}

Allocator get_allocator() const noexcept { return self_.get_allocator(); }

RegisterCallbackRPCHandlerOperation& self_;
ServerRPCPtr ptr_;
};

static void wait_for_done_deleter(ServerRPCWithRequest* ptr) noexcept
{
auto& self = *static_cast<ServerRPCAllocation*>(ptr);
Expand Down Expand Up @@ -104,45 +135,7 @@ struct RegisterCallbackRPCHandlerOperation
void perform_request_and_repeat(ServerRPCPtr&& ptr)
{
auto& rpc = *ptr.server_rpc_;
rpc.start(rpc.rpc_, this->service(),
bind_allocator(
[this, ptr = static_cast<ServerRPCPtr&&>(ptr)](bool ok) mutable
{
if (ok)
{
initiate_next();
AGRPC_TRY
{
ptr.server_rpc_->invoke(this->rpc_handler(), static_cast<ServerRPCPtr&&>(ptr));
}
AGRPC_CATCH(...)
{
// Technically `this` could already be deallocated at this point but we rely on the
// fact that completing this operation is done in a manner similar to asio::post and
// therefore never before this lambda ends.
this->set_error(std::current_exception());
}
}
else
{
[[maybe_unused]] RefCountGuard a{*this};
[[maybe_unused]] detail::AllocationGuard b{
static_cast<ServerRPCAllocation*>(ptr.release()), this->get_allocator()};
}
}));
}

template <class Function>
decltype(auto) bind_allocator(Function&& function)
{
if constexpr (detail::IS_STD_ALLOCATOR<Allocator>)
{
return static_cast<Function&&>(function);
}
else
{
return agrpc::AllocatorBinder(this->get_allocator(), static_cast<Function&&>(function));
}
rpc.start(rpc.rpc_, this->service(), StartCallback{*this, static_cast<ServerRPCPtr&&>(ptr)});
}
};

Expand Down
12 changes: 8 additions & 4 deletions src/agrpc/register_awaitable_rpc_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ AGRPC_NAMESPACE_BEGIN()
*
* The rpc handler will be invoked for every incoming request of this gRPC method. It must take `ServerRPC&` as
* first argument and `ServerRPC::Request&` as second argument (only for unary and server-streaming rpcs). The ServerRPC
* is automatically cancelled at the end of the rpc handler if `finish()` was not called earlier.
* is automatically cancelled at the end of the rpc handler if `finish()` was not called earlier. The return value of
* the rpc handler is be `co_spawn`ed in a manner similar to:
* `asio::co_spawn(asio::get_associated_executor(completion_handler, executor), rpc_handler())`, where
* `completion_handler` is obtained from `token` and `executor` the first argument passed to this function.
*
* This asynchronous operation runs forever unless it is cancelled, the rpc handler throws an exception or the server is
* shutdown
Expand All @@ -44,9 +47,10 @@ AGRPC_NAMESPACE_BEGIN()
* @tparam ServerRPC An instantiation of `agrpc::ServerRPC`
* @param executor The executor used to handle each rpc
* @param service The service associated with the gRPC method of the ServerRPC
* @param rpc_handler A callable that produces an `asio::awaitable<void, CompletionExecutor>`. The awaitable's return
* value is ignored. The CompletionExecutor is the executor associated with the completion handler (defaulted to
* `executor`).
* @param rpc_handler A callable that produces an `asio::awaitable<void, Executor>`. The awaitable's return
* value is ignored. The Executor must be constructible from `asio::get_associated_executor(completion_handler,
* executor)`, where `completion_handler` is obtained from `token` and `executor` the first argument passed to this
* function.
* @param token A completion token for signature `void(std::exception_ptr)`.
*
* @since 2.7.0
Expand Down
13 changes: 8 additions & 5 deletions src/agrpc/register_yield_rpc_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ AGRPC_NAMESPACE_BEGIN()
*
* The rpc handler will be invoked for every incoming request of this gRPC method. It must take `ServerRPC&` as
* first, `ServerRPC::Request&` as second (only for unary and server-streaming rpcs) and
* `asio::basic_yield_context<CompletionExecutor>` as third argument. The ServerRPC is automatically cancelled at the
* end of the rpc handler if `finish()` was not called earlier.
* `asio::basic_yield_context<Executor>` as third argument. The Executor is obtained by calling
* `asio::get_associated_executor(completion_handler, executor)`, where `completion_handler` is obtained from `token`
* and `executor` the first argument passed to this function. The ServerRPC is automatically cancelled at the end of the
* rpc handler if `finish()` was not called earlier.
*
* This asynchronous operation runs forever unless it is cancelled, the rpc handler throws an exception or the server is
* shutdown
Expand All @@ -41,9 +43,10 @@ AGRPC_NAMESPACE_BEGIN()
* @tparam ServerRPC An instantiation of `agrpc::ServerRPC`
* @param executor The executor used to handle each rpc
* @param service The service associated with the gRPC method of the ServerRPC
* @param rpc_handler A callable that takes an `asio::basic_yield_context<CompletionExecutor>` as last argument. The
* return value is ignored. The CompletionExecutor is the executor associated with the completion handler (defaulted to
* `executor`).
* @param rpc_handler A callable that takes an `asio::basic_yield_context<Executor>` as last argument. The
* return value is ignored. The Executor must be constructible from `asio::get_associated_executor(completion_handler,
* executor)`, where `completion_handler` is obtained from `token` and `executor` the first argument passed to this
* function.
* @param token A completion token for signature `void(std::exception_ptr)`.
*
* @since 2.7.0
Expand Down
Loading

0 comments on commit 12c5826

Please sign in to comment.