Skip to content

Commit

Permalink
Bug/liveliness duplicate (#1404)
Browse files Browse the repository at this point in the history
* Don't resend identical declares with different ids

* Clients and peers don't return local tokens from routers to avoid duplicates

* User session answers LivelinessSubscribers history with locally known tokens

* Session deduplicates incomming tokens
  • Loading branch information
OlivierHecart authored and fuzzypixelz committed Sep 23, 2024
1 parent bd179cb commit f730ea3
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 232 deletions.
42 changes: 38 additions & 4 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#[cfg(feature = "unstable")]
use std::collections::hash_map::Entry;
use std::{
collections::HashMap,
convert::TryInto,
Expand Down Expand Up @@ -1560,7 +1562,7 @@ impl SessionInner {
remote_id: id,
key_expr: key_expr.clone().into_owned(),
origin,
callback,
callback: callback.clone(),
};

let sub_state = Arc::new(sub_state);
Expand Down Expand Up @@ -1591,9 +1593,42 @@ impl SessionInner {
}
}

let known_tokens = if history {
state
.remote_tokens
.values()
.filter(|token| key_expr.intersects(token))
.cloned()
.collect::<Vec<KeyExpr<'static>>>()
} else {
vec![]
};

let primitives = state.primitives()?;
drop(state);

if !known_tokens.is_empty() {
self.task_controller
.spawn_with_rt(zenoh_runtime::ZRuntime::Net, async move {
for token in known_tokens {
callback(Sample {
key_expr: token,
payload: ZBytes::empty(),
kind: SampleKind::Put,
encoding: Encoding::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
reliability: Reliability::Reliable,
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
attachment: None,
})
}
});
}

primitives.send_interest(Interest {
id,
mode: if history {
Expand Down Expand Up @@ -2253,9 +2288,8 @@ impl Primitives for WeakSession {

query.callback.call(reply);
}
} else {
state.remote_tokens.insert(m.id, key_expr.clone());

} else if let Entry::Vacant(e) = state.remote_tokens.entry(m.id) {
e.insert(key_expr.clone());
drop(state);

self.execute_subscriber_callbacks(
Expand Down
41 changes: 20 additions & 21 deletions zenoh/src/net/routing/hat/client/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,21 @@ pub(super) fn token_new_face(
}
}

#[inline]
fn make_token_id(res: &Arc<Resource>, face: &mut Arc<FaceState>, mode: InterestMode) -> u32 {
if mode.future() {
if let Some(id) = face_hat!(face).local_tokens.get(res) {
*id
} else {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face).local_tokens.insert(res.clone(), id);
id
}
} else {
0
}
}

pub(crate) fn declare_token_interest(
tables: &mut Tables,
face: &mut Arc<FaceState>,
Expand All @@ -304,13 +319,7 @@ pub(crate) fn declare_token_interest(
.values()
.any(|token| token.context.is_some() && token.matches(res))
}) {
let id = if mode.future() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face).local_tokens.insert((*res).clone(), id);
id
} else {
0
};
let id = make_token_id(res, face, mode);
let wire_expr = Resource::decl_key(res, face, true);
send_declare(
&face.primitives,
Expand All @@ -330,18 +339,13 @@ pub(crate) fn declare_token_interest(
for src_face in tables
.faces
.values()
.filter(|f| f.whatami == WhatAmI::Client)
.cloned()
.collect::<Vec<Arc<FaceState>>>()
{
for token in face_hat!(src_face).remote_tokens.values() {
if token.context.is_some() && token.matches(res) {
let id = if mode.future() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face).local_tokens.insert(token.clone(), id);
id
} else {
0
};
let id = make_token_id(token, face, mode);
let wire_expr = Resource::decl_key(token, face, true);
send_declare(
&face.primitives,
Expand All @@ -367,17 +371,12 @@ pub(crate) fn declare_token_interest(
for src_face in tables
.faces
.values()
.filter(|f| f.whatami == WhatAmI::Client)
.cloned()
.collect::<Vec<Arc<FaceState>>>()
{
for token in face_hat!(src_face).remote_tokens.values() {
let id = if mode.future() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face).local_tokens.insert(token.clone(), id);
id
} else {
0
};
let id = make_token_id(token, face, mode);
let wire_expr = Resource::decl_key(token, face, true);
send_declare(
&face.primitives,
Expand Down
39 changes: 18 additions & 21 deletions zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,21 @@ pub(super) fn pubsub_tree_change(tables: &mut Tables, new_children: &[Vec<NodeIn
update_data_routes_from(tables, &mut tables.root_res.clone());
}

#[inline]
fn make_sub_id(res: &Arc<Resource>, face: &mut Arc<FaceState>, mode: InterestMode) -> u32 {
if mode.future() {
if let Some(id) = face_hat!(face).local_subs.get(res) {
*id
} else {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face).local_subs.insert(res.clone(), id);
id
}
} else {
0
}
}

pub(super) fn declare_sub_interest(
tables: &mut Tables,
face: &mut Arc<FaceState>,
Expand All @@ -690,13 +705,7 @@ pub(super) fn declare_sub_interest(
&& sub.matches(res)
&& (remote_simple_subs(sub, face) || remote_linkstatepeer_subs(tables, sub))
}) {
let id = if mode.future() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face).local_subs.insert((*res).clone(), id);
id
} else {
0
};
let id = make_sub_id(res, face, mode);
let wire_expr = Resource::decl_key(res, face, face.whatami != WhatAmI::Client);
send_declare(
&face.primitives,
Expand All @@ -721,13 +730,7 @@ pub(super) fn declare_sub_interest(
&& sub.matches(res)
&& (remote_simple_subs(sub, face) || remote_linkstatepeer_subs(tables, sub))
{
let id = if mode.future() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face).local_subs.insert(sub.clone(), id);
id
} else {
0
};
let id = make_sub_id(sub, face, mode);
let wire_expr =
Resource::decl_key(sub, face, face.whatami != WhatAmI::Client);
send_declare(
Expand All @@ -754,13 +757,7 @@ pub(super) fn declare_sub_interest(
if sub.context.is_some()
&& (remote_simple_subs(sub, face) || remote_linkstatepeer_subs(tables, sub))
{
let id = if mode.future() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face).local_subs.insert(sub.clone(), id);
id
} else {
0
};
let id = make_sub_id(sub, face, mode);
let wire_expr = Resource::decl_key(sub, face, face.whatami != WhatAmI::Client);
send_declare(
&face.primitives,
Expand Down
52 changes: 25 additions & 27 deletions zenoh/src/net/routing/hat/linkstate_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,28 @@ lazy_static::lazy_static! {
static ref EMPTY_ROUTE: Arc<QueryTargetQablSet> = Arc::new(Vec::new());
}

#[inline]
fn make_qabl_id(
res: &Arc<Resource>,
face: &mut Arc<FaceState>,
mode: InterestMode,
info: QueryableInfoType,
) -> u32 {
if mode.future() {
if let Some((id, _)) = face_hat!(face).local_qabls.get(res) {
*id
} else {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face)
.local_qabls
.insert(res.clone(), (id, info));
id
}
} else {
0
}
}

pub(super) fn declare_qabl_interest(
tables: &mut Tables,
face: &mut Arc<FaceState>,
Expand All @@ -758,15 +780,7 @@ pub(super) fn declare_qabl_interest(
|| remote_linkstatepeer_qabls(tables, qabl))
}) {
let info = local_qabl_info(tables, res, face);
let id = if mode.future() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face)
.local_qabls
.insert((*res).clone(), (id, info));
id
} else {
0
};
let id = make_qabl_id(res, face, mode, info);
let wire_expr = Resource::decl_key(res, face, face.whatami != WhatAmI::Client);
send_declare(
&face.primitives,
Expand Down Expand Up @@ -794,15 +808,7 @@ pub(super) fn declare_qabl_interest(
|| remote_linkstatepeer_qabls(tables, qabl))
{
let info = local_qabl_info(tables, qabl, face);
let id = if mode.future() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face)
.local_qabls
.insert(qabl.clone(), (id, info));
id
} else {
0
};
let id = make_qabl_id(qabl, face, mode, info);
let key_expr =
Resource::decl_key(qabl, face, face.whatami != WhatAmI::Client);
send_declare(
Expand Down Expand Up @@ -831,15 +837,7 @@ pub(super) fn declare_qabl_interest(
&& (remote_simple_qabls(qabl, face) || remote_linkstatepeer_qabls(tables, qabl))
{
let info = local_qabl_info(tables, qabl, face);
let id = if mode.future() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face)
.local_qabls
.insert(qabl.clone(), (id, info));
id
} else {
0
};
let id = make_qabl_id(qabl, face, mode, info);
let key_expr = Resource::decl_key(qabl, face, face.whatami != WhatAmI::Client);
send_declare(
&face.primitives,
Expand Down
39 changes: 18 additions & 21 deletions zenoh/src/net/routing/hat/linkstate_peer/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,21 @@ pub(super) fn token_tree_change(tables: &mut Tables, new_clildren: &[Vec<NodeInd
}
}

#[inline]
fn make_token_id(res: &Arc<Resource>, face: &mut Arc<FaceState>, mode: InterestMode) -> u32 {
if mode.future() {
if let Some(id) = face_hat!(face).local_tokens.get(res) {
*id
} else {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face).local_tokens.insert(res.clone(), id);
id
}
} else {
0
}
}

pub(crate) fn declare_token_interest(
tables: &mut Tables,
face: &mut Arc<FaceState>,
Expand All @@ -652,13 +667,7 @@ pub(crate) fn declare_token_interest(
&& (remote_simple_tokens(tables, token, face)
|| remote_linkstatepeer_tokens(tables, token))
}) {
let id = if mode.future() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face).local_tokens.insert((*res).clone(), id);
id
} else {
0
};
let id = make_token_id(res, face, mode);
let wire_expr = Resource::decl_key(res, face, face.whatami != WhatAmI::Client);
send_declare(
&face.primitives,
Expand All @@ -681,13 +690,7 @@ pub(crate) fn declare_token_interest(
&& (remote_simple_tokens(tables, token, face)
|| remote_linkstatepeer_tokens(tables, token))
{
let id = if mode.future() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face).local_tokens.insert(token.clone(), id);
id
} else {
0
};
let id = make_token_id(token, face, mode);
let wire_expr =
Resource::decl_key(token, face, face.whatami != WhatAmI::Client);
send_declare(
Expand All @@ -712,13 +715,7 @@ pub(crate) fn declare_token_interest(
&& (remote_simple_tokens(tables, token, face)
|| remote_linkstatepeer_tokens(tables, token))
{
let id = if mode.future() {
let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst);
face_hat_mut!(face).local_tokens.insert(token.clone(), id);
id
} else {
0
};
let id = make_token_id(token, face, mode);
let wire_expr =
Resource::decl_key(token, face, face.whatami != WhatAmI::Client);
send_declare(
Expand Down
Loading

0 comments on commit f730ea3

Please sign in to comment.