Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename IntoCallbackReceiver trait to IntoHandler trait #816

Merged
merged 3 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
handler: Handler,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler>
where
Handler: zenoh::prelude::IntoCallbackReceiverPair<'static, Sample>,
Handler: zenoh::prelude::IntoHandler<'static, Sample>,
{
let QueryingSubscriberBuilder {
session,
Expand Down Expand Up @@ -214,17 +214,17 @@ impl<'a, 'b, KeySpace, Handler> QueryingSubscriberBuilder<'a, 'b, KeySpace, Hand

impl<'a, KeySpace, Handler> Resolvable for QueryingSubscriberBuilder<'a, '_, KeySpace, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Sample>,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample>,
Handler::Handler: Send,
{
type To = ZResult<FetchingSubscriber<'a, Handler::Receiver>>;
type To = ZResult<FetchingSubscriber<'a, Handler::Handler>>;
}

impl<KeySpace, Handler> SyncResolve for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
{
fn res_sync(self) -> <Self as Resolvable>::To {
let session = self.session.clone();
Expand Down Expand Up @@ -272,8 +272,8 @@ where
impl<'a, KeySpace, Handler> AsyncResolve for QueryingSubscriberBuilder<'a, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
{
type Future = Ready<Self::To>;

Expand Down Expand Up @@ -462,7 +462,7 @@ where
handler: Handler,
) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample>
where
Handler: zenoh::prelude::IntoCallbackReceiverPair<'static, Sample>,
Handler: zenoh::prelude::IntoHandler<'static, Sample>,
{
let FetchingSubscriberBuilder {
session,
Expand Down Expand Up @@ -536,11 +536,11 @@ impl<
TryIntoSample,
> Resolvable for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
Handler: IntoCallbackReceiverPair<'static, Sample>,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample>,
Handler::Handler: Send,
TryIntoSample: ExtractSample,
{
type To = ZResult<FetchingSubscriber<'a, Handler::Receiver>>;
type To = ZResult<FetchingSubscriber<'a, Handler::Handler>>;
}

impl<
Expand All @@ -551,8 +551,8 @@ impl<
> SyncResolve for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
{
fn res_sync(self) -> <Self as Resolvable>::To {
Expand All @@ -569,8 +569,8 @@ impl<
> AsyncResolve for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
{
type Future = Ready<Self::To>;
Expand Down Expand Up @@ -643,14 +643,14 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {
) -> ZResult<Self>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoCallbackReceiverPair<'static, Sample, Receiver = Receiver> + Send,
Handler: IntoHandler<'static, Sample, Handler = Receiver> + Send,
TryIntoSample: ExtractSample + Send + Sync,
{
let state = Arc::new(Mutex::new(InnerState {
pending_fetches: 0,
merge_queue: MergeQueue::new(),
}));
let (callback, receiver) = conf.handler.into_cb_receiver_pair();
let (callback, receiver) = conf.handler.into_handler();

let sub_callback = {
let state = state.clone();
Expand Down
60 changes: 34 additions & 26 deletions zenoh/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,36 @@ use crate::API_DATA_RECEPTION_CHANNEL_SIZE;

/// An alias for `Arc<T>`.
pub type Dyn<T> = std::sync::Arc<T>;

/// An immutable callback function.
pub type Callback<'a, T> = Dyn<dyn Fn(T) + Send + Sync + 'a>;

/// A type that can be converted into a [`Callback`]-receiver pair.
/// A type that can be converted into a [`Callback`]-handler pair.
///
/// When Zenoh functions accept types that implement these, it intends to use the [`Callback`] as just that,
/// while granting you access to the receiver through the returned value via [`std::ops::Deref`] and [`std::ops::DerefMut`].
/// while granting you access to the handler through the returned value via [`std::ops::Deref`] and [`std::ops::DerefMut`].
///
/// Any closure that accepts `T` can be converted into a pair of itself and `()`.
pub trait IntoCallbackReceiverPair<'a, T> {
type Receiver;
fn into_cb_receiver_pair(self) -> (Callback<'a, T>, Self::Receiver);
pub trait IntoHandler<'a, T> {
type Handler;

fn into_handler(self) -> (Callback<'a, T>, Self::Handler);
}
impl<'a, T, F> IntoCallbackReceiverPair<'a, T> for F

impl<'a, T, F> IntoHandler<'a, T> for F
where
F: Fn(T) + Send + Sync + 'a,
{
type Receiver = ();
fn into_cb_receiver_pair(self) -> (Callback<'a, T>, Self::Receiver) {
type Handler = ();
fn into_handler(self) -> (Callback<'a, T>, Self::Handler) {
(Dyn::from(self), ())
}
}
impl<T: Send + 'static> IntoCallbackReceiverPair<'static, T>
for (flume::Sender<T>, flume::Receiver<T>)
{
type Receiver = flume::Receiver<T>;

fn into_cb_receiver_pair(self) -> (Callback<'static, T>, Self::Receiver) {
impl<T: Send + 'static> IntoHandler<'static, T> for (flume::Sender<T>, flume::Receiver<T>) {
type Handler = flume::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
let (sender, receiver) = self;
(
Dyn::new(move |t| {
Expand All @@ -56,18 +58,24 @@ impl<T: Send + 'static> IntoCallbackReceiverPair<'static, T>
)
}
}

/// The default handler in Zenoh is a FIFO queue.
pub struct DefaultHandler;
impl<T: Send + 'static> IntoCallbackReceiverPair<'static, T> for DefaultHandler {
type Receiver = flume::Receiver<T>;
fn into_cb_receiver_pair(self) -> (Callback<'static, T>, Self::Receiver) {
flume::bounded(*API_DATA_RECEPTION_CHANNEL_SIZE).into_cb_receiver_pair()

impl<T: Send + 'static> IntoHandler<'static, T> for DefaultHandler {
type Handler = flume::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
flume::bounded(*API_DATA_RECEPTION_CHANNEL_SIZE).into_handler()
}
}
impl<T: Send + Sync + 'static> IntoCallbackReceiverPair<'static, T>

impl<T: Send + Sync + 'static> IntoHandler<'static, T>
for (std::sync::mpsc::SyncSender<T>, std::sync::mpsc::Receiver<T>)
{
type Receiver = std::sync::mpsc::Receiver<T>;
fn into_cb_receiver_pair(self) -> (Callback<'static, T>, Self::Receiver) {
type Handler = std::sync::mpsc::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
let (sender, receiver) = self;
(
Dyn::new(move |t| {
Expand Down Expand Up @@ -96,15 +104,15 @@ pub fn locked<T>(fnmut: impl FnMut(T)) -> impl Fn(T) {
/// - `callback` will never be called once `drop` has started.
/// - `drop` will only be called **once**, and **after every** `callback` has ended.
/// - The two previous guarantees imply that `call` and `drop` are never called concurrently.
pub struct CallbackPair<Callback, DropFn>
pub struct CallbackDrop<Callback, DropFn>
where
DropFn: FnMut() + Send + Sync + 'static,
{
pub callback: Callback,
pub drop: DropFn,
}

impl<Callback, DropFn> Drop for CallbackPair<Callback, DropFn>
impl<Callback, DropFn> Drop for CallbackDrop<Callback, DropFn>
where
DropFn: FnMut() + Send + Sync + 'static,
{
Expand All @@ -113,14 +121,14 @@ where
}
}

impl<'a, OnEvent, Event, DropFn> IntoCallbackReceiverPair<'a, Event>
for CallbackPair<OnEvent, DropFn>
impl<'a, OnEvent, Event, DropFn> IntoHandler<'a, Event> for CallbackDrop<OnEvent, DropFn>
where
OnEvent: Fn(Event) + Send + Sync + 'a,
DropFn: FnMut() + Send + Sync + 'static,
{
type Receiver = ();
fn into_cb_receiver_pair(self) -> (Callback<'a, Event>, Self::Receiver) {
type Handler = ();

fn into_handler(self) -> (Callback<'a, Event>, Self::Handler) {
(Dyn::from(move |evt| (self.callback)(evt)), ())
}
}
40 changes: 20 additions & 20 deletions zenoh/src/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
self.callback(locked(callback))
}

/// Receive the samples for this subscription with a [`Handler`](crate::prelude::IntoCallbackReceiverPair).
/// Receive the samples for this subscription with a [`Handler`](crate::prelude::IntoHandler).
///
/// # Examples
/// ```no_run
Expand All @@ -507,7 +507,7 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
#[zenoh_macros::unstable]
pub fn with<Handler>(self, handler: Handler) -> LivelinessSubscriberBuilder<'a, 'b, Handler>
where
Handler: crate::handlers::IntoCallbackReceiverPair<'static, Sample>,
Handler: crate::handlers::IntoHandler<'static, Sample>,
{
let LivelinessSubscriberBuilder {
session,
Expand All @@ -525,23 +525,23 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
#[zenoh_macros::unstable]
impl<'a, Handler> Resolvable for LivelinessSubscriberBuilder<'a, '_, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
{
type To = ZResult<Subscriber<'a, Handler::Receiver>>;
type To = ZResult<Subscriber<'a, Handler::Handler>>;
}

#[zenoh_macros::unstable]
impl<'a, Handler> SyncResolve for LivelinessSubscriberBuilder<'a, '_, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
{
#[zenoh_macros::unstable]
fn res_sync(self) -> <Self as Resolvable>::To {
let key_expr = self.key_expr?;
let session = self.session;
let (callback, receiver) = self.handler.into_cb_receiver_pair();
let (callback, receiver) = self.handler.into_handler();
session
.declare_subscriber_inner(
&key_expr,
Expand All @@ -564,8 +564,8 @@ where
#[zenoh_macros::unstable]
impl<'a, Handler> AsyncResolve for LivelinessSubscriberBuilder<'a, '_, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Sample> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Sample> + Send,
Handler::Handler: Send,
{
type Future = Ready<Self::To>;

Expand Down Expand Up @@ -677,7 +677,7 @@ impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> {
self.callback(locked(callback))
}

/// Receive the replies for this query with a [`Handler`](crate::prelude::IntoCallbackReceiverPair).
/// Receive the replies for this query with a [`Handler`](crate::prelude::IntoHandler).
///
/// # Examples
/// ```
Expand All @@ -700,7 +700,7 @@ impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> {
#[inline]
pub fn with<Handler>(self, handler: Handler) -> LivelinessGetBuilder<'a, 'b, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Reply>,
Handler: IntoHandler<'static, Reply>,
{
let LivelinessGetBuilder {
session,
Expand Down Expand Up @@ -728,19 +728,19 @@ impl<'a, 'b, Handler> LivelinessGetBuilder<'a, 'b, Handler> {

impl<Handler> Resolvable for LivelinessGetBuilder<'_, '_, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Reply> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Reply> + Send,
Handler::Handler: Send,
{
type To = ZResult<Handler::Receiver>;
type To = ZResult<Handler::Handler>;
}

impl<Handler> SyncResolve for LivelinessGetBuilder<'_, '_, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Reply> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Reply> + Send,
Handler::Handler: Send,
{
fn res_sync(self) -> <Self as Resolvable>::To {
let (callback, receiver) = self.handler.into_cb_receiver_pair();
let (callback, receiver) = self.handler.into_handler();

self.session
.query(
Expand All @@ -761,8 +761,8 @@ where

impl<Handler> AsyncResolve for LivelinessGetBuilder<'_, '_, Handler>
where
Handler: IntoCallbackReceiverPair<'static, Reply> + Send,
Handler::Receiver: Send,
Handler: IntoHandler<'static, Reply> + Send,
Handler::Handler: Send,
{
type Future = Ready<Self::To>;

Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ pub(crate) mod common {
pub use zenoh_protocol::core::{EntityGlobalId, EntityId};

pub use crate::config::{self, Config, ValidatedMap};
pub use crate::handlers::IntoCallbackReceiverPair;
pub use crate::handlers::IntoHandler;
pub use crate::selector::{Parameter, Parameters, Selector};
pub use crate::session::{Session, SessionDeclarations};

pub use crate::query::{ConsolidationMode, QueryConsolidation, QueryTarget};
pub use crate::selector::{Parameter, Parameters, Selector};

pub use crate::encoding::Encoding;
/// The encoding of a zenoh `Value`.
Expand Down
Loading