From af3ac7bdca37682130f01bb982276a4b7223a4ef Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 27 Sep 2024 12:42:01 +0200 Subject: [PATCH] Remove closing from hat trait (#1469) * Remove closing from hat trait * Also move orchestrator session closing code to closed phase * Remove closing from TransportPeerEventHandler --- io/zenoh-transport/src/lib.rs | 4 - io/zenoh-transport/src/multicast/transport.rs | 5 - .../src/unicast/lowlatency/transport.rs | 5 - .../src/unicast/universal/transport.rs | 5 - io/zenoh-transport/tests/endpoints.rs | 1 - .../tests/multicast_compression.rs | 2 - .../tests/multicast_transport.rs | 2 - .../tests/transport_whitelist.rs | 1 - .../tests/unicast_authenticator.rs | 1 - .../tests/unicast_compression.rs | 2 - .../tests/unicast_concurrent.rs | 1 - .../tests/unicast_intermittent.rs | 1 - .../tests/unicast_priorities.rs | 2 - io/zenoh-transport/tests/unicast_shm.rs | 1 - .../tests/unicast_simultaneous.rs | 1 - io/zenoh-transport/tests/unicast_transport.rs | 2 - zenoh/src/api/admin.rs | 4 - zenoh/src/net/primitives/demux.rs | 17 +- zenoh/src/net/primitives/mux.rs | 8 +- zenoh/src/net/routing/dispatcher/face.rs | 28 ++- zenoh/src/net/routing/dispatcher/tables.rs | 24 +-- zenoh/src/net/routing/hat/client/mod.rs | 11 +- .../src/net/routing/hat/linkstate_peer/mod.rs | 45 ++--- zenoh/src/net/routing/hat/mod.rs | 9 +- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 27 +-- zenoh/src/net/routing/hat/router/mod.rs | 173 ++++++++---------- zenoh/src/net/runtime/mod.rs | 22 +-- zenoh/src/net/runtime/orchestrator.rs | 2 +- zenoh/src/net/tests/tables.rs | 64 +++---- 29 files changed, 166 insertions(+), 304 deletions(-) diff --git a/io/zenoh-transport/src/lib.rs b/io/zenoh-transport/src/lib.rs index e603563b6e..f004b4d511 100644 --- a/io/zenoh-transport/src/lib.rs +++ b/io/zenoh-transport/src/lib.rs @@ -82,7 +82,6 @@ impl TransportEventHandler for DummyTransportEventHandler { /*************************************/ pub trait TransportMulticastEventHandler: Send + Sync { fn new_peer(&self, peer: TransportPeer) -> ZResult>; - fn closing(&self); fn closed(&self); fn as_any(&self) -> &dyn Any; } @@ -95,7 +94,6 @@ impl TransportMulticastEventHandler for DummyTransportMulticastEventHandler { fn new_peer(&self, _peer: TransportPeer) -> ZResult> { Ok(Arc::new(DummyTransportPeerEventHandler)) } - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { self @@ -121,7 +119,6 @@ pub trait TransportPeerEventHandler: Send + Sync { fn handle_message(&self, msg: NetworkMessage) -> ZResult<()>; fn new_link(&self, src: Link); fn del_link(&self, link: Link); - fn closing(&self); fn closed(&self); fn as_any(&self) -> &dyn Any; } @@ -137,7 +134,6 @@ impl TransportPeerEventHandler for DummyTransportPeerEventHandler { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index 36b9dbbea0..bcccaa9a85 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -178,11 +178,7 @@ impl TransportMulticastInner { pub(super) async fn delete(&self) -> ZResult<()> { tracing::debug!("Closing multicast transport on {:?}", self.locator); - // Notify the callback that we are going to close the transport let callback = zwrite!(self.callback).take(); - if let Some(cb) = callback.as_ref() { - cb.closing(); - } // Delete the transport on the manager let _ = self.manager.del_transport_multicast(&self.locator).await; @@ -441,7 +437,6 @@ impl TransportMulticastInner { // TODO(yuyuan): Unify the termination peer.token.cancel(); - peer.handler.closing(); drop(guard); peer.handler.closed(); } diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index c602dcf806..69d88af636 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -117,12 +117,7 @@ impl TransportUnicastLowlatency { // to avoid concurrent new_transport and closing/closed notifications let mut a_guard = self.get_alive().await; *a_guard = false; - - // Notify the callback that we are going to close the transport let callback = zwrite!(self.callback).take(); - if let Some(cb) = callback.as_ref() { - cb.closing(); - } // Delete the transport on the manager let _ = self.manager.del_transport_unicast(&self.config.zid).await; diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index f01a4a8f18..fdaadaea66 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -129,12 +129,7 @@ impl TransportUnicastUniversal { // to avoid concurrent new_transport and closing/closed notifications let mut a_guard = self.get_alive().await; *a_guard = false; - - // Notify the callback that we are going to close the transport let callback = zwrite!(self.callback).take(); - if let Some(cb) = callback.as_ref() { - cb.closing(); - } // Delete the transport on the manager let _ = self.manager.del_transport_unicast(&self.config.zid).await; diff --git a/io/zenoh-transport/tests/endpoints.rs b/io/zenoh-transport/tests/endpoints.rs index 7fe2f949ef..3ebb015981 100644 --- a/io/zenoh-transport/tests/endpoints.rs +++ b/io/zenoh-transport/tests/endpoints.rs @@ -62,7 +62,6 @@ impl TransportPeerEventHandler for SC { } fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/multicast_compression.rs b/io/zenoh-transport/tests/multicast_compression.rs index 3b8715c0df..5e31aa6514 100644 --- a/io/zenoh-transport/tests/multicast_compression.rs +++ b/io/zenoh-transport/tests/multicast_compression.rs @@ -111,7 +111,6 @@ mod tests { count: self.count.clone(), })) } - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { @@ -127,7 +126,6 @@ mod tests { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/multicast_transport.rs b/io/zenoh-transport/tests/multicast_transport.rs index 664de47ffb..18c8468ecc 100644 --- a/io/zenoh-transport/tests/multicast_transport.rs +++ b/io/zenoh-transport/tests/multicast_transport.rs @@ -110,7 +110,6 @@ mod tests { count: self.count.clone(), })) } - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { @@ -126,7 +125,6 @@ mod tests { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/transport_whitelist.rs b/io/zenoh-transport/tests/transport_whitelist.rs index f8428b457d..66f5b58e3b 100644 --- a/io/zenoh-transport/tests/transport_whitelist.rs +++ b/io/zenoh-transport/tests/transport_whitelist.rs @@ -58,7 +58,6 @@ impl TransportPeerEventHandler for SCRouter { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_authenticator.rs b/io/zenoh-transport/tests/unicast_authenticator.rs index a9b22ad5bb..77f025717d 100644 --- a/io/zenoh-transport/tests/unicast_authenticator.rs +++ b/io/zenoh-transport/tests/unicast_authenticator.rs @@ -70,7 +70,6 @@ impl TransportPeerEventHandler for MHRouterAuthenticator { } fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_compression.rs b/io/zenoh-transport/tests/unicast_compression.rs index 7b18983b4b..b7f34fbf7f 100644 --- a/io/zenoh-transport/tests/unicast_compression.rs +++ b/io/zenoh-transport/tests/unicast_compression.rs @@ -110,7 +110,6 @@ mod tests { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { @@ -150,7 +149,6 @@ mod tests { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_concurrent.rs b/io/zenoh-transport/tests/unicast_concurrent.rs index ea124e1c05..410ac33955 100644 --- a/io/zenoh-transport/tests/unicast_concurrent.rs +++ b/io/zenoh-transport/tests/unicast_concurrent.rs @@ -98,7 +98,6 @@ impl TransportPeerEventHandler for MHPeer { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_intermittent.rs b/io/zenoh-transport/tests/unicast_intermittent.rs index 9564eeb865..3e2f8196f4 100644 --- a/io/zenoh-transport/tests/unicast_intermittent.rs +++ b/io/zenoh-transport/tests/unicast_intermittent.rs @@ -138,7 +138,6 @@ impl TransportPeerEventHandler for SCClient { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_priorities.rs b/io/zenoh-transport/tests/unicast_priorities.rs index d71cc845cc..87cf5b5e9e 100644 --- a/io/zenoh-transport/tests/unicast_priorities.rs +++ b/io/zenoh-transport/tests/unicast_priorities.rs @@ -133,7 +133,6 @@ impl TransportPeerEventHandler for SCRouter { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { @@ -183,7 +182,6 @@ impl TransportPeerEventHandler for SCClient { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_shm.rs b/io/zenoh-transport/tests/unicast_shm.rs index db5f719665..b03771a164 100644 --- a/io/zenoh-transport/tests/unicast_shm.rs +++ b/io/zenoh-transport/tests/unicast_shm.rs @@ -141,7 +141,6 @@ mod tests { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_simultaneous.rs b/io/zenoh-transport/tests/unicast_simultaneous.rs index 248ff2ef53..97d43fc672 100644 --- a/io/zenoh-transport/tests/unicast_simultaneous.rs +++ b/io/zenoh-transport/tests/unicast_simultaneous.rs @@ -126,7 +126,6 @@ mod tests { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/io/zenoh-transport/tests/unicast_transport.rs b/io/zenoh-transport/tests/unicast_transport.rs index 5a414d664d..e8f473b754 100644 --- a/io/zenoh-transport/tests/unicast_transport.rs +++ b/io/zenoh-transport/tests/unicast_transport.rs @@ -296,7 +296,6 @@ impl TransportPeerEventHandler for SCRouter { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { @@ -336,7 +335,6 @@ impl TransportPeerEventHandler for SCClient { fn new_link(&self, _link: Link) {} fn del_link(&self, _link: Link) {} - fn closing(&self) {} fn closed(&self) {} fn as_any(&self) -> &dyn Any { diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index b96fc75dd2..f7bcc06419 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -187,8 +187,6 @@ impl TransportMulticastEventHandler for Handler { } } - fn closing(&self) {} - fn closed(&self) {} fn as_any(&self) -> &dyn std::any::Any { @@ -250,8 +248,6 @@ impl TransportPeerEventHandler for PeerHandler { ); } - fn closing(&self) {} - fn closed(&self) { let info = DataInfo { kind: SampleKind::Delete, diff --git a/zenoh/src/net/primitives/demux.rs b/zenoh/src/net/primitives/demux.rs index e4774aab4a..8735ed0f32 100644 --- a/zenoh/src/net/primitives/demux.rs +++ b/zenoh/src/net/primitives/demux.rs @@ -100,25 +100,10 @@ impl TransportPeerEventHandler for DeMux { fn del_link(&self, _link: Link) {} - fn closing(&self) { + fn closed(&self) { self.face.send_close(); - if let Some(transport) = self.transport.as_ref() { - let mut declares = vec![]; - let ctrl_lock = zlock!(self.face.tables.ctrl_lock); - let mut tables = zwrite!(self.face.tables.tables); - let _ = ctrl_lock.closing(&mut tables, &self.face.tables, transport, &mut |p, m| { - declares.push((p.clone(), m)) - }); - drop(tables); - drop(ctrl_lock); - for (p, m) in declares { - p.send_declare(m); - } - } } - fn closed(&self) {} - fn as_any(&self) -> &dyn Any { self } diff --git a/zenoh/src/net/primitives/mux.rs b/zenoh/src/net/primitives/mux.rs index 47067f231e..63376a6d63 100644 --- a/zenoh/src/net/primitives/mux.rs +++ b/zenoh/src/net/primitives/mux.rs @@ -200,9 +200,7 @@ impl Primitives for Mux { } } - fn send_close(&self) { - // self.handler.closing().await; - } + fn send_close(&self) {} } impl EPrimitives for Mux { @@ -530,9 +528,7 @@ impl Primitives for McastMux { } } - fn send_close(&self) { - // self.handler.closing().await; - } + fn send_close(&self) {} } impl EPrimitives for McastMux { diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 654f5cf9c3..57d3dbb041 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -16,6 +16,7 @@ use std::{ collections::HashMap, fmt, sync::{Arc, Weak}, + time::Duration, }; use tokio_util::sync::CancellationToken; @@ -37,13 +38,16 @@ use super::{ super::router::*, interests::{declare_final, declare_interest, undeclare_interest, CurrentInterest}, resource::*, - tables::{self, TablesLock}, + tables::TablesLock, }; use crate::{ api::key_expr::KeyExpr, net::{ primitives::{McastMux, Mux, Primitives}, - routing::interceptor::{InterceptorTrait, InterceptorsChain}, + routing::{ + dispatcher::interests::finalize_pending_interests, + interceptor::{InterceptorTrait, InterceptorsChain}, + }, }, }; @@ -421,7 +425,25 @@ impl Primitives for Face { } fn send_close(&self) { - tables::close_face(&self.tables, &Arc::downgrade(&self.state)); + tracing::debug!("Close {}", self.state); + let mut state = self.state.clone(); + state.task_controller.terminate_all(Duration::from_secs(10)); + finalize_pending_queries(&self.tables, &mut state); + let mut declares = vec![]; + let ctrl_lock = zlock!(self.tables.ctrl_lock); + finalize_pending_interests(&self.tables, &mut state, &mut |p, m| { + declares.push((p.clone(), m)) + }); + ctrl_lock.close_face( + &self.tables, + &self.tables.clone(), + &mut state, + &mut |p, m| declares.push((p.clone(), m)), + ); + drop(ctrl_lock); + for (p, m) in declares { + p.send_declare(m); + } } } diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index 2c5cfffffb..4dd447360e 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -14,7 +14,7 @@ use std::{ any::Any, collections::HashMap, - sync::{Arc, Mutex, RwLock, Weak}, + sync::{Arc, Mutex, RwLock}, time::Duration, }; @@ -30,7 +30,6 @@ use zenoh_sync::get_mut_unchecked; use super::face::FaceState; pub use super::{pubsub::*, queries::*, resource::*}; use crate::net::routing::{ - dispatcher::interests::finalize_pending_interests, hat::{self, HatTrait}, interceptor::{interceptor_factories, InterceptorFactory}, }; @@ -169,27 +168,6 @@ impl Tables { } } -pub fn close_face(tables: &TablesLock, face: &Weak) { - match face.upgrade() { - Some(mut face) => { - tracing::debug!("Close {}", face); - face.task_controller.terminate_all(Duration::from_secs(10)); - finalize_pending_queries(tables, &mut face); - let mut declares = vec![]; - let ctrl_lock = zlock!(tables.ctrl_lock); - finalize_pending_interests(tables, &mut face, &mut |p, m| { - declares.push((p.clone(), m)) - }); - ctrl_lock.close_face(tables, &mut face, &mut |p, m| declares.push((p.clone(), m))); - drop(ctrl_lock); - for (p, m) in declares { - p.send_declare(m); - } - } - None => tracing::error!("Face already closed!"), - } -} - pub struct TablesLock { pub tables: RwLock, pub(crate) ctrl_lock: Mutex>, diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index 04ab653d9f..169b6ccbf1 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -130,6 +130,7 @@ impl HatBaseTrait for HatCode { fn close_face( &self, tables: &TablesLock, + _tables_ref: &Arc, face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -260,16 +261,6 @@ impl HatBaseTrait for HatCode { 0 } - fn closing( - &self, - _tables: &mut Tables, - _tables_ref: &Arc, - _transport: &TransportUnicast, - _send_declare: &mut SendDeclare, - ) -> ZResult<()> { - Ok(()) - } - #[inline] fn ingress_filter(&self, _tables: &Tables, _face: &FaceState, _expr: &mut RoutingExpr) -> bool { true diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index 167d9ae58b..a5d1608274 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -261,6 +261,7 @@ impl HatBaseTrait for HatCode { fn close_face( &self, tables: &TablesLock, + tables_ref: &Arc, face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -367,6 +368,21 @@ impl HatBaseTrait for HatCode { Resource::clean(&mut res); } wtables.faces.remove(&face.id); + + if face.whatami != WhatAmI::Client { + for (_, removed_node) in hat_mut!(wtables) + .linkstatepeers_net + .as_mut() + .unwrap() + .remove_link(&face.zid) + { + pubsub_remove_node(&mut wtables, &removed_node.zid, send_declare); + queries_remove_node(&mut wtables, &removed_node.zid, send_declare); + token_remove_node(&mut wtables, &removed_node.zid, send_declare); + } + + hat_mut!(wtables).schedule_compute_trees(tables_ref.clone()); + }; drop(wtables); } @@ -422,35 +438,6 @@ impl HatBaseTrait for HatCode { .get_local_context(routing_context, face_hat!(face).link_id) } - fn closing( - &self, - tables: &mut Tables, - tables_ref: &Arc, - transport: &TransportUnicast, - send_declare: &mut SendDeclare, - ) -> ZResult<()> { - match (transport.get_zid(), transport.get_whatami()) { - (Ok(zid), Ok(whatami)) => { - if whatami != WhatAmI::Client { - for (_, removed_node) in hat_mut!(tables) - .linkstatepeers_net - .as_mut() - .unwrap() - .remove_link(&zid) - { - pubsub_remove_node(tables, &removed_node.zid, send_declare); - queries_remove_node(tables, &removed_node.zid, send_declare); - token_remove_node(tables, &removed_node.zid, send_declare); - } - - hat_mut!(tables).schedule_compute_trees(tables_ref.clone()); - }; - } - (_, _) => tracing::error!("Closed transport in session closing!"), - } - Ok(()) - } - #[inline] fn ingress_filter(&self, _tables: &Tables, _face: &FaceState, _expr: &mut RoutingExpr) -> bool { true diff --git a/zenoh/src/net/routing/hat/mod.rs b/zenoh/src/net/routing/hat/mod.rs index 3e5a81d259..74850ea4c1 100644 --- a/zenoh/src/net/routing/hat/mod.rs +++ b/zenoh/src/net/routing/hat/mod.rs @@ -131,17 +131,10 @@ pub(crate) trait HatBaseTrait { fn info(&self, tables: &Tables, kind: WhatAmI) -> String; - fn closing( - &self, - tables: &mut Tables, - tables_ref: &Arc, - transport: &TransportUnicast, - send_declare: &mut SendDeclare, - ) -> ZResult<()>; - fn close_face( &self, tables: &TablesLock, + tables_ref: &Arc, face: &mut Arc, send_declare: &mut SendDeclare, ); diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 4d20204c19..b4da9a241d 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -205,6 +205,7 @@ impl HatBaseTrait for HatCode { fn close_face( &self, tables: &TablesLock, + _tables_ref: &Arc, face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -311,6 +312,14 @@ impl HatBaseTrait for HatCode { Resource::clean(&mut res); } wtables.faces.remove(&face.id); + + if face.whatami != WhatAmI::Client { + hat_mut!(wtables) + .gossip + .as_mut() + .unwrap() + .remove_link(&face.zid); + }; drop(wtables); } @@ -354,24 +363,6 @@ impl HatBaseTrait for HatCode { 0 } - fn closing( - &self, - tables: &mut Tables, - _tables_ref: &Arc, - transport: &TransportUnicast, - _send_declare: &mut SendDeclare, - ) -> ZResult<()> { - match (transport.get_zid(), transport.get_whatami()) { - (Ok(zid), Ok(whatami)) => { - if whatami != WhatAmI::Client { - hat_mut!(tables).gossip.as_mut().unwrap().remove_link(&zid); - }; - } - (_, _) => tracing::error!("Closed transport in session closing!"), - } - Ok(()) - } - #[inline] fn ingress_filter(&self, _tables: &Tables, _face: &FaceState, _expr: &mut RoutingExpr) -> bool { true diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index c7ea567fb4..5e1906920f 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -430,6 +430,7 @@ impl HatBaseTrait for HatCode { fn close_face( &self, tables: &TablesLock, + tables_ref: &Arc, face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -536,6 +537,84 @@ impl HatBaseTrait for HatCode { Resource::clean(&mut res); } wtables.faces.remove(&face.id); + + match face.whatami { + WhatAmI::Router => { + for (_, removed_node) in hat_mut!(wtables) + .routers_net + .as_mut() + .unwrap() + .remove_link(&face.zid) + { + pubsub_remove_node( + &mut wtables, + &removed_node.zid, + WhatAmI::Router, + send_declare, + ); + queries_remove_node( + &mut wtables, + &removed_node.zid, + WhatAmI::Router, + send_declare, + ); + token_remove_node( + &mut wtables, + &removed_node.zid, + WhatAmI::Router, + send_declare, + ); + } + + if hat!(wtables).full_net(WhatAmI::Peer) { + hat_mut!(wtables).shared_nodes = shared_nodes( + hat!(wtables).routers_net.as_ref().unwrap(), + hat!(wtables).linkstatepeers_net.as_ref().unwrap(), + ); + } + + hat_mut!(wtables).schedule_compute_trees(tables_ref.clone(), WhatAmI::Router); + } + WhatAmI::Peer => { + if hat!(wtables).full_net(WhatAmI::Peer) { + for (_, removed_node) in hat_mut!(wtables) + .linkstatepeers_net + .as_mut() + .unwrap() + .remove_link(&face.zid) + { + pubsub_remove_node( + &mut wtables, + &removed_node.zid, + WhatAmI::Peer, + send_declare, + ); + queries_remove_node( + &mut wtables, + &removed_node.zid, + WhatAmI::Peer, + send_declare, + ); + token_remove_node( + &mut wtables, + &removed_node.zid, + WhatAmI::Peer, + send_declare, + ); + } + + hat_mut!(wtables).shared_nodes = shared_nodes( + hat!(wtables).routers_net.as_ref().unwrap(), + hat!(wtables).linkstatepeers_net.as_ref().unwrap(), + ); + + hat_mut!(wtables).schedule_compute_trees(tables_ref.clone(), WhatAmI::Peer); + } else if let Some(net) = hat_mut!(wtables).linkstatepeers_net.as_mut() { + net.remove_link(&face.zid); + } + } + _ => (), + }; drop(wtables); } @@ -689,100 +768,6 @@ impl HatBaseTrait for HatCode { } } - fn closing( - &self, - tables: &mut Tables, - tables_ref: &Arc, - transport: &TransportUnicast, - send_declare: &mut SendDeclare, - ) -> ZResult<()> { - match (transport.get_zid(), transport.get_whatami()) { - (Ok(zid), Ok(whatami)) => { - match whatami { - WhatAmI::Router => { - for (_, removed_node) in hat_mut!(tables) - .routers_net - .as_mut() - .unwrap() - .remove_link(&zid) - { - pubsub_remove_node( - tables, - &removed_node.zid, - WhatAmI::Router, - send_declare, - ); - queries_remove_node( - tables, - &removed_node.zid, - WhatAmI::Router, - send_declare, - ); - token_remove_node( - tables, - &removed_node.zid, - WhatAmI::Router, - send_declare, - ); - } - - if hat!(tables).full_net(WhatAmI::Peer) { - hat_mut!(tables).shared_nodes = shared_nodes( - hat!(tables).routers_net.as_ref().unwrap(), - hat!(tables).linkstatepeers_net.as_ref().unwrap(), - ); - } - - hat_mut!(tables) - .schedule_compute_trees(tables_ref.clone(), WhatAmI::Router); - } - WhatAmI::Peer => { - if hat!(tables).full_net(WhatAmI::Peer) { - for (_, removed_node) in hat_mut!(tables) - .linkstatepeers_net - .as_mut() - .unwrap() - .remove_link(&zid) - { - pubsub_remove_node( - tables, - &removed_node.zid, - WhatAmI::Peer, - send_declare, - ); - queries_remove_node( - tables, - &removed_node.zid, - WhatAmI::Peer, - send_declare, - ); - token_remove_node( - tables, - &removed_node.zid, - WhatAmI::Peer, - send_declare, - ); - } - - hat_mut!(tables).shared_nodes = shared_nodes( - hat!(tables).routers_net.as_ref().unwrap(), - hat!(tables).linkstatepeers_net.as_ref().unwrap(), - ); - - hat_mut!(tables) - .schedule_compute_trees(tables_ref.clone(), WhatAmI::Peer); - } else if let Some(net) = hat_mut!(tables).linkstatepeers_net.as_mut() { - net.remove_link(&zid); - } - } - _ => (), - }; - } - (_, _) => tracing::error!("Closed transport in session closing!"), - } - Ok(()) - } - #[inline] fn ingress_filter(&self, tables: &Tables, face: &FaceState, expr: &mut RoutingExpr) -> bool { face.whatami != WhatAmI::Peer diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index aca6734d4d..301698eea6 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -457,16 +457,9 @@ impl TransportPeerEventHandler for RuntimeSession { } } - fn closing(&self) { - self.main_handler.closing(); - Runtime::closing_session(self); - for handler in &self.slave_handlers { - handler.closing(); - } - } - fn closed(&self) { self.main_handler.closed(); + Runtime::closed_session(self); for handler in &self.slave_handlers { handler.closed(); } @@ -500,12 +493,6 @@ impl TransportMulticastEventHandler for RuntimeMulticastGroup { })) } - fn closing(&self) { - for handler in &self.slave_handlers { - handler.closed(); - } - } - fn closed(&self) { for handler in &self.slave_handlers { handler.closed(); @@ -541,13 +528,6 @@ impl TransportPeerEventHandler for RuntimeMulticastSession { } } - fn closing(&self) { - self.main_handler.closing(); - for handler in &self.slave_handlers { - handler.closing(); - } - } - fn closed(&self) { self.main_handler.closed(); for handler in &self.slave_handlers { diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index c95f47a970..e09a4e812b 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -1184,7 +1184,7 @@ impl Runtime { } } - pub(super) fn closing_session(session: &RuntimeSession) { + pub(super) fn closed_session(session: &RuntimeSession) { if session.runtime.is_closed() { return; } diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 23c0d9c053..8953397b8d 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -32,10 +32,7 @@ use zenoh_protocol::{ use crate::net::{ primitives::{DummyPrimitives, EPrimitives, Primitives}, routing::{ - dispatcher::{ - pubsub::SubscriberInfo, - tables::{self, Tables}, - }, + dispatcher::{pubsub::SubscriberInfo, tables::Tables}, router::*, RoutingContext, }, @@ -189,15 +186,14 @@ fn multisub_test() { let tables = router.tables.clone(); let primitives = Arc::new(DummyPrimitives {}); - let face0 = Arc::downgrade(&router.new_primitives(primitives).state); - assert!(face0.upgrade().is_some()); + let face0 = &router.new_primitives(primitives); // -------------- let sub_info = SubscriberInfo; declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 0, &"sub".into(), &sub_info, @@ -213,7 +209,7 @@ fn multisub_test() { declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 1, &"sub".into(), &sub_info, @@ -225,7 +221,7 @@ fn multisub_test() { undeclare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 0, &WireExpr::empty(), NodeId::default(), @@ -236,7 +232,7 @@ fn multisub_test() { undeclare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 1, &WireExpr::empty(), NodeId::default(), @@ -244,7 +240,7 @@ fn multisub_test() { ); assert!(res.upgrade().is_none()); - tables::close_face(&tables, &face0); + face0.send_close(); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -260,11 +256,10 @@ async fn clean_test() { let tables = router.tables.clone(); let primitives = Arc::new(DummyPrimitives {}); - let face0 = Arc::downgrade(&router.new_primitives(primitives).state); - assert!(face0.upgrade().is_some()); + let face0 = &router.new_primitives(primitives); // -------------- - register_expr(&tables, &mut face0.upgrade().unwrap(), 1, &"todrop1".into()); + register_expr(&tables, &mut face0.state.clone(), 1, &"todrop1".into()); let optres1 = Resource::get_resource(zread!(tables.tables)._get_root(), "todrop1") .map(|res| Arc::downgrade(&res)); assert!(optres1.is_some()); @@ -273,7 +268,7 @@ async fn clean_test() { register_expr( &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 2, &"todrop1/todrop11".into(), ); @@ -283,30 +278,30 @@ async fn clean_test() { let res2 = optres2.unwrap(); assert!(res2.upgrade().is_some()); - register_expr(&tables, &mut face0.upgrade().unwrap(), 3, &"**".into()); + register_expr(&tables, &mut face0.state.clone(), 3, &"**".into()); let optres3 = Resource::get_resource(zread!(tables.tables)._get_root(), "**") .map(|res| Arc::downgrade(&res)); assert!(optres3.is_some()); let res3 = optres3.unwrap(); assert!(res3.upgrade().is_some()); - unregister_expr(&tables, &mut face0.upgrade().unwrap(), 1); + unregister_expr(&tables, &mut face0.state.clone(), 1); assert!(res1.upgrade().is_some()); assert!(res2.upgrade().is_some()); assert!(res3.upgrade().is_some()); - unregister_expr(&tables, &mut face0.upgrade().unwrap(), 2); + unregister_expr(&tables, &mut face0.state.clone(), 2); assert!(res1.upgrade().is_none()); assert!(res2.upgrade().is_none()); assert!(res3.upgrade().is_some()); - unregister_expr(&tables, &mut face0.upgrade().unwrap(), 3); + unregister_expr(&tables, &mut face0.state.clone(), 3); assert!(res1.upgrade().is_none()); assert!(res2.upgrade().is_none()); assert!(res3.upgrade().is_none()); // -------------- - register_expr(&tables, &mut face0.upgrade().unwrap(), 1, &"todrop1".into()); + register_expr(&tables, &mut face0.state.clone(), 1, &"todrop1".into()); let optres1 = Resource::get_resource(zread!(tables.tables)._get_root(), "todrop1") .map(|res| Arc::downgrade(&res)); assert!(optres1.is_some()); @@ -318,7 +313,7 @@ async fn clean_test() { declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 0, &"todrop1/todrop11".into(), &sub_info, @@ -334,7 +329,7 @@ async fn clean_test() { declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 1, &WireExpr::from(1).with_suffix("/todrop12"), &sub_info, @@ -351,7 +346,7 @@ async fn clean_test() { undeclare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 1, &WireExpr::empty(), NodeId::default(), @@ -367,7 +362,7 @@ async fn clean_test() { undeclare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 0, &WireExpr::empty(), NodeId::default(), @@ -377,17 +372,17 @@ async fn clean_test() { assert!(res2.upgrade().is_none()); assert!(res3.upgrade().is_none()); - unregister_expr(&tables, &mut face0.upgrade().unwrap(), 1); + unregister_expr(&tables, &mut face0.state.clone(), 1); assert!(res1.upgrade().is_none()); assert!(res2.upgrade().is_none()); assert!(res3.upgrade().is_none()); // -------------- - register_expr(&tables, &mut face0.upgrade().unwrap(), 2, &"todrop3".into()); + register_expr(&tables, &mut face0.state.clone(), 2, &"todrop3".into()); declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 2, &"todrop3".into(), &sub_info, @@ -403,7 +398,7 @@ async fn clean_test() { undeclare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 2, &WireExpr::empty(), NodeId::default(), @@ -411,16 +406,16 @@ async fn clean_test() { ); assert!(res1.upgrade().is_some()); - unregister_expr(&tables, &mut face0.upgrade().unwrap(), 2); + unregister_expr(&tables, &mut face0.state.clone(), 2); assert!(res1.upgrade().is_none()); // -------------- - register_expr(&tables, &mut face0.upgrade().unwrap(), 3, &"todrop4".into()); - register_expr(&tables, &mut face0.upgrade().unwrap(), 4, &"todrop5".into()); + register_expr(&tables, &mut face0.state.clone(), 3, &"todrop4".into()); + register_expr(&tables, &mut face0.state.clone(), 4, &"todrop5".into()); declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 3, &"todrop5".into(), &sub_info, @@ -430,7 +425,7 @@ async fn clean_test() { declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, - &mut face0.upgrade().unwrap(), + &mut face0.state.clone(), 4, &"todrop6".into(), &sub_info, @@ -455,8 +450,7 @@ async fn clean_test() { assert!(res2.upgrade().is_some()); assert!(res3.upgrade().is_some()); - tables::close_face(&tables, &face0); - assert!(face0.upgrade().is_none()); + face0.send_close(); assert!(res1.upgrade().is_none()); assert!(res2.upgrade().is_none()); assert!(res3.upgrade().is_none());