-
Notifications
You must be signed in to change notification settings - Fork 791
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add
PyFuture
to await Python awaitables
- Loading branch information
Showing
11 changed files
with
605 additions
and
131 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
# Awaiting Python awaitables | ||
|
||
Python awaitable can be awaited on Rust side using [`PyFuture`]({{#PYO3_DOCS_URL}}/pyo3/types/struct.PyFuture.html). | ||
|
||
```rust | ||
# #![allow(dead_code)] | ||
use pyo3::{prelude::*, types::PyFuture}; | ||
|
||
#[pyfunction] | ||
async fn wrap_awaitable(awaitable: PyObject) -> PyResult<PyObject> { | ||
let future: Py<PyFuture> = Python::with_gil(|gil| Py::from_object(gil, awaitable))?; | ||
future.await | ||
} | ||
``` | ||
|
||
`PyFuture::from_object` construct a `PyFuture` from a Python awaitable object, by calling its `__await__` method (or `__iter__` for generator-based coroutine). | ||
|
||
## Restrictions | ||
|
||
`PyFuture` can only be awaited in the context of a PyO3 coroutine. Otherwise, it panics. | ||
|
||
```rust | ||
# #![allow(dead_code)] | ||
use pyo3::{prelude::*, types::PyFuture}; | ||
|
||
#[pyfunction] | ||
fn block_on(awaitable: PyObject) -> PyResult<PyObject> { | ||
let future: Py<PyFuture> = Python::with_gil(|gil| Py::from_object(gil, awaitable))?; | ||
futures::executor::block_on(future) // ERROR: PyFuture must be awaited in coroutine context | ||
} | ||
``` | ||
|
||
`PyFuture` must be the only Rust future awaited; it means that it's forbidden to `select!` a `Pyfuture`. Otherwise, it panics. | ||
|
||
```rust | ||
# #![allow(dead_code)] | ||
use std::future; | ||
use futures::FutureExt; | ||
use pyo3::{prelude::*, types::PyFuture}; | ||
|
||
#[pyfunction] | ||
async fn select(awaitable: PyObject) -> PyResult<PyObject> { | ||
let future: Py<PyFuture> = Python::with_gil(|gil| Py::from_object(gil, awaitable))?; | ||
futures::select_biased! { | ||
_ = future::pending::<()>().fuse() => unreachable!(), | ||
res = future.fuse() => res, // ERROR: Python awaitable mixed with Rust future | ||
} | ||
} | ||
``` | ||
|
||
These restrictions exist because awaiting a `PyFuture` strongly binds it to the enclosing coroutine. The coroutine will then delegate its `send`/`throw`/`close` methods to the awaited `PyFuture`. If it was awaited in a `select!`, `Coroutine::send` would no able to know if the value passed would have to be delegated to the `Pyfuture` or not. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
//! Coroutine implementation compatible with asyncio. | ||
use crate::sync::GILOnceCell; | ||
use crate::types::{PyCFunction, PyIterator}; | ||
use crate::{intern, wrap_pyfunction, IntoPy, Py, PyAny, PyObject, PyResult, Python}; | ||
use pyo3_macros::pyfunction; | ||
|
||
/// `asyncio.get_running_loop` | ||
fn get_running_loop(py: Python<'_>) -> PyResult<&PyAny> { | ||
static GET_RUNNING_LOOP: GILOnceCell<PyObject> = GILOnceCell::new(); | ||
let import = || -> PyResult<_> { | ||
let module = py.import("asyncio")?; | ||
Ok(module.getattr("get_running_loop")?.into()) | ||
}; | ||
GET_RUNNING_LOOP | ||
.get_or_try_init(py, import)? | ||
.as_ref(py) | ||
.call0() | ||
} | ||
|
||
/// Asyncio-compatible coroutine waker. | ||
/// | ||
/// Polling a Rust future yields an `asyncio.Future`, whose `set_result` method is called | ||
/// when `Waker::wake` is called. | ||
pub(super) struct AsyncioWaker { | ||
event_loop: PyObject, | ||
future: PyObject, | ||
} | ||
|
||
impl AsyncioWaker { | ||
pub(super) fn new(py: Python<'_>) -> PyResult<Self> { | ||
let event_loop = get_running_loop(py)?.into_py(py); | ||
let future = event_loop.call_method0(py, "create_future")?; | ||
Ok(Self { event_loop, future }) | ||
} | ||
|
||
pub(super) fn yield_(&self, py: Python<'_>) -> PyResult<PyObject> { | ||
let __await__; | ||
// `asyncio.Future` must be awaited; in normal case, it implements `__iter__ = __await__`, | ||
// but `create_future` may have been overriden | ||
let mut iter = match PyIterator::from_object(self.future.as_ref(py)) { | ||
Ok(iter) => iter, | ||
Err(_) => { | ||
__await__ = self.future.call_method0(py, intern!(py, "__await__"))?; | ||
PyIterator::from_object(__await__.as_ref(py))? | ||
} | ||
}; | ||
// future has not been waken (because `yield_waken` would have been called | ||
// otherwise), so it is expected to yield itself | ||
Ok(iter.next().expect("future didn't yield")?.into_py(py)) | ||
} | ||
|
||
pub(super) fn yield_waken(py: Python<'_>) -> PyResult<PyObject> { | ||
Ok(py.None().into()) | ||
} | ||
|
||
pub(super) fn wake(&self, py: Python<'_>) -> PyResult<()> { | ||
static RELEASE_WAITER: GILOnceCell<Py<PyCFunction>> = GILOnceCell::new(); | ||
let release_waiter = RELEASE_WAITER | ||
.get_or_try_init(py, || wrap_pyfunction!(release_waiter, py).map(Into::into))?; | ||
// `Future.set_result` must be called in event loop thread, | ||
// so it requires `call_soon_threadsafe` | ||
let call_soon_threadsafe = self.event_loop.call_method1( | ||
py, | ||
intern!(py, "call_soon_threadsafe"), | ||
(release_waiter, self.future.as_ref(py)), | ||
); | ||
if let Err(err) = call_soon_threadsafe { | ||
// `call_soon_threadsafe` will raise if the event loop is closed; | ||
// instead of catching an unspecific `RuntimeError`, check directly if it's closed. | ||
let is_closed = self.event_loop.call_method0(py, "is_closed")?; | ||
if !is_closed.extract(py)? { | ||
return Err(err); | ||
} | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
/// Call `future.set_result` if the future is not done. | ||
/// | ||
/// Future can be cancelled by the event loop before being waken. | ||
/// See <https://github.com/python/cpython/blob/main/Lib/asyncio/tasks.py#L452C5-L452C5> | ||
#[pyfunction(crate = "crate")] | ||
fn release_waiter(future: &PyAny) -> PyResult<()> { | ||
let done = future.call_method0(intern!(future.py(), "done"))?; | ||
if !done.extract::<bool>()? { | ||
future.call_method1(intern!(future.py(), "set_result"), (future.py().None(),))?; | ||
} | ||
Ok(()) | ||
} |
Oops, something went wrong.