Skip to content

Commit

Permalink
Fix bugs querying liveliness tokens (#1374)
Browse files Browse the repository at this point in the history
* Fix bug in liveliness get in client

* Fix bug treating token interests replies from routers in peers

* Peers propagate current token interests to remote peers with unfinalize initial declarations push

* Don't register current interests declaration replies

* Add comments

* Add comments

* Add comments
  • Loading branch information
OlivierHecart authored Sep 9, 2024
1 parent 55d5395 commit 7387e28
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 32 deletions.
9 changes: 4 additions & 5 deletions zenoh/src/net/routing/hat/client/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,9 @@ fn declare_simple_token(
interest_id: Option<InterestId>,
send_declare: &mut SendDeclare,
) {
register_simple_token(tables, face, id, res);

propagate_simple_token(tables, res, face, send_declare);

let wire_expr = Resource::decl_key(res, face, true);
if let Some(interest_id) = interest_id {
if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) {
let wire_expr = Resource::get_best_key(res, "", interest.src_face.id);
send_declare(
&interest.src_face.primitives,
RoutingContext::with_expr(
Expand All @@ -137,6 +133,9 @@ fn declare_simple_token(
),
)
}
} else {
register_simple_token(tables, face, id, res);
propagate_simple_token(tables, res, face, send_declare);
}
}

Expand Down
15 changes: 8 additions & 7 deletions zenoh/src/net/routing/hat/p2p_peer/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use zenoh_protocol::{
use zenoh_sync::get_mut_unchecked;

use super::{
face_hat, face_hat_mut, pubsub::declare_sub_interest, queries::declare_qabl_interest,
token::declare_token_interest, HatCode, HatFace,
face_hat, face_hat_mut, initial_interest, pubsub::declare_sub_interest,
queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace,
};
use crate::net::routing::{
dispatcher::{
Expand Down Expand Up @@ -132,11 +132,12 @@ impl HatInterestTrait for HatCode {
src_interest_id: id,
});

for dst_face in tables
.faces
.values_mut()
.filter(|f| f.whatami == WhatAmI::Router)
{
for dst_face in tables.faces.values_mut().filter(|f| {
f.whatami == WhatAmI::Router
|| (options.tokens()
&& f.whatami == WhatAmI::Peer
&& !initial_interest(f).map(|i| i.finalized).unwrap_or(true))
}) {
let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst);
get_mut_unchecked(dst_face).local_interests.insert(
id,
Expand Down
17 changes: 15 additions & 2 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl HatBaseTrait for HatCode {
}
if face.state.whatami == WhatAmI::Peer {
get_mut_unchecked(&mut face.state).local_interests.insert(
0,
INITIAL_INTEREST_ID,
InterestState {
options: InterestOptions::ALL,
res: None,
Expand Down Expand Up @@ -418,7 +418,7 @@ struct HatFace {
impl HatFace {
fn new() -> Self {
Self {
next_id: AtomicU32::new(0),
next_id: AtomicU32::new(1), // In p2p, id 0 is erserved for initial interest
remote_interests: HashMap::new(),
local_subs: HashMap::new(),
remote_subs: HashMap::new(),
Expand All @@ -440,3 +440,16 @@ fn get_routes_entries() -> RoutesIndexes {
clients: vec![0],
}
}

// In p2p, at connection, while no interest is sent on the network,
// peers act as if they received an interest CurrentFuture with id 0
// and send back a DeclareFinal with interest_id 0.
// This 'ghost' interest is registered locally to allow tracking if
// the DeclareFinal has been received or not (finalized).

const INITIAL_INTEREST_ID: u32 = 0;

#[inline]
fn initial_interest(face: &FaceState) -> Option<&InterestState> {
face.local_interests.get(&INITIAL_INTEREST_ID)
}
10 changes: 4 additions & 6 deletions zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ use crate::{
resource::{NodeId, Resource, SessionContext},
tables::{Route, RoutingExpr, Tables},
},
hat::{CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources},
hat::{
p2p_peer::initial_interest, CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources,
},
router::{update_data_routes_from, RoutesIndexes},
RoutingContext,
},
Expand Down Expand Up @@ -654,11 +656,7 @@ impl HatPubSubTrait for HatCode {

for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
&& !f
.local_interests
.get(&0)
.map(|i| i.finalized)
.unwrap_or(true)
&& !initial_interest(f).map(|i| i.finalized).unwrap_or(true)
}) {
route.entry(face.id).or_insert_with(|| {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
Expand Down
8 changes: 2 additions & 6 deletions zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::net::routing::{
resource::{NodeId, Resource, SessionContext},
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
},
hat::{CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources},
hat::{p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources},
router::{update_query_routes_from, RoutesIndexes},
RoutingContext,
};
Expand Down Expand Up @@ -602,11 +602,7 @@ impl HatQueriesTrait for HatCode {

for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
&& !f
.local_interests
.get(&0)
.map(|i| i.finalized)
.unwrap_or(true)
&& !initial_interest(f).map(|i| i.finalized).unwrap_or(true)
}) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
Expand Down
31 changes: 25 additions & 6 deletions zenoh/src/net/routing/hat/p2p_peer/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,30 @@ fn declare_simple_token(
face: &mut Arc<FaceState>,
id: TokenId,
res: &mut Arc<Resource>,
interest_id: Option<InterestId>,
send_declare: &mut SendDeclare,
) {
register_simple_token(tables, face, id, res);

propagate_simple_token(tables, res, face, send_declare);
if let Some(interest_id) = interest_id {
if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) {
let wire_expr = Resource::get_best_key(res, "", interest.src_face.id);
send_declare(
&interest.src_face.primitives,
RoutingContext::with_expr(
Declare {
interest_id: Some(interest.src_interest_id),
ext_qos: ext::QoSType::default(),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::default(),
body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }),
},
res.expr(),
),
)
}
} else {
register_simple_token(tables, face, id, res);
propagate_simple_token(tables, res, face, send_declare);
}
}

#[inline]
Expand Down Expand Up @@ -411,7 +430,7 @@ pub(crate) fn declare_token_interest(
aggregate: bool,
send_declare: &mut SendDeclare,
) {
if mode.current() && face.whatami == WhatAmI::Client {
if mode.current() {
let interest_id = (!mode.future()).then_some(id);
if let Some(res) = res.as_ref() {
if aggregate {
Expand Down Expand Up @@ -525,10 +544,10 @@ impl HatTokenTrait for HatCode {
id: TokenId,
res: &mut Arc<Resource>,
_node_id: NodeId,
_interest_id: Option<InterestId>,
interest_id: Option<InterestId>,
send_declare: &mut SendDeclare,
) {
declare_simple_token(tables, face, id, res, send_declare)
declare_simple_token(tables, face, id, res, interest_id, send_declare)
}

fn undeclare_token(
Expand Down

0 comments on commit 7387e28

Please sign in to comment.