Skip to content

Commit

Permalink
Rename IntoCallbackReceiver trait to IntoHandler trait (#816)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets authored Mar 18, 2024
1 parent 2e9db3f commit 1d1d4ed
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 130 deletions.
36 changes: 18 additions & 18 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
handler: Handler,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler>
where
Handler: zenoh::prelude::IntoCallbackReceiverPair<'static, Sample>,
Handler: zenoh::prelude::IntoHandler<'static, Sample>,
{
let QueryingSubscriberBuilder {
session,
Expand Down Expand Up @@ -214,17 +214,17 @@ impl<'a, 'b, KeySpace, Handler> QueryingSubscriberBuilder<'a, 'b, KeySpace, Hand

impl<'a, KeySpace, Handler> Resolvable for QueryingSubscriberBuilder<'a, '_, KeySpace, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Sample>,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample>,
Handler::Handler: Send,
{
type To = ZResult<FetchingSubscriber<'a, Handler::Receiver>>;
type To = ZResult<FetchingSubscriber<'a, Handler::Handler>>;
}

impl<KeySpace, Handler> SyncResolve for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
{
fn res_sync(self) -> <Self as Resolvable>::To {
let session = self.session.clone();
Expand Down Expand Up @@ -272,8 +272,8 @@ where
impl<'a, KeySpace, Handler> AsyncResolve for QueryingSubscriberBuilder<'a, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
{
type Future = Ready<Self::To>;

Expand Down Expand Up @@ -462,7 +462,7 @@ where
handler: Handler,
) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample>
where
Handler: zenoh::prelude::IntoCallbackReceiverPair<'static, Sample>,
Handler: zenoh::prelude::IntoHandler<'static, Sample>,
{
let FetchingSubscriberBuilder {
session,
Expand Down Expand Up @@ -536,11 +536,11 @@ impl<
TryIntoSample,
> Resolvable for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
Handler: IntoCallbackReceiverPair<'static, Sample>,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample>,
Handler::Handler: Send,
TryIntoSample: ExtractSample,
{
type To = ZResult<FetchingSubscriber<'a, Handler::Receiver>>;
type To = ZResult<FetchingSubscriber<'a, Handler::Handler>>;
}

impl<
Expand All @@ -551,8 +551,8 @@ impl<
> SyncResolve for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
{
fn res_sync(self) -> <Self as Resolvable>::To {
Expand All @@ -569,8 +569,8 @@ impl<
> AsyncResolve for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
{
type Future = Ready<Self::To>;
Expand Down Expand Up @@ -643,14 +643,14 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {
) -> ZResult<Self>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoCallbackReceiverPair<'static, Sample, Receiver = Receiver> + Send,
Handler: IntoHandler<'static, Sample, Handler = Receiver> + Send,
TryIntoSample: ExtractSample + Send + Sync,
{
let state = Arc::new(Mutex::new(InnerState {
pending_fetches: 0,
merge_queue: MergeQueue::new(),
}));
let (callback, receiver) = conf.handler.into_cb_receiver_pair();
let (callback, receiver) = conf.handler.into_handler();

let sub_callback = {
let state = state.clone();
Expand Down
60 changes: 34 additions & 26 deletions zenoh/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,36 @@ use crate::API_DATA_RECEPTION_CHANNEL_SIZE;

/// An alias for `Arc<T>`.
pub type Dyn<T> = std::sync::Arc<T>;

/// An immutable callback function.
pub type Callback<'a, T> = Dyn<dyn Fn(T) + Send + Sync + 'a>;

/// A type that can be converted into a [`Callback`]-receiver pair.
/// A type that can be converted into a [`Callback`]-handler pair.
///
/// When Zenoh functions accept types that implement these, it intends to use the [`Callback`] as just that,
/// while granting you access to the receiver through the returned value via [`std::ops::Deref`] and [`std::ops::DerefMut`].
/// while granting you access to the handler through the returned value via [`std::ops::Deref`] and [`std::ops::DerefMut`].
///
/// Any closure that accepts `T` can be converted into a pair of itself and `()`.
pub trait IntoCallbackReceiverPair<'a, T> {
type Receiver;
fn into_cb_receiver_pair(self) -> (Callback<'a, T>, Self::Receiver);
pub trait IntoHandler<'a, T> {
type Handler;

fn into_handler(self) -> (Callback<'a, T>, Self::Handler);
}
impl<'a, T, F> IntoCallbackReceiverPair<'a, T> for F

impl<'a, T, F> IntoHandler<'a, T> for F
where
F: Fn(T) + Send + Sync + 'a,
{
type Receiver = ();
fn into_cb_receiver_pair(self) -> (Callback<'a, T>, Self::Receiver) {
type Handler = ();
fn into_handler(self) -> (Callback<'a, T>, Self::Handler) {
(Dyn::from(self), ())
}
}
impl<T: Send + 'static> IntoCallbackReceiverPair<'static, T>
for (flume::Sender<T>, flume::Receiver<T>)
{
type Receiver = flume::Receiver<T>;

fn into_cb_receiver_pair(self) -> (Callback<'static, T>, Self::Receiver) {
impl<T: Send + 'static> IntoHandler<'static, T> for (flume::Sender<T>, flume::Receiver<T>) {
type Handler = flume::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
let (sender, receiver) = self;
(
Dyn::new(move |t| {
Expand All @@ -56,18 +58,24 @@ impl<T: Send + 'static> IntoCallbackReceiverPair<'static, T>
)
}
}

/// The default handler in Zenoh is a FIFO queue.
pub struct DefaultHandler;
impl<T: Send + 'static> IntoCallbackReceiverPair<'static, T> for DefaultHandler {
type Receiver = flume::Receiver<T>;
fn into_cb_receiver_pair(self) -> (Callback<'static, T>, Self::Receiver) {
flume::bounded(*API_DATA_RECEPTION_CHANNEL_SIZE).into_cb_receiver_pair()

impl<T: Send + 'static> IntoHandler<'static, T> for DefaultHandler {
type Handler = flume::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
flume::bounded(*API_DATA_RECEPTION_CHANNEL_SIZE).into_handler()
}
}
impl<T: Send + Sync + 'static> IntoCallbackReceiverPair<'static, T>

impl<T: Send + Sync + 'static> IntoHandler<'static, T>
for (std::sync::mpsc::SyncSender<T>, std::sync::mpsc::Receiver<T>)
{
type Receiver = std::sync::mpsc::Receiver<T>;
fn into_cb_receiver_pair(self) -> (Callback<'static, T>, Self::Receiver) {
type Handler = std::sync::mpsc::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
let (sender, receiver) = self;
(
Dyn::new(move |t| {
Expand Down Expand Up @@ -96,15 +104,15 @@ pub fn locked<T>(fnmut: impl FnMut(T)) -> impl Fn(T) {
/// - `callback` will never be called once `drop` has started.
/// - `drop` will only be called **once**, and **after every** `callback` has ended.
/// - The two previous guarantees imply that `call` and `drop` are never called concurrently.
pub struct CallbackPair<Callback, DropFn>
pub struct CallbackDrop<Callback, DropFn>
where
DropFn: FnMut() + Send + Sync + 'static,
{
pub callback: Callback,
pub drop: DropFn,
}

impl<Callback, DropFn> Drop for CallbackPair<Callback, DropFn>
impl<Callback, DropFn> Drop for CallbackDrop<Callback, DropFn>
where
DropFn: FnMut() + Send + Sync + 'static,
{
Expand All @@ -113,14 +121,14 @@ where
}
}

impl<'a, OnEvent, Event, DropFn> IntoCallbackReceiverPair<'a, Event>
for CallbackPair<OnEvent, DropFn>
impl<'a, OnEvent, Event, DropFn> IntoHandler<'a, Event> for CallbackDrop<OnEvent, DropFn>
where
OnEvent: Fn(Event) + Send + Sync + 'a,
DropFn: FnMut() + Send + Sync + 'static,
{
type Receiver = ();
fn into_cb_receiver_pair(self) -> (Callback<'a, Event>, Self::Receiver) {
type Handler = ();

fn into_handler(self) -> (Callback<'a, Event>, Self::Handler) {
(Dyn::from(move |evt| (self.callback)(evt)), ())
}
}
40 changes: 20 additions & 20 deletions zenoh/src/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
self.callback(locked(callback))
}

/// Receive the samples for this subscription with a [`Handler`](crate::prelude::IntoCallbackReceiverPair).
/// Receive the samples for this subscription with a [`Handler`](crate::prelude::IntoHandler).
///
/// # Examples
/// ```no_run
Expand All @@ -507,7 +507,7 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
#[zenoh_macros::unstable]
pub fn with<Handler>(self, handler: Handler) -> LivelinessSubscriberBuilder<'a, 'b, Handler>
where
Handler: crate::handlers::IntoCallbackReceiverPair<'static, Sample>,
Handler: crate::handlers::IntoHandler<'static, Sample>,
{
let LivelinessSubscriberBuilder {
session,
Expand All @@ -525,23 +525,23 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
#[zenoh_macros::unstable]
impl<'a, Handler> Resolvable for LivelinessSubscriberBuilder<'a, '_, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
{
type To = ZResult<Subscriber<'a, Handler::Receiver>>;
type To = ZResult<Subscriber<'a, Handler::Handler>>;
}

#[zenoh_macros::unstable]
impl<'a, Handler> SyncResolve for LivelinessSubscriberBuilder<'a, '_, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
{
#[zenoh_macros::unstable]
fn res_sync(self) -> <Self as Resolvable>::To {
let key_expr = self.key_expr?;
let session = self.session;
let (callback, receiver) = self.handler.into_cb_receiver_pair();
let (callback, receiver) = self.handler.into_handler();
session
.declare_subscriber_inner(
&key_expr,
Expand All @@ -564,8 +564,8 @@ where
#[zenoh_macros::unstable]
impl<'a, Handler> AsyncResolve for LivelinessSubscriberBuilder<'a, '_, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
{
type Future = Ready<Self::To>;

Expand Down Expand Up @@ -677,7 +677,7 @@ impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> {
self.callback(locked(callback))
}

/// Receive the replies for this query with a [`Handler`](crate::prelude::IntoCallbackReceiverPair).
/// Receive the replies for this query with a [`Handler`](crate::prelude::IntoHandler).
///
/// # Examples
/// ```
Expand All @@ -700,7 +700,7 @@ impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> {
#[inline]
pub fn with<Handler>(self, handler: Handler) -> LivelinessGetBuilder<'a, 'b, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Reply>,
Handler: IntoHandler<'static, Reply>,
{
let LivelinessGetBuilder {
session,
Expand Down Expand Up @@ -728,19 +728,19 @@ impl<'a, 'b, Handler> LivelinessGetBuilder<'a, 'b, Handler> {

impl<Handler> Resolvable for LivelinessGetBuilder<'_, '_, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Reply> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Reply> + Send,
Handler::Handler: Send,
{
type To = ZResult<Handler::Receiver>;
type To = ZResult<Handler::Handler>;
}

impl<Handler> SyncResolve for LivelinessGetBuilder<'_, '_, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Reply> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Reply> + Send,
Handler::Handler: Send,
{
fn res_sync(self) -> <Self as Resolvable>::To {
let (callback, receiver) = self.handler.into_cb_receiver_pair();
let (callback, receiver) = self.handler.into_handler();

self.session
.query(
Expand All @@ -761,8 +761,8 @@ where

impl<Handler> AsyncResolve for LivelinessGetBuilder<'_, '_, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Reply> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Reply> + Send,
Handler::Handler: Send,
{
type Future = Ready<Self::To>;

Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ pub(crate) mod common {
pub use zenoh_protocol::core::{EntityGlobalId, EntityId};

pub use crate::config::{self, Config, ValidatedMap};
pub use crate::handlers::IntoCallbackReceiverPair;
pub use crate::handlers::IntoHandler;
pub use crate::selector::{Parameter, Parameters, Selector};
pub use crate::session::{Session, SessionDeclarations};

pub use crate::query::{ConsolidationMode, QueryConsolidation, QueryTarget};
pub use crate::selector::{Parameter, Parameters, Selector};

pub use crate::encoding::Encoding;
/// The encoding of a zenoh `Value`.
Expand Down
Loading

0 comments on commit 1d1d4ed

Please sign in to comment.