Skip to content

Commit

Permalink
perf: Reduce callstack depths in GrpcContext run functions by 3
Browse files Browse the repository at this point in the history
  • Loading branch information
Tradias committed Dec 12, 2024
1 parent d01059f commit c9433ee
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 136 deletions.
74 changes: 31 additions & 43 deletions src/agrpc/detail/grpc_context_definition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,23 @@ AGRPC_NAMESPACE_BEGIN()
namespace detail
{
template <class Function>
struct GrpcContextLoopFunction : Function
struct GrpcContextLoopCondition : Function
{
static constexpr bool COMPLETION_QUEUE_ONLY = false;

using Function::operator();

[[nodiscard]] bool has_processed(detail::DoOneResult result) const noexcept { return bool{result}; }
};

template <class Function>
GrpcContextLoopFunction(Function) -> GrpcContextLoopFunction<Function>;
GrpcContextLoopCondition(Function) -> GrpcContextLoopCondition<Function>;

template <class Function>
struct GrpcContextCompletionQueueLoopFunction : Function
struct GrpcContextCompletionQueueLoopCondition : Function
{
static constexpr bool COMPLETION_QUEUE_ONLY = true;

using Function::operator();

[[nodiscard]] bool has_processed(detail::DoOneResult result) const noexcept
Expand All @@ -54,7 +58,14 @@ struct GrpcContextCompletionQueueLoopFunction : Function
};

template <class Function>
GrpcContextCompletionQueueLoopFunction(Function) -> GrpcContextCompletionQueueLoopFunction<Function>;
GrpcContextCompletionQueueLoopCondition(Function) -> GrpcContextCompletionQueueLoopCondition<Function>;

struct GrpcContextIsNotStopped
{
[[nodiscard]] bool operator()() const noexcept { return !grpc_context_.is_stopped(); }

agrpc::GrpcContext& grpc_context_;
};

template <class T>
inline void create_resources(T& resources, std::size_t concurrency_hint)
Expand Down Expand Up @@ -127,70 +138,47 @@ inline GrpcContext::~GrpcContext()
inline bool GrpcContext::run()
{
return detail::GrpcContextImplementation::process_work(
*this, detail::GrpcContextLoopFunction{[](auto& context)
{
return detail::GrpcContextImplementation::do_one_if_not_stopped(
context, detail::GrpcContextImplementation::INFINITE_FUTURE);
}});
*this, detail::GrpcContextLoopCondition{detail::GrpcContextIsNotStopped{*this}},
detail::GrpcContextImplementation::INFINITE_FUTURE);
}

inline bool GrpcContext::run_completion_queue()
{
return detail::GrpcContextImplementation::process_work(
*this,
detail::GrpcContextCompletionQueueLoopFunction{
[](detail::GrpcContextThreadContext& context)
{
return detail::GrpcContextImplementation::do_one_completion_queue_if_not_stopped(
context, detail::GrpcContextImplementation::INFINITE_FUTURE);
}});
*this, detail::GrpcContextCompletionQueueLoopCondition{detail::GrpcContextIsNotStopped{*this}},
detail::GrpcContextImplementation::INFINITE_FUTURE);
}

inline bool GrpcContext::poll()
{
return detail::GrpcContextImplementation::process_work(
*this, detail::GrpcContextLoopFunction{[](auto& context)
{
return detail::GrpcContextImplementation::do_one_if_not_stopped(
context, detail::GrpcContextImplementation::TIME_ZERO);
}});
*this, detail::GrpcContextLoopCondition{detail::GrpcContextIsNotStopped{*this}},
detail::GrpcContextImplementation::TIME_ZERO);
}

inline bool GrpcContext::run_until_impl(::gpr_timespec deadline)
{
return detail::GrpcContextImplementation::process_work(
*this, detail::GrpcContextLoopFunction{[deadline](auto& context)
{
return detail::GrpcContextImplementation::do_one_if_not_stopped(
context, deadline);
}});
*this, detail::GrpcContextLoopCondition{detail::GrpcContextIsNotStopped{*this}}, deadline);
}

template <class Condition>
inline bool GrpcContext::run_while(Condition&& condition)
{
return detail::GrpcContextImplementation::process_work(
*this, detail::GrpcContextLoopFunction{[&](auto& context)
{
if (!condition())
{
return detail::DoOneResult{};
}
return detail::GrpcContextImplementation::do_one_if_not_stopped(
context, detail::GrpcContextImplementation::INFINITE_FUTURE);
}});
return detail::GrpcContextImplementation::process_work(*this,
detail::GrpcContextLoopCondition{[&]
{
return condition() &&
!is_stopped();
}},
detail::GrpcContextImplementation::INFINITE_FUTURE);
}

inline bool GrpcContext::poll_completion_queue()
{
return detail::GrpcContextImplementation::process_work(
*this,
detail::GrpcContextCompletionQueueLoopFunction{
[](detail::GrpcContextThreadContext& context)
{
return detail::GrpcContextImplementation::do_one_completion_queue_if_not_stopped(
context, detail::GrpcContextImplementation::TIME_ZERO);
}});
*this, detail::GrpcContextCompletionQueueLoopCondition{detail::GrpcContextIsNotStopped{*this}},
detail::GrpcContextImplementation::TIME_ZERO);
}

inline void GrpcContext::stop()
Expand Down
22 changes: 6 additions & 16 deletions src/agrpc/detail/grpc_context_implementation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ struct GrpcContextImplementation

static void add_operation(agrpc::GrpcContext& grpc_context, detail::QueueableOperationBase* op) noexcept;

static CompletionQueueEventResult handle_next_completion_queue_event(detail::GrpcContextThreadContext& context,
::gpr_timespec deadline,
detail::InvokeHandler invoke);
static CompletionQueueEventResult do_one_completion_queue_event(
detail::GrpcContextThreadContext& context, ::gpr_timespec deadline,
detail::InvokeHandler invoke = detail::InvokeHandler::YES_);

[[nodiscard]] static bool running_in_this_thread() noexcept;

Expand All @@ -149,26 +149,16 @@ struct GrpcContextImplementation
static DoOneResult do_one(detail::GrpcContextThreadContextImpl<IsMultithreaded>& context, ::gpr_timespec deadline,
detail::InvokeHandler invoke = detail::InvokeHandler::YES_);

template <bool IsMultithreaded>
static DoOneResult do_one_if_not_stopped(detail::GrpcContextThreadContextImpl<IsMultithreaded>& context,
::gpr_timespec deadline);

static DoOneResult do_one_completion_queue(detail::GrpcContextThreadContext& context, ::gpr_timespec deadline);

static DoOneResult do_one_completion_queue_if_not_stopped(detail::GrpcContextThreadContext& context,
::gpr_timespec deadline);

template <class LoopFunction>
static bool process_work(agrpc::GrpcContext& grpc_context, LoopFunction loop_function);
template <class LoopCondition>
static bool process_work(agrpc::GrpcContext& grpc_context, LoopCondition loop_condition, ::gpr_timespec deadline);

static void drain_completion_queue(agrpc::GrpcContext& grpc_context) noexcept;

static detail::ListablePoolResource& pop_resource(agrpc::GrpcContext& grpc_context);

static void push_resource(agrpc::GrpcContext& grpc_context, detail::ListablePoolResource& resource);

template <class Function>
static decltype(auto) visit_is_multithreaded(const agrpc::GrpcContext& grpc_context, Function function);
static bool is_multithreaded(const agrpc::GrpcContext& grpc_context);
};

void process_grpc_tag(void* tag, detail::OperationResult result, agrpc::GrpcContext& grpc_context);
Expand Down
95 changes: 36 additions & 59 deletions src/agrpc/detail/grpc_context_implementation_definition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ inline bool get_next_event(grpc::CompletionQueue* cq, detail::GrpcCompletionQueu
return grpc::CompletionQueue::GOT_EVENT == cq->AsyncNext(&event.tag_, &event.ok_, deadline);
}

inline CompletionQueueEventResult GrpcContextImplementation::handle_next_completion_queue_event(
inline CompletionQueueEventResult GrpcContextImplementation::do_one_completion_queue_event(
detail::GrpcContextThreadContext& context, ::gpr_timespec deadline, detail::InvokeHandler invoke)
{
agrpc::GrpcContext& grpc_context = context.grpc_context_;
Expand Down Expand Up @@ -236,76 +236,59 @@ inline DoOneResult GrpcContextImplementation::do_one(detail::GrpcContextThreadCo
{
return {{DoOneResult::PROCESSED_LOCAL_WORK}};
}
const auto handled_event = GrpcContextImplementation::handle_next_completion_queue_event(
const auto handled_event = GrpcContextImplementation::do_one_completion_queue_event(
context, is_more_completed_work_pending ? GrpcContextImplementation::TIME_ZERO : deadline, invoke);
return DoOneResult::from(handled_event, processed_local_work);
}

template <bool IsMultithreaded>
inline DoOneResult GrpcContextImplementation::do_one_if_not_stopped(
detail::GrpcContextThreadContextImpl<IsMultithreaded>& context, ::gpr_timespec deadline)
{
if (context.grpc_context_.is_stopped())
{
return {};
}
return {GrpcContextImplementation::do_one(context, deadline, detail::InvokeHandler::YES_)};
}

inline DoOneResult GrpcContextImplementation::do_one_completion_queue(detail::GrpcContextThreadContext& context,
::gpr_timespec deadline)
{
return {
GrpcContextImplementation::handle_next_completion_queue_event(context, deadline, detail::InvokeHandler::YES_)};
}

inline DoOneResult GrpcContextImplementation::do_one_completion_queue_if_not_stopped(
detail::GrpcContextThreadContext& context, ::gpr_timespec deadline)
{
if (context.grpc_context_.is_stopped())
{
return {};
}
return {
GrpcContextImplementation::handle_next_completion_queue_event(context, deadline, detail::InvokeHandler::YES_)};
}

template <class LoopFunction>
inline bool GrpcContextImplementation::process_work(agrpc::GrpcContext& grpc_context, LoopFunction loop_function)
template <class LoopCondition>
inline bool GrpcContextImplementation::process_work(agrpc::GrpcContext& grpc_context, LoopCondition loop_condition,
::gpr_timespec deadline)
{
const auto run = [&loop_function](auto& thread_context)
const auto run = [&loop_condition, deadline](auto& thread_context)
{
bool processed{};
DoOneResult result;
while ((result = loop_function(thread_context)))
while (loop_condition())
{
processed = processed || loop_function.has_processed(result);
if constexpr (LoopCondition::COMPLETION_QUEUE_ONLY)
{
result = {GrpcContextImplementation::do_one_completion_queue_event(thread_context, deadline)};
}
else
{
result = GrpcContextImplementation::do_one(thread_context, deadline);
}
if (!result)
{
break;
}
processed = processed || loop_condition.has_processed(result);
}
return processed;
};
if (GrpcContextImplementation::running_in_this_thread(grpc_context))
{
return GrpcContextImplementation::visit_is_multithreaded(
grpc_context,
[&](auto v)
{
return run(
static_cast<GrpcContextThreadContextImpl<decltype(v)::value>&>(*detail::thread_local_grpc_context));
});
auto& context = *detail::thread_local_grpc_context;
if (grpc_context.multithreaded_)
{
return run(static_cast<GrpcContextThreadContextImpl<true>&>(context));
}
return run(static_cast<GrpcContextThreadContextImpl<false>&>(context));
}
if (grpc_context.outstanding_work_.load(std::memory_order_relaxed) == 0)
{
grpc_context.stopped_.store(true, std::memory_order_relaxed);
return false;
}
grpc_context.reset();
return GrpcContextImplementation::visit_is_multithreaded(
grpc_context,
[&](auto v)
{
detail::GrpcContextThreadContextImpl<decltype(v)::value> thread_context{grpc_context};
return run(thread_context);
});
if (grpc_context.multithreaded_)
{
detail::GrpcContextThreadContextImpl<true> thread_context{grpc_context};
return run(thread_context);
}
detail::GrpcContextThreadContextImpl<false> thread_context{grpc_context};
return run(thread_context);
}

inline void GrpcContextImplementation::drain_completion_queue(agrpc::GrpcContext& grpc_context) noexcept
Expand All @@ -314,7 +297,7 @@ inline void GrpcContextImplementation::drain_completion_queue(agrpc::GrpcContext
(void)grpc_context.remote_work_queue_.try_mark_active();
(void)GrpcContextImplementation::move_remote_work_to_local_queue(thread_context);
GrpcContextImplementation::process_local_queue(thread_context, detail::InvokeHandler::NO_);
while (GrpcContextImplementation::handle_next_completion_queue_event(
while (GrpcContextImplementation::do_one_completion_queue_event(
thread_context, detail::GrpcContextImplementation::INFINITE_FUTURE, detail::InvokeHandler::NO_)
.handled_event())
{
Expand All @@ -340,15 +323,9 @@ inline void GrpcContextImplementation::push_resource(agrpc::GrpcContext& grpc_co
grpc_context.memory_resources_.push_front(resource);
}

template <class Function>
inline decltype(auto) GrpcContextImplementation::visit_is_multithreaded(const agrpc::GrpcContext& grpc_context,
Function function)
inline bool GrpcContextImplementation::is_multithreaded(const agrpc::GrpcContext& grpc_context)
{
if (grpc_context.multithreaded_)
{
return function(std::true_type{});
}
return function(std::false_type{});
return grpc_context.multithreaded_;
}

inline void process_grpc_tag(void* tag, detail::OperationResult result, agrpc::GrpcContext& grpc_context)
Expand Down
36 changes: 18 additions & 18 deletions src/agrpc/run.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,18 +217,18 @@ struct AlwaysFalseCondition
struct GrpcContextDoOne
{
template <bool IsMultithreaded>
static auto poll(detail::GrpcContextThreadContextImpl<IsMultithreaded>& context, ::gpr_timespec deadline)
static DoOneResult poll(detail::GrpcContextThreadContextImpl<IsMultithreaded>& context, ::gpr_timespec deadline)
{
return detail::GrpcContextImplementation::do_one(context, deadline);
return GrpcContextImplementation::do_one(context, deadline);
}
};

struct GrpcContextDoOneCompletionQueue
{
template <bool IsMultithreaded>
static auto poll(detail::GrpcContextThreadContextImpl<IsMultithreaded>& context, ::gpr_timespec deadline)
static DoOneResult poll(detail::GrpcContextThreadContextImpl<IsMultithreaded>& context, ::gpr_timespec deadline)
{
return detail::GrpcContextImplementation::do_one_completion_queue(context, deadline);
return {GrpcContextImplementation::do_one_completion_queue_event(context, deadline)};
}
};

Expand Down Expand Up @@ -294,13 +294,13 @@ void run(agrpc::GrpcContext& grpc_context, ExecutionContext& execution_context)
template <class Traits, class ExecutionContext, class StopCondition>
void run(agrpc::GrpcContext& grpc_context, ExecutionContext& execution_context, StopCondition stop_condition)
{
detail::GrpcContextImplementation::visit_is_multithreaded(
grpc_context,
[&](auto v)
{
detail::run_impl<decltype(v)::value, detail::GrpcContextDoOne, Traits>(
grpc_context, execution_context, static_cast<StopCondition&&>(stop_condition));
});
if (detail::GrpcContextImplementation::is_multithreaded(grpc_context))
{
return detail::run_impl<true, detail::GrpcContextDoOne, Traits>(grpc_context, execution_context,
static_cast<StopCondition&&>(stop_condition));
}
return detail::run_impl<false, detail::GrpcContextDoOne, Traits>(grpc_context, execution_context,
static_cast<StopCondition&&>(stop_condition));
}

template <class Traits, class ExecutionContext>
Expand All @@ -313,13 +313,13 @@ template <class Traits, class ExecutionContext, class StopCondition>
void run_completion_queue(agrpc::GrpcContext& grpc_context, ExecutionContext& execution_context,
StopCondition stop_condition)
{
detail::GrpcContextImplementation::visit_is_multithreaded(
grpc_context,
[&](auto v)
{
detail::run_impl<decltype(v)::value, detail::GrpcContextDoOneCompletionQueue, Traits>(
grpc_context, execution_context, static_cast<StopCondition&&>(stop_condition));
});
if (detail::GrpcContextImplementation::is_multithreaded(grpc_context))
{
return detail::run_impl<true, detail::GrpcContextDoOneCompletionQueue, Traits>(
grpc_context, execution_context, static_cast<StopCondition&&>(stop_condition));
}
return detail::run_impl<false, detail::GrpcContextDoOneCompletionQueue, Traits>(
grpc_context, execution_context, static_cast<StopCondition&&>(stop_condition));
}

AGRPC_NAMESPACE_END
Expand Down

0 comments on commit c9433ee

Please sign in to comment.