diff --git a/examples/z_queryable.py b/examples/z_queryable.py index eb297e76..f9c6617e 100644 --- a/examples/z_queryable.py +++ b/examples/z_queryable.py @@ -120,7 +120,11 @@ def main(): print("Press CTRL-C to quit...") while True: - time.sleep(1) + try: + time.sleep(1) + except Exception as err: + print(err, flush=True) + raise if __name__ == "__main__": diff --git a/examples/z_sub_thr.py b/examples/z_sub_thr.py index 550e9db2..dc6be158 100644 --- a/examples/z_sub_thr.py +++ b/examples/z_sub_thr.py @@ -118,7 +118,7 @@ def main(): with zenoh.open(conf) as session: session.declare_subscriber( "test/thr", - zenoh.handlers.CallbackDrop(listener, report), + zenoh.handlers.Callback(listener, report), reliability=zenoh.Reliability.RELIABLE, ) diff --git a/src/handlers.rs b/src/handlers.rs index af4e2292..c50a7efb 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -11,19 +11,14 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::{ - fmt, - marker::PhantomData, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{fmt, marker::PhantomData, sync::Arc, time::Duration}; use pyo3::{ - exceptions::{PyAttributeError, PyValueError}, + exceptions::PyValueError, prelude::*, - types::{PyDict, PyType}, + types::{PyCFunction, PyDict, PyType}, }; -use zenoh::handlers::{Callback, IntoHandler}; +use zenoh::handlers::{Callback as RustCallback, IntoHandler}; use crate::{ macros::{import, py_static}, @@ -32,9 +27,6 @@ use crate::{ }; const CHECK_SIGNALS_INTERVAL: Duration = Duration::from_millis(100); -fn check_signals_deadline() -> Instant { - Instant::now() + CHECK_SIGNALS_INTERVAL -} fn exec_callback(callback: impl FnOnce(Python) -> PyResult) { Python::with_gil(|gil| { @@ -157,16 +149,23 @@ impl Handler { #[pyclass] #[derive(Clone, Debug)] -pub(crate) struct CallbackDrop { +pub(crate) struct Callback { callback: PyObject, - drop: PyObject, + drop: Option, + #[pyo3(get)] + indirect: bool, } #[pymethods] -impl CallbackDrop { +impl Callback { #[new] - fn new(callback: PyObject, drop: PyObject) -> Self { - Self { callback, drop } + #[pyo3(signature = (callback, drop, *, indirect = true))] + fn new(callback: PyObject, drop: Option, indirect: bool) -> Self { + Self { + callback, + drop, + indirect, + } } fn __call__(&self, arg: &Bound) -> PyResult { @@ -174,7 +173,10 @@ impl CallbackDrop { } fn drop(&self, py: Python) -> PyResult { - self.drop.call0(py) + match &self.drop { + Some(drop) => drop.call0(py), + None => Ok(py.None()), + } } fn __repr__(&self) -> String { @@ -182,52 +184,40 @@ impl CallbackDrop { } } -pub(crate) enum PythonCallback { - Simple(PyObject), - WithDrop { callback: PyObject, drop: PyObject }, -} +pub(crate) struct PythonCallback(Callback); impl PythonCallback { - fn new(obj: &Bound) -> PyResult { - if let Ok(CallbackDrop { callback, drop }) = CallbackDrop::extract_bound(obj) { - return Ok(Self::WithDrop { callback, drop }); + fn new(obj: &Bound) -> Self { + if let Ok(cb) = Callback::extract_bound(obj) { + return Self(cb); } - let callback = obj.clone().unbind(); - Ok(if obj.hasattr("drop").unwrap_or(false) { - Self::WithDrop { - callback, - drop: obj.getattr("drop")?.unbind(), - } - } else { - PythonCallback::Simple(callback) - }) + Self(Callback::new(obj.clone().unbind(), None, true)) } + fn call(&self, t: T) { - let callback = match self { - Self::Simple(cb) => cb, - Self::WithDrop { callback, .. } => callback, - }; - exec_callback(|gil| callback.call1(gil, (t.into_pyobject(gil),))); + exec_callback(|gil| self.0.callback.call1(gil, (t.into_pyobject(gil),))); } } impl Drop for PythonCallback { fn drop(&mut self) { - if let Self::WithDrop { drop, .. } = self { - exec_callback(|gil| drop.call0(gil)); - } + exec_callback(|gil| self.0.drop(gil)); } } pub(crate) enum IntoHandlerImpl { Rust { - callback: Callback<'static, T::Into>, + callback: RustCallback<'static, T::Into>, handler: Py, }, Python { callback: PythonCallback, handler: PyObject, }, + PythonIndirect { + callback: RustCallback<'static, T::Into>, + handler: PyObject, + }, } impl IntoHandler<'static, T> for IntoHandlerImpl @@ -236,19 +226,19 @@ where { type Handler = HandlerImpl; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (RustCallback<'static, T>, Self::Handler) { match self { Self::Rust { callback, handler } => (callback, HandlerImpl::Rust(handler, PhantomData)), Self::Python { callback, handler } => ( Arc::new(move |t| callback.call(t)), HandlerImpl::Python(handler), ), + Self::PythonIndirect { callback, handler } => (callback, HandlerImpl::Python(handler)), } } } pub(crate) enum HandlerImpl { - // PhantomData just for documentation, until pyo3 accepts generic classes Rust(Py, PhantomData), Python(PyObject), } @@ -287,40 +277,18 @@ where pub(crate) fn try_recv(&self, py: Python) -> PyResult { match self { Self::Rust(handler, _) => handler.borrow(py).try_recv(py), - _ => Err(PyAttributeError::new_err( - "No method 'try_recv'. For Python receiver, use 'receiver' attribute", - )), + Self::Python(handler) => handler.call_method0(py, "try_recv"), } } pub(crate) fn recv(&self, py: Python) -> PyResult { match self { Self::Rust(handler, _) => handler.borrow(py).recv(py), - _ => Err(PyAttributeError::new_err( - "No method 'recv'. For Python receiver, use 'receiver' attribute", - )), + Self::Python(handler) => handler.call_method0(py, "recv"), } } } -fn rust_handler(py: Python, into_handler: H) -> IntoHandlerImpl -where - H::Into: IntoHandler<'static, T::Into>, - >::Handler: Send + Sync, - T::Into: IntoPython, - RustHandler: Receiver, -{ - let (callback, handler) = into_handler.into_rust().into_handler(); - let handler = RustHandler:: { - handler, - _phantom: PhantomData, - }; - IntoHandlerImpl::Rust { - callback, - handler: Py::new(py, Handler(Box::new(handler))).unwrap(), - } -} - struct RustHandler where H::Into: IntoHandler<'static, T::Into>, @@ -365,7 +333,7 @@ where fn recv(&self, py: Python) -> PyResult { recv( py, - || self.handler.recv_deadline(check_signals_deadline()), + || self.handler.recv_timeout(CHECK_SIGNALS_INTERVAL), |err| matches!(err, flume::RecvTimeoutError::Timeout), ) } @@ -386,7 +354,7 @@ where fn recv(&self, py: Python) -> PyResult { recv( py, - || self.handler.recv_deadline(check_signals_deadline()), + || self.handler.recv_timeout(CHECK_SIGNALS_INTERVAL), |err| matches!(err, flume::RecvTimeoutError::Timeout), ) } @@ -405,53 +373,108 @@ where } fn recv(&self, py: Python) -> PyResult { - // TODO use recv_deadline - try_recv(py, || self.handler.recv()) + enum DeadlineError { + Timeout, + Error(E), + } + impl fmt::Display for DeadlineError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Error(err) => write!(f, "{err}"), + Self::Timeout => unreachable!(), + } + } + } + recv( + py, + || match self.handler.recv_timeout(CHECK_SIGNALS_INTERVAL) { + Ok(Some(x)) => Ok(x), + Ok(None) => Err(DeadlineError::Timeout), + Err(err) => Err(DeadlineError::Error(err)), + }, + |err| matches!(err, DeadlineError::Timeout), + ) } } -pub(crate) fn into_handler(obj: &Bound) -> PyResult>> +fn rust_handler(py: Python, into_handler: H) -> IntoHandlerImpl where + H::Into: IntoHandler<'static, T::Into>, + >::Handler: Send + Sync, T::Into: IntoPython, + RustHandler: Receiver, { - if obj.is_none() { - return Ok(None); + let (callback, handler) = into_handler.into_rust().into_handler(); + let handler = RustHandler:: { + handler, + _phantom: PhantomData, + }; + IntoHandlerImpl::Rust { + callback, + handler: Py::new(py, Handler(Box::new(handler))).unwrap(), } +} + +fn python_handler( + py: Python, + callback: &Bound, + handler: PyObject, +) -> PyResult> +where + T::Into: IntoPython, +{ + let callback = PythonCallback::new(callback); + if callback.0.indirect { + let (rust_callback, receiver) = DefaultHandler.into_rust().into_handler(); + let kwargs = PyDict::new_bound(py); + let target = PyCFunction::new_closure_bound(py, None, None, move |args, _| loop { + let py = args.py(); + match py.allow_threads(|| receiver.recv_timeout(CHECK_SIGNALS_INTERVAL)) { + Ok(x) => callback.call(x), + Err(flume::RecvTimeoutError::Timeout) => py.check_signals()?, + Err(flume::RecvTimeoutError::Disconnected) => return PyResult::Ok(()), + } + })?; + kwargs.set_item("target", target)?; + let thread = import!(py, threading.Thread).call((), Some(&kwargs))?; + thread.call_method0("start")?; + Ok(IntoHandlerImpl::PythonIndirect { + callback: rust_callback, + handler, + }) + } else { + Ok(IntoHandlerImpl::Python { callback, handler }) + } +} + +pub(crate) fn into_handler( + py: Python, + obj: Option<&Bound>, +) -> PyResult> +where + T::Into: IntoPython, +{ + let Some(obj) = obj else { + return Ok(rust_handler(py, DefaultHandler)); + }; if let Ok(handler) = obj.extract::() { - return Ok(Some(rust_handler(obj.py(), handler))); + return Ok(rust_handler(py, handler)); } if let Ok(handler) = obj.extract::() { - return Ok(Some(rust_handler(obj.py(), handler))); + return Ok(rust_handler(py, handler)); } if let Ok(handler) = obj.extract::() { - return Ok(Some(rust_handler(obj.py(), handler))); + return Ok(rust_handler(py, handler)); } if obj.is_callable() { - return Ok(Some(IntoHandlerImpl::Python { - callback: PythonCallback::new(obj)?, - handler: obj.py().None(), - })); - } - if let Ok((cb, handler)) = obj.extract::<(Bound, PyObject)>() { + return python_handler(py, obj, py.None()); + } else if let Ok((cb, handler)) = obj.extract::<(Bound, PyObject)>() { if cb.is_callable() { - return Ok(Some(IntoHandlerImpl::Python { - callback: PythonCallback::new(&cb)?, - handler, - })); + return python_handler(py, &cb, handler); } } Err(PyValueError::new_err(format!( - "Invalid handler {}", + "Invalid handler type {}", obj.get_type().name()? ))) } - -pub(crate) fn handler_or_default( - py: Python, - into_handler: Option>, -) -> IntoHandlerImpl -where - T::Into: IntoPython, -{ - into_handler.unwrap_or_else(|| rust_handler(py, DefaultHandler)) -} diff --git a/src/lib.rs b/src/lib.rs index 8184c3dc..0d001ef9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,7 +41,7 @@ pub(crate) mod zenoh { #[pymodule] mod handlers { #[pymodule_export] - use crate::handlers::{CallbackDrop, DefaultHandler, FifoChannel, Handler, RingChannel}; + use crate::handlers::{Callback, DefaultHandler, FifoChannel, Handler, RingChannel}; } #[pymodule_export] diff --git a/src/scouting.rs b/src/scouting.rs index 7e255e0d..4be8f002 100644 --- a/src/scouting.rs +++ b/src/scouting.rs @@ -20,7 +20,7 @@ use pyo3::{ use crate::{ config::{Config, WhatAmI, WhatAmIMatcher, ZenohId}, - handlers::{handler_or_default, into_handler, HandlerImpl, IntoHandlerImpl}, + handlers::{into_handler, HandlerImpl}, macros::{option_wrapper, wrapper}, utils::{generic, wait}, }; @@ -114,13 +114,13 @@ impl Scout { #[pyo3(signature = (handler = None, what = None, config = None))] pub(crate) fn scout( py: Python, - #[pyo3(from_py_with = "into_handler::")] handler: Option>, + handler: Option<&Bound>, #[pyo3(from_py_with = "WhatAmIMatcher::from_py_opt")] what: Option, config: Option, ) -> PyResult { let what = what.unwrap_or_default(); let config = config.unwrap_or_default(); - let handler = handler_or_default(py, handler); + let handler = into_handler(py, handler)?; let scout = wait(py, || zenoh::scout(what, config).with(handler))?; Ok(Scout(Some(scout))) } diff --git a/src/session.rs b/src/session.rs index ef9ab4e0..1934c101 100644 --- a/src/session.rs +++ b/src/session.rs @@ -24,14 +24,13 @@ use crate::{ bytes::ZBytes, config::{Config, ConfigInner, ZenohId}, encoding::Encoding, - handlers::{handler_or_default, into_handler, HandlerImpl, IntoHandlerImpl}, + handlers::{into_handler, HandlerImpl}, info::SessionInfo, key_expr::KeyExpr, macros::{bail, build, build_with, option_wrapper}, publisher::{CongestionControl, Priority, Publisher}, - query::{ConsolidationMode, Query, QueryTarget, Reply}, + query::{ConsolidationMode, QueryTarget, Reply}, queryable::Queryable, - sample::Sample, selector::Selector, subscriber::{Reliability, Subscriber}, utils::{wait, MapInto}, @@ -152,7 +151,7 @@ impl Session { &self, py: Python, #[pyo3(from_py_with = "Selector::from_py")] selector: Selector, - #[pyo3(from_py_with = "into_handler::")] handler: Option>, + handler: Option<&Bound>, target: Option, consolidation: Option, #[pyo3(from_py_with = "timeout")] timeout: Option, @@ -164,8 +163,9 @@ impl Session { #[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option, ) -> PyResult> { let this = self.get_ref()?; + let handler = into_handler(py, handler)?; let build = build_with!( - handler_or_default(py, handler), + handler, this.get(selector), target, consolidation, @@ -190,15 +190,12 @@ impl Session { &self, py: Python, #[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr, - #[pyo3(from_py_with = "into_handler::")] handler: Option>, + handler: Option<&Bound>, reliability: Option, ) -> PyResult> { let this = self.get_ref()?; - let build = build_with!( - handler_or_default(py, handler), - this.declare_subscriber(key_expr), - reliability, - ); + let handler = into_handler(py, handler)?; + let build = build_with!(handler, this.declare_subscriber(key_expr), reliability,); let subscriber = Subscriber { subscriber: Some(wait(py, build)?), session_pool: self.pool.clone_ref(py), @@ -213,15 +210,12 @@ impl Session { &self, py: Python, #[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr, - #[pyo3(from_py_with = "into_handler::")] handler: Option>, + handler: Option<&Bound>, complete: Option, ) -> PyResult> { let this = self.get_ref()?; - let build = build_with!( - handler_or_default(py, handler), - this.declare_queryable(key_expr), - complete, - ); + let handler = into_handler(py, handler)?; + let build = build_with!(handler, this.declare_queryable(key_expr), complete,); let queryable = Queryable { queryable: Some(wait(py, build)?), session_pool: self.pool.clone_ref(py), diff --git a/zenoh/__init__.pyi b/zenoh/__init__.pyi index fc0a3dd6..fd5711e5 100644 --- a/zenoh/__init__.pyi +++ b/zenoh/__init__.pyi @@ -24,7 +24,6 @@ from . import handlers as handlers from .handlers import Handler as Handler _T = TypeVar("_T") -_T_contra = TypeVar("_T_contra", contravariant=True) _H = TypeVar("_H") _F = TypeVar("_F", bound=Callable) @@ -32,12 +31,8 @@ _RustHandler = ( handlers.DefaultHandler[_T] | handlers.FifoChannel[_T] | handlers.RingChannel[_T] ) -class _CallableDrop(Protocol[_T_contra]): - def __call__(self, arg: _T_contra, /) -> Any: ... - def drop(self) -> Any: ... - -_PythonCallback = Callable[[_T_contra], Any] | _CallableDrop[_T_contra] -_PythonHandler = tuple[_PythonCallback[_T_contra], _H] +_PythonCallback = Callable[[_T], Any] +_PythonHandler = tuple[_PythonCallback[_T], _H] @final class ZError(Exception): ... diff --git a/zenoh/handlers.pyi b/zenoh/handlers.pyi index e625328a..0352bd23 100644 --- a/zenoh/handlers.pyi +++ b/zenoh/handlers.pyi @@ -42,3 +42,19 @@ class RingChannel(Generic[_T]): """A synchrounous ring channel with a limited size that allows users to keep the last N data.""" def __new__(cls, capacity: int) -> Self: ... + +RustHandler = DefaultHandler[_T] | FifoChannel[_T] | RingChannel[_T] + +@final +class Callback(Generic[_T]): + def __new__( + cls, + callback: Callable[[_T], Any], + drop: Callable[[], Any] | None = None, + *, + indirect: bool = True, + ) -> Self: ... + def __call__(self, arg: _T, /) -> Any: ... + def drop(self) -> Any: ... + @property + def indirect(self) -> bool: ...