From dc51ccb6b85fc016aadc215d70e5ce1677c4c8b2 Mon Sep 17 00:00:00 2001 From: Pierre Avital Date: Fri, 3 Nov 2023 17:33:58 +0100 Subject: [PATCH] liveliness --- Cargo.toml | 2 +- include/zenoh_commons.h | 121 ++++++++++++++++++++ src/lib.rs | 5 + src/liveliness.rs | 246 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 373 insertions(+), 1 deletion(-) create mode 100644 src/liveliness.rs diff --git a/Cargo.toml b/Cargo.toml index 47af03b38..a689aa9a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ serde_yaml = "0.9.19" [lib] path="src/lib.rs" -name = "zenohc" +name = "zenohcd" crate-type = ["cdylib", "staticlib"] doctest = false diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 763c267aa..aecd40786 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -667,6 +667,34 @@ typedef struct z_owned_scouting_config_t { typedef struct z_subscriber_t { const struct z_owned_subscriber_t *_0; } z_subscriber_t; +/** + * The options for `zc_liveliness_declare_token` + */ +typedef struct zc_owned_liveliness_declaration_options_t { + uint8_t _inner; +} zc_owned_liveliness_declaration_options_t; +/** + * The options for `zc_liveliness_declare_subscriber` + */ +typedef struct zc_owned_liveliness_declare_subscriber_options_t { + uint8_t _inner; +} zc_owned_liveliness_declare_subscriber_options_t; +/** + * A liveliness token that can be used to provide the network with information about connectivity to its + * declarer: when constructed, a PUT sample will be received by liveliness subscribers on intersecting key + * expressions. + * + * A DELETE on the token's key expression will be received by subscribers if the token is destroyed, or if connectivity between the subscriber and the token's creator is lost. + */ +typedef struct zc_owned_liveliness_token_t { + uintptr_t _inner[4]; +} zc_owned_liveliness_token_t; +/** + * The options for `zc_liveliness_declare_subscriber` + */ +typedef struct zc_owned_liveliness_get_options_t { + uint32_t timeout_ms; +} zc_owned_liveliness_get_options_t; /** * An owned payload, backed by a reference counted owner. * @@ -1679,6 +1707,99 @@ ZENOHC_API struct z_keyexpr_t zc_keyexpr_from_slice(const char *name, uintptr_t ZENOHC_API struct z_keyexpr_t zc_keyexpr_from_slice_unchecked(const char *start, uintptr_t len); +/** + * Returns `true` if the options are valid. + */ +ZENOHC_API +bool zc_liveliness_declaration_options_check(const struct zc_owned_liveliness_declaration_options_t *_opts); +/** + * Destroys the options. + */ +ZENOHC_API +void zc_liveliness_declaration_options_drop(struct zc_owned_liveliness_declaration_options_t *opts); +/** + * The gravestone value for `zc_owned_liveliness_declaration_options_t` + */ +ZENOHC_API +struct zc_owned_liveliness_declaration_options_t zc_liveliness_declaration_options_null(void); +/** + * Declares a subscriber on liveliness tokens that intersect `key`. + * + * Passing `NULL` as options is valid and equivalent to passing a pointer to the default options. + */ +ZENOHC_API +struct z_owned_subscriber_t zc_liveliness_declare_subscriber(struct z_session_t session, + struct z_keyexpr_t key, + struct z_owned_closure_sample_t *callback, + const struct zc_owned_liveliness_declare_subscriber_options_t *_options); +/** + * Constructs and declares a liveliness token on the network. + * + * Liveliness token subscribers on an intersecting key expression will receive a PUT sample when connectivity + * is achieved, and a DELETE sample if it's lost. + * + * Passing `NULL` as options is valid and equivalent to a pointer to the default options. + */ +ZENOHC_API +struct zc_owned_liveliness_token_t zc_liveliness_declare_token(struct z_session_t session, + struct z_keyexpr_t key, + const struct zc_owned_liveliness_declaration_options_t *_options); +/** + * Queries liveliness tokens currently on the network with a key expression intersecting with `key`. + * + * Note that the same "value stealing" tricks apply as with a normal `z_get` + * + * Passing `NULL` as options is valid and equivalent to passing a pointer to the default options. + */ +ZENOHC_API +int8_t zc_liveliness_get(struct z_session_t session, + struct z_keyexpr_t key, + struct z_owned_closure_reply_t *callback, + const struct zc_owned_liveliness_get_options_t *options); +/** + * Returns `true` if the options are valid. + */ +ZENOHC_API +bool zc_liveliness_get_options_check(const struct zc_owned_liveliness_get_options_t *_opts); +/** + * The gravestone value for `zc_owned_liveliness_get_options_t` + */ +ZENOHC_API struct zc_owned_liveliness_get_options_t zc_liveliness_get_options_default(void); +/** + * Destroys the options. + */ +ZENOHC_API void zc_liveliness_get_options_drop(struct zc_owned_liveliness_get_options_t *opts); +/** + * The gravestone value for `zc_owned_liveliness_get_options_t` + */ +ZENOHC_API struct zc_owned_liveliness_get_options_t zc_liveliness_get_options_null(void); +/** + * Returns `true` if the options are valid. + */ +ZENOHC_API +bool zc_liveliness_subscriber_options_check(const struct zc_owned_liveliness_declare_subscriber_options_t *_opts); +/** + * Destroys the options. + */ +ZENOHC_API +void zc_liveliness_subscriber_options_drop(struct zc_owned_liveliness_declare_subscriber_options_t *opts); +/** + * The gravestone value for `zc_owned_liveliness_declare_subscriber_options_t` + */ +ZENOHC_API +struct zc_owned_liveliness_declare_subscriber_options_t zc_liveliness_subscriber_options_null(void); +/** + * Returns `true` unless the token is at its gravestone value. + */ +ZENOHC_API bool zc_liveliness_token_check(const struct zc_owned_liveliness_token_t *token); +/** + * The gravestone value for liveliness tokens. + */ +ZENOHC_API struct zc_owned_liveliness_token_t zc_liveliness_token_null(void); +/** + * Destroys a liveliness token, notifying subscribers of its destruction. + */ +ZENOHC_API void zc_liveliness_undeclare_token(struct zc_owned_liveliness_token_t *token); /** * Returns `false` if `payload` is the gravestone value. */ diff --git a/src/lib.rs b/src/lib.rs index c24a536a5..cc2a7bb3d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,9 @@ // Contributors: // ZettaScale Zenoh team, // + +#![allow(non_camel_case_types)] + mod collections; pub use crate::collections::*; mod config; @@ -39,6 +42,8 @@ mod publisher; pub use crate::publisher::*; mod closures; pub use closures::*; +mod liveliness; +pub use liveliness::*; #[cfg(feature = "shared-memory")] mod shm; diff --git a/src/liveliness.rs b/src/liveliness.rs new file mode 100644 index 000000000..ecf695010 --- /dev/null +++ b/src/liveliness.rs @@ -0,0 +1,246 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use zenoh::{ + liveliness::{Liveliness, LivelinessToken}, + prelude::{SessionDeclarations, SplitBuffer}, +}; +use zenoh_util::core::{zresult::ErrNo, SyncResolve}; + +use crate::{ + z_closure_reply_call, z_closure_sample_call, z_keyexpr_t, z_owned_closure_reply_t, + z_owned_closure_sample_t, z_owned_subscriber_t, z_sample_t, z_session_t, +}; + +/// A liveliness token that can be used to provide the network with information about connectivity to its +/// declarer: when constructed, a PUT sample will be received by liveliness subscribers on intersecting key +/// expressions. +/// +/// A DELETE on the token's key expression will be received by subscribers if the token is destroyed, or if connectivity between the subscriber and the token's creator is lost. +#[repr(C)] +pub struct zc_owned_liveliness_token_t { + _inner: [usize; 4], +} + +/// The gravestone value for liveliness tokens. +#[no_mangle] +pub extern "C" fn zc_liveliness_token_null() -> zc_owned_liveliness_token_t { + zc_owned_liveliness_token_t { _inner: [0; 4] } +} + +/// Returns `true` unless the token is at its gravestone value. +#[no_mangle] +pub extern "C" fn zc_liveliness_token_check(token: &zc_owned_liveliness_token_t) -> bool { + token._inner.iter().any(|v| *v != 0) +} +/// The options for `zc_liveliness_declare_token` +#[repr(C)] +pub struct zc_owned_liveliness_declaration_options_t { + _inner: u8, +} +/// The gravestone value for `zc_owned_liveliness_declaration_options_t` +#[no_mangle] +pub extern "C" fn zc_liveliness_declaration_options_null( +) -> zc_owned_liveliness_declaration_options_t { + zc_owned_liveliness_declaration_options_t { _inner: 0 } +} +/// Returns `true` if the options are valid. +#[no_mangle] +pub extern "C" fn zc_liveliness_declaration_options_check( + _opts: &zc_owned_liveliness_declaration_options_t, +) -> bool { + true +} +/// Destroys the options. +#[no_mangle] +pub extern "C" fn zc_liveliness_declaration_options_drop( + opts: &mut zc_owned_liveliness_declaration_options_t, +) { + *opts = zc_liveliness_declaration_options_null() +} +impl From> for zc_owned_liveliness_token_t { + fn from(value: LivelinessToken<'static>) -> Self { + unsafe { core::mem::transmute(value) } + } +} +impl From for Option> { + fn from(value: zc_owned_liveliness_token_t) -> Self { + if value._inner.iter().all(|v| *v == 0) { + None + } else { + Some(unsafe { core::mem::transmute(value) }) + } + } +} +/// Constructs and declares a liveliness token on the network. +/// +/// Liveliness token subscribers on an intersecting key expression will receive a PUT sample when connectivity +/// is achieved, and a DELETE sample if it's lost. +/// +/// Passing `NULL` as options is valid and equivalent to a pointer to the default options. +#[no_mangle] +pub extern "C" fn zc_liveliness_declare_token( + session: z_session_t, + key: z_keyexpr_t, + _options: Option<&zc_owned_liveliness_declaration_options_t>, +) -> zc_owned_liveliness_token_t { + let Some(session) = session.upgrade() else { + log::error!("Failed to declare liveliness token: provided session was invalid"); + return zc_liveliness_token_null(); + }; + match session.liveliness().declare_token(key).res() { + Ok(token) => unsafe { core::mem::transmute(token) }, + Err(e) => { + log::error!("Failed to declare liveliness token: {e}"); + zc_liveliness_token_null() + } + } +} + +/// Destroys a liveliness token, notifying subscribers of its destruction. +#[no_mangle] +pub extern "C" fn zc_liveliness_undeclare_token(token: &mut zc_owned_liveliness_token_t) { + let Some(token): Option = + core::mem::replace(token, zc_liveliness_token_null()).into() + else { + return; + }; + if let Err(e) = token.undeclare().res() { + log::error!("Failed to undeclare token: {e}"); + } +} + +/// The options for `zc_liveliness_declare_subscriber` +#[repr(C)] +pub struct zc_owned_liveliness_declare_subscriber_options_t { + _inner: u8, +} +/// The gravestone value for `zc_owned_liveliness_declare_subscriber_options_t` +#[no_mangle] +pub extern "C" fn zc_liveliness_subscriber_options_null( +) -> zc_owned_liveliness_declare_subscriber_options_t { + zc_owned_liveliness_declare_subscriber_options_t { _inner: 0 } +} +/// Returns `true` if the options are valid. +#[no_mangle] +pub extern "C" fn zc_liveliness_subscriber_options_check( + _opts: &zc_owned_liveliness_declare_subscriber_options_t, +) -> bool { + true +} +/// Destroys the options. +#[no_mangle] +pub extern "C" fn zc_liveliness_subscriber_options_drop( + opts: &mut zc_owned_liveliness_declare_subscriber_options_t, +) { + *opts = zc_liveliness_subscriber_options_null() +} + +/// Declares a subscriber on liveliness tokens that intersect `key`. +/// +/// Passing `NULL` as options is valid and equivalent to passing a pointer to the default options. +#[no_mangle] +pub extern "C" fn zc_liveliness_declare_subscriber( + session: z_session_t, + key: z_keyexpr_t, + callback: &mut z_owned_closure_sample_t, + _options: Option<&zc_owned_liveliness_declare_subscriber_options_t>, +) -> z_owned_subscriber_t { + let Some(session) = session.upgrade() else { + log::error!("Failed to declare liveliness token: provided session was invalid"); + return z_owned_subscriber_t::null(); + }; + let callback = core::mem::replace(callback, z_owned_closure_sample_t::empty()); + match session + .liveliness() + .declare_subscriber(key) + .callback(move |sample| { + let payload = sample.payload.contiguous(); + let owner = match payload { + std::borrow::Cow::Owned(v) => zenoh::buffers::ZBuf::from(v), + _ => sample.payload.clone(), + }; + let sample = z_sample_t::new(&sample, &owner); + z_closure_sample_call(&callback, &sample) + }) + .res() + { + Ok(token) => z_owned_subscriber_t::new(token), + Err(e) => { + log::error!("Failed to subscribe to liveliness: {e}"); + z_owned_subscriber_t::null() + } + } +} + +/// The options for `zc_liveliness_declare_subscriber` +#[repr(C)] +pub struct zc_owned_liveliness_get_options_t { + timeout_ms: u32, +} +/// The gravestone value for `zc_owned_liveliness_get_options_t` +#[no_mangle] +pub extern "C" fn zc_liveliness_get_options_null() -> zc_owned_liveliness_get_options_t { + zc_owned_liveliness_get_options_t { timeout_ms: 0 } +} +/// The gravestone value for `zc_owned_liveliness_get_options_t` +#[no_mangle] +pub extern "C" fn zc_liveliness_get_options_default() -> zc_owned_liveliness_get_options_t { + zc_owned_liveliness_get_options_t { timeout_ms: 10000 } +} +/// Returns `true` if the options are valid. +#[no_mangle] +pub extern "C" fn zc_liveliness_get_options_check( + _opts: &zc_owned_liveliness_get_options_t, +) -> bool { + true +} +/// Destroys the options. +#[no_mangle] +pub extern "C" fn zc_liveliness_get_options_drop(opts: &mut zc_owned_liveliness_get_options_t) { + *opts = zc_liveliness_get_options_null() +} + +/// Queries liveliness tokens currently on the network with a key expression intersecting with `key`. +/// +/// Note that the same "value stealing" tricks apply as with a normal `z_get` +/// +/// Passing `NULL` as options is valid and equivalent to passing a pointer to the default options. +#[no_mangle] +pub extern "C" fn zc_liveliness_get( + session: z_session_t, + key: z_keyexpr_t, + callback: &mut z_owned_closure_reply_t, + options: Option<&zc_owned_liveliness_get_options_t>, +) -> i8 { + let Some(session) = session.upgrade() else { + log::error!("Failed to declare liveliness token: provided session was invalid"); + return i8::MIN; + }; + let callback = core::mem::replace(callback, z_owned_closure_reply_t::empty()); + let liveliness: Liveliness<'static> = session.liveliness(); + let mut builder = liveliness + .get(key) + .callback(move |response| z_closure_reply_call(&callback, &mut response.into())); + if let Some(options) = options { + builder = builder.timeout(core::time::Duration::from_millis(options.timeout_ms as u64)) + } + match builder.res() { + Ok(()) => 0, + Err(e) => { + log::error!("Failed to subscribe to liveliness: {e}"); + e.errno().get() + } + } +}