From c9433eee4f8eaca9df85a25ea328f926a06fdac6 Mon Sep 17 00:00:00 2001 From: Dennis Hezel Date: Thu, 12 Dec 2024 19:39:01 +0100 Subject: [PATCH] perf: Reduce callstack depths in GrpcContext run functions by 3 --- src/agrpc/detail/grpc_context_definition.hpp | 74 ++++++--------- .../detail/grpc_context_implementation.hpp | 22 ++--- ...grpc_context_implementation_definition.hpp | 95 +++++++------------ src/agrpc/run.hpp | 36 +++---- 4 files changed, 91 insertions(+), 136 deletions(-) diff --git a/src/agrpc/detail/grpc_context_definition.hpp b/src/agrpc/detail/grpc_context_definition.hpp index 80a10c88..178f35db 100644 --- a/src/agrpc/detail/grpc_context_definition.hpp +++ b/src/agrpc/detail/grpc_context_definition.hpp @@ -32,19 +32,23 @@ AGRPC_NAMESPACE_BEGIN() namespace detail { template -struct GrpcContextLoopFunction : Function +struct GrpcContextLoopCondition : Function { + static constexpr bool COMPLETION_QUEUE_ONLY = false; + using Function::operator(); [[nodiscard]] bool has_processed(detail::DoOneResult result) const noexcept { return bool{result}; } }; template -GrpcContextLoopFunction(Function) -> GrpcContextLoopFunction; +GrpcContextLoopCondition(Function) -> GrpcContextLoopCondition; template -struct GrpcContextCompletionQueueLoopFunction : Function +struct GrpcContextCompletionQueueLoopCondition : Function { + static constexpr bool COMPLETION_QUEUE_ONLY = true; + using Function::operator(); [[nodiscard]] bool has_processed(detail::DoOneResult result) const noexcept @@ -54,7 +58,14 @@ struct GrpcContextCompletionQueueLoopFunction : Function }; template -GrpcContextCompletionQueueLoopFunction(Function) -> GrpcContextCompletionQueueLoopFunction; +GrpcContextCompletionQueueLoopCondition(Function) -> GrpcContextCompletionQueueLoopCondition; + +struct GrpcContextIsNotStopped +{ + [[nodiscard]] bool operator()() const noexcept { return !grpc_context_.is_stopped(); } + + agrpc::GrpcContext& grpc_context_; +}; template inline void create_resources(T& resources, std::size_t concurrency_hint) @@ -127,70 +138,47 @@ inline GrpcContext::~GrpcContext() inline bool GrpcContext::run() { return detail::GrpcContextImplementation::process_work( - *this, detail::GrpcContextLoopFunction{[](auto& context) - { - return detail::GrpcContextImplementation::do_one_if_not_stopped( - context, detail::GrpcContextImplementation::INFINITE_FUTURE); - }}); + *this, detail::GrpcContextLoopCondition{detail::GrpcContextIsNotStopped{*this}}, + detail::GrpcContextImplementation::INFINITE_FUTURE); } inline bool GrpcContext::run_completion_queue() { return detail::GrpcContextImplementation::process_work( - *this, - detail::GrpcContextCompletionQueueLoopFunction{ - [](detail::GrpcContextThreadContext& context) - { - return detail::GrpcContextImplementation::do_one_completion_queue_if_not_stopped( - context, detail::GrpcContextImplementation::INFINITE_FUTURE); - }}); + *this, detail::GrpcContextCompletionQueueLoopCondition{detail::GrpcContextIsNotStopped{*this}}, + detail::GrpcContextImplementation::INFINITE_FUTURE); } inline bool GrpcContext::poll() { return detail::GrpcContextImplementation::process_work( - *this, detail::GrpcContextLoopFunction{[](auto& context) - { - return detail::GrpcContextImplementation::do_one_if_not_stopped( - context, detail::GrpcContextImplementation::TIME_ZERO); - }}); + *this, detail::GrpcContextLoopCondition{detail::GrpcContextIsNotStopped{*this}}, + detail::GrpcContextImplementation::TIME_ZERO); } inline bool GrpcContext::run_until_impl(::gpr_timespec deadline) { return detail::GrpcContextImplementation::process_work( - *this, detail::GrpcContextLoopFunction{[deadline](auto& context) - { - return detail::GrpcContextImplementation::do_one_if_not_stopped( - context, deadline); - }}); + *this, detail::GrpcContextLoopCondition{detail::GrpcContextIsNotStopped{*this}}, deadline); } template inline bool GrpcContext::run_while(Condition&& condition) { - return detail::GrpcContextImplementation::process_work( - *this, detail::GrpcContextLoopFunction{[&](auto& context) - { - if (!condition()) - { - return detail::DoOneResult{}; - } - return detail::GrpcContextImplementation::do_one_if_not_stopped( - context, detail::GrpcContextImplementation::INFINITE_FUTURE); - }}); + return detail::GrpcContextImplementation::process_work(*this, + detail::GrpcContextLoopCondition{[&] + { + return condition() && + !is_stopped(); + }}, + detail::GrpcContextImplementation::INFINITE_FUTURE); } inline bool GrpcContext::poll_completion_queue() { return detail::GrpcContextImplementation::process_work( - *this, - detail::GrpcContextCompletionQueueLoopFunction{ - [](detail::GrpcContextThreadContext& context) - { - return detail::GrpcContextImplementation::do_one_completion_queue_if_not_stopped( - context, detail::GrpcContextImplementation::TIME_ZERO); - }}); + *this, detail::GrpcContextCompletionQueueLoopCondition{detail::GrpcContextIsNotStopped{*this}}, + detail::GrpcContextImplementation::TIME_ZERO); } inline void GrpcContext::stop() diff --git a/src/agrpc/detail/grpc_context_implementation.hpp b/src/agrpc/detail/grpc_context_implementation.hpp index 94566464..7d4ff14f 100644 --- a/src/agrpc/detail/grpc_context_implementation.hpp +++ b/src/agrpc/detail/grpc_context_implementation.hpp @@ -128,9 +128,9 @@ struct GrpcContextImplementation static void add_operation(agrpc::GrpcContext& grpc_context, detail::QueueableOperationBase* op) noexcept; - static CompletionQueueEventResult handle_next_completion_queue_event(detail::GrpcContextThreadContext& context, - ::gpr_timespec deadline, - detail::InvokeHandler invoke); + static CompletionQueueEventResult do_one_completion_queue_event( + detail::GrpcContextThreadContext& context, ::gpr_timespec deadline, + detail::InvokeHandler invoke = detail::InvokeHandler::YES_); [[nodiscard]] static bool running_in_this_thread() noexcept; @@ -149,17 +149,8 @@ struct GrpcContextImplementation static DoOneResult do_one(detail::GrpcContextThreadContextImpl& context, ::gpr_timespec deadline, detail::InvokeHandler invoke = detail::InvokeHandler::YES_); - template - static DoOneResult do_one_if_not_stopped(detail::GrpcContextThreadContextImpl& context, - ::gpr_timespec deadline); - - static DoOneResult do_one_completion_queue(detail::GrpcContextThreadContext& context, ::gpr_timespec deadline); - - static DoOneResult do_one_completion_queue_if_not_stopped(detail::GrpcContextThreadContext& context, - ::gpr_timespec deadline); - - template - static bool process_work(agrpc::GrpcContext& grpc_context, LoopFunction loop_function); + template + static bool process_work(agrpc::GrpcContext& grpc_context, LoopCondition loop_condition, ::gpr_timespec deadline); static void drain_completion_queue(agrpc::GrpcContext& grpc_context) noexcept; @@ -167,8 +158,7 @@ struct GrpcContextImplementation static void push_resource(agrpc::GrpcContext& grpc_context, detail::ListablePoolResource& resource); - template - static decltype(auto) visit_is_multithreaded(const agrpc::GrpcContext& grpc_context, Function function); + static bool is_multithreaded(const agrpc::GrpcContext& grpc_context); }; void process_grpc_tag(void* tag, detail::OperationResult result, agrpc::GrpcContext& grpc_context); diff --git a/src/agrpc/detail/grpc_context_implementation_definition.hpp b/src/agrpc/detail/grpc_context_implementation_definition.hpp index 514281d1..d20c4ece 100644 --- a/src/agrpc/detail/grpc_context_implementation_definition.hpp +++ b/src/agrpc/detail/grpc_context_implementation_definition.hpp @@ -174,7 +174,7 @@ inline bool get_next_event(grpc::CompletionQueue* cq, detail::GrpcCompletionQueu return grpc::CompletionQueue::GOT_EVENT == cq->AsyncNext(&event.tag_, &event.ok_, deadline); } -inline CompletionQueueEventResult GrpcContextImplementation::handle_next_completion_queue_event( +inline CompletionQueueEventResult GrpcContextImplementation::do_one_completion_queue_event( detail::GrpcContextThreadContext& context, ::gpr_timespec deadline, detail::InvokeHandler invoke) { agrpc::GrpcContext& grpc_context = context.grpc_context_; @@ -236,62 +236,45 @@ inline DoOneResult GrpcContextImplementation::do_one(detail::GrpcContextThreadCo { return {{DoOneResult::PROCESSED_LOCAL_WORK}}; } - const auto handled_event = GrpcContextImplementation::handle_next_completion_queue_event( + const auto handled_event = GrpcContextImplementation::do_one_completion_queue_event( context, is_more_completed_work_pending ? GrpcContextImplementation::TIME_ZERO : deadline, invoke); return DoOneResult::from(handled_event, processed_local_work); } -template -inline DoOneResult GrpcContextImplementation::do_one_if_not_stopped( - detail::GrpcContextThreadContextImpl& context, ::gpr_timespec deadline) -{ - if (context.grpc_context_.is_stopped()) - { - return {}; - } - return {GrpcContextImplementation::do_one(context, deadline, detail::InvokeHandler::YES_)}; -} - -inline DoOneResult GrpcContextImplementation::do_one_completion_queue(detail::GrpcContextThreadContext& context, - ::gpr_timespec deadline) -{ - return { - GrpcContextImplementation::handle_next_completion_queue_event(context, deadline, detail::InvokeHandler::YES_)}; -} - -inline DoOneResult GrpcContextImplementation::do_one_completion_queue_if_not_stopped( - detail::GrpcContextThreadContext& context, ::gpr_timespec deadline) -{ - if (context.grpc_context_.is_stopped()) - { - return {}; - } - return { - GrpcContextImplementation::handle_next_completion_queue_event(context, deadline, detail::InvokeHandler::YES_)}; -} - -template -inline bool GrpcContextImplementation::process_work(agrpc::GrpcContext& grpc_context, LoopFunction loop_function) +template +inline bool GrpcContextImplementation::process_work(agrpc::GrpcContext& grpc_context, LoopCondition loop_condition, + ::gpr_timespec deadline) { - const auto run = [&loop_function](auto& thread_context) + const auto run = [&loop_condition, deadline](auto& thread_context) { bool processed{}; DoOneResult result; - while ((result = loop_function(thread_context))) + while (loop_condition()) { - processed = processed || loop_function.has_processed(result); + if constexpr (LoopCondition::COMPLETION_QUEUE_ONLY) + { + result = {GrpcContextImplementation::do_one_completion_queue_event(thread_context, deadline)}; + } + else + { + result = GrpcContextImplementation::do_one(thread_context, deadline); + } + if (!result) + { + break; + } + processed = processed || loop_condition.has_processed(result); } return processed; }; if (GrpcContextImplementation::running_in_this_thread(grpc_context)) { - return GrpcContextImplementation::visit_is_multithreaded( - grpc_context, - [&](auto v) - { - return run( - static_cast&>(*detail::thread_local_grpc_context)); - }); + auto& context = *detail::thread_local_grpc_context; + if (grpc_context.multithreaded_) + { + return run(static_cast&>(context)); + } + return run(static_cast&>(context)); } if (grpc_context.outstanding_work_.load(std::memory_order_relaxed) == 0) { @@ -299,13 +282,13 @@ inline bool GrpcContextImplementation::process_work(agrpc::GrpcContext& grpc_con return false; } grpc_context.reset(); - return GrpcContextImplementation::visit_is_multithreaded( - grpc_context, - [&](auto v) - { - detail::GrpcContextThreadContextImpl thread_context{grpc_context}; - return run(thread_context); - }); + if (grpc_context.multithreaded_) + { + detail::GrpcContextThreadContextImpl thread_context{grpc_context}; + return run(thread_context); + } + detail::GrpcContextThreadContextImpl thread_context{grpc_context}; + return run(thread_context); } inline void GrpcContextImplementation::drain_completion_queue(agrpc::GrpcContext& grpc_context) noexcept @@ -314,7 +297,7 @@ inline void GrpcContextImplementation::drain_completion_queue(agrpc::GrpcContext (void)grpc_context.remote_work_queue_.try_mark_active(); (void)GrpcContextImplementation::move_remote_work_to_local_queue(thread_context); GrpcContextImplementation::process_local_queue(thread_context, detail::InvokeHandler::NO_); - while (GrpcContextImplementation::handle_next_completion_queue_event( + while (GrpcContextImplementation::do_one_completion_queue_event( thread_context, detail::GrpcContextImplementation::INFINITE_FUTURE, detail::InvokeHandler::NO_) .handled_event()) { @@ -340,15 +323,9 @@ inline void GrpcContextImplementation::push_resource(agrpc::GrpcContext& grpc_co grpc_context.memory_resources_.push_front(resource); } -template -inline decltype(auto) GrpcContextImplementation::visit_is_multithreaded(const agrpc::GrpcContext& grpc_context, - Function function) +inline bool GrpcContextImplementation::is_multithreaded(const agrpc::GrpcContext& grpc_context) { - if (grpc_context.multithreaded_) - { - return function(std::true_type{}); - } - return function(std::false_type{}); + return grpc_context.multithreaded_; } inline void process_grpc_tag(void* tag, detail::OperationResult result, agrpc::GrpcContext& grpc_context) diff --git a/src/agrpc/run.hpp b/src/agrpc/run.hpp index 29a3abba..4e53dc66 100644 --- a/src/agrpc/run.hpp +++ b/src/agrpc/run.hpp @@ -217,18 +217,18 @@ struct AlwaysFalseCondition struct GrpcContextDoOne { template - static auto poll(detail::GrpcContextThreadContextImpl& context, ::gpr_timespec deadline) + static DoOneResult poll(detail::GrpcContextThreadContextImpl& context, ::gpr_timespec deadline) { - return detail::GrpcContextImplementation::do_one(context, deadline); + return GrpcContextImplementation::do_one(context, deadline); } }; struct GrpcContextDoOneCompletionQueue { template - static auto poll(detail::GrpcContextThreadContextImpl& context, ::gpr_timespec deadline) + static DoOneResult poll(detail::GrpcContextThreadContextImpl& context, ::gpr_timespec deadline) { - return detail::GrpcContextImplementation::do_one_completion_queue(context, deadline); + return {GrpcContextImplementation::do_one_completion_queue_event(context, deadline)}; } }; @@ -294,13 +294,13 @@ void run(agrpc::GrpcContext& grpc_context, ExecutionContext& execution_context) template void run(agrpc::GrpcContext& grpc_context, ExecutionContext& execution_context, StopCondition stop_condition) { - detail::GrpcContextImplementation::visit_is_multithreaded( - grpc_context, - [&](auto v) - { - detail::run_impl( - grpc_context, execution_context, static_cast(stop_condition)); - }); + if (detail::GrpcContextImplementation::is_multithreaded(grpc_context)) + { + return detail::run_impl(grpc_context, execution_context, + static_cast(stop_condition)); + } + return detail::run_impl(grpc_context, execution_context, + static_cast(stop_condition)); } template @@ -313,13 +313,13 @@ template void run_completion_queue(agrpc::GrpcContext& grpc_context, ExecutionContext& execution_context, StopCondition stop_condition) { - detail::GrpcContextImplementation::visit_is_multithreaded( - grpc_context, - [&](auto v) - { - detail::run_impl( - grpc_context, execution_context, static_cast(stop_condition)); - }); + if (detail::GrpcContextImplementation::is_multithreaded(grpc_context)) + { + return detail::run_impl( + grpc_context, execution_context, static_cast(stop_condition)); + } + return detail::run_impl( + grpc_context, execution_context, static_cast(stop_condition)); } AGRPC_NAMESPACE_END