-
Notifications
You must be signed in to change notification settings - Fork 524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WinRT Futures should update their inner Waker #342
Comments
Thanks for the repro, Nathaniel! That's super helpful. A wrapper may be the way to go, but I really don't enjoy all the verbiage Rust requires with all the unwrapping so I'll think about generating the projection to possibly return the wrapper directly so that it doesn't require another method call. |
I was thinking that some of the verbiage could be avoided by making the conversion method not return a result, like this: // Work-around being unable to implement new functions on foreign types
trait IAsyncActionExt {
fn to_future(self) -> IAsyncActionFuture;
}
impl IAsyncActionExt for IAsyncAction {
fn to_future(self) -> IAsyncActionFuture {
IAsyncActionFuture {
inner: self,
shared_waker: Arc::new(Mutex::new(None)),
}
}
}
struct IAsyncActionFuture {
inner: IAsyncAction,
shared_waker: Arc<Mutex<Option<Waker>>>,
}
impl Future for IAsyncActionFuture {
type Output = winrt::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.inner.status()? == AsyncStatus::Started {
let mut shared_waker = self.shared_waker.lock().unwrap();
let shared_waker_is_none = shared_waker.is_none();
*shared_waker = Some(cx.waker().clone());
if shared_waker_is_none {
let shared_waker = self.shared_waker.clone();
self.inner.set_completed(AsyncActionCompletedHandler::new(
move |_sender, _args| {
// The waker will always be some here
shared_waker.lock().unwrap().take().unwrap().wake();
Ok(())
},
))?;
}
Poll::Pending
} else {
Poll::Ready(self.inner.get_results())
}
}
}
fn main() {
let mut tokio_rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
tokio_rt.block_on(async {
let fut_1 = TestRunner::create_async_action(1000).unwrap().to_future();
let fut_2 = TestRunner::create_async_action(2000).unwrap().to_future();
let mut fut_1_fuse = fut_1.fuse();
let mut fut_2_fuse = fut_2.fuse();
let incomplete_future = futures::select! {
res_1 = fut_1_fuse => fut_2_fuse,
res_2 = fut_2_fuse => fut_1_fuse,
};
println!("1 future complete, finishing other on background task...");
// Work-around !Send future, this example still works with Send winrt futures and tokio::spawn.
let local = tokio::task::LocalSet::new();
local.spawn_local(async move {
incomplete_future.await.unwrap();
});
local.await;
println!("Both futures complete!");
});
println!("Done!");
} The only job of the conversion method is to essentially create a wrapper around an |
|
Thanks for the reminder. Yes, now that |
Closing this old issue - doesn't seem to be much demand for such API support. Let me know if you have thoughts to the contrary. 😊 |
This is still an issue and is a footgun for anyone who wants to use async APIs. I might take a crack at implementing a wrapper using IntoFuture, but not this week. |
I also would also like to note that the issue isn't specifically the code I sent above, but the fact that the waker isn't updated. This is a violation of the futures contract. Updates to async executors in the future may break the current code in strange ways, such as permanently hanging like shown in the code above. If fixing this is off the table, would it be possible to at least gate the |
#3142 removes the |
Reopening as there's been some renewed interest in futures support. Here's a sketch of how this can be implemented correctly to accommodate both Rust futures and WinRT async. use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use windows::{core::*, Foundation::*, System::Threading::*};
// The actual async types can implement `IntoFuture` and return this `Future` with some extra state needed to
// keep things moving along. This would just need to be a bit more generic to handle the four kinds of async
// types.
struct AsyncFuture {
// Represents the async execution and provides the virtual methods for setting up a `Completed` handler and
// calling `GetResults` when execution is completed.
inner: IAsyncAction,
// Provides the `Status` virtual method and saves repeated calls to `QueryInterface` during polling.
status: IAsyncInfo,
// A shared waker is needed to keep the `Completed` handler updated.
waker: Option<Arc<Mutex<Waker>>>,
}
impl AsyncFuture {
fn new(inner: IAsyncAction) -> Self {
Self {
status: inner.cast().unwrap(),
inner,
waker: None,
}
}
}
unsafe impl Send for AsyncFuture {}
unsafe impl Sync for AsyncFuture {}
impl Future for AsyncFuture {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// A status of `Started` just means async execution is still in flight. Since WinRT async is always
// "hot start", if its not `Started` then its ready for us to call `GetResults` so we can skip all of
// the remaining set up.
if self.status.Status()? != AsyncStatus::Started {
return Poll::Ready(self.inner.GetResults());
}
if let Some(shared_waker) = &self.waker {
// We have a shared waker which means we're either getting polled again or been transfered to
// another another execution context. Either way, we need to update the shared waker to make sure
// we've got the "current" waker.
let mut guard = shared_waker.lock().unwrap();
guard.clone_from(cx.waker());
// It may be possible that the `Completed` handler acquired the lock and signaled the old waker
// before we managed to acquire the lock to update it with the current waker. We check the status
// again here just in case this happens.
if self.status.Status()? != AsyncStatus::Started {
return Poll::Ready(self.inner.GetResults());
}
} else {
// If we don't have a saved waker it means this is the first time we're getting polled and should
// create the shared waker and set up a `Completed` handler.
let shared_waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(shared_waker.clone());
// Note that the handler can only be set once, which is why we need a shared waker in the first
// place. On the other hand, the handler will get called even if async execution has already
// completed, so we can just return `Pending` after setting the Completed handler.
self.inner
.SetCompleted(&AsyncActionCompletedHandler::new(move |_, _| {
shared_waker.lock().unwrap().wake_by_ref();
Ok(())
}))?;
};
Poll::Pending
}
}
async fn async_test() -> Result<()> {
let object = ThreadPool::RunAsync(&WorkItemHandler::new(|_| {
println!("work");
Ok(())
}))?;
AsyncFuture::new(object).await
}
fn main() -> Result<()> {
futures::executor::block_on(async_test())
} |
This is now fixed by #3213, right? |
Yes. |
Opening this issue as a continuation of #322. While it is now possible to call
poll
on winrt futures more than once, consecutivepoll
calls will not update theWaker
. As a result, these futures will hang forever if a future is transferred between tasks after being polled. A simple example of this behavior:This example will hang indefinitely and never complete. Unfortunately, I don't this its possible to fix this while maintaining the
Future
impl onIAsyncxxx
as some extra memory is needed to store the shared reference to theWaker
to update if needed.In my opinion, IntoFuture is probably the best way forward, however it is currently unstable. Implementing this trait will allow
await
-ing on that value by implicity callinginto_future
, just as theIntoIterator
trait allows iterating over a value that doesn't explicitly implementIterator
. In the meantime, maybe we could remove theFuture
impl fromIAsyncxxx
, replacing it with a method to get a wrapper struct implementingFuture
?The text was updated successfully, but these errors were encountered: