diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 57d3dbb041..6e1db6bbf1 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -359,15 +359,19 @@ impl Primitives for Face { .entry(id) .and_modify(|interest| interest.finalized = true); + let mut wtables = zwrite!(self.tables.tables); let mut declares = vec![]; - declare_final(&mut self.state.clone(), id, &mut |p, m| { - declares.push((p.clone(), m)) - }); + declare_final( + ctrl_lock.as_ref(), + &mut wtables, + &mut self.state.clone(), + id, + &mut |p, m| declares.push((p.clone(), m)), + ); // recompute routes // TODO: disable routes and recompute them in parallel to avoid holding // tables write lock for a long time. - let mut wtables = zwrite!(self.tables.tables); let mut root_res = wtables.root_res.clone(); update_data_routes_from(&mut wtables, &mut root_res); update_query_routes_from(&mut wtables, &mut root_res); diff --git a/zenoh/src/net/routing/dispatcher/interests.rs b/zenoh/src/net/routing/dispatcher/interests.rs index 17f5883fca..5ee860da2f 100644 --- a/zenoh/src/net/routing/dispatcher/interests.rs +++ b/zenoh/src/net/routing/dispatcher/interests.rs @@ -34,7 +34,7 @@ use zenoh_util::Timed; use super::{ face::FaceState, - tables::{register_expr_interest, TablesLock}, + tables::{register_expr_interest, Tables, TablesLock}, }; use crate::net::routing::{ hat::{HatTrait, SendDeclare}, @@ -73,6 +73,8 @@ impl RemoteInterest { } pub(crate) fn declare_final( + hat_code: &(dyn HatTrait + Send + Sync), + wtables: &mut Tables, face: &mut Arc, id: InterestId, send_declare: &mut SendDeclare, @@ -83,6 +85,8 @@ pub(crate) fn declare_final( { finalize_pending_interest(interest, send_declare); } + + hat_code.declare_final(wtables, face, id); } pub(crate) fn finalize_pending_interests( diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index 4dd447360e..6aac7d0a2d 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -29,9 +29,12 @@ use zenoh_sync::get_mut_unchecked; use super::face::FaceState; pub use super::{pubsub::*, queries::*, resource::*}; -use crate::net::routing::{ - hat::{self, HatTrait}, - interceptor::{interceptor_factories, InterceptorFactory}, +use crate::net::{ + routing::{ + hat::{self, HatTrait}, + interceptor::{interceptor_factories, InterceptorFactory}, + }, + runtime::WeakRuntime, }; pub(crate) struct RoutingExpr<'a> { @@ -62,6 +65,7 @@ impl<'a> RoutingExpr<'a> { pub struct Tables { pub(crate) zid: ZenohIdProto, pub(crate) whatami: WhatAmI, + pub(crate) runtime: Option, pub(crate) face_counter: usize, #[allow(dead_code)] pub(crate) hlc: Option>, @@ -93,6 +97,7 @@ impl Tables { Ok(Tables { zid, whatami, + runtime: None, face_counter: 0, hlc, drop_future_timestamp, diff --git a/zenoh/src/net/routing/hat/client/interests.rs b/zenoh/src/net/routing/hat/client/interests.rs index c6cfcf94f4..c534c65bad 100644 --- a/zenoh/src/net/routing/hat/client/interests.rs +++ b/zenoh/src/net/routing/hat/client/interests.rs @@ -231,4 +231,8 @@ impl HatInterestTrait for HatCode { } } } + + fn declare_final(&self, _tables: &mut Tables, _face: &mut Arc, _id: InterestId) { + // Nothing + } } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/interests.rs b/zenoh/src/net/routing/hat/linkstate_peer/interests.rs index 27194c2762..c97fc1960f 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/interests.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/interests.rs @@ -107,4 +107,8 @@ impl HatInterestTrait for HatCode { fn undeclare_interest(&self, _tables: &mut Tables, face: &mut Arc, id: InterestId) { face_hat_mut!(face).remote_interests.remove(&id); } + + fn declare_final(&self, _tables: &mut Tables, _face: &mut Arc, _id: InterestId) { + // Nothing + } } diff --git a/zenoh/src/net/routing/hat/mod.rs b/zenoh/src/net/routing/hat/mod.rs index 74850ea4c1..fb2ae44c3a 100644 --- a/zenoh/src/net/routing/hat/mod.rs +++ b/zenoh/src/net/routing/hat/mod.rs @@ -154,6 +154,7 @@ pub(crate) trait HatInterestTrait { send_declare: &mut SendDeclare, ); fn undeclare_interest(&self, tables: &mut Tables, face: &mut Arc, id: InterestId); + fn declare_final(&self, tables: &mut Tables, face: &mut Arc, id: InterestId); } pub(crate) trait HatPubSubTrait { diff --git a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs index 5f1daeb83d..b9155b44b3 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs @@ -451,7 +451,7 @@ impl Network { } } } - if self.wait_declares && src_whatami != WhatAmI::Peer { + if (!self.wait_declares) || src_whatami != WhatAmI::Peer { zenoh_runtime::ZRuntime::Net.block_in_place( strong_runtime .start_conditions() diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 2853c2790f..b0d7c98426 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -24,8 +24,8 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{ - face_hat, face_hat_mut, hat, initial_interest, pubsub::declare_sub_interest, - queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace, HatTables, + face_hat, face_hat_mut, initial_interest, pubsub::declare_sub_interest, + queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace, INITIAL_INTEREST_ID, }; use crate::net::routing::{ @@ -256,10 +256,13 @@ impl HatInterestTrait for HatCode { } } } + } + + fn declare_final(&self, tables: &mut Tables, face: &mut Arc, id: InterestId) { if id == INITIAL_INTEREST_ID { zenoh_runtime::ZRuntime::Net.block_in_place(async move { - if let Some(net) = &hat!(tables).gossip { - if let Some(runtime) = net.runtime.upgrade() { + if let Some(runtime) = &tables.runtime { + if let Some(runtime) = runtime.upgrade() { runtime .start_conditions() .terminate_peer_connector_zid(face.zid) diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 8fb4e97c0d..3a33c28c3c 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -75,13 +75,6 @@ mod pubsub; mod queries; mod token; -macro_rules! hat { - ($t:expr) => { - $t.hat.downcast_ref::().unwrap() - }; -} -use hat; - macro_rules! hat_mut { ($t:expr) => { $t.hat.downcast_mut::().unwrap() @@ -132,16 +125,18 @@ impl HatBaseTrait for HatCode { unwrap_or_default!(config.routing().router().peers_failover_brokering()); drop(config_guard); - hat_mut!(tables).gossip = Some(Network::new( - "[Gossip]".to_string(), - tables.zid, - runtime, - router_peers_failover_brokering, - gossip, - gossip_multihop, - autoconnect, - wait_declares, - )); + if gossip { + hat_mut!(tables).gossip = Some(Network::new( + "[Gossip]".to_string(), + tables.zid, + runtime, + router_peers_failover_brokering, + gossip, + gossip_multihop, + autoconnect, + wait_declares, + )); + } } fn new_tables(&self, _router_peers_failover_brokering: bool) -> Box { @@ -326,11 +321,9 @@ impl HatBaseTrait for HatCode { wtables.faces.remove(&face.id); if face.whatami != WhatAmI::Client { - hat_mut!(wtables) - .gossip - .as_mut() - .unwrap() - .remove_link(&face.zid); + if let Some(net) = hat_mut!(wtables).gossip.as_mut() { + net.remove_link(&face.zid); + } }; drop(wtables); } @@ -346,15 +339,15 @@ impl HatBaseTrait for HatCode { if oam.id == OAM_LINKSTATE { if let ZExtBody::ZBuf(buf) = oam.body { if let Ok(zid) = transport.get_zid() { - use zenoh_buffers::reader::HasReader; - use zenoh_codec::RCodec; - let codec = Zenoh080Routing::new(); - let mut reader = buf.reader(); - let list: LinkStateList = codec.read(&mut reader).unwrap(); - let whatami = transport.get_whatami()?; if whatami != WhatAmI::Client { if let Some(net) = hat_mut!(tables).gossip.as_mut() { + use zenoh_buffers::reader::HasReader; + use zenoh_codec::RCodec; + let codec = Zenoh080Routing::new(); + let mut reader = buf.reader(); + let list: LinkStateList = codec.read(&mut reader).unwrap(); + net.link_states(list.link_states, zid, whatami); } }; diff --git a/zenoh/src/net/routing/hat/router/interests.rs b/zenoh/src/net/routing/hat/router/interests.rs index 18929ba72c..898fed5e62 100644 --- a/zenoh/src/net/routing/hat/router/interests.rs +++ b/zenoh/src/net/routing/hat/router/interests.rs @@ -117,6 +117,10 @@ impl HatInterestTrait for HatCode { fn undeclare_interest(&self, _tables: &mut Tables, face: &mut Arc, id: InterestId) { face_hat_mut!(face).remote_interests.remove(&id); } + + fn declare_final(&self, _tables: &mut Tables, _face: &mut Arc, _id: InterestId) { + // Nothing + } } #[inline] diff --git a/zenoh/src/net/routing/router.rs b/zenoh/src/net/routing/router.rs index cd525189d3..3a368f30ea 100644 --- a/zenoh/src/net/routing/router.rs +++ b/zenoh/src/net/routing/router.rs @@ -64,6 +64,7 @@ impl Router { pub fn init_link_state(&mut self, runtime: Runtime) { let ctrl_lock = zlock!(self.tables.ctrl_lock); let mut tables = zwrite!(self.tables.tables); + tables.runtime = Some(Runtime::downgrade(&runtime)); ctrl_lock.init(&mut tables, runtime) } diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 93a43c627f..1ed468bbf9 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -949,7 +949,16 @@ impl Runtime { let locators = scouted_locators .iter() - .filter(|l| !configured_locators.contains(l)); + .filter(|l| !configured_locators.contains(l)) + .collect::>(); + + if locators.is_empty() { + tracing::debug!( + "Already connecting to locators of {} (connect configuration). Ignore.", + zid + ); + return false; + } let manager = self.manager();