Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug declaring/undeclaring tokens with the same key #1619

Merged
merged 3 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ impl Wait for LivelinessTokenBuilder<'_, '_> {
session
.0
.declare_liveliness_inner(&key_expr)
.map(|tok_state| LivelinessToken {
.map(|id| LivelinessToken {
session: self.session.downgrade(),
state: tok_state,
id,
undeclare_on_drop: true,
})
}
Expand All @@ -272,13 +272,6 @@ impl IntoFuture for LivelinessTokenBuilder<'_, '_> {
}
}

#[zenoh_macros::unstable]
#[derive(Debug)]
pub(crate) struct LivelinessTokenState {
pub(crate) id: Id,
pub(crate) key_expr: KeyExpr<'static>,
}

/// A token whose liveliness is tied to the Zenoh [`Session`](Session).
///
/// A declared liveliness token will be seen as alive by any other Zenoh
Expand Down Expand Up @@ -308,7 +301,7 @@ pub(crate) struct LivelinessTokenState {
#[derive(Debug)]
pub struct LivelinessToken {
session: WeakSession,
state: Arc<LivelinessTokenState>,
id: Id,
undeclare_on_drop: bool,
}

Expand Down Expand Up @@ -382,7 +375,7 @@ impl LivelinessToken {
fn undeclare_impl(&mut self) -> ZResult<()> {
// set the flag first to avoid double panic if this function panic
self.undeclare_on_drop = false;
self.session.undeclare_liveliness(self.state.id)
self.session.undeclare_liveliness(self.id)
}
}

Expand Down
61 changes: 17 additions & 44 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use zenoh_task::TaskController;
use crate::api::selector::ZenohParameters;
#[cfg(feature = "unstable")]
use crate::api::{
liveliness::{Liveliness, LivelinessTokenState},
liveliness::Liveliness,
publisher::Publisher,
publisher::{MatchingListenerState, MatchingStatus},
query::LivelinessQueryState,
Expand Down Expand Up @@ -137,8 +137,6 @@ pub(crate) struct SessionState {
pub(crate) liveliness_subscribers: HashMap<Id, Arc<SubscriberState>>,
pub(crate) queryables: HashMap<Id, Arc<QueryableState>>,
#[cfg(feature = "unstable")]
pub(crate) tokens: HashMap<Id, Arc<LivelinessTokenState>>,
#[cfg(feature = "unstable")]
pub(crate) matching_listeners: HashMap<Id, Arc<MatchingListenerState>>,
pub(crate) queries: HashMap<RequestId, QueryState>,
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -170,8 +168,6 @@ impl SessionState {
liveliness_subscribers: HashMap::new(),
queryables: HashMap::new(),
#[cfg(feature = "unstable")]
tokens: HashMap::new(),
#[cfg(feature = "unstable")]
matching_listeners: HashMap::new(),
queries: HashMap::new(),
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -1110,7 +1106,6 @@ impl SessionInner {
// anyway, it doesn't really matter, and this code will be cleaned up when the APIs
// will be stabilized.
let mut state = zwrite!(self.state);
let _tokens = std::mem::take(&mut state.tokens);
let _matching_listeners = std::mem::take(&mut state.matching_listeners);
drop(state);
}
Expand Down Expand Up @@ -1548,21 +1543,10 @@ impl SessionInner {
}

#[zenoh_macros::unstable]
pub(crate) fn declare_liveliness_inner(
&self,
key_expr: &KeyExpr,
) -> ZResult<Arc<LivelinessTokenState>> {
let mut state = zwrite!(self.state);
pub(crate) fn declare_liveliness_inner(&self, key_expr: &KeyExpr) -> ZResult<Id> {
tracing::trace!("declare_liveliness({:?})", key_expr);
let id = self.runtime.next_id();
let tok_state = Arc::new(LivelinessTokenState {
id,
key_expr: key_expr.clone().into_owned(),
});

state.tokens.insert(tok_state.id, tok_state.clone());
let primitives = state.primitives()?;
drop(state);
let primitives = zread!(self.state).primitives()?;
primitives.send_declare(Declare {
interest_id: None,
ext_qos: declare::ext::QoSType::DECLARE,
Expand All @@ -1573,7 +1557,7 @@ impl SessionInner {
wire_expr: key_expr.to_wire(self).to_owned(),
}),
});
Ok(tok_state)
Ok(id)
}

#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -1679,32 +1663,21 @@ impl SessionInner {

#[zenoh_macros::unstable]
pub(crate) fn undeclare_liveliness(&self, tid: Id) -> ZResult<()> {
let mut state = zwrite!(self.state);
let Ok(primitives) = state.primitives() else {
let Ok(primitives) = zread!(self.state).primitives() else {
return Ok(());
};
if let Some(tok_state) = state.tokens.remove(&tid) {
trace!("undeclare_liveliness({:?})", tok_state);
// Note: there might be several Tokens on the same KeyExpr.
let key_expr = &tok_state.key_expr;
let twin_tok = state.tokens.values().any(|s| s.key_expr == *key_expr);
if !twin_tok {
drop(state);
primitives.send_declare(Declare {
interest_id: None,
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareToken(UndeclareToken {
id: tok_state.id,
ext_wire_expr: WireExprType::null(),
}),
});
}
Ok(())
} else {
Err(zerror!("Unable to find liveliness token").into())
}
trace!("undeclare_liveliness({:?})", tid);
primitives.send_declare(Declare {
interest_id: None,
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareToken(UndeclareToken {
id: tid,
ext_wire_expr: WireExprType::null(),
}),
});
Ok(())
}

#[zenoh_macros::unstable]
Expand Down
76 changes: 76 additions & 0 deletions zenoh/tests/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4718,3 +4718,79 @@ async fn test_liveliness_issue_1470() {
client0.close().await.unwrap();
client1.close().await.unwrap();
}

/// -------------------------------------------------------
/// DOUBLE UNDECLARE CLIQUE
/// -------------------------------------------------------
#[cfg(feature = "unstable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_liveliness_double_undeclare_clique() {
use std::time::Duration;

use zenoh::{config::WhatAmI, sample::SampleKind};
use zenoh_config::EndPoint;
const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);
const PEER1_ENDPOINT: &str = "tcp/localhost:30515";
const LIVELINESS_KEYEXPR: &str = "test/liveliness/double/undeclare/clique";

zenoh_util::init_log_from_env_or("error");

let peer1 = {
let mut c = zenoh::Config::default();
c.listen
.endpoints
.set(vec![PEER1_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Peer));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Peer (1) ZID: {}", s.zid());
s
};

let peer2 = {
let mut c = zenoh::Config::default();
c.connect
.endpoints
.set(vec![PEER1_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Peer));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Peer (2) ZID: {}", s.zid());
s
};

let sub = ztimeout!(peer1.liveliness().declare_subscriber(LIVELINESS_KEYEXPR)).unwrap();
tokio::time::sleep(SLEEP).await;

let token = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Put);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

let token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap();
tokio::time::sleep(SLEEP).await;

assert!(sub.try_recv().unwrap().is_none());

token.undeclare().await.unwrap();
tokio::time::sleep(SLEEP).await;

assert!(sub.try_recv().unwrap().is_none());

token2.undeclare().await.unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Delete);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

sub.undeclare().await.unwrap();

peer1.close().await.unwrap();
peer2.close().await.unwrap();
}