Skip to content

Commit

Permalink
Prevent liveliness change notifications during and after session close (
Browse files Browse the repository at this point in the history
#1466)

* Ignore incomming message during and after session close

* Add test
  • Loading branch information
OlivierHecart authored Sep 24, 2024
1 parent 3b6d773 commit bf010dc
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
30 changes: 30 additions & 0 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,9 @@ impl SessionInner {
) {
let mut callbacks = SingleOrVec::default();
let state = zread!(self.state);
if state.primitives.is_none() {
return; // Session closing or closed
}
if key_expr.suffix.is_empty() {
match state.get_res(&key_expr.scope, key_expr.mapping, local) {
Some(Resource::Node(res)) => {
Expand Down Expand Up @@ -2203,6 +2206,9 @@ impl SessionInner {
) {
let (primitives, key_expr, queryables) = {
let state = zread!(self.state);
if state.primitives.is_none() {
return; // Session closing or closed
}
let Ok(primitives) = state.primitives() else {
return;
};
Expand Down Expand Up @@ -2278,6 +2284,9 @@ impl Primitives for WeakSession {
zenoh_protocol::network::DeclareBody::DeclareKeyExpr(m) => {
trace!("recv DeclareKeyExpr {} {:?}", m.id, m.wire_expr);
let state = &mut zwrite!(self.state);
if state.primitives.is_none() {
return; // Session closing or closed
}
match state.remote_key_to_expr(&m.wire_expr) {
Ok(key_expr) => {
let mut res_node = ResourceNode::new(key_expr.clone().into());
Expand Down Expand Up @@ -2310,6 +2319,9 @@ impl Primitives for WeakSession {
#[cfg(feature = "unstable")]
{
let mut state = zwrite!(self.state);
if state.primitives.is_none() {
return; // Session closing or closed
}
match state
.wireexpr_to_keyexpr(&m.wire_expr, false)
.map(|e| e.into_owned())
Expand All @@ -2332,6 +2344,9 @@ impl Primitives for WeakSession {
#[cfg(feature = "unstable")]
{
let mut state = zwrite!(self.state);
if state.primitives.is_none() {
return; // Session closing or closed
}
if let Some(expr) = state.remote_subscribers.remove(&m.id) {
self.update_status_down(&state, &expr);
} else {
Expand All @@ -2350,6 +2365,9 @@ impl Primitives for WeakSession {
#[cfg(feature = "unstable")]
zenoh_protocol::network::DeclareBody::DeclareToken(m) => {
let mut state = zwrite!(self.state);
if state.primitives.is_none() {
return; // Session closing or closed
}
match state
.wireexpr_to_keyexpr(&m.wire_expr, false)
.map(|e| e.into_owned())
Expand Down Expand Up @@ -2407,6 +2425,9 @@ impl Primitives for WeakSession {
#[cfg(feature = "unstable")]
{
let mut state = zwrite!(self.state);
if state.primitives.is_none() {
return; // Session closing or closed
}
if let Some(key_expr) = state.remote_tokens.remove(&m.id) {
drop(state);

Expand Down Expand Up @@ -2540,6 +2561,9 @@ impl Primitives for WeakSession {
match msg.payload {
ResponseBody::Err(e) => {
let mut state = zwrite!(self.state);
if state.primitives.is_none() {
return; // Session closing or closed
}
match state.queries.get_mut(&msg.rid) {
Some(query) => {
let callback = query.callback.clone();
Expand All @@ -2562,6 +2586,9 @@ impl Primitives for WeakSession {
}
ResponseBody::Reply(m) => {
let mut state = zwrite!(self.state);
if state.primitives.is_none() {
return; // Session closing or closed
}
let key_expr = match state.remote_key_to_expr(&msg.wire_expr) {
Ok(key) => key.into_owned(),
Err(e) => {
Expand Down Expand Up @@ -2737,6 +2764,9 @@ impl Primitives for WeakSession {
fn send_response_final(&self, msg: ResponseFinal) {
trace!("recv ResponseFinal {:?}", msg);
let mut state = zwrite!(self.state);
if state.primitives.is_none() {
return; // Session closing or closed
}
match state.queries.get_mut(&msg.rid) {
Some(query) => {
query.nb_final -= 1;
Expand Down
56 changes: 56 additions & 0 deletions zenoh/tests/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,62 @@ async fn test_liveliness_query_local() {
peer.close().await.unwrap();
}

#[cfg(feature = "unstable")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_liveliness_after_close() {
use std::time::Duration;

use zenoh::{config::WhatAmI, sample::SampleKind};
use zenoh_config::EndPoint;
const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);
const PEER1_ENDPOINT: &str = "tcp/localhost:47447";
const LIVELINESS_KEYEXPR: &str = "test/liveliness/subscriber/clique";

zenoh_util::init_log_from_env_or("error");

let peer1 = {
let mut c = zenoh::Config::default();
c.listen
.endpoints
.set(vec![PEER1_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Peer));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Peer (1) ZID: {}", s.zid());
s
};

let peer2 = {
let mut c = zenoh::Config::default();
c.connect
.endpoints
.set(vec![PEER1_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Peer));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Peer (2) ZID: {}", s.zid());
s
};

let sub = ztimeout!(peer1.liveliness().declare_subscriber(LIVELINESS_KEYEXPR)).unwrap();
tokio::time::sleep(SLEEP).await;

let _token = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert!(sample.kind() == SampleKind::Put);
assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR);

peer1.close().await.unwrap();
tokio::time::sleep(SLEEP).await;

assert!(sub.try_recv().is_err())
}

/// -------------------------------------------------------
/// DOUBLE CLIENT
/// -------------------------------------------------------
Expand Down

0 comments on commit bf010dc

Please sign in to comment.