Skip to content

Commit

Permalink
feat: add PyFuture to await Python awaitables
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Apr 7, 2024
1 parent 199d4f5 commit e15675f
Show file tree
Hide file tree
Showing 13 changed files with 686 additions and 173 deletions.
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- [Mapping of Rust types to Python types](conversions/tables.md)
- [Conversion traits](conversions/traits.md)
- [Using `async` and `await`](async-await.md)
- [Awaiting Python awaitables](async-await/pyfuture.md)
- [Parallelism](parallelism.md)
- [Debugging](debugging.md)
- [Features reference](features.md)
Expand Down
62 changes: 62 additions & 0 deletions guide/src/async-await/pyfuture.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Awaiting Python awaitables

Python awaitable can be awaited on Rust side using [`PyFuture`]({{#PYO3_DOCS_URL}}/pyo3/types/struct.PyFuture.html).

```rust
# # ![allow(dead_code)]
# #[cfg(feature = "experimental-async")] {
use pyo3::{prelude::*, types::PyFuture};

#[pyfunction]
async fn wrap_awaitable(awaitable: PyObject) -> PyResult<PyObject> {
let future = Python::with_gil(|gil| PyFuture::from_unbound_object(gil, awaitable))?;
future.await
}
# }
```

`PyFuture` is constructed from a Python awaitablef by calling its `__await__` method
(or `__iter__` for generator-based coroutine).

## Restrictions

`PyFuture` can only be awaited in the context of a PyO3 coroutine. Otherwise, it panics.

```rust
# # ![allow(dead_code)]
# #[cfg(feature = "experimental-async")] {
use pyo3::{prelude::*, types::PyFuture};

#[pyfunction]
fn block_on(awaitable: PyObject) -> PyResult<PyObject> {
let future = Python::with_gil(|gil| PyFuture::from_unbound_object(gil, awaitable))?;
futures::executor::block_on(future) // ERROR: PyFuture must be awaited in coroutine context
}
# }
```

`PyFuture` must be the only Rust future awaited; it means that it's forbidden to `select!` a `Pyfuture`. Otherwise, it
panics.

```rust
# # ![allow(dead_code)]
# #[cfg(feature = "experimental-async")] {
use std::future;
use futures::FutureExt;
use pyo3::{prelude::*, types::PyFuture};

#[pyfunction]
async fn select(awaitable: PyObject) -> PyResult<PyObject> {
let future = Python::with_gil(|gil| PyFuture::from_unbound_object(gil, awaitable))?;
futures::select_biased! {
_ = future::pending::<()>().fuse() => unreachable!(),
res = future.fuse() => res, // ERROR: Python awaitable mixed with Rust future
}
}
# }
```

These restrictions exist because awaiting a `PyFuture` strongly binds it to the enclosing coroutine. The coroutine will
then delegate its `send`/`throw`/`close` methods to the awaited `PyFuture`. If it was awaited in
a `select!`, `Coroutine::send` would no able to know if the value passed would have to be delegated to the `Pyfuture` or
not.
1 change: 1 addition & 0 deletions newsfragments/3611.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `PyFuture` to await Python awaitables
6 changes: 5 additions & 1 deletion pyo3-ffi/src/abstract_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ extern "C" {
pub fn PyIter_Next(arg1: *mut PyObject) -> *mut PyObject;
#[cfg(all(not(PyPy), Py_3_10))]
#[cfg_attr(PyPy, link_name = "PyPyIter_Send")]
pub fn PyIter_Send(iter: *mut PyObject, arg: *mut PyObject, presult: *mut *mut PyObject);
pub fn PyIter_Send(
iter: *mut PyObject,
arg: *mut PyObject,
presult: *mut *mut PyObject,
) -> c_int;

#[cfg_attr(PyPy, link_name = "PyPyNumber_Check")]
pub fn PyNumber_Check(o: *mut PyObject) -> c_int;
Expand Down
107 changes: 57 additions & 50 deletions src/coroutine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,29 @@ use std::{
use pyo3_macros::{pyclass, pymethods};

use crate::{
coroutine::{cancel::ThrowCallback, waker::AsyncioWaker},
coroutine::{cancel::ThrowCallback, waker::CoroutineWaker},
exceptions::{PyAttributeError, PyRuntimeError, PyStopIteration},
panic::PanicException,
types::{string::PyStringMethods, PyIterator, PyString},
Bound, IntoPy, Py, PyAny, PyErr, PyObject, PyResult, Python,
types::{string::PyStringMethods, PyString},
IntoPy, Py, PyErr, PyObject, PyResult, Python,
};

mod asyncio;
pub(crate) mod cancel;
mod waker;
pub(crate) mod waker;

use crate::marker::Ungil;
pub use cancel::CancelHandle;

use crate::{exceptions::PyGeneratorExit, marker::Ungil};

const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine";

pub(crate) enum CoroOp {
Send(PyObject),
Throw(PyObject),
Close,
}

trait CoroutineFuture: Send {
fn poll(self: Pin<&mut Self>, py: Python<'_>, waker: &Waker) -> Poll<PyResult<PyObject>>;
}
Expand Down Expand Up @@ -69,7 +77,7 @@ pub struct Coroutine {
qualname_prefix: Option<&'static str>,
throw_callback: Option<ThrowCallback>,
future: Option<Pin<Box<dyn CoroutineFuture>>>,
waker: Option<Arc<AsyncioWaker>>,
waker: Option<Arc<CoroutineWaker>>,
}

impl Coroutine {
Expand Down Expand Up @@ -104,58 +112,55 @@ impl Coroutine {
}
}

fn poll(&mut self, py: Python<'_>, throw: Option<PyObject>) -> PyResult<PyObject> {
fn poll_inner(&mut self, py: Python<'_>, mut op: CoroOp) -> PyResult<PyObject> {
// raise if the coroutine has already been run to completion
let future_rs = match self.future {
Some(ref mut fut) => fut,
None => return Err(PyRuntimeError::new_err(COROUTINE_REUSED_ERROR)),
};
// reraise thrown exception it
match (throw, &self.throw_callback) {
(Some(exc), Some(cb)) => cb.throw(exc),
(Some(exc), None) => {
self.close();
return Err(PyErr::from_value_bound(exc.into_bound(py)));
}
(None, _) => {}
// if the future is not pending on a Python awaitable,
// execute throw callback or complete on close
if !matches!(self.waker, Some(ref w) if w.is_delegated(py)) {
match op {
send @ CoroOp::Send(_) => op = send,
CoroOp::Throw(exc) => match &self.throw_callback {
Some(cb) => {
cb.throw(exc.clone_ref(py));
op = CoroOp::Send(py.None());
}
None => return Err(PyErr::from_value_bound(exc.into_bound(py))),
},
CoroOp::Close => return Err(PyGeneratorExit::new_err(py.None())),
};
}
// create a new waker, or try to reset it in place
if let Some(waker) = self.waker.as_mut().and_then(Arc::get_mut) {
waker.reset();
waker.reset(op);
} else {
self.waker = Some(Arc::new(AsyncioWaker::new()));
self.waker = Some(Arc::new(CoroutineWaker::new(op)));
}
// poll the future and forward its results if ready
// poll the future and forward its results if ready; otherwise, yield from waker
// polling is UnwindSafe because the future is dropped in case of panic
let waker = Waker::from(self.waker.clone().unwrap());
let poll = || future_rs.as_mut().poll(py, &waker);
match panic::catch_unwind(panic::AssertUnwindSafe(poll)) {
Ok(Poll::Ready(res)) => {
self.close();
return Err(PyStopIteration::new_err(res?));
}
Err(err) => {
self.close();
return Err(PanicException::from_panic_payload(err));
}
_ => {}
Err(err) => Err(PanicException::from_panic_payload(err)),
Ok(Poll::Ready(res)) => Err(PyStopIteration::new_err(res?)),
Ok(Poll::Pending) => match self.waker.as_ref().unwrap().yield_(py) {
Ok(to_yield) => Ok(to_yield),
Err(err) => Err(err),
},
}
// otherwise, initialize the waker `asyncio.Future`
if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? {
// `asyncio.Future` must be awaited; fortunately, it implements `__iter__ = __await__`
// and will yield itself if its result has not been set in polling above
if let Some(future) = PyIterator::from_bound_object(&future.as_borrowed())
.unwrap()
.next()
{
// future has not been leaked into Python for now, and Rust code can only call
// `set_result(None)` in `Wake` implementation, so it's safe to unwrap
return Ok(future.unwrap().into());
}
}

fn poll(&mut self, py: Python<'_>, op: CoroOp) -> PyResult<PyObject> {
let result = self.poll_inner(py, op);
if result.is_err() {
// the Rust future is dropped, and the field set to `None`
// to indicate the coroutine has been run to completion
drop(self.future.take());
}
// if waker has been waken during future polling, this is roughly equivalent to
// `await asyncio.sleep(0)`, so just yield `None`.
Ok(py.None().into_py(py))
result
}
}

Expand All @@ -180,25 +185,27 @@ impl Coroutine {
}
}

fn send(&mut self, py: Python<'_>, _value: &Bound<'_, PyAny>) -> PyResult<PyObject> {
self.poll(py, None)
fn send(&mut self, py: Python<'_>, value: PyObject) -> PyResult<PyObject> {
self.poll(py, CoroOp::Send(value))
}

fn throw(&mut self, py: Python<'_>, exc: PyObject) -> PyResult<PyObject> {
self.poll(py, Some(exc))
self.poll(py, CoroOp::Throw(exc))
}

fn close(&mut self) {
// the Rust future is dropped, and the field set to `None`
// to indicate the coroutine has been run to completion
drop(self.future.take());
fn close(&mut self, py: Python<'_>) -> PyResult<()> {
match self.poll(py, CoroOp::Close) {
Ok(_) => Ok(()),
Err(err) if err.is_instance_of::<PyGeneratorExit>(py) => Ok(()),
Err(err) => Err(err),
}
}

fn __await__(self_: Py<Self>) -> Py<Self> {
self_
}

fn __next__(&mut self, py: Python<'_>) -> PyResult<PyObject> {
self.poll(py, None)
self.poll(py, CoroOp::Send(py.None()))
}
}
96 changes: 96 additions & 0 deletions src/coroutine/asyncio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//! Coroutine implementation compatible with asyncio.
use pyo3_macros::pyfunction;

use crate::{
intern,
sync::GILOnceCell,
types::{PyAnyMethods, PyCFunction, PyIterator},
wrap_pyfunction_bound, Bound, IntoPy, Py, PyAny, PyObject, PyResult, Python,
};

/// `asyncio.get_running_loop`
fn get_running_loop(py: Python<'_>) -> PyResult<Bound<'_, PyAny>> {
static GET_RUNNING_LOOP: GILOnceCell<PyObject> = GILOnceCell::new();
let import = || -> PyResult<_> {
let module = py.import_bound("asyncio")?;
Ok(module.getattr("get_running_loop")?.into())
};
GET_RUNNING_LOOP
.get_or_try_init(py, import)?
.bind(py)
.call0()
}

/// Asyncio-compatible coroutine waker.
///
/// Polling a Rust future yields an `asyncio.Future`, whose `set_result` method is called
/// when `Waker::wake` is called.
pub(super) struct AsyncioWaker {
event_loop: PyObject,
future: PyObject,
}

impl AsyncioWaker {
pub(super) fn new(py: Python<'_>) -> PyResult<Self> {
let event_loop = get_running_loop(py)?.into_py(py);
let future = event_loop.call_method0(py, "create_future")?;
Ok(Self { event_loop, future })
}

pub(super) fn yield_(&self, py: Python<'_>) -> PyResult<PyObject> {
let __await__;
// `asyncio.Future` must be awaited; in normal case, it implements `__iter__ = __await__`,
// but `create_future` may have been overriden
let mut iter = match PyIterator::from_bound_object(self.future.bind(py)) {
Ok(iter) => iter,
Err(_) => {
__await__ = self.future.call_method0(py, intern!(py, "__await__"))?;
PyIterator::from_bound_object(__await__.bind(py))?
}
};
// future has not been wakened (because `yield_waken` would have been called
// otherwise), so it is expected to yield itself
Ok(iter.next().expect("future didn't yield")?.into_py(py))
}

#[allow(clippy::unnecessary_wraps)]
pub(super) fn yield_waken(py: Python<'_>) -> PyResult<PyObject> {
Ok(py.None().into())
}

pub(super) fn wake(&self, py: Python<'_>) -> PyResult<()> {
static RELEASE_WAITER: GILOnceCell<Py<PyCFunction>> = GILOnceCell::new();
let release_waiter = RELEASE_WAITER.get_or_try_init(py, || {
wrap_pyfunction_bound!(release_waiter, py).map(Into::into)
})?;
// `Future.set_result` must be called in event loop thread,
// so it requires `call_soon_threadsafe`
let call_soon_threadsafe = self.event_loop.call_method1(
py,
intern!(py, "call_soon_threadsafe"),
(release_waiter, &self.future),
);
if let Err(err) = call_soon_threadsafe {
// `call_soon_threadsafe` will raise if the event loop is closed;
// instead of catching an unspecific `RuntimeError`, check directly if it's closed.
let is_closed = self.event_loop.call_method0(py, "is_closed")?;
if !is_closed.extract(py)? {
return Err(err);
}
}
Ok(())
}
}

/// Call `future.set_result` if the future is not done.
///
/// Future can be cancelled by the event loop before being waken.
/// See <https://github.com/python/cpython/blob/main/Lib/asyncio/tasks.py#L452C5-L452C5>
#[pyfunction(crate = "crate")]
fn release_waiter(future: Bound<'_, PyAny>) -> PyResult<()> {
let done = future.call_method0(intern!(future.py(), "done"))?;
if !done.extract::<bool>()? {
future.call_method1(intern!(future.py(), "set_result"), (future.py().None(),))?;
}
Ok(())
}
Loading

0 comments on commit e15675f

Please sign in to comment.