Skip to content

Commit

Permalink
test: Improve "Awaitable ServerRPC resumable read can be cancelled"
Browse files Browse the repository at this point in the history
  • Loading branch information
Tradias committed Dec 8, 2024
1 parent a0bbfa0 commit 4f1ee9c
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 21 deletions.
11 changes: 1 addition & 10 deletions test/src/test_server_rpc_17.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,15 +556,6 @@ TEST_CASE_TEMPLATE("ServerRPC resumable read can be cancelled", RPC, test::Clien
{
ServerRPCTest<RPC> test{true};
agrpc::Waiter<void()> client_waiter;
const auto complete_client_waiter = [&]
{
client_waiter.initiate(
[&](auto&& e, auto&& token)
{
asio::post(e, token);
},
test.grpc_context);
};
test.register_and_perform_requests(
[&](RPC& rpc, const asio::yield_context& yield)
{
Expand All @@ -588,7 +579,7 @@ TEST_CASE_TEMPLATE("ServerRPC resumable read can be cancelled", RPC, test::Clien
CHECK_EQ(asio::error::operation_aborted, ec);
CHECK_EQ(1, request.integer());
}
complete_client_waiter();
test::complete_immediately(test.grpc_context, client_waiter);
CHECK_FALSE(waiter.wait(yield));

if constexpr (agrpc::ServerRPCType::BIDIRECTIONAL_STREAMING == RPC::TYPE)
Expand Down
15 changes: 4 additions & 11 deletions test/src/test_server_rpc_20.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,8 @@ TEST_CASE_FIXTURE(ServerRPCAwaitableTest<test::BidirectionalStreamingServerRPC>,
"Awaitable ServerRPC resumable read can be cancelled")
{
using RPC = test::BidirectionalStreamingServerRPC;
register_and_perform_three_requests(
agrpc::Waiter<void()> client_waiter;
register_and_perform_requests(
[&](RPC& rpc) -> asio::awaitable<void>
{
typename RPC::Request request;
Expand All @@ -498,7 +499,6 @@ TEST_CASE_FIXTURE(ServerRPCAwaitableTest<test::BidirectionalStreamingServerRPC>,
CHECK_EQ(true, co_await waiter.wait(asio::use_awaitable));
CHECK_EQ(1, request.integer());

const auto not_to_exceed = test::two_hundred_milliseconds_from_now();
waiter.initiate(agrpc::read, rpc, request);
for (int i{}; i != 2; ++i)
{
Expand All @@ -507,10 +507,10 @@ TEST_CASE_FIXTURE(ServerRPCAwaitableTest<test::BidirectionalStreamingServerRPC>,
waiter.wait(test::ASIO_DEFERRED),
asio::post(asio::bind_executor(grpc_context, test::ASIO_DEFERRED)))
.async_wait(asio::experimental::wait_for_one(), asio::use_awaitable);
CHECK_LT(test::now(), not_to_exceed);
CHECK_EQ(asio::error::operation_aborted, ec);
CHECK_EQ(1, request.integer());
}
test::complete_immediately(grpc_context, client_waiter);
CHECK_EQ(false, co_await waiter.wait(asio::use_awaitable));
CHECK(co_await rpc.finish(grpc::Status::OK, asio::use_awaitable));
},
Expand All @@ -520,15 +520,8 @@ TEST_CASE_FIXTURE(ServerRPCAwaitableTest<test::BidirectionalStreamingServerRPC>,
start_rpc(rpc, request, response, yield);
request.set_integer(1);
CHECK(rpc.write(request, yield));
agrpc::Waiter<void(bool)> waiter;
waiter.initiate(agrpc::read, rpc, response);
auto [completion_order, ec, read_ok, wait_ok, alarm] =
asio::experimental::make_parallel_group(
waiter.wait(test::ASIO_DEFERRED),
agrpc::Alarm(grpc_context).wait(test::five_hundred_milliseconds_from_now(), test::ASIO_DEFERRED))
.async_wait(asio::experimental::wait_for_one(), yield);
client_waiter.wait(yield);
CHECK_EQ(grpc::StatusCode::OK, rpc.finish(yield).error_code());
waiter.wait(yield);
});
}
#endif
Expand Down
12 changes: 12 additions & 0 deletions test/utils/utils/asio_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <agrpc/alarm.hpp>
#include <agrpc/grpc_context.hpp>
#include <agrpc/grpc_executor.hpp>
#include <agrpc/waiter.hpp>

#include <functional>
#include <type_traits>
Expand Down Expand Up @@ -243,6 +244,17 @@ void post(agrpc::GrpcContext& grpc_context, const std::function<void()>& functio

void post(const agrpc::GrpcExecutor& executor, const std::function<void()>& function);

template <class Signature, class Executor>
void complete_immediately(agrpc::GrpcContext& grpc_context, agrpc::Waiter<Signature, Executor>& waiter)
{
waiter.initiate(
[&](auto&& context, auto&& token)
{
asio::post(context, token);
},
grpc_context);
}

#ifdef AGRPC_TEST_ASIO_HAS_CO_AWAIT
void co_spawn(agrpc::GrpcContext& grpc_context, const std::function<asio::awaitable<void>()>& function);

Expand Down

0 comments on commit 4f1ee9c

Please sign in to comment.