Skip to content

Commit

Permalink
feat: Add support for Asio's associated_immediate_executor to ServerR…
Browse files Browse the repository at this point in the history
…PC::wait_for_done()
  • Loading branch information
Tradias committed Dec 7, 2023
1 parent 0ad0554 commit f0f2110
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 15 deletions.
12 changes: 12 additions & 0 deletions src/agrpc/detail/asio_forward.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@

#define AGRPC_ASIO_HAS_BIND_ALLOCATOR
#endif

#if (ASIO_VERSION >= 102700)
#include <asio/associated_immediate_executor.hpp>

#define AGRPC_ASIO_HAS_IMMEDIATE_EXECUTOR
#endif
#elif defined(AGRPC_BOOST_ASIO)
//
#include <boost/version.hpp>
Expand Down Expand Up @@ -99,6 +105,12 @@
#if (BOOST_VERSION >= 108000)
#define AGRPC_ASIO_HAS_NEW_SPAWN
#endif

#if (BOOST_VERSION >= 108200)
#include <boost/asio/associated_immediate_executor.hpp>

#define AGRPC_ASIO_HAS_IMMEDIATE_EXECUTOR
#endif
#endif

#ifdef AGRPC_UNIFEX
Expand Down
38 changes: 38 additions & 0 deletions src/agrpc/detail/association_asio.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,44 @@ void post_with_allocator(Executor&& executor, Function&& function, const Allocat
static_cast<Function&&>(function));
}

template <class CompletionHandler, class Function, class IOExecutor>
void complete_immediately(CompletionHandler&& completion_handler, Function&& function, const IOExecutor& io_executor)
{
const auto allocator = asio::get_associated_allocator(completion_handler);
#ifdef AGRPC_ASIO_HAS_IMMEDIATE_EXECUTOR
auto executor = asio::get_associated_immediate_executor(
completion_handler,
[&]() -> decltype(auto)
{
// Ensure that the io_executor is not already blocking::never because asio will try to convert const& to &&
// due to chosing the `identity` overload of asio::require within the default_immediate_executor.
if constexpr (asio::traits::static_require<IOExecutor, asio::execution::blocking_t::never_t>::is_valid)
{
return asio::prefer(io_executor, asio::execution::blocking_t::possibly);
}
else
{
return (io_executor);
}
}());
detail::do_execute(
asio::prefer(std::move(executor), asio::execution::allocator(allocator)),
[ch = static_cast<CompletionHandler&&>(completion_handler), f = static_cast<Function&&>(function)]() mutable
{
static_cast<Function&&>(f)(static_cast<CompletionHandler&&>(ch));
});
#else
auto executor = asio::get_associated_executor(completion_handler, io_executor);
detail::post_with_allocator(
std::move(executor),
[ch = static_cast<CompletionHandler&&>(completion_handler), f = static_cast<Function&&>(function)]() mutable
{
static_cast<Function&&>(f)(static_cast<CompletionHandler&&>(ch));
},
allocator);
#endif
}

template <class CancellationSlot>
bool stop_possible(CancellationSlot& cancellation_slot)
{
Expand Down
8 changes: 6 additions & 2 deletions src/agrpc/detail/initiate_sender_implementation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ AGRPC_NAMESPACE_BEGIN()
namespace detail
{
#if defined(AGRPC_STANDALONE_ASIO) || defined(AGRPC_BOOST_ASIO)
struct SubmitToCompletionHandler
struct SubmitSenderImplementationOperation
{
using executor_type = agrpc::GrpcExecutor;

template <class CompletionHandler, class Initiation, class Implementation>
void operator()(CompletionHandler&& completion_handler, const Initiation& initiation,
Implementation&& implementation)
Expand All @@ -40,6 +42,8 @@ struct SubmitToCompletionHandler
static_cast<Implementation&&>(implementation));
}

executor_type get_executor() const noexcept { return grpc_context_.get_executor(); }

agrpc::GrpcContext& grpc_context_;
};
#endif
Expand All @@ -52,7 +56,7 @@ auto async_initiate_sender_implementation(agrpc::GrpcContext& grpc_context, cons
if constexpr (!std::is_same_v<agrpc::UseSender, detail::RemoveCrefT<CompletionToken>>)
{
return asio::async_initiate<CompletionToken, typename Implementation::Signature>(
detail::SubmitToCompletionHandler{grpc_context}, token, initiation,
detail::SubmitSenderImplementationOperation{grpc_context}, token, initiation,
static_cast<Implementation&&>(implementation));
}
else
Expand Down
13 changes: 6 additions & 7 deletions src/agrpc/detail/manual_reset_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,21 @@ class ManualResetEvent<void(Args...)> : private detail::Tuple<Args...>
template <class CompletionHandler, class IOExecutor>
void operator()(CompletionHandler&& completion_handler, const IOExecutor& io_executor) const
{
const auto allocator = asio::get_associated_allocator(completion_handler);
if (auto& event = event_; event.ready())
{
auto executor = asio::get_associated_executor(completion_handler, io_executor);
detail::post_with_allocator(
std::move(executor),
[&event, ch = static_cast<CompletionHandler&&>(completion_handler)]() mutable
detail::complete_immediately(
static_cast<CompletionHandler&&>(completion_handler),
[&event](auto&& ch)
{
detail::prepend_error_code_and_apply(static_cast<CompletionHandler&&>(ch),
detail::prepend_error_code_and_apply(std::move(ch),
static_cast<ManualResetEvent&&>(event).args());
},
allocator);
io_executor);
return;
}
using Ch = detail::RemoveCrefT<CompletionHandler>;
using Operation = ManualResetEventOperation<Signature, Ch>;
const auto allocator = asio::get_associated_allocator(completion_handler);
detail::allocate<Operation>(allocator, static_cast<CompletionHandler&&>(completion_handler), event_)
.release();
}
Expand Down
11 changes: 5 additions & 6 deletions src/agrpc/grpc_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ class BasicGrpcExecutor
*
* Thread-safe
*/
[[nodiscard]] static constexpr asio::execution::blocking_t query(asio::execution::blocking_t) noexcept
[[nodiscard]] static constexpr auto query(asio::execution::blocking_t) noexcept
{
return typename detail::QueryStaticBlocking<detail::is_blocking_never(Options)>::result_type();
}
Expand All @@ -409,9 +409,9 @@ class BasicGrpcExecutor
*
* Thread-safe
*/
[[nodiscard]] static constexpr asio::execution::mapping_t query(asio::execution::mapping_t) noexcept
[[nodiscard]] static constexpr detail::QueryStaticMapping::result_type query(asio::execution::mapping_t) noexcept
{
return detail::QueryStaticMapping::result_type();
return {};
}

/**
Expand All @@ -437,7 +437,7 @@ class BasicGrpcExecutor
*
* Thread-safe
*/
[[nodiscard]] static constexpr asio::execution::relationship_t::fork_t query(
[[nodiscard]] static constexpr detail::QueryStaticRelationship::result_type query(
asio::execution::relationship_t) noexcept
{
return {};
Expand All @@ -452,8 +452,7 @@ class BasicGrpcExecutor
*
* Thread-safe
*/
[[nodiscard]] static constexpr asio::execution::outstanding_work_t query(
asio::execution::outstanding_work_t) noexcept
[[nodiscard]] static constexpr auto query(asio::execution::outstanding_work_t) noexcept
{
return typename detail::QueryStaticWorkTracked<detail::is_outstanding_work_tracked(Options)>::result_type();
}
Expand Down

0 comments on commit f0f2110

Please sign in to comment.