Skip to content

Commit

Permalink
fix: Guard callback_rpc_handler include against unifex and fix compil…
Browse files Browse the repository at this point in the history
…ation. Use callback rpc handler in multi-threaded server example and remove when_all_bind_executor
  • Loading branch information
Tradias committed Nov 9, 2023
1 parent f7f9e84 commit 5ca49ed
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 38 deletions.
15 changes: 7 additions & 8 deletions example/generic-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ void make_generic_unary_request(agrpc::GrpcContext& grpc_context, grpc::GenericS

// begin-snippet: client-side-generic-bidirectional-request
// ---------------------------------------------------
// A generic bidirectional-streaming request that simply sends the response from the server back to it.
// Here we are using stackless coroutines and the low-level gRPC client API.
// A generic bidirectional-streaming request that simply sends the response from the server back to it using Asio's
// stackless coroutines.
// ---------------------------------------------------
// end-snippet
struct BidirectionalStreamingRequest
Expand Down Expand Up @@ -115,13 +115,13 @@ struct BidirectionalStreamingRequest
template <class Self>
void operator()(Self& self, const std::array<std::size_t, 2>&, bool read_ok, bool write_ok)
{
(*this)(self, read_ok, write_ok);
operator()(self, read_ok, write_ok);
}

template <class Self>
void operator()(Self& self, const grpc::Status& status)
{
(*this)(self, {}, {}, status);
operator()(self, {}, {}, status);
}

template <class Self>
Expand All @@ -147,19 +147,18 @@ struct BidirectionalStreamingRequest
BOOST_ASIO_CORO_YIELD
{
auto request_buffer = serialize(c.request);
const auto executor = asio::get_associated_executor(self);

// Reads and writes can be performed simultaneously.
example::when_all_bind_executor(
executor, std::move(self),
asio::experimental::make_parallel_group(
[&](auto&& token)
{
return c.rpc.read(c.response_buffer, std::move(token));
},
[&](auto&& token)
{
return c.rpc.write(request_buffer, std::move(token));
});
})
.async_wait(asio::experimental::wait_for_all(), std::move(self));
}
c.read_ok = ok;
c.write_ok = write_ok;
Expand Down
14 changes: 0 additions & 14 deletions example/helper/yield_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,6 @@ auto spawn_all_void(agrpc::GrpcContext& grpc_context, CompletionToken&& token, F
return boost::asio::async_compose<CompletionToken, void()>(SpawnAllVoid{std::move(function)...}, token,
grpc_context);
}

template <class Executor, class CompletionToken, class... Function>
auto when_all_bind_executor(const Executor& executor, CompletionToken&& token, Function&&... function)
{
return boost::asio::experimental::make_parallel_group(
[&](auto& f)
{
return [&](auto&& t)
{
return f(boost::asio::bind_executor(executor, std::forward<decltype(t)>(t)));
};
}(function)...)
.async_wait(boost::asio::experimental::wait_for_all(), std::forward<CompletionToken>(token));
}
} // namespace example

#endif // AGRPC_HELPER_YIELD_HELPER_HPP
26 changes: 15 additions & 11 deletions example/multi-threaded-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,31 @@ namespace asio = boost::asio;

// begin-snippet: server-side-multi-threaded
// ---------------------------------------------------
// Multi-threaded server performing 20 unary requests
// Multi-threaded server performing 20 unary requests using callback API
// ---------------------------------------------------
// end-snippet

void register_request_handler(agrpc::GrpcContext& grpc_context, helloworld::Greeter::AsyncService& service,
example::ServerShutdown& shutdown)
{
agrpc::register_awaitable_rpc_handler<agrpc::ServerRPC<&helloworld::Greeter::AsyncService::RequestSayHello>>(
using RPC = agrpc::ServerRPC<&helloworld::Greeter::AsyncService::RequestSayHello>;
agrpc::register_callback_rpc_handler<RPC>(
grpc_context, service,
[&](auto& rpc, helloworld::HelloRequest& request) -> asio::awaitable<void>
[&](RPC::Ptr ptr, helloworld::HelloRequest& request)
{
helloworld::HelloReply response;
response.set_message("Hello " + request.name());
co_await rpc.finish(response, grpc::Status::OK, asio::use_awaitable);

// In this example we shut down the server after 20 requests
static std::atomic_int counter{};
if (19 == counter.fetch_add(1))
{
shutdown.shutdown();
}
auto& rpc = *ptr;
rpc.finish(response, grpc::Status::OK,
[&, p = std::move(ptr)](bool)
{
// In this example we shut down the server after 20 requests
static std::atomic_int counter{};
if (19 == counter.fetch_add(1))
{
shutdown.shutdown();
}
});
},
asio::detached);
}
Expand Down
6 changes: 2 additions & 4 deletions example/streaming-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ using ExampleExtService = example::v1::ExampleExt::AsyncService;

// begin-snippet: server-side-client-streaming
// ---------------------------------------------------
// A simple client-streaming rpc handler using coroutines.
// A simple client-streaming rpc handler using C++20 coroutines.
// ---------------------------------------------------
// end-snippet
using ClientStreamingRPC =
Expand All @@ -47,8 +47,6 @@ using ClientStreamingRPC =
asio::awaitable<void> handle_client_streaming_request(ClientStreamingRPC& rpc)
{
// Optionally send initial metadata first.
// Since the default completion token in asio-grpc is asio::use_awaitable, this line is equivalent to:
// co_await agrpc::send_initial_metadata(reader, asio::use_awaitable);
if (!co_await rpc.send_initial_metadata())
{
// Connection lost
Expand All @@ -75,7 +73,7 @@ asio::awaitable<void> handle_client_streaming_request(ClientStreamingRPC& rpc)

// begin-snippet: server-side-server-streaming
// ---------------------------------------------------
// A simple server-streaming rpc handler using coroutines.
// A simple server-streaming rpc handler using C++20 coroutines.
// ---------------------------------------------------
// end-snippet
using ServerStreamingRPC =
Expand Down
1 change: 1 addition & 0 deletions src/agrpc/asio_grpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <agrpc/notify_on_state_change.hpp>
#include <agrpc/notify_when_done.hpp>
#include <agrpc/register_awaitable_rpc_handler.hpp>
#include <agrpc/register_callback_rpc_handler.hpp>
#include <agrpc/register_sender_rpc_handler.hpp>
#include <agrpc/repeatedly_request.hpp>
#include <agrpc/repeatedly_request_context.hpp>
Expand Down
2 changes: 1 addition & 1 deletion src/agrpc/detail/register_callback_rpc_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ struct RegisterCallbackRPCHandlerOperation
using typename Base::RPCRequest;
using typename Base::ServerRPCExecutor;
using typename Base::Service;
using ServerRPCWithRequest = ServerRPCWithRequest<ServerRPC>;
using ServerRPCWithRequest = detail::ServerRPCWithRequest<ServerRPC>;
using ServerRPCPtr = agrpc::ServerRPCPtr<ServerRPC>;

struct ServerRPCAllocation : ServerRPCWithRequest
Expand Down
5 changes: 5 additions & 0 deletions src/agrpc/register_callback_rpc_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#define AGRPC_AGRPC_REGISTER_CALLBACK_RPC_HANDLER_HPP

#include <agrpc/detail/config.hpp>

#if defined(AGRPC_STANDALONE_ASIO) || defined(AGRPC_BOOST_ASIO)

#include <agrpc/detail/register_callback_rpc_handler.hpp>

AGRPC_NAMESPACE_BEGIN()
Expand Down Expand Up @@ -71,4 +74,6 @@ auto register_callback_rpc_handler(agrpc::GrpcContext& grpc_context, detail::Get

AGRPC_NAMESPACE_END

#endif

#endif // AGRPC_AGRPC_REGISTER_CALLBACK_RPC_HANDLER_HPP

0 comments on commit 5ca49ed

Please sign in to comment.