diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index 20dd30b4b..3e4b0d696 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -254,9 +254,9 @@ impl Wait for LivelinessTokenBuilder<'_, '_> { session .0 .declare_liveliness_inner(&key_expr) - .map(|tok_state| LivelinessToken { + .map(|id| LivelinessToken { session: self.session.downgrade(), - state: tok_state, + id, undeclare_on_drop: true, }) } @@ -272,13 +272,6 @@ impl IntoFuture for LivelinessTokenBuilder<'_, '_> { } } -#[zenoh_macros::unstable] -#[derive(Debug)] -pub(crate) struct LivelinessTokenState { - pub(crate) id: Id, - pub(crate) key_expr: KeyExpr<'static>, -} - /// A token whose liveliness is tied to the Zenoh [`Session`](Session). /// /// A declared liveliness token will be seen as alive by any other Zenoh @@ -308,7 +301,7 @@ pub(crate) struct LivelinessTokenState { #[derive(Debug)] pub struct LivelinessToken { session: WeakSession, - state: Arc, + id: Id, undeclare_on_drop: bool, } @@ -382,7 +375,7 @@ impl LivelinessToken { fn undeclare_impl(&mut self) -> ZResult<()> { // set the flag first to avoid double panic if this function panic self.undeclare_on_drop = false; - self.session.undeclare_liveliness(self.state.id) + self.session.undeclare_liveliness(self.id) } } diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 0c01bffdb..225a0d16c 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -71,7 +71,7 @@ use zenoh_task::TaskController; use crate::api::selector::ZenohParameters; #[cfg(feature = "unstable")] use crate::api::{ - liveliness::{Liveliness, LivelinessTokenState}, + liveliness::Liveliness, publisher::Publisher, publisher::{MatchingListenerState, MatchingStatus}, query::LivelinessQueryState, @@ -137,8 +137,6 @@ pub(crate) struct SessionState { pub(crate) liveliness_subscribers: HashMap>, pub(crate) queryables: HashMap>, #[cfg(feature = "unstable")] - pub(crate) tokens: HashMap>, - #[cfg(feature = "unstable")] pub(crate) matching_listeners: HashMap>, pub(crate) queries: HashMap, #[cfg(feature = "unstable")] @@ -170,8 +168,6 @@ impl SessionState { liveliness_subscribers: HashMap::new(), queryables: HashMap::new(), #[cfg(feature = "unstable")] - tokens: HashMap::new(), - #[cfg(feature = "unstable")] matching_listeners: HashMap::new(), queries: HashMap::new(), #[cfg(feature = "unstable")] @@ -1110,7 +1106,6 @@ impl SessionInner { // anyway, it doesn't really matter, and this code will be cleaned up when the APIs // will be stabilized. let mut state = zwrite!(self.state); - let _tokens = std::mem::take(&mut state.tokens); let _matching_listeners = std::mem::take(&mut state.matching_listeners); drop(state); } @@ -1548,21 +1543,10 @@ impl SessionInner { } #[zenoh_macros::unstable] - pub(crate) fn declare_liveliness_inner( - &self, - key_expr: &KeyExpr, - ) -> ZResult> { - let mut state = zwrite!(self.state); + pub(crate) fn declare_liveliness_inner(&self, key_expr: &KeyExpr) -> ZResult { tracing::trace!("declare_liveliness({:?})", key_expr); let id = self.runtime.next_id(); - let tok_state = Arc::new(LivelinessTokenState { - id, - key_expr: key_expr.clone().into_owned(), - }); - - state.tokens.insert(tok_state.id, tok_state.clone()); - let primitives = state.primitives()?; - drop(state); + let primitives = zread!(self.state).primitives()?; primitives.send_declare(Declare { interest_id: None, ext_qos: declare::ext::QoSType::DECLARE, @@ -1573,7 +1557,7 @@ impl SessionInner { wire_expr: key_expr.to_wire(self).to_owned(), }), }); - Ok(tok_state) + Ok(id) } #[cfg(feature = "unstable")] @@ -1679,32 +1663,21 @@ impl SessionInner { #[zenoh_macros::unstable] pub(crate) fn undeclare_liveliness(&self, tid: Id) -> ZResult<()> { - let mut state = zwrite!(self.state); - let Ok(primitives) = state.primitives() else { + let Ok(primitives) = zread!(self.state).primitives() else { return Ok(()); }; - if let Some(tok_state) = state.tokens.remove(&tid) { - trace!("undeclare_liveliness({:?})", tok_state); - // Note: there might be several Tokens on the same KeyExpr. - let key_expr = &tok_state.key_expr; - let twin_tok = state.tokens.values().any(|s| s.key_expr == *key_expr); - if !twin_tok { - drop(state); - primitives.send_declare(Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id: tok_state.id, - ext_wire_expr: WireExprType::null(), - }), - }); - } - Ok(()) - } else { - Err(zerror!("Unable to find liveliness token").into()) - } + trace!("undeclare_liveliness({:?})", tid); + primitives.send_declare(Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareToken(UndeclareToken { + id: tid, + ext_wire_expr: WireExprType::null(), + }), + }); + Ok(()) } #[zenoh_macros::unstable] diff --git a/zenoh/tests/liveliness.rs b/zenoh/tests/liveliness.rs index a7de858c8..968b24303 100644 --- a/zenoh/tests/liveliness.rs +++ b/zenoh/tests/liveliness.rs @@ -4718,3 +4718,79 @@ async fn test_liveliness_issue_1470() { client0.close().await.unwrap(); client1.close().await.unwrap(); } + +/// ------------------------------------------------------- +/// DOUBLE UNDECLARE CLIQUE +/// ------------------------------------------------------- +#[cfg(feature = "unstable")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_liveliness_double_undeclare_clique() { + use std::time::Duration; + + use zenoh::{config::WhatAmI, sample::SampleKind}; + use zenoh_config::EndPoint; + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const PEER1_ENDPOINT: &str = "tcp/localhost:30515"; + const LIVELINESS_KEYEXPR: &str = "test/liveliness/double/undeclare/clique"; + + zenoh_util::init_log_from_env_or("error"); + + let peer1 = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (1) ZID: {}", s.zid()); + s + }; + + let peer2 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (2) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(peer1.liveliness().declare_subscriber(LIVELINESS_KEYEXPR)).unwrap(); + tokio::time::sleep(SLEEP).await; + + let token = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert!(sample.kind() == SampleKind::Put); + assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + let token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().unwrap().is_none()); + + token.undeclare().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().unwrap().is_none()); + + token2.undeclare().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert!(sample.kind() == SampleKind::Delete); + assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + + sub.undeclare().await.unwrap(); + + peer1.close().await.unwrap(); + peer2.close().await.unwrap(); +}