Skip to content

Commit

Permalink
feat: Add register_callback_rpc_handler and ServerRPCPtr
Browse files Browse the repository at this point in the history
  • Loading branch information
Tradias committed Nov 8, 2023
1 parent 253fc6a commit f7f9e84
Show file tree
Hide file tree
Showing 22 changed files with 568 additions and 32 deletions.
2 changes: 1 addition & 1 deletion doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Feature overview, installation and performance benchmark can be found on [github
* Main workhorses of this library: `agrpc::GrpcContext`, `agrpc::GrpcExecutor`.
* Asynchronous gRPC clients: [cheat sheet](md_doc_client_rpc_cheat_sheet.html), `agrpc::ClientRPC`,
* Asynchronous gRPC servers: [cheat sheet](md_doc_server_rpc_cheat_sheet.html), `agrpc::ServerRPC`, `agrpc::register_awaitable_rpc_handler`,
`agrpc::register_yield_rpc_handler`, `agrpc::register_sender_rpc_handler`
`agrpc::register_yield_rpc_handler`, `agrpc::register_sender_rpc_handler`, `agrpc::register_callback_rpc_handler`
* GRPC Timer: `agrpc::Alarm`
* Combining GrpcContext and `asio::io_context`: `agrpc::run`, `agrpc::run_completion_queue`
* Faster, drop-in replacement for gRPC's [DefaultHealthCheckService](https://github.com/grpc/grpc/blob/v1.50.1/src/cpp/server/health/default_health_check_service.h): `agrpc::HealthCheckService`
Expand Down
2 changes: 1 addition & 1 deletion example/generic-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ bool writer(agrpc::GenericServerRPC& rpc, Channel& channel, asio::thread_pool& t

process_request(buffer);

// reader_writer is thread-safe so we can just interact with it from the thread_pool.
// rpc.write() is thread-safe so we can interact with it from the thread_pool.
ok = rpc.write(buffer, yield);
// Now we are back on the main thread.
}
Expand Down
18 changes: 18 additions & 0 deletions example/snippets/server_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "example/v1/example.grpc.pb.h"

#include <agrpc/asio_grpc.hpp>
#include <agrpc/register_callback_rpc_handler.hpp>
#include <agrpc/register_yield_rpc_handler.hpp>
#include <boost/asio/deferred.hpp>
#include <boost/asio/detached.hpp>
Expand Down Expand Up @@ -213,3 +214,20 @@ void server_rpc_unary_yield(agrpc::GrpcContext& grpc_context, example::v1::Examp
asio::detached);
}
/* [server-rpc-unary-yield] */

/* [server-rpc-unary-callback] */
void server_rpc_unary_callback(agrpc::GrpcContext& grpc_context, example::v1::Example::AsyncService& service)
{
using RPC = agrpc::ServerRPC<&example::v1::Example::AsyncService::RequestUnary>;
agrpc::register_callback_rpc_handler<RPC>(
grpc_context, service,
[](RPC::Ptr ptr, RPC::Request& request)
{
RPC::Response response;
response.set_integer(request.integer());
auto& rpc = *ptr;
rpc.finish(response, grpc::Status::OK, [p = std::move(ptr)](bool) {});
},
asio::detached);
}
/* [server-rpc-unary-callback] */
2 changes: 1 addition & 1 deletion example/streaming-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ asio::awaitable<bool> writer(BidiStreamingRPC& rpc, Channel& channel, asio::thre
example::v1::Response response;
response.set_integer(request.integer() * 2);

// reader_writer is thread-safe so we can just interact with it from the thread_pool.
// rpc.write() is thread-safe so we can interact with it from the thread_pool.
ok = co_await rpc.write(response);
// Now we are back on the main thread.
}
Expand Down
3 changes: 3 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ if(ASIO_GRPC_BUILD_TESTS AND ASIO_GRPC_ENABLE_CHECK_HEADER_SYNTAX_TARGET)
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/query_grpc_context.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/receiver.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/register_awaitable_rpc_handler.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/register_callback_rpc_handler.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/register_rpc_handler_asio_base.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/register_rpc_handler_base.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/register_sender_rpc_handler.hpp"
Expand All @@ -159,6 +160,7 @@ if(ASIO_GRPC_BUILD_TESTS AND ASIO_GRPC_ENABLE_CHECK_HEADER_SYNTAX_TARGET)
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/server_rpc_notify_when_done_base.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/server_rpc_notify_when_done_mixin.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/server_rpc_sender.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/server_rpc_with_request.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/server_write_reactor.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/serving_status.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/detail/start_server_rpc.hpp"
Expand All @@ -181,6 +183,7 @@ if(ASIO_GRPC_BUILD_TESTS AND ASIO_GRPC_ENABLE_CHECK_HEADER_SYNTAX_TARGET)
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/notify_when_done.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/read.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/register_awaitable_rpc_handler.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/register_callback_rpc_handler.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/register_sender_rpc_handler.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/register_yield_rpc_handler.hpp"
"${CMAKE_CURRENT_SOURCE_DIR}/agrpc/repeatedly_request.hpp"
Expand Down
7 changes: 5 additions & 2 deletions src/agrpc/detail/allocate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ class AllocationGuard
public:
AllocationGuard(Pointer ptr, const Allocator& allocator) noexcept : ptr_(ptr), allocator_(allocator) {}

AllocationGuard(const AllocationGuard&) = delete;
AllocationGuard(AllocationGuard&& other) noexcept
: ptr_(std::exchange(other.ptr_, nullptr)), allocator_(other.allocator_)
{
}

AllocationGuard& operator=(const AllocationGuard&) = delete;
AllocationGuard(AllocationGuard&&) = delete;
AllocationGuard& operator=(AllocationGuard&&) = delete;

~AllocationGuard() noexcept
Expand Down
6 changes: 6 additions & 0 deletions src/agrpc/detail/forward.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ struct DefaultServerRPCTraits;
template <auto RequestRPC, class Traits = agrpc::DefaultServerRPCTraits, class Executor = agrpc::BasicGrpcExecutor<>>
class ServerRPC;

template <class ServerRPCT>
class ServerRPCPtr;

template <class Signature, class Executor = agrpc::BasicGrpcExecutor<>>
class Waiter;

Expand Down Expand Up @@ -132,6 +135,9 @@ class ServerRPCNotifyWhenDoneMixin;

class NotifyWhenDoneEvent;

template <class ServerRPC, class RPCHandler, class CompletionHandler>
struct RegisterCallbackRPCHandlerOperation;

AGRPC_NAMESPACE_CPP20_BEGIN()

class RepeatedlyRequestFn;
Expand Down
156 changes: 156 additions & 0 deletions src/agrpc/detail/register_callback_rpc_handler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2023 Dennis Hezel
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef AGRPC_DETAIL_REGISTER_CALLBACK_RPC_HANDLER_HPP
#define AGRPC_DETAIL_REGISTER_CALLBACK_RPC_HANDLER_HPP

#include <agrpc/bind_allocator.hpp>
#include <agrpc/detail/config.hpp>
#include <agrpc/detail/register_rpc_handler_asio_base.hpp>
#include <agrpc/detail/server_rpc_with_request.hpp>
#include <agrpc/grpc_context.hpp>
#include <agrpc/server_rpc_ptr.hpp>

AGRPC_NAMESPACE_BEGIN()

namespace detail
{
template <class ServerRPC, class RPCHandler, class CompletionHandler>
struct RegisterCallbackRPCHandlerOperation
: detail::RegisterRPCHandlerOperationAsioBase<ServerRPC, RPCHandler, CompletionHandler>
{
using Base = detail::RegisterRPCHandlerOperationAsioBase<ServerRPC, RPCHandler, CompletionHandler>;
using typename Base::Allocator;
using typename Base::RefCountGuard;
using typename Base::RPCRequest;
using typename Base::ServerRPCExecutor;
using typename Base::Service;
using ServerRPCWithRequest = ServerRPCWithRequest<ServerRPC>;

Check failure on line 39 in src/agrpc/detail/register_callback_rpc_handler.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Default

declaration of ‘using ServerRPCWithRequest = struct agrpc::b::detail::ServerRPCWithRequest<ServerRPCT>’ changes meaning of ‘ServerRPCWithRequest’ [-fpermissive]

Check failure on line 39 in src/agrpc/detail/register_callback_rpc_handler.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Default

declaration of ‘using ServerRPCWithRequest = struct agrpc::s::detail::ServerRPCWithRequest<ServerRPCT>’ changes meaning of ‘ServerRPCWithRequest’ [-fpermissive]

Check failure on line 39 in src/agrpc/detail/register_callback_rpc_handler.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Default

declaration of ‘using ServerRPCWithRequest = struct agrpc::b::detail::ServerRPCWithRequest<ServerRPCT>’ changes meaning of ‘ServerRPCWithRequest’ [-fpermissive]

Check failure on line 39 in src/agrpc/detail/register_callback_rpc_handler.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/Default

declaration of ‘using ServerRPCWithRequest = struct agrpc::b::detail::ServerRPCWithRequest<ServerRPCT>’ changes meaning of ‘ServerRPCWithRequest’ [-fpermissive]

Check failure on line 39 in src/agrpc/detail/register_callback_rpc_handler.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/22.04/GCC

declaration of ‘using ServerRPCWithRequest = struct agrpc::b::detail::ServerRPCWithRequest<ServerRPCT>’ changes meaning of ‘ServerRPCWithRequest’ [-fpermissive]

Check failure on line 39 in src/agrpc/detail/register_callback_rpc_handler.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/GCC

declaration of ‘using ServerRPCWithRequest = struct agrpc::b::detail::ServerRPCWithRequest<ServerRPCT>’ [-fpermissive]

Check failure on line 39 in src/agrpc/detail/register_callback_rpc_handler.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/GCC

declaration of ‘using ServerRPCWithRequest = struct agrpc::s::detail::ServerRPCWithRequest<ServerRPCT>’ [-fpermissive]

Check failure on line 39 in src/agrpc/detail/register_callback_rpc_handler.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/GCC

declaration of ‘using ServerRPCWithRequest = struct agrpc::s::detail::ServerRPCWithRequest<ServerRPCT>’ [-fpermissive]
using ServerRPCPtr = agrpc::ServerRPCPtr<ServerRPC>;

struct ServerRPCAllocation : ServerRPCWithRequest
{
ServerRPCAllocation(const ServerRPCExecutor& executor, RegisterCallbackRPCHandlerOperation& self)
: ServerRPCWithRequest(executor), self_(self)
{
}

RegisterCallbackRPCHandlerOperation& self_;
};

static void wait_for_done_deleter(ServerRPCWithRequest* ptr) noexcept
{
auto& self = *static_cast<ServerRPCAllocation*>(ptr);
[[maybe_unused]] RefCountGuard a{self.self_};
[[maybe_unused]] detail::AllocationGuard b{&self, self.self_.get_allocator()};
}

static void deleter(ServerRPCWithRequest* ptr) noexcept
{
auto& self = *static_cast<ServerRPCAllocation*>(ptr);
RefCountGuard guard{self.self_};
detail::AllocationGuard alloc_guard{&self, self.self_.get_allocator()};
auto& rpc = ptr->rpc_;
if (!detail::ServerRPCContextBaseAccess::is_finished(rpc))
{
rpc.cancel();
}
if constexpr (ServerRPC::Traits::NOTIFY_WHEN_DONE)
{
if (!rpc.is_done())
{
rpc.wait_for_done([ptr = ServerRPCPtr{ptr, &wait_for_done_deleter}](const detail::ErrorCode&) {});
guard.release();
alloc_guard.release();
}
}
}

template <class Ch>
RegisterCallbackRPCHandlerOperation(const ServerRPCExecutor& executor, Service& service, RPCHandler&& rpc_handler,
Ch&& completion_handler)
: Base(executor, service, static_cast<RPCHandler&&>(rpc_handler), static_cast<Ch&&>(completion_handler),
&detail::register_rpc_handler_asio_do_complete<RegisterCallbackRPCHandlerOperation>)
{
initiate();
}

void initiate()
{
auto ptr = detail::allocate<ServerRPCAllocation>(this->get_allocator(), this->get_executor(), *this);
this->increment_ref_count();
perform_request_and_repeat({ptr.release(), &deleter});
}

void initiate_next()
{
if AGRPC_LIKELY (!this->is_stopped())
{
initiate();
}
}

void perform_request_and_repeat(ServerRPCPtr&& ptr)
{
auto& rpc = *ptr.server_rpc_;
rpc.start(rpc.rpc_, this->service(),
bind_allocator(
[this, ptr = static_cast<ServerRPCPtr&&>(ptr)](bool ok) mutable
{
if (ok)
{
initiate_next();
AGRPC_TRY
{
ptr.server_rpc_->invoke(this->rpc_handler(), static_cast<ServerRPCPtr&&>(ptr));
}
AGRPC_CATCH(...)
{
// Technically `this` could already be deallocated at this point but we rely on the
// fact that completing this operation is done in a manner similar to asio::post and
// therefore never before this lambda ends.
this->set_error(std::current_exception());
}
}
else
{
[[maybe_unused]] RefCountGuard a{*this};
[[maybe_unused]] detail::AllocationGuard b{
static_cast<ServerRPCAllocation*>(ptr.release()), this->get_allocator()};
}
}));
}

template <class Function>
decltype(auto) bind_allocator(Function&& function)
{
if constexpr (detail::IS_STD_ALLOCATOR<Allocator>)
{
return static_cast<Function&&>(function);
}
else
{
return agrpc::AllocatorBinder(this->get_allocator(), static_cast<Function&&>(function));
}
}
};

template <class ServerRPC>
using RegisterCallbackRPCHandlerInitiator =
detail::RegisterRPCHandlerInitiator<ServerRPC, RegisterCallbackRPCHandlerOperation>;
}

AGRPC_NAMESPACE_END

#endif // AGRPC_DETAIL_REGISTER_CALLBACK_RPC_HANDLER_HPP
2 changes: 1 addition & 1 deletion src/agrpc/detail/register_yield_rpc_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ struct RegisterYieldRPCHandlerOperation
}

template <class Yield>
constexpr decltype(auto) use_yield(const Yield& yield)
decltype(auto) use_yield(const Yield& yield)
{
if constexpr (detail::IS_STD_ALLOCATOR<Allocator>)
{
Expand Down
8 changes: 4 additions & 4 deletions src/agrpc/detail/rpc_request.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ struct RPCRequest
}

template <class Handler, class RPC, class... Args>
decltype(auto) invoke(Handler&& handler, RPC& rpc, Args&&... args)
decltype(auto) invoke(Handler&& handler, RPC&& rpc, Args&&... args)
{
return static_cast<Handler&&>(handler)(rpc, request_, static_cast<Args&&>(args)...);
return static_cast<Handler&&>(handler)(static_cast<RPC&&>(rpc), request_, static_cast<Args&&>(args)...);
}

Request request_;
Expand All @@ -56,9 +56,9 @@ struct RPCRequest<Request, false>
}

template <class Handler, class RPC, class... Args>
decltype(auto) invoke(Handler&& handler, RPC& rpc, Args&&... args)
decltype(auto) invoke(Handler&& handler, RPC&& rpc, Args&&... args)
{
return static_cast<Handler&&>(handler)(rpc, static_cast<Args&&>(args)...);
return static_cast<Handler&&>(handler)(static_cast<RPC&&>(rpc), static_cast<Args&&>(args)...);
}
};
}
Expand Down
7 changes: 0 additions & 7 deletions src/agrpc/detail/server_rpc_context_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class ServerRPCContextBase : private ServerContextBase<Responder>
friend class detail::ServerRPCNotifyWhenDoneMixin;

Responder responder_{&this->server_context_};
bool is_started_{};
bool is_finished_{};
};

Expand Down Expand Up @@ -105,12 +104,6 @@ struct ServerRPCContextBaseAccess
return rpc.responder_;
}

template <class Responder>
static void set_started(ServerRPCContextBase<Responder>& rpc) noexcept
{
rpc.is_started_ = true;
}

template <class Responder>
[[nodiscard]] static bool is_finished(ServerRPCContextBase<Responder>& rpc) noexcept
{
Expand Down
1 change: 0 additions & 1 deletion src/agrpc/detail/server_rpc_sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ struct ServerRequestSenderImplementation : detail::GrpcSenderImplementationBase
{
grpc_context.work_started();
}
ServerRPCAccess::set_started(rpc_);
}
}

Expand Down
40 changes: 40 additions & 0 deletions src/agrpc/detail/server_rpc_with_request.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 Dennis Hezel
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef AGRPC_DETAIL_SERVER_RPC_WITH_REQUEST_HPP
#define AGRPC_DETAIL_SERVER_RPC_WITH_REQUEST_HPP

#include <agrpc/detail/rpc_request.hpp>
#include <agrpc/detail/server_rpc_context_base.hpp>

AGRPC_NAMESPACE_BEGIN()

namespace detail
{
template <class ServerRPC>
struct ServerRPCWithRequest

Check failure on line 26 in src/agrpc/detail/server_rpc_with_request.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/GCC

changes meaning of ‘ServerRPCWithRequest’ from ‘struct agrpc::b::detail::ServerRPCWithRequest<ServerRPCT>’ [-fpermissive]

Check failure on line 26 in src/agrpc/detail/server_rpc_with_request.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/GCC

changes meaning of ‘ServerRPCWithRequest’ from ‘struct agrpc::s::detail::ServerRPCWithRequest<ServerRPCT>’ [-fpermissive]

Check failure on line 26 in src/agrpc/detail/server_rpc_with_request.hpp

View workflow job for this annotation

GitHub Actions / Ubuntu/20.04/GCC

changes meaning of ‘ServerRPCWithRequest’ from ‘struct agrpc::s::detail::ServerRPCWithRequest<ServerRPCT>’ [-fpermissive]
: detail::RPCRequest<typename ServerRPC::Request, detail::has_initial_request(ServerRPC::TYPE)>
{
ServerRPCWithRequest(const typename ServerRPC::executor_type& executor)
: rpc_(detail::ServerRPCContextBaseAccess::construct<ServerRPC>(executor))
{
}

ServerRPC rpc_;
};
}

AGRPC_NAMESPACE_END

#endif // AGRPC_DETAIL_SERVER_RPC_WITH_REQUEST_HPP
2 changes: 1 addition & 1 deletion src/agrpc/register_awaitable_rpc_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ auto register_awaitable_rpc_handler(const typename ServerRPC::executor_type& exe
}

/**
* @brief (experimental) Register a rpc handler for the given method (GrpcContext overload)
* @brief (experimental) Register an awaitable rpc handler for the given method (GrpcContext overload)
*
* @since 2.7.0
*/
Expand Down
Loading

0 comments on commit f7f9e84

Please sign in to comment.