Skip to content

Commit

Permalink
Merge pull request #232 from ZettaScaleLabs/indirect_handler
Browse files Browse the repository at this point in the history
feat: make callbacks indirect by default
  • Loading branch information
milyin authored Jun 18, 2024
2 parents 91d46da + f1463a2 commit c3ce7ef
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 130 deletions.
6 changes: 5 additions & 1 deletion examples/z_queryable.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ def main():

print("Press CTRL-C to quit...")
while True:
time.sleep(1)
try:
time.sleep(1)
except Exception as err:
print(err, flush=True)
raise


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion examples/z_sub_thr.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def main():
with zenoh.open(conf) as session:
session.declare_subscriber(
"test/thr",
zenoh.handlers.CallbackDrop(listener, report),
zenoh.handlers.Callback(listener, report),
reliability=zenoh.Reliability.RELIABLE,
)

Expand Down
223 changes: 123 additions & 100 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,14 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::{
fmt,
marker::PhantomData,
sync::Arc,
time::{Duration, Instant},
};
use std::{fmt, marker::PhantomData, sync::Arc, time::Duration};

use pyo3::{
exceptions::{PyAttributeError, PyValueError},
exceptions::PyValueError,
prelude::*,
types::{PyDict, PyType},
types::{PyCFunction, PyDict, PyType},
};
use zenoh::handlers::{Callback, IntoHandler};
use zenoh::handlers::{Callback as RustCallback, IntoHandler};

use crate::{
macros::{import, py_static},
Expand All @@ -32,9 +27,6 @@ use crate::{
};

const CHECK_SIGNALS_INTERVAL: Duration = Duration::from_millis(100);
fn check_signals_deadline() -> Instant {
Instant::now() + CHECK_SIGNALS_INTERVAL
}

fn exec_callback(callback: impl FnOnce(Python) -> PyResult<PyObject>) {
Python::with_gil(|gil| {
Expand Down Expand Up @@ -157,77 +149,75 @@ impl Handler {

#[pyclass]
#[derive(Clone, Debug)]
pub(crate) struct CallbackDrop {
pub(crate) struct Callback {
callback: PyObject,
drop: PyObject,
drop: Option<PyObject>,
#[pyo3(get)]
indirect: bool,
}

#[pymethods]
impl CallbackDrop {
impl Callback {
#[new]
fn new(callback: PyObject, drop: PyObject) -> Self {
Self { callback, drop }
#[pyo3(signature = (callback, drop, *, indirect = true))]
fn new(callback: PyObject, drop: Option<PyObject>, indirect: bool) -> Self {
Self {
callback,
drop,
indirect,
}
}

fn __call__(&self, arg: &Bound<PyAny>) -> PyResult<PyObject> {
self.callback.call1(arg.py(), (arg,))
}

fn drop(&self, py: Python) -> PyResult<PyObject> {
self.drop.call0(py)
match &self.drop {
Some(drop) => drop.call0(py),
None => Ok(py.None()),
}
}

fn __repr__(&self) -> String {
format!("{self:?}")
}
}

pub(crate) enum PythonCallback {
Simple(PyObject),
WithDrop { callback: PyObject, drop: PyObject },
}
pub(crate) struct PythonCallback(Callback);

impl PythonCallback {
fn new(obj: &Bound<PyAny>) -> PyResult<Self> {
if let Ok(CallbackDrop { callback, drop }) = CallbackDrop::extract_bound(obj) {
return Ok(Self::WithDrop { callback, drop });
fn new(obj: &Bound<PyAny>) -> Self {
if let Ok(cb) = Callback::extract_bound(obj) {
return Self(cb);
}
let callback = obj.clone().unbind();
Ok(if obj.hasattr("drop").unwrap_or(false) {
Self::WithDrop {
callback,
drop: obj.getattr("drop")?.unbind(),
}
} else {
PythonCallback::Simple(callback)
})
Self(Callback::new(obj.clone().unbind(), None, true))
}

fn call<T: IntoPython>(&self, t: T) {
let callback = match self {
Self::Simple(cb) => cb,
Self::WithDrop { callback, .. } => callback,
};
exec_callback(|gil| callback.call1(gil, (t.into_pyobject(gil),)));
exec_callback(|gil| self.0.callback.call1(gil, (t.into_pyobject(gil),)));
}
}

impl Drop for PythonCallback {
fn drop(&mut self) {
if let Self::WithDrop { drop, .. } = self {
exec_callback(|gil| drop.call0(gil));
}
exec_callback(|gil| self.0.drop(gil));
}
}

pub(crate) enum IntoHandlerImpl<T: IntoRust> {
Rust {
callback: Callback<'static, T::Into>,
callback: RustCallback<'static, T::Into>,
handler: Py<Handler>,
},
Python {
callback: PythonCallback,
handler: PyObject,
},
PythonIndirect {
callback: RustCallback<'static, T::Into>,
handler: PyObject,
},
}

impl<T: IntoPython> IntoHandler<'static, T> for IntoHandlerImpl<T::Into>
Expand All @@ -236,19 +226,19 @@ where
{
type Handler = HandlerImpl<T::Into>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
fn into_handler(self) -> (RustCallback<'static, T>, Self::Handler) {
match self {
Self::Rust { callback, handler } => (callback, HandlerImpl::Rust(handler, PhantomData)),
Self::Python { callback, handler } => (
Arc::new(move |t| callback.call(t)),
HandlerImpl::Python(handler),
),
Self::PythonIndirect { callback, handler } => (callback, HandlerImpl::Python(handler)),
}
}
}

pub(crate) enum HandlerImpl<T> {
// PhantomData just for documentation, until pyo3 accepts generic classes
Rust(Py<Handler>, PhantomData<T>),
Python(PyObject),
}
Expand Down Expand Up @@ -287,40 +277,18 @@ where
pub(crate) fn try_recv(&self, py: Python) -> PyResult<PyObject> {
match self {
Self::Rust(handler, _) => handler.borrow(py).try_recv(py),
_ => Err(PyAttributeError::new_err(
"No method 'try_recv'. For Python receiver, use 'receiver' attribute",
)),
Self::Python(handler) => handler.call_method0(py, "try_recv"),
}
}

pub(crate) fn recv(&self, py: Python) -> PyResult<PyObject> {
match self {
Self::Rust(handler, _) => handler.borrow(py).recv(py),
_ => Err(PyAttributeError::new_err(
"No method 'recv'. For Python receiver, use 'receiver' attribute",
)),
Self::Python(handler) => handler.call_method0(py, "recv"),
}
}
}

fn rust_handler<H: IntoRust, T: IntoRust>(py: Python, into_handler: H) -> IntoHandlerImpl<T>
where
H::Into: IntoHandler<'static, T::Into>,
<H::Into as IntoHandler<'static, T::Into>>::Handler: Send + Sync,
T::Into: IntoPython,
RustHandler<H, T>: Receiver,
{
let (callback, handler) = into_handler.into_rust().into_handler();
let handler = RustHandler::<H, T> {
handler,
_phantom: PhantomData,
};
IntoHandlerImpl::Rust {
callback,
handler: Py::new(py, Handler(Box::new(handler))).unwrap(),
}
}

struct RustHandler<H: IntoRust, T: IntoRust>
where
H::Into: IntoHandler<'static, T::Into>,
Expand Down Expand Up @@ -365,7 +333,7 @@ where
fn recv(&self, py: Python) -> PyResult<PyObject> {
recv(
py,
|| self.handler.recv_deadline(check_signals_deadline()),
|| self.handler.recv_timeout(CHECK_SIGNALS_INTERVAL),
|err| matches!(err, flume::RecvTimeoutError::Timeout),
)
}
Expand All @@ -386,7 +354,7 @@ where
fn recv(&self, py: Python) -> PyResult<PyObject> {
recv(
py,
|| self.handler.recv_deadline(check_signals_deadline()),
|| self.handler.recv_timeout(CHECK_SIGNALS_INTERVAL),
|err| matches!(err, flume::RecvTimeoutError::Timeout),
)
}
Expand All @@ -405,53 +373,108 @@ where
}

fn recv(&self, py: Python) -> PyResult<PyObject> {
// TODO use recv_deadline
try_recv(py, || self.handler.recv())
enum DeadlineError<E> {
Timeout,
Error(E),
}
impl<E: fmt::Display> fmt::Display for DeadlineError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Error(err) => write!(f, "{err}"),
Self::Timeout => unreachable!(),
}
}
}
recv(
py,
|| match self.handler.recv_timeout(CHECK_SIGNALS_INTERVAL) {
Ok(Some(x)) => Ok(x),
Ok(None) => Err(DeadlineError::Timeout),
Err(err) => Err(DeadlineError::Error(err)),
},
|err| matches!(err, DeadlineError::Timeout),
)
}
}

pub(crate) fn into_handler<T: IntoRust>(obj: &Bound<PyAny>) -> PyResult<Option<IntoHandlerImpl<T>>>
fn rust_handler<H: IntoRust, T: IntoRust>(py: Python, into_handler: H) -> IntoHandlerImpl<T>
where
H::Into: IntoHandler<'static, T::Into>,
<H::Into as IntoHandler<'static, T::Into>>::Handler: Send + Sync,
T::Into: IntoPython,
RustHandler<H, T>: Receiver,
{
if obj.is_none() {
return Ok(None);
let (callback, handler) = into_handler.into_rust().into_handler();
let handler = RustHandler::<H, T> {
handler,
_phantom: PhantomData,
};
IntoHandlerImpl::Rust {
callback,
handler: Py::new(py, Handler(Box::new(handler))).unwrap(),
}
}

fn python_handler<T: IntoRust>(
py: Python,
callback: &Bound<PyAny>,
handler: PyObject,
) -> PyResult<IntoHandlerImpl<T>>
where
T::Into: IntoPython,
{
let callback = PythonCallback::new(callback);
if callback.0.indirect {
let (rust_callback, receiver) = DefaultHandler.into_rust().into_handler();
let kwargs = PyDict::new_bound(py);
let target = PyCFunction::new_closure_bound(py, None, None, move |args, _| loop {
let py = args.py();
match py.allow_threads(|| receiver.recv_timeout(CHECK_SIGNALS_INTERVAL)) {
Ok(x) => callback.call(x),
Err(flume::RecvTimeoutError::Timeout) => py.check_signals()?,
Err(flume::RecvTimeoutError::Disconnected) => return PyResult::Ok(()),
}
})?;
kwargs.set_item("target", target)?;
let thread = import!(py, threading.Thread).call((), Some(&kwargs))?;
thread.call_method0("start")?;
Ok(IntoHandlerImpl::PythonIndirect {
callback: rust_callback,
handler,
})
} else {
Ok(IntoHandlerImpl::Python { callback, handler })
}
}

pub(crate) fn into_handler<T: IntoRust>(
py: Python,
obj: Option<&Bound<PyAny>>,
) -> PyResult<IntoHandlerImpl<T>>
where
T::Into: IntoPython,
{
let Some(obj) = obj else {
return Ok(rust_handler(py, DefaultHandler));
};
if let Ok(handler) = obj.extract::<DefaultHandler>() {
return Ok(Some(rust_handler(obj.py(), handler)));
return Ok(rust_handler(py, handler));
}
if let Ok(handler) = obj.extract::<FifoChannel>() {
return Ok(Some(rust_handler(obj.py(), handler)));
return Ok(rust_handler(py, handler));
}
if let Ok(handler) = obj.extract::<RingChannel>() {
return Ok(Some(rust_handler(obj.py(), handler)));
return Ok(rust_handler(py, handler));
}
if obj.is_callable() {
return Ok(Some(IntoHandlerImpl::Python {
callback: PythonCallback::new(obj)?,
handler: obj.py().None(),
}));
}
if let Ok((cb, handler)) = obj.extract::<(Bound<PyAny>, PyObject)>() {
return python_handler(py, obj, py.None());
} else if let Ok((cb, handler)) = obj.extract::<(Bound<PyAny>, PyObject)>() {
if cb.is_callable() {
return Ok(Some(IntoHandlerImpl::Python {
callback: PythonCallback::new(&cb)?,
handler,
}));
return python_handler(py, &cb, handler);
}
}
Err(PyValueError::new_err(format!(
"Invalid handler {}",
"Invalid handler type {}",
obj.get_type().name()?
)))
}

pub(crate) fn handler_or_default<T: IntoRust>(
py: Python,
into_handler: Option<IntoHandlerImpl<T>>,
) -> IntoHandlerImpl<T>
where
T::Into: IntoPython,
{
into_handler.unwrap_or_else(|| rust_handler(py, DefaultHandler))
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub(crate) mod zenoh {
#[pymodule]
mod handlers {
#[pymodule_export]
use crate::handlers::{CallbackDrop, DefaultHandler, FifoChannel, Handler, RingChannel};
use crate::handlers::{Callback, DefaultHandler, FifoChannel, Handler, RingChannel};
}

#[pymodule_export]
Expand Down
Loading

0 comments on commit c3ce7ef

Please sign in to comment.