Skip to content

asioexec::completion_token & ::use_sender #1503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

RobertLeahy
Copy link

See commit message for description of functionality.

Addresses defects discussed in comment on #1501.

Adds two completion tokens for interop with Boost.Asio (standalone Asio
not currently supported but would be simple to support in the future).

asioexec::completion_token performs the most basic transformations
necessary to transform an Asio initiating function into a sender
factory:

- The initiating function returns a sender
- Initiation is deferred until the above-mentioned sender is connected
  and the resulting operation state is started
- The completion handler provided to the initiation (see boost::asio::
  async_initiate) has the following properties:
  - Invocation results in the arguments thereto being sent via a value
    completion signal (this means that errors transmitted via a leading
    boost::system::error_code parameter (i.e. in Asio style) are
    delivered in the value channel, see below)
  - Abandonment thereof (i.e. allowing the lifetime of the completion
    handler, and all objects transitively derived by moving therefrom,
    to end without invoking any of them) results in a stopped completion
    signal
  - Any exception thrown from any intermediate completion handler, or
    the final completion handler, is sent via an error completion signal
    with a std::exception_ptr representing that exception (this is
    accomplished by wrapping the associated executor)
  - The cancellation slot is connected to a cancellation signal which
    is sent when a stop request is received via the receiver's
    associated stop token

The fact that invocations of the completion handler are passed to the
value channel untouched reflects the design intent that the above-
described completion token perform only "the most basic transformations
necessary." This means that the full context of partial success must be
made available and since the error channel is unary this must be
transmitted in the value channel.

For a more ergonomic experience than that described above asioexec::
use_sender is also provided. This uses asioexec::completion_token to
adapt an Asio initiating function into a sender factory and wraps the
returned sender with an additional layer which performs the following
transformations to value completion signals with a leading boost::
system::error_code parameter:

- If that argument compares equal to boost::asio::error::
  operation_aborted transforms the value completion signal into a
  stopped completion signal, otherwise
- If that argument is truthy transforms the value completion signal into
  an error completion signal with an appropriate std::exception_ptr
  (i.e. one which points to a boost::system::system_error), otherwise
- Sends the remainder of the arguments (i.e. all but the boost::system::
  error_code) as a value completion signal
Copy link

copy-pr-bot bot commented Mar 27, 2025

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@ericniebler
Copy link
Collaborator

/ok to test

@AnonymousPC
Copy link

AnonymousPC commented Mar 27, 2025

This PR successfully resolves the issues mentioned in #1501. However, the use_sender tokens still runs on legacy boost::asio contexts.

std::cout << "main: " << std::this_thread::get_id() << std::endl;

auto context = boost::asio::io_context();
auto runner = std::thread([&] {
    auto guard = boost::asio::make_work_guard(context);
    context.run();
});

auto timer = boost::asio::steady_timer(context, std::chrono::seconds(1));
auto task = timer.async_wait(asioexec::use_sender)
            | stdexec::then([] (auto&&...) { std::cout << "then: " << std::this_thread::get_id() << std::endl; });
stdexec::sync_wait(std::move(task));

context.stop();
runner.join();

The output (on my laptop, may vary across different platforms) appears to be:

main: 0x2090d4840
then: 0x16d5d3000

As far as I can tell one of the core concepts in std::execution is that operations are independent from schedulers, which means that a function like std::execution::sender auto async_operation() should always execute within the caller-provided execution context (unless explicitly schedule, starts_on or continutes_on in function body).

@villevoutilainen
Copy link
Contributor

One of the core concepts in std::execution is that operations are independent from schedulers, which means that a function like std::execution::sender auto async_operation() should always execute within the caller-provided execution context (unless explicitly schedule(particular_sched) in function body).

That's a strong preference and recommendation, but not a hard requirement. Some async work might not be able to be moved to another context, even if you can move some parts of its setup with starts_on and its whole continuations with continues_on. And then there's things that can't really move the setup either, such as this little beast:
https://git.qt.io/vivoutil/libunifex-with-qt/-/blob/main/thread_example/threadrunner.cpp?ref_type=heads#L89-96

The QTimer can't be started or stopped on any other thread than the one that owns it. So, its users will have to do that transfer (continues_on) dance before invoking such operations. That doesn't mean that QTimers cannot be used with the rest of the framework, they just happen to require such a QTimer-specific dance.

I can easily imagine that there's some existing async facilities that are similarly thread-pinned to a particular i/o thread, and can't be completely moved to another scheduler, even if you can move parts of operating them, like doing a starts_on, but that can't be assumed to mean that every part of it moves. It's named "starts_on", not "runs_completely_on".

@ericniebler
Copy link
Collaborator

However, the use_sender tokens still runs on legacy boost::asio contexts.

playing the n00b card: what is the non-legacy way to do this? what are the advantages?

@AnonymousPC
Copy link

AnonymousPC commented Mar 27, 2025

However, the use_sender tokens still runs on legacy boost::asio contexts.

playing the n00b card: what is the non-legacy way to do this? what are the advantages?

In this case:

boost::asio::io_context external_context;

auto function() {
  auto work = boost::asio::ssl_stream(external_context).async_connect(website)
            | std::execution::let_value([] { return async_ssl_handshake(); /*here*/ })
            | std::execution::let_value([] { return async_send_request(); /*and here*/ });
  std::execution::sync_wait(work);
}

I'm wondering which thread will execute the "here" scope:

  • The function caller's thread? (stdexec/examples use this one)
  • The external_context's thread? ("legacy" boost::asio use this one)
  • The main thread?
  • A system context thread?
  • Or could it be any of the above?

A reasonable assumption might be the caller's thread, because:

  1. std::execution only has access to the caller thread in this scenario.
  2. If the "here" scope were to run on the external_context's thread, the function caller's thread would still need to busy-wait for it (potentially causing ~2x CPU clock usage due to contention).

Would appreciate any insights or corrections on this understanding!
(Above is translated by GPT as english is not my mother-language, and i've tried hard not making the grammar seems offensive. Thank you !)

@RobertLeahy
Copy link
Author

However, the use_sender tokens still runs on legacy boost::asio contexts.

playing the n00b card: what is the non-legacy way to do this? what are the advantages?

I think the boost::asio::io_context (in which work is running) is what's being called "legacy," rather than what's actually being done.

std::this_thread::sync_wait must generate a receiver to connect to the provided sender. This receiver has an associated scheduler and delegation scheduler obtained from a std::execution::run_loop execution context.

Asio asynchronous operations have one or two executors associated with them: The "I/O executor" which is associated with the "I/O object" upon which the operation is being performed, and the "associated executor" which is associated with the completion handler. When obtaining the executor which will actually be used operations are expected to consult the completion handler association, provide the I/O executor as a fallback, and then perform all work (except that contained within the initiation) thereupon.

Note that modern versions of Asio actually have two hooks for providing an associated executor: One to be used in the case of deferred execution, and one to be used in the case of immediate execution. This distinction is important because in the absence of an immediate executor association Asio asynchronou operations which complete "inline" (i.e. from within the initiation) are required to complete non-reentrantly. This is possible because in the Asio executor model whether or not work is allowed to complete inline (i.e. from within the execute member function of the executor) can be controlled via the "blocking" property.

It's important to note that S&R schedulers do not support this functionality. You cannot ask an S&R scheduler whether or not its schedule operation is permitted to complete inline within start, and you cannot ask an S&R scheduler to provide you with an updated version of itself which provides (or doesn't provide) a guarantee one way or another.

The above means that S&R schedulers lack the ability to express a fundamental Asio basis operation: A request to execute work non-reentrantly. Since asynchronous operations in the Asio model do not complete inline (unless the user has explicitly opted into this via the immediate executor association) many Asio asynchronous operations will assume this property about their intermediate asynchronous operations and will only be correct (i.e. not liable to overflow the stack) if this is satisfied.

Which gives us the background to come back to the question at hand. The issue here is that asioexec::completion_token (which asioexec::use_sender is built on top of) does not obtain a scheduler from the receiver's environment and pass this through as the associated executor from the completion handler it synthesizes to interoperate with Asio asynchronous operations. The fundamental reason for this is, as described above, S&R schedulers cannot provide all the same guarantees as Asio executors, and that this would therefore be unsafe and undesirable.

The argument could be made that an executor could be synthesized which wraps both a scheduler and the I/O executor, and which obtains non-reentrant execution by requesting such execution from the executor, and then scheduling back to the scheduler but this would be inefficient and would still require additional careful care and attention to satisfy Asio's guarantees that no move constructors of the completion handler are called except from the associated executor's execution context.

Ultimately, at least as of now, the S&R ecosystem does not provide guarantees anywhere near as strict around schedulers and execution contexts as Asio, and therefore while there is a scheduler available in the receiver's environment operations are free within the S&R model to ignore it, and that's what this integration does (at least as of now).

@RobertLeahy
Copy link
Author

In this case:

boost::asio::io_context external_context;

auto function() {
  auto work = boost::asio::ssl_stream(external_context).async_connect(website)
            | std::execution::let_value([] { return async_ssl_handshake(); /*here*/ })
            | std::execution::let_value([] { return async_send_request(); /*and here*/ });
  std::execution::sync_wait(work);
}

I'm wondering which thread will execute the "here" scope [...]

Assuming async_connect is parameterized with asioexec::use_sender from this PR the first here will run within external_context. If we assume that async_ssl_handshake sources a sender from an I/O object also associated with external_context the same will also be true for and here.

@ericniebler
Copy link
Collaborator

This is possible because in the Asio executor model whether or not work is allowed to complete inline (i.e. from within the execute member function of the executor) can be controlled via the "blocking" property.

we've long discussed the possibility of a "blocking" sender attribute. it needs a design and a champion. i would love to close this gap.

Copy link
Collaborator

@ericniebler ericniebler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love this, it's looking good. needs tests and a clang-format. i can help with the cmakery needed for the tests, if you like.

@@ -0,0 +1,411 @@
#pragma once
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs license and copyright header

Comment on lines +28 to +29
struct signature<void(Args...)> : std::type_identity<
::stdexec::set_value_t(Args...)> {};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
struct signature<void(Args...)> : std::type_identity<
::stdexec::set_value_t(Args...)> {};
struct signature<void(Args...)> {
using type = ::stdexec::set_value_t(Args...)>
};

it's clearer and saves a class template instantiation.

assert(!*prev);
}
try {
std::invoke(std::forward<F>(f));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::invoke(std::forward<F>(f));
std::forward<F>(f)();

std::invoke is a piggy and isn't needed here.

::stdexec::stop_callback_for_t<
::stdexec::stop_token_of_t<
::stdexec::env_of_t<Receiver>>,
stop_callback>> callback_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i prefer to define all the members in one place -- top or bottom of the class, i don't care which -- but pick one. and unless there's a good reason, they should all have the same accessibility.

finally, unless you expect this hierarchy to be openly extensible, i prefer private/friend over protected.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason this base class exists at all is to be SCARY.

Given that it's buried in a detail namespace I don't expect the hierarchy to be openly extensible, but I also don't expect anyone to look at or care about any of the members, so I was just using the accessibility that got me what I needed with the least amount of typing.

self_->completed_ = nullptr;
}
self_ = nullptr;
std::invoke(channel, std::move(r), std::forward<Args>(args)...);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::invoke(channel, std::move(r), std::forward<Args>(args)...);
channel(std::move(r), std::forward<Args>(args)...);

Comment on lines +67 to +73
//template<typename ChildOp>
// requires
// execution::inlinable_receiver<Receiver, ChildOp> &&
// std::is_nothrow_move_constructible_v<Receiver>
//constexpr static receiver make_receiver_for(ChildOp* op) noexcept {
// return receiver(Receiver::make_receiver_for(op));
//}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//template<typename ChildOp>
// requires
// execution::inlinable_receiver<Receiver, ChildOp> &&
// std::is_nothrow_move_constructible_v<Receiver>
//constexpr static receiver make_receiver_for(ChildOp* op) noexcept {
// return receiver(Receiver::make_receiver_for(op));
//}

Comment on lines +26 to +31
try {
return std::make_exception_ptr(
::boost::system::system_error(std::move(ec)));
} catch (...) {
return std::current_exception();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugh. your code is fine, i'm just appalled that constructing an exception can throw. i know that's true of std::runtime_error too. i feel like if you can't construct an exception, the program should terminate.

don't mind me, i'm just ranting.

Copy link
Author

@RobertLeahy RobertLeahy Mar 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code:

try {
  throw ::boost::system::system_error(std::move(ec));
} catch (...) {
  return std::current_exception();
}

Doesn't make obvious the fact that constructing an exception can throw, but also incurs the cost of a try/catch. I'd normally use the above but I expect the code you commented on to be a bit "hotter" than other kinds of error handling code I've written.

Comment on lines +36 to +38
template<typename T>
requires std::constructible_from<Receiver, T>
constexpr explicit receiver(T&& t) noexcept(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a matter of taste, but i tend not to concept-check internal implementation details like receivers and operation states. a failed requirement there is a library bug, and the concept diagnostic is unlikely to be an improvement. and concept checks are extra work for the compiler.

but like i said, just a matter of taste.

template<typename T, typename... Args>
requires is_error_code<T>
struct transform_completion_signature<::stdexec::set_value_t(T, Args...)>
: std::type_identity<::stdexec::set_value_t(Args...)> {};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the set_value on line 51, it looks like such a completion could result in a call to either set_value, set_stopped, or set_error. why is it mapped to only set_value_t here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're building on top of completion_token and that unconditionally adds set_error_t(std::exception_ptr) (because intermediate completion handlers could throw) and set_stopped_t() (because of abandonment) so we don't need to additionally do it here (and then deduplicate).

{}
using sender_concept = ::stdexec::sender_t;
template<typename Self, typename Env>
constexpr auto get_completion_signatures(this Self&&, const Env&) noexcept ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for deducing this here.

Comment on lines +171 to +174
std::invoke(
std::move(init_),
completion_handler<Receiver>(*this),
std::forward<decltype(args)>(args)...);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are comfortable saying that init_ is never going to be a completion_handler member pointer, then we can change this to not use std::invoke also.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know that we can say that? I'd need to read very finely the expectations that async_initiate has for the Initiation.

@ericniebler
Copy link
Collaborator

/ok to test

@ericniebler
Copy link
Collaborator

/ok to test

@ericniebler
Copy link
Collaborator

/ok to test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants