Skip to content

Commit

Permalink
feat: Make register_sender_rpc_handler and ServerRPC::wait_for_done c…
Browse files Browse the repository at this point in the history
…ompatible with std::execution
  • Loading branch information
Tradias committed Dec 9, 2023
1 parent 6228e25 commit 50b73c5
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 47 deletions.
3 changes: 1 addition & 2 deletions src/agrpc/bind_allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ class AllocatorBinder
constexpr Allocator get_allocator() const noexcept { return impl_.second(); }

#if defined(AGRPC_UNIFEX) || defined(AGRPC_STDEXEC)
friend Allocator tag_invoke(detail::exec::tag_t<detail::exec::get_allocator>,
const AllocatorBinder& binder) noexcept
friend Allocator tag_invoke(detail::exec::get_allocator_t, const AllocatorBinder& binder) noexcept
{
return binder.get_allocator();
}
Expand Down
11 changes: 6 additions & 5 deletions src/agrpc/detail/basic_sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,18 @@ class BasicSender : public detail::SenderOf<typename Implementation::Signature>
#ifdef AGRPC_STDEXEC
template <class Receiver>
friend detail::BasicSenderOperationState<Initiation, Implementation, detail::RemoveCrefT<Receiver>> tag_invoke(
stdexec::connect_t, const BasicSender& s, Receiver&& r)
stdexec::connect_t, BasicSender&& s,
Receiver&& r) noexcept(noexcept(static_cast<BasicSender&&>(s).connect(static_cast<Receiver&&>(r))))
{
return {static_cast<Receiver&&>(r), s.grpc_context_, s.initiation_, s.implementation_};
return static_cast<BasicSender&&>(s).connect(static_cast<Receiver&&>(r));
}

template <class Receiver>
friend detail::BasicSenderOperationState<Initiation, Implementation, detail::RemoveCrefT<Receiver>> tag_invoke(
stdexec::connect_t, BasicSender&& s, Receiver&& r)
stdexec::connect_t, const BasicSender& s,
Receiver&& r) noexcept(noexcept(s.connect(static_cast<Receiver&&>(r))))
{
return {static_cast<Receiver&&>(r), s.grpc_context_, s.initiation_,
static_cast<Implementation&&>(s.implementation_)};
return s.connect(static_cast<Receiver&&>(r));
}
#endif

Expand Down
27 changes: 23 additions & 4 deletions src/agrpc/detail/execution_stdexec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,21 @@ AGRPC_NAMESPACE_BEGIN()

namespace detail::exec
{
using ::stdexec::get_allocator;
using ::stdexec::get_allocator_t;

template <class Receiver>
decltype(auto) get_allocator(const Receiver& receiver)
{
if constexpr (::stdexec::tag_invocable<::stdexec::get_allocator_t, ::stdexec::env_of_t<Receiver>>)
{
return ::stdexec::get_allocator(::stdexec::get_env(receiver));
}
else
{
return std::allocator<std::byte>{};
}
}

using ::stdexec::get_scheduler;
inline const auto& get_executor = get_scheduler;
using ::stdexec::scheduler;
Expand All @@ -40,7 +54,12 @@ inline constexpr bool is_sender_v = ::stdexec::sender<T>;
using ::exec::inline_scheduler;
using ::stdexec::connect;
using ::stdexec::connect_result_t;
using ::stdexec::get_stop_token;

template <class Receiver>
decltype(auto) get_stop_token(const Receiver& receiver)
{
return ::stdexec::get_stop_token(::stdexec::get_env(receiver));
}

template <class Receiver>
void set_done(Receiver&& receiver)
Expand All @@ -52,8 +71,8 @@ using ::stdexec::set_error;
using ::stdexec::set_value;
using ::stdexec::start;

template <class T>
using stop_token_type_t = ::stdexec::stop_token_of_t<T>;
template <class Receiver>
using stop_token_type_t = ::stdexec::stop_token_of_t<::stdexec::env_of_t<Receiver>>;

using ::stdexec::tag_t;
} // namespace exec
Expand Down
4 changes: 4 additions & 0 deletions src/agrpc/detail/execution_unifex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define AGRPC_DETAIL_EXECUTION_UNIFEX_HPP

#include <agrpc/detail/config.hpp>
#include <agrpc/detail/utility.hpp>
#include <unifex/config.hpp>
#include <unifex/get_allocator.hpp>
#include <unifex/get_stop_token.hpp>
Expand All @@ -31,6 +32,9 @@ AGRPC_NAMESPACE_BEGIN()
namespace detail::exec
{
using ::unifex::get_allocator;

using get_allocator_t = detail::RemoveCrefT<decltype(get_allocator)>;

using ::unifex::get_scheduler;
inline const auto& get_executor = get_scheduler;
using ::unifex::scheduler;
Expand Down
16 changes: 15 additions & 1 deletion src/agrpc/detail/manual_reset_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,19 @@ class ManualResetEventOperationState
state.complete();
return;
}
if (detail::stop_requested(exec::get_stop_token(state.receiver())))
auto stop_token = exec::get_stop_token(state.receiver());
if (detail::stop_requested(stop_token))
{
exec::set_done(static_cast<Receiver&&>(state.receiver()));
return;
}
state.start();
}

#ifdef AGRPC_STDEXEC
friend void tag_invoke(stdexec::start_t, ManualResetEventOperationState& o) noexcept { o.start(); }
#endif

private:
friend ManualResetEventSender<Signature>;

Expand All @@ -276,6 +281,15 @@ class ManualResetEventSender : public detail::SenderOf<void()>
return {static_cast<R&&>(receiver), event_};
}

#ifdef AGRPC_STDEXEC
template <class Receiver>
friend auto tag_invoke(stdexec::connect_t, ManualResetEventSender&& s, Receiver&& r) noexcept(
noexcept(static_cast<ManualResetEventSender&&>(s).connect(static_cast<Receiver&&>(r))))
{
return static_cast<ManualResetEventSender&&>(s).connect(static_cast<Receiver&&>(r));
}
#endif

private:
friend ManualResetEvent<Signature>;

Expand Down
73 changes: 62 additions & 11 deletions src/agrpc/detail/register_sender_rpc_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ class RPCHandlerSender : public detail::SenderOf<void()>
auto connect(Receiver&& receiver) && noexcept(detail::IS_NOTRHOW_DECAY_CONSTRUCTIBLE_V<Receiver> &&
std::is_nothrow_move_constructible_v<RPCHandler>);

#ifdef AGRPC_STDEXEC
template <class Receiver>
friend auto tag_invoke(stdexec::connect_t, RPCHandlerSender&& s, Receiver&& r) noexcept(
noexcept(static_cast<RPCHandlerSender&&>(s).connect(static_cast<Receiver&&>(r))))
{
return static_cast<RPCHandlerSender&&>(s).connect(static_cast<Receiver&&>(r));
}
#endif

private:
template <class, class, class>
friend struct detail::RegisterRPCHandlerSenderOperationBase;
Expand Down Expand Up @@ -97,7 +106,7 @@ using GetWaitForDoneOperationStateT =
struct RPCHandlerOperationFinish
{
template <class Operation>
static void perform(Operation& op, std::exception_ptr* eptr)
static void perform(Operation& op, std::exception_ptr* eptr) noexcept
{
auto& rpc = op.rpc_;
if (eptr)
Expand All @@ -123,7 +132,7 @@ struct RPCHandlerOperationFinish
struct RPCHandlerOperationWaitForDone
{
template <class Operation>
static void perform(Operation& op, const std::exception_ptr*)
static void perform(Operation& op, const std::exception_ptr*) noexcept
{
detail::destroy_deallocate(&op, op.get_allocator());
}
Expand All @@ -149,11 +158,15 @@ struct RPCHandlerOperation

struct StartReceiver
{
#ifdef AGRPC_STDEXEC
using receiver_concept = stdexec::receiver_t;
#endif

RPCHandlerOperation& rpc_handler_op_;

static constexpr void set_done() noexcept {}

void set_value(bool ok)
void set_value(bool ok) const noexcept
{
auto& op = rpc_handler_op_;
detail::AllocationGuard guard{&op, op.get_allocator()};
Expand All @@ -171,7 +184,15 @@ struct RPCHandlerOperation
}
}

static constexpr void set_error(const std::exception_ptr&) noexcept {}
static void set_error(const std::exception_ptr&) noexcept {}

#ifdef AGRPC_STDEXEC
friend void tag_invoke(stdexec::set_stopped_t, const StartReceiver&) noexcept {}

friend void tag_invoke(stdexec::set_value_t, const StartReceiver& r, bool ok) noexcept { r.set_value(ok); }

friend void tag_invoke(stdexec::set_error_t, const StartReceiver&, const std::exception_ptr&) noexcept {}
#endif

friend exec::inline_scheduler tag_invoke(exec::tag_t<exec::get_scheduler>, const StartReceiver&) noexcept
{
Expand All @@ -187,19 +208,38 @@ struct RPCHandlerOperation
template <class Action>
struct Receiver
{
#ifdef AGRPC_STDEXEC
using receiver_concept = stdexec::receiver_t;
#endif

RPCHandlerOperation& op_;

void perform(std::exception_ptr* eptr) noexcept { Action::perform(op_, eptr); }
void perform(std::exception_ptr* eptr) const noexcept { Action::perform(op_, eptr); }

void set_done() noexcept { perform(nullptr); }
void set_done() const noexcept { perform(nullptr); }

template <class... T>
void set_value(T&&...) noexcept
void set_value(T&&...) const noexcept
{
perform(nullptr);
}

void set_error(std::exception_ptr eptr) noexcept { perform(&eptr); }
void set_error(std::exception_ptr eptr) const noexcept { perform(&eptr); }

#ifdef AGRPC_STDEXEC
friend constexpr void tag_invoke(stdexec::set_stopped_t, const Receiver& r) noexcept { r.set_done(); }

template <class... T>
friend constexpr void tag_invoke(stdexec::set_value_t, const Receiver& r, T&&...) noexcept
{
r.set_value();
}

friend void tag_invoke(stdexec::set_error_t, const Receiver& r, std::exception_ptr e) noexcept
{
r.set_error(static_cast<std::exception_ptr&&>(e));
}
#endif

friend exec::inline_scheduler tag_invoke(exec::tag_t<exec::get_scheduler>, const Receiver&) noexcept
{
Expand Down Expand Up @@ -248,7 +288,11 @@ struct RPCHandlerOperation

void start() { std::get<StartOperationState>(operation_state()).value_.start(); }

std::optional<std::exception_ptr> emplace_rpc_handler_operation_state()
#ifdef AGRPC_STDEXEC
friend void tag_invoke(stdexec::start_t, RPCHandlerOperation& o) noexcept { o.start(); }
#endif

std::optional<std::exception_ptr> emplace_rpc_handler_operation_state() noexcept
{
AGRPC_TRY
{
Expand All @@ -263,9 +307,12 @@ struct RPCHandlerOperation
AGRPC_CATCH(...) { return std::current_exception(); }
}

void start_rpc_handler_operation_state() { exec::start(std::get<FinishOperationState>(operation_state()).value_); }
void start_rpc_handler_operation_state() noexcept
{
exec::start(std::get<FinishOperationState>(operation_state()).value_);
}

void start_wait_for_done()
void start_wait_for_done() noexcept
{
auto& state = operation_state().template emplace<WaitForDoneOperationState>(
detail::InplaceWithFunction{},
Expand Down Expand Up @@ -336,6 +383,10 @@ class RPCHandlerSenderOperation
detail::create_and_start_rpc_handler_operation(*this, get_allocator());
}

#ifdef AGRPC_STDEXEC
friend void tag_invoke(stdexec::start_t, RPCHandlerSenderOperation& o) noexcept { o.start(); }
#endif

private:
friend RPCHandlerSender;

Expand Down
4 changes: 4 additions & 0 deletions test/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ set(ASIO_GRPC_CPP17_TEST_SOURCE_FILES
set(ASIO_GRPC_CPP20_TEST_SOURCE_FILES
"test_asio_grpc_20.cpp" "test_repeatedly_request_20.cpp" "test_bind_allocator_20.cpp" "test_grpc_context_20.cpp"
"test_grpc_stream_20.cpp" "test_server_rpc_20.cpp")

set_source_files_properties("test_test_17.cpp" PROPERTIES SKIP_UNITY_BUILD_INCLUSION on)
set_source_files_properties(
"test_client_rpc_17.cpp" "test_server_rpc_17.cpp" "test_server_rpc_20.cpp" "test_unifex_20.cpp"
PROPERTIES COMPILE_OPTIONS "$<$<AND:$<CXX_COMPILER_ID:MSVC>,$<CONFIG:Debug>>:/bigobj>")

asio_grpc_add_test(asio-grpc-test-boost-cpp17 "BOOST_ASIO" "17" ${ASIO_GRPC_CPP17_TEST_SOURCE_FILES})
asio_grpc_add_test(asio-grpc-test-cpp17 "STANDALONE_ASIO" "17" ${ASIO_GRPC_CPP17_TEST_SOURCE_FILES})
Expand Down
Loading

0 comments on commit 50b73c5

Please sign in to comment.