Skip to content

Commit

Permalink
fix: Propagate GrpcContext.stop to other threads at the end of run/po…
Browse files Browse the repository at this point in the history
…ll calls
  • Loading branch information
Tradias committed Jul 29, 2024
1 parent 4a2fd4d commit e9e8549
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/agrpc/detail/grpc_context_implementation_definition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ inline GrpcContextThreadContext::~GrpcContextThreadContext() noexcept
{
check_remote_work = GrpcContextImplementation::move_remote_work_to_local_queue(*this);
}
if (GrpcContextImplementation::move_local_queue_to_remote_work(*this))
const bool moved_work_to_remove = GrpcContextImplementation::move_local_queue_to_remote_work(*this);
if (moved_work_to_remove)
{
GrpcContextImplementation::trigger_work_alarm(grpc_context_);
}
if (!moved_work_to_remove && grpc_context_.is_stopped() && grpc_context_.remote_work_queue_.try_mark_active() &&
!GrpcContextImplementation::is_shutdown(grpc_context_))
{
GrpcContextImplementation::trigger_work_alarm(grpc_context_);
}
Expand Down Expand Up @@ -220,8 +226,8 @@ inline bool GrpcContextImplementation::do_one(detail::GrpcContextThreadContext&
check_remote_work = GrpcContextImplementation::move_remote_work_to_local_queue(context);
}
context.check_remote_work_ = false;
const bool processed_local_operation = GrpcContextImplementation::process_local_queue(context, invoke);
processed = processed || processed_local_operation;
const bool processed_local_work = GrpcContextImplementation::process_local_queue(context, invoke);
processed = processed || processed_local_work;
const bool is_more_completed_work_pending = check_remote_work || !context.local_work_queue_.empty();
if (!is_more_completed_work_pending && stop_predicate(grpc_context))
{
Expand Down
32 changes: 32 additions & 0 deletions test/src/test_grpc_context_17.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,38 @@ TEST_CASE_FIXTURE(test::GrpcContextTest, "GrpcContext.run(_completion_queue) par
}
}

TEST_CASE_FIXTURE(test::GrpcContextTest, "GrpcContext.run(_completion_queue) parallel can run out of work")
{
bool run_completion_queue{};
SUBCASE("run()") {}
SUBCASE("run_completion_queue()") { run_completion_queue = true; }
const auto thread_count = 3;
grpc_context_lifetime.emplace(thread_count);
asio::thread_pool pool{thread_count};
agrpc::Alarm{grpc_context}.wait(test::hundred_milliseconds_from_now(),
[&](auto&&...)
{
if (run_completion_queue)
{
agrpc::Alarm{grpc_context}.wait(test::hundred_milliseconds_from_now(),
test::NoOp{});
}
else
{
test::post(grpc_context, test::NoOp{});
}
});
for (size_t i{}; i != thread_count; ++i)
{
asio::post(pool,
[&]
{
run_completion_queue ? grpc_context.run_completion_queue() : grpc_context.run();
});
}
pool.join();
}

TEST_CASE_FIXTURE(test::GrpcContextTest, "GrpcContext.poll() within run()")
{
int count{};
Expand Down

0 comments on commit e9e8549

Please sign in to comment.