Skip to content

Commit

Permalink
fix: Correctly use ServerRPC::executor_type passed to register_reques…
Browse files Browse the repository at this point in the history
…t_handler function
  • Loading branch information
Tradias committed Oct 16, 2023
1 parent 51125fc commit 5cd94f1
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 33 deletions.
10 changes: 5 additions & 5 deletions src/agrpc/detail/register_awaitable_request_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct AwaitableRequestHandlerOperation
using typename Base::Allocator;
using typename Base::RefCountGuard;
using typename Base::RPCRequest;
using typename Base::ServerRPCExecutor;
using typename Base::Service;

using Awaitable = detail::RebindCoroutineT<decltype(std::declval<RPCRequest&>().invoke(
Expand All @@ -42,10 +43,9 @@ struct AwaitableRequestHandlerOperation
using UseAwaitable = detail::CoroutineCompletionTokenT<Awaitable>;

template <class Ch>
AwaitableRequestHandlerOperation(agrpc::GrpcContext& grpc_context, Service& service,
AwaitableRequestHandlerOperation(const ServerRPCExecutor& executor, Service& service,
RequestHandler&& request_handler, Ch&& completion_handler)
: Base(grpc_context, service, static_cast<RequestHandler&&>(request_handler),
static_cast<Ch&&>(completion_handler),
: Base(executor, service, static_cast<RequestHandler&&>(request_handler), static_cast<Ch&&>(completion_handler),
&detail::register_request_handler_asio_do_complete<AwaitableRequestHandlerOperation>)
{
initiate();
Expand All @@ -54,7 +54,7 @@ struct AwaitableRequestHandlerOperation
void initiate()
{
this->increment_ref_count();
asio::co_spawn(asio::get_associated_executor(this->completion_handler(), this->grpc_context()),
asio::co_spawn(asio::get_associated_executor(this->completion_handler(), this->get_executor()),
perform_request_and_repeat(),
[g = RefCountGuard{*this}](std::exception_ptr eptr)
{
Expand All @@ -76,7 +76,7 @@ struct AwaitableRequestHandlerOperation

Awaitable perform_request_and_repeat()
{
auto rpc = detail::ServerRPCContextBaseAccess::construct<ServerRPC>(this->grpc_context().get_executor());
auto rpc = detail::ServerRPCContextBaseAccess::construct<ServerRPC>(this->get_executor());
RPCRequest req;
if (!co_await req.start(rpc, this->service(), use_awaitable()))
{
Expand Down
18 changes: 10 additions & 8 deletions src/agrpc/detail/register_request_handler_asio_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,23 @@ class RegisterRequestHandlerOperationAsioBase
};

public:
using typename Base::ServerRPCExecutor;
using typename Base::Service;
using Executor = detail::AssociatedExecutorT<CompletionHandlerT, agrpc::GrpcExecutor>;
using Executor = detail::AssociatedExecutorT<CompletionHandlerT, ServerRPCExecutor>;
using Allocator = detail::AssociatedAllocatorT<CompletionHandlerT>;
using RPCRequest = detail::RPCRequest<typename ServerRPC::Request, detail::has_initial_request(ServerRPC::TYPE)>;
using RefCountGuard = detail::ScopeGuard<Decrementer>;

template <class Ch>
RegisterRequestHandlerOperationAsioBase(agrpc::GrpcContext& grpc_context, Service& service,
RegisterRequestHandlerOperationAsioBase(const ServerRPCExecutor& executor, Service& service,
RequestHandler&& request_handler, Ch&& completion_handler,
detail::OperationOnComplete on_complete)
: Base(grpc_context, service, static_cast<RequestHandler&&>(request_handler)),
WorkTracker(exec::get_executor(completion_handler)),
: Base(executor, service, static_cast<RequestHandler&&>(request_handler)),
detail::QueueableOperationBase(on_complete),
WorkTracker(exec::get_executor(completion_handler)),
completion_handler_(static_cast<Ch&&>(completion_handler))
{
grpc_context.work_started();
this->grpc_context().work_started();
this->stop_context_.emplace(exec::get_stop_token(completion_handler_));
}

Expand All @@ -100,17 +101,18 @@ struct RegisterRequestHandlerInitiator
{
template <class CompletionHandler, class RequestHandler>
void operator()(CompletionHandler&& completion_handler, const typename ServerRPC::executor_type& executor,
detail::GetServerRPCServiceT<ServerRPC>& service, RequestHandler&& request_handler) const
RequestHandler&& request_handler) const
{
using Ch = detail::RemoveCrefT<CompletionHandler>;
using DecayedRequestHandler = detail::RemoveCrefT<RequestHandler>;
auto& grpc_context = detail::query_grpc_context(executor);
const auto allocator = exec::get_allocator(completion_handler);
detail::allocate<Operation<ServerRPC, DecayedRequestHandler, Ch>>(
allocator, grpc_context, service, static_cast<RequestHandler&&>(request_handler),
allocator, executor, service_, static_cast<RequestHandler&&>(request_handler),
static_cast<CompletionHandler&&>(completion_handler))
.release();
}

detail::GetServerRPCServiceT<ServerRPC>& service_;
};

template <class Operation>
Expand Down
15 changes: 8 additions & 7 deletions src/agrpc/detail/register_request_handler_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ template <class ServerRPC, class RequestHandler, class StopToken>
struct RegisterRequestHandlerOperationBase
{
using Service = detail::GetServerRPCServiceT<ServerRPC>;
using ServerRPCExecutor = typename ServerRPC::executor_type;

RegisterRequestHandlerOperationBase(agrpc::GrpcContext& grpc_context, Service& service,
RegisterRequestHandlerOperationBase(const ServerRPCExecutor& executor, Service& service,
RequestHandler&& request_handler)
: grpc_context_(grpc_context),
service_(service),
request_handler_(static_cast<RequestHandler&&>(request_handler))
: executor_(executor), service_(service), request_handler_(static_cast<RequestHandler&&>(request_handler))
{
}

Expand All @@ -61,11 +60,13 @@ struct RegisterRequestHandlerOperationBase

void stop() noexcept { stop_context_.stop(); }

agrpc::GrpcContext& grpc_context() const noexcept { return grpc_context_; }
agrpc::GrpcContext& grpc_context() const noexcept { return detail::query_grpc_context(executor_); }

const ServerRPCExecutor& get_executor() const noexcept { return executor_; }

Service& service() const noexcept { return service_; }

const RequestHandler& request_handler() const noexcept { return request_handler_; }
RequestHandler& request_handler() noexcept { return request_handler_; }

void set_error(std::exception_ptr&& eptr) noexcept
{
Expand All @@ -81,7 +82,7 @@ struct RegisterRequestHandlerOperationBase

[[nodiscard]] bool decrement_ref_count() noexcept { return 0 == --reference_count_; }

agrpc::GrpcContext& grpc_context_;
ServerRPCExecutor executor_;
Service& service_;
std::atomic_size_t reference_count_{};
std::exception_ptr eptr_{};
Expand Down
4 changes: 2 additions & 2 deletions src/agrpc/detail/register_sender_request_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct RegisterRequestHandlerSenderOperationBase
{
RegisterRequestHandlerSenderOperationBase(RequestHandlerSender<ServerRPC, RequestHandler>&& sender,
RegisterRequestHandlerOperationComplete::Complete complete)
: RegisterRequestHandlerOperationBase<ServerRPC, RequestHandler, StopToken>{sender.grpc_context_,
: RegisterRequestHandlerOperationBase<ServerRPC, RequestHandler, StopToken>{sender.grpc_context_.get_executor(),
sender.service_,
static_cast<RequestHandler&&>(
sender.request_handler_)},
Expand Down Expand Up @@ -224,7 +224,7 @@ struct RequestHandlerOperation
explicit RequestHandlerOperation(RegisterRequestHandlerSenderOperationBase& operation, const Allocator& allocator)
: base_(operation),
impl1_(operation.request_handler()),
rpc_(detail::ServerRPCContextBaseAccess::construct<ServerRPC>(operation.grpc_context().get_executor())),
rpc_(detail::ServerRPCContextBaseAccess::construct<ServerRPC>(operation.get_executor())),
impl2_(detail::SecondThenVariadic{}, allocator, std::in_place_type<StartOperationState>,
detail::InplaceWithFunction{},
[&]
Expand Down
10 changes: 5 additions & 5 deletions src/agrpc/detail/register_yield_request_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ struct YieldRequestHandlerOperation
using typename Base::Allocator;
using typename Base::RefCountGuard;
using typename Base::RPCRequest;
using typename Base::ServerRPCExecutor;
using typename Base::Service;

template <class Ch>
YieldRequestHandlerOperation(agrpc::GrpcContext& grpc_context, Service& service, RequestHandler&& request_handler,
YieldRequestHandlerOperation(const ServerRPCExecutor& executor, Service& service, RequestHandler&& request_handler,
Ch&& completion_handler)
: Base(grpc_context, service, static_cast<RequestHandler&&>(request_handler),
static_cast<Ch&&>(completion_handler),
: Base(executor, service, static_cast<RequestHandler&&>(request_handler), static_cast<Ch&&>(completion_handler),
&detail::register_request_handler_asio_do_complete<YieldRequestHandlerOperation>)
{
initiate();
Expand All @@ -64,7 +64,7 @@ struct YieldRequestHandlerOperation
void initiate()
{
this->increment_ref_count();
detail::spawn(asio::get_associated_executor(this->completion_handler(), this->grpc_context()),
detail::spawn(asio::get_associated_executor(this->completion_handler(), this->get_executor()),
[g = RefCountGuard{*this}](const auto& yield)
{
static_cast<YieldRequestHandlerOperation&>(g.get().self_).perform_request_and_repeat(yield);
Expand All @@ -82,7 +82,7 @@ struct YieldRequestHandlerOperation
template <class Yield>
void perform_request_and_repeat(const Yield& yield)
{
auto rpc = detail::ServerRPCContextBaseAccess::construct<ServerRPC>(this->grpc_context().get_executor());
auto rpc = detail::ServerRPCContextBaseAccess::construct<ServerRPC>(this->get_executor());
RPCRequest req;
if (!req.start(rpc, this->service(), use_yield(yield)))
{
Expand Down
4 changes: 2 additions & 2 deletions src/agrpc/detail/work_tracking_completion_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void dispatch_with_args(Handler&& handler, Args&&... args)
std::move(executor),
[handler = static_cast<Handler&&>(handler), args = detail::Tuple{static_cast<Args&&>(args)...}]() mutable
{
detail::apply(static_cast<Handler&&>(handler), std::move(args));
detail::apply(std::move(handler), std::move(args));
});
}

Expand Down Expand Up @@ -120,7 +120,7 @@ template <class AllocationGuard, class... Args>
void dispatch_complete(AllocationGuard& guard, Args&&... args)
{
auto handler{std::move(guard->completion_handler())};
auto tracker{std::move(guard->work_tracker())};
[[maybe_unused]] auto tracker{std::move(guard->work_tracker())};
guard.reset();
detail::dispatch_with_args(std::move(handler), static_cast<Args&&>(args)...);
}
Expand Down
2 changes: 1 addition & 1 deletion src/agrpc/register_awaitable_request_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ auto register_awaitable_request_handler(const typename ServerRPC::executor_type&
RequestHandler request_handler, CompletionToken token)
{
return asio::async_initiate<CompletionToken, void(std::exception_ptr)>(
detail::AwaitableRequestHandlerInitiator<ServerRPC>{}, token, executor, service,
detail::AwaitableRequestHandlerInitiator<ServerRPC>{service}, token, executor,
static_cast<RequestHandler&&>(request_handler));
}

Expand Down
4 changes: 2 additions & 2 deletions src/agrpc/register_sender_request_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

AGRPC_NAMESPACE_BEGIN()

template <class ServerRPC, class Service, class RequestHandler>
template <class ServerRPC, class RequestHandler>
[[nodiscard]] detail::RequestHandlerSender<ServerRPC, RequestHandler> register_sender_request_handler(
agrpc::GrpcContext& grpc_context, Service& service, RequestHandler request_handler)
agrpc::GrpcContext& grpc_context, detail::GetServerRPCServiceT<ServerRPC>& service, RequestHandler request_handler)
{
return {grpc_context, service, static_cast<RequestHandler&&>(request_handler)};
}
Expand Down
2 changes: 1 addition & 1 deletion src/agrpc/register_yield_request_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ auto register_yield_request_handler(const typename ServerRPC::executor_type& exe
CompletionToken token)
{
return asio::async_initiate<CompletionToken, void(std::exception_ptr)>(
detail::YieldRequestHandlerInitiator<ServerRPC>{}, token, executor, service,
detail::YieldRequestHandlerInitiator<ServerRPC>{service}, token, executor,
static_cast<RequestHandler&&>(request_handler));
}

Expand Down

0 comments on commit 5cd94f1

Please sign in to comment.