From d21c9f4c17dfab93b0f213e30aa4e1a70bbac1a1 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Thu, 16 Jan 2025 00:43:05 +0100 Subject: [PATCH] feat: support anyio with a Cargo feature --- Cargo.toml | 3 + guide/src/async-await.md | 9 ++- guide/src/building-and-distribution.md | 1 + newsfragments/3612.added.md | 1 + pytests/Cargo.toml | 3 +- pytests/pyproject.toml | 1 + pytests/src/anyio.rs | 34 ++++++++++ pytests/src/lib.rs | 3 + pytests/tests/test_anyio.py | 14 ++++ src/coroutine.rs | 4 ++ src/coroutine/anyio.rs | 73 +++++++++++++++++++++ src/coroutine/trio.rs | 88 ++++++++++++++++++++++++++ src/coroutine/waker.rs | 15 +++-- 13 files changed, 242 insertions(+), 7 deletions(-) create mode 100644 newsfragments/3612.added.md create mode 100644 pytests/src/anyio.rs create mode 100644 pytests/tests/test_anyio.py create mode 100644 src/coroutine/anyio.rs create mode 100644 src/coroutine/trio.rs diff --git a/Cargo.toml b/Cargo.toml index 0bea4306ecf..02e05c10309 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,9 @@ default = ["macros"] # Enables support for `async fn` for `#[pyfunction]` and `#[pymethods]`. experimental-async = ["macros", "pyo3-macros/experimental-async"] +# Switch coroutine implementation to anyio instead of asyncio +anyio = ["experimental-async"] + # Enables pyo3::inspect module and additional type information on FromPyObject # and IntoPy traits experimental-inspect = [] diff --git a/guide/src/async-await.md b/guide/src/async-await.md index 27574181804..613aca5f97c 100644 --- a/guide/src/async-await.md +++ b/guide/src/async-await.md @@ -25,8 +25,6 @@ async fn sleep(seconds: f64, result: Option) -> Option { # } ``` -*Python awaitables instantiated with this method can only be awaited in *asyncio* context. Other Python async runtime may be supported in the future.* - ## `Send + 'static` constraint Resulting future of an `async fn` decorated by `#[pyfunction]` must be `Send + 'static` to be embedded in a Python object. @@ -94,6 +92,13 @@ async fn cancellable(#[pyo3(cancel_handle)] mut cancel: CancelHandle) { # } ``` +## *asyncio* vs. *anyio* + +By default, Python awaitables instantiated with `async fn` can only be awaited in *asyncio* context. + +PyO3 can also target [*anyio*](https://github.com/agronholm/anyio) with the dedicated `anyio` Cargo feature. With it enabled, `async fn` become awaitable both in *asyncio* or [*trio*](https://github.com/python-trio/trio) context. +However, it requires to have the [*sniffio*](https://github.com/python-trio/sniffio) (or *anyio*) library installed. + ## The `Coroutine` type To make a Rust future awaitable in Python, PyO3 defines a [`Coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.Coroutine.html) type, which implements the Python [coroutine protocol](https://docs.python.org/3/library/collections.abc.html#collections.abc.Coroutine). diff --git a/guide/src/building-and-distribution.md b/guide/src/building-and-distribution.md index d3474fedaf7..9c14e5782ad 100644 --- a/guide/src/building-and-distribution.md +++ b/guide/src/building-and-distribution.md @@ -62,6 +62,7 @@ There are many ways to go about this: it is possible to use `cargo` to build the PyO3 has some Cargo features to configure projects for building Python extension modules: - The `extension-module` feature, which must be enabled when building Python extension modules. - The `abi3` feature and its version-specific `abi3-pyXY` companions, which are used to opt-in to the limited Python API in order to support multiple Python versions in a single wheel. +- The `anyio` feature, making PyO3 coroutines target [*anyio*](https://github.com/agronholm/anyio) instead of *asyncio*; either [*sniffio*](https://github.com/python-trio/sniffio) or *anyio* should be added as dependency of the Python extension. This section describes each of these packaging tools before describing how to build manually without them. It then proceeds with an explanation of the `extension-module` feature. Finally, there is a section describing PyO3's `abi3` features. diff --git a/newsfragments/3612.added.md b/newsfragments/3612.added.md new file mode 100644 index 00000000000..4f5f2f24014 --- /dev/null +++ b/newsfragments/3612.added.md @@ -0,0 +1 @@ +Support anyio with a Cargo feature \ No newline at end of file diff --git a/pytests/Cargo.toml b/pytests/Cargo.toml index 1fee3093275..4c6d958dd46 100644 --- a/pytests/Cargo.toml +++ b/pytests/Cargo.toml @@ -8,7 +8,8 @@ publish = false rust-version = "1.63" [dependencies] -pyo3 = { path = "../", features = ["extension-module"] } +futures = "0.3.29" +pyo3 = { path = "../", features = ["extension-module", "anyio"] } [build-dependencies] pyo3-build-config = { path = "../pyo3-build-config" } diff --git a/pytests/pyproject.toml b/pytests/pyproject.toml index 5f78a573124..90d1867b88d 100644 --- a/pytests/pyproject.toml +++ b/pytests/pyproject.toml @@ -20,6 +20,7 @@ classifiers = [ [project.optional-dependencies] dev = [ + "anyio[trio]>=4.0", "hypothesis>=3.55", "pytest-asyncio>=0.21", "pytest-benchmark>=3.4", diff --git a/pytests/src/anyio.rs b/pytests/src/anyio.rs new file mode 100644 index 00000000000..e123a5ae2a3 --- /dev/null +++ b/pytests/src/anyio.rs @@ -0,0 +1,34 @@ +use std::{task::Poll, thread, time::Duration}; + +use futures::{channel::oneshot, future::poll_fn}; +use pyo3::prelude::*; + +#[pyfunction(signature = (seconds, result = None))] +async fn sleep(seconds: f64, result: Option) -> Option { + if seconds <= 0.0 { + let mut ready = false; + poll_fn(|cx| { + if ready { + return Poll::Ready(()); + } + ready = true; + cx.waker().wake_by_ref(); + Poll::Pending + }) + .await; + } else { + let (tx, rx) = oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_secs_f64(seconds)); + tx.send(()).unwrap(); + }); + rx.await.unwrap(); + } + result +} + +#[pymodule] +pub fn anyio(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_function(wrap_pyfunction!(sleep, m)?)?; + Ok(()) +} diff --git a/pytests/src/lib.rs b/pytests/src/lib.rs index b6c32230dac..e1589125484 100644 --- a/pytests/src/lib.rs +++ b/pytests/src/lib.rs @@ -2,6 +2,7 @@ use pyo3::prelude::*; use pyo3::types::PyDict; use pyo3::wrap_pymodule; +pub mod anyio; pub mod awaitable; pub mod buf_and_str; pub mod comparisons; @@ -19,6 +20,7 @@ pub mod subclassing; #[pymodule(gil_used = false)] fn pyo3_pytests(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_wrapped(wrap_pymodule!(anyio::anyio))?; m.add_wrapped(wrap_pymodule!(awaitable::awaitable))?; #[cfg(not(Py_LIMITED_API))] m.add_wrapped(wrap_pymodule!(buf_and_str::buf_and_str))?; @@ -41,6 +43,7 @@ fn pyo3_pytests(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { let sys = PyModule::import(py, "sys")?; let sys_modules = sys.getattr("modules")?.downcast_into::()?; + sys_modules.set_item("pyo3_pytests.anyio", m.getattr("anyio")?)?; sys_modules.set_item("pyo3_pytests.awaitable", m.getattr("awaitable")?)?; sys_modules.set_item("pyo3_pytests.buf_and_str", m.getattr("buf_and_str")?)?; sys_modules.set_item("pyo3_pytests.comparisons", m.getattr("comparisons")?)?; diff --git a/pytests/tests/test_anyio.py b/pytests/tests/test_anyio.py new file mode 100644 index 00000000000..c48435bd2fc --- /dev/null +++ b/pytests/tests/test_anyio.py @@ -0,0 +1,14 @@ +import asyncio + +from pyo3_pytests.anyio import sleep +import trio + + +def test_asyncio(): + assert asyncio.run(sleep(0)) is None + assert asyncio.run(sleep(0.1, 42)) == 42 + + +def test_trio(): + assert trio.run(sleep, 0) is None + assert trio.run(sleep, 0.1, 42) == 42 diff --git a/src/coroutine.rs b/src/coroutine.rs index 26bb69df432..8dbb4bd9512 100644 --- a/src/coroutine.rs +++ b/src/coroutine.rs @@ -18,9 +18,13 @@ use crate::{ Bound, IntoPyObject, IntoPyObjectExt, Py, PyErr, PyObject, PyResult, Python, }; +#[cfg(feature = "anyio")] +mod anyio; mod asyncio; mod awaitable; mod cancel; +#[cfg(feature = "anyio")] +mod trio; mod waker; pub use awaitable::await_in_coroutine; diff --git a/src/coroutine/anyio.rs b/src/coroutine/anyio.rs new file mode 100644 index 00000000000..bb120dd456f --- /dev/null +++ b/src/coroutine/anyio.rs @@ -0,0 +1,73 @@ +//! Coroutine implementation using sniffio to select the appropriate implementation, +//! compatible with anyio. +use crate::{ + coroutine::{asyncio::AsyncioWaker, trio::TrioWaker}, + exceptions::PyRuntimeError, + sync::GILOnceCell, + types::PyAnyMethods, + PyObject, PyResult, Python, +}; + +enum AsyncLib { + Asyncio, + Trio, +} + +fn current_async_library(py: Python<'_>) -> PyResult { + static CURRENT_ASYNC_LIBRARY: GILOnceCell> = GILOnceCell::new(); + let import = || -> PyResult<_> { + Ok(match py.import("sniffio") { + Ok(module) => Some(module.getattr("current_async_library")?.into()), + Err(_) => None, + }) + }; + let Some(func) = CURRENT_ASYNC_LIBRARY.get_or_try_init(py, import)? else { + return Ok(AsyncLib::Asyncio); + }; + match func.bind(py).call0()?.extract()? { + "asyncio" => Ok(AsyncLib::Asyncio), + "trio" => Ok(AsyncLib::Trio), + rt => Err(PyRuntimeError::new_err(format!("unsupported runtime {rt}"))), + } +} + +/// Sniffio/anyio-compatible coroutine waker. +/// +/// Polling a Rust future calls `sniffio.current_async_library` to select the appropriate +/// implementation, either asyncio or trio. +pub(super) enum AnyioWaker { + /// [`AsyncioWaker`] + Asyncio(AsyncioWaker), + /// [`TrioWaker`] + Trio(TrioWaker), +} + +impl AnyioWaker { + pub(super) fn new(py: Python<'_>) -> PyResult { + match current_async_library(py)? { + AsyncLib::Asyncio => Ok(Self::Asyncio(AsyncioWaker::new(py)?)), + AsyncLib::Trio => Ok(Self::Trio(TrioWaker::new(py)?)), + } + } + + pub(super) fn yield_(&self, py: Python<'_>) -> PyResult { + match self { + AnyioWaker::Asyncio(w) => w.yield_(py), + AnyioWaker::Trio(w) => w.yield_(py), + } + } + + pub(super) fn yield_waken(py: Python<'_>) -> PyResult { + match current_async_library(py)? { + AsyncLib::Asyncio => AsyncioWaker::yield_waken(py), + AsyncLib::Trio => TrioWaker::yield_waken(py), + } + } + + pub(super) fn wake(&self, py: Python<'_>) -> PyResult<()> { + match self { + AnyioWaker::Asyncio(w) => w.wake(py), + AnyioWaker::Trio(w) => w.wake(py), + } + } +} diff --git a/src/coroutine/trio.rs b/src/coroutine/trio.rs new file mode 100644 index 00000000000..725e3ba56ef --- /dev/null +++ b/src/coroutine/trio.rs @@ -0,0 +1,88 @@ +//! Coroutine implementation compatible with trio. +use pyo3_macros::pyfunction; + +use crate::{ + intern, + sync::GILOnceCell, + types::{PyAnyMethods, PyCFunction, PyIterator}, + wrap_pyfunction, Bound, Py, PyAny, PyObject, PyResult, Python, +}; + +struct Trio { + cancel_shielded_checkpoint: PyObject, + current_task: PyObject, + current_trio_token: PyObject, + reschedule: PyObject, + succeeded: PyObject, + wait_task_rescheduled: PyObject, +} +impl Trio { + fn get(py: Python<'_>) -> PyResult<&Self> { + static TRIO: GILOnceCell = GILOnceCell::new(); + TRIO.get_or_try_init(py, || { + let module = py.import("trio.lowlevel")?; + Ok(Self { + cancel_shielded_checkpoint: module.getattr("cancel_shielded_checkpoint")?.into(), + current_task: module.getattr("current_task")?.into(), + current_trio_token: module.getattr("current_trio_token")?.into(), + reschedule: module.getattr("reschedule")?.into(), + succeeded: module.getattr("Abort")?.getattr("SUCCEEDED")?.into(), + wait_task_rescheduled: module.getattr("wait_task_rescheduled")?.into(), + }) + }) + } +} + +fn yield_from(coro_func: &Bound<'_, PyAny>) -> PyResult { + PyIterator::from_object(&coro_func.call_method0("__await__")?)? + .next() + .expect("cancel_shielded_checkpoint didn't yield") + .map(Into::into) +} + +/// Asyncio-compatible coroutine waker. +/// +/// Polling a Rust future yields `trio.lowlevel.wait_task_rescheduled()`, while `Waker::wake` +/// reschedule the current task. +pub(super) struct TrioWaker { + task: PyObject, + token: PyObject, +} + +impl TrioWaker { + pub(super) fn new(py: Python<'_>) -> PyResult { + let trio = Trio::get(py)?; + let task = trio.current_task.call0(py)?; + let token = trio.current_trio_token.call0(py)?; + Ok(Self { task, token }) + } + + pub(super) fn yield_(&self, py: Python<'_>) -> PyResult { + static ABORT_FUNC: GILOnceCell> = GILOnceCell::new(); + let abort_func = + ABORT_FUNC.get_or_try_init(py, || wrap_pyfunction!(abort_func, py).map(Into::into))?; + let wait_task_rescheduled = Trio::get(py)? + .wait_task_rescheduled + .call1(py, (abort_func,))?; + yield_from(wait_task_rescheduled.bind(py)) + } + + pub(super) fn yield_waken(py: Python<'_>) -> PyResult { + let checkpoint = Trio::get(py)?.cancel_shielded_checkpoint.call0(py)?; + yield_from(checkpoint.bind(py)) + } + + pub(super) fn wake(&self, py: Python<'_>) -> PyResult<()> { + self.token.call_method1( + py, + intern!(py, "run_sync_soon"), + (&Trio::get(py)?.reschedule, &self.task), + )?; + Ok(()) + } +} + +#[pyfunction(crate = "crate")] +fn abort_func(py: Python<'_>, _arg: &Bound<'_, PyAny>) -> PyResult { + Ok(Trio::get(py)?.succeeded.clone_ref(py)) +} diff --git a/src/coroutine/waker.rs b/src/coroutine/waker.rs index 9ba6fbdaaa4..4132eeb76af 100644 --- a/src/coroutine/waker.rs +++ b/src/coroutine/waker.rs @@ -6,7 +6,6 @@ use std::{ use crate::{ coroutine::{ - asyncio::AsyncioWaker, awaitable::{delegate, YieldOrReturn}, CoroOp, }, @@ -17,10 +16,18 @@ use crate::{ Bound, PyObject, PyResult, Python, }; +cfg_if::cfg_if! { + if #[cfg(feature = "anyio")] { + type WakerImpl = crate::coroutine::anyio::AnyioWaker; + } else { + type WakerImpl = crate::coroutine::asyncio::AsyncioWaker; + } +} + const MIXED_AWAITABLE_AND_FUTURE_ERROR: &str = "Python awaitable mixed with Rust future"; enum State { - Pending(AsyncioWaker), + Pending(WakerImpl), Waken, Delegated(PyObject), } @@ -48,11 +55,11 @@ impl CoroutineWaker { } pub(super) fn yield_(&self, py: Python<'_>) -> PyResult { - let init = || PyResult::Ok(State::Pending(AsyncioWaker::new(py)?)); + let init = || PyResult::Ok(State::Pending(WakerImpl::new(py)?)); let state = self.state.get_or_try_init(py, init)?; match state { State::Pending(waker) => waker.yield_(py), - State::Waken => AsyncioWaker::yield_waken(py), + State::Waken => WakerImpl::yield_waken(py), State::Delegated(obj) => Ok(obj.clone_ref(py)), } }