Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove closing from hat trait #1469

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions io/zenoh-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ impl TransportEventHandler for DummyTransportEventHandler {
/*************************************/
pub trait TransportMulticastEventHandler: Send + Sync {
fn new_peer(&self, peer: TransportPeer) -> ZResult<Arc<dyn TransportPeerEventHandler>>;
fn closing(&self);
fn closed(&self);
fn as_any(&self) -> &dyn Any;
}
Expand All @@ -95,7 +94,6 @@ impl TransportMulticastEventHandler for DummyTransportMulticastEventHandler {
fn new_peer(&self, _peer: TransportPeer) -> ZResult<Arc<dyn TransportPeerEventHandler>> {
Ok(Arc::new(DummyTransportPeerEventHandler))
}
fn closing(&self) {}
fn closed(&self) {}
fn as_any(&self) -> &dyn Any {
self
Expand All @@ -121,7 +119,6 @@ pub trait TransportPeerEventHandler: Send + Sync {
fn handle_message(&self, msg: NetworkMessage) -> ZResult<()>;
fn new_link(&self, src: Link);
fn del_link(&self, link: Link);
fn closing(&self);
fn closed(&self);
fn as_any(&self) -> &dyn Any;
}
Expand All @@ -137,7 +134,6 @@ impl TransportPeerEventHandler for DummyTransportPeerEventHandler {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
5 changes: 0 additions & 5 deletions io/zenoh-transport/src/multicast/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,7 @@ impl TransportMulticastInner {
pub(super) async fn delete(&self) -> ZResult<()> {
tracing::debug!("Closing multicast transport on {:?}", self.locator);

// Notify the callback that we are going to close the transport
let callback = zwrite!(self.callback).take();
if let Some(cb) = callback.as_ref() {
cb.closing();
}

// Delete the transport on the manager
let _ = self.manager.del_transport_multicast(&self.locator).await;
Expand Down Expand Up @@ -441,7 +437,6 @@ impl TransportMulticastInner {

// TODO(yuyuan): Unify the termination
peer.token.cancel();
peer.handler.closing();
drop(guard);
peer.handler.closed();
}
Expand Down
5 changes: 0 additions & 5 deletions io/zenoh-transport/src/unicast/lowlatency/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,7 @@ impl TransportUnicastLowlatency {
// to avoid concurrent new_transport and closing/closed notifications
let mut a_guard = self.get_alive().await;
*a_guard = false;

// Notify the callback that we are going to close the transport
let callback = zwrite!(self.callback).take();
if let Some(cb) = callback.as_ref() {
cb.closing();
}

// Delete the transport on the manager
let _ = self.manager.del_transport_unicast(&self.config.zid).await;
Expand Down
5 changes: 0 additions & 5 deletions io/zenoh-transport/src/unicast/universal/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,7 @@ impl TransportUnicastUniversal {
// to avoid concurrent new_transport and closing/closed notifications
let mut a_guard = self.get_alive().await;
*a_guard = false;

// Notify the callback that we are going to close the transport
let callback = zwrite!(self.callback).take();
if let Some(cb) = callback.as_ref() {
cb.closing();
}

// Delete the transport on the manager
let _ = self.manager.del_transport_unicast(&self.config.zid).await;
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ impl TransportPeerEventHandler for SC {
}
fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
2 changes: 0 additions & 2 deletions io/zenoh-transport/tests/multicast_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ mod tests {
count: self.count.clone(),
}))
}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand All @@ -127,7 +126,6 @@ mod tests {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
2 changes: 0 additions & 2 deletions io/zenoh-transport/tests/multicast_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ mod tests {
count: self.count.clone(),
}))
}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand All @@ -126,7 +125,6 @@ mod tests {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/transport_whitelist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl TransportPeerEventHandler for SCRouter {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/unicast_authenticator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ impl TransportPeerEventHandler for MHRouterAuthenticator {
}
fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
2 changes: 0 additions & 2 deletions io/zenoh-transport/tests/unicast_compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ mod tests {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -150,7 +149,6 @@ mod tests {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/unicast_concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ impl TransportPeerEventHandler for MHPeer {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/unicast_intermittent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ impl TransportPeerEventHandler for SCClient {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
2 changes: 0 additions & 2 deletions io/zenoh-transport/tests/unicast_priorities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ impl TransportPeerEventHandler for SCRouter {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -183,7 +182,6 @@ impl TransportPeerEventHandler for SCClient {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/unicast_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ mod tests {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/tests/unicast_simultaneous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ mod tests {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
2 changes: 0 additions & 2 deletions io/zenoh-transport/tests/unicast_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ impl TransportPeerEventHandler for SCRouter {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down Expand Up @@ -336,7 +335,6 @@ impl TransportPeerEventHandler for SCClient {

fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closing(&self) {}
fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
Expand Down
4 changes: 0 additions & 4 deletions zenoh/src/api/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,6 @@ impl TransportMulticastEventHandler for Handler {
}
}

fn closing(&self) {}

fn closed(&self) {}

fn as_any(&self) -> &dyn std::any::Any {
Expand Down Expand Up @@ -250,8 +248,6 @@ impl TransportPeerEventHandler for PeerHandler {
);
}

fn closing(&self) {}

fn closed(&self) {
let info = DataInfo {
kind: SampleKind::Delete,
Expand Down
17 changes: 1 addition & 16 deletions zenoh/src/net/primitives/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,25 +100,10 @@ impl TransportPeerEventHandler for DeMux {

fn del_link(&self, _link: Link) {}

fn closing(&self) {
fn closed(&self) {
self.face.send_close();
if let Some(transport) = self.transport.as_ref() {
let mut declares = vec![];
let ctrl_lock = zlock!(self.face.tables.ctrl_lock);
let mut tables = zwrite!(self.face.tables.tables);
let _ = ctrl_lock.closing(&mut tables, &self.face.tables, transport, &mut |p, m| {
declares.push((p.clone(), m))
});
drop(tables);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
}

fn closed(&self) {}

fn as_any(&self) -> &dyn Any {
self
}
Expand Down
8 changes: 2 additions & 6 deletions zenoh/src/net/primitives/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,7 @@ impl Primitives for Mux {
}
}

fn send_close(&self) {
// self.handler.closing().await;
}
fn send_close(&self) {}
}

impl EPrimitives for Mux {
Expand Down Expand Up @@ -530,9 +528,7 @@ impl Primitives for McastMux {
}
}

fn send_close(&self) {
// self.handler.closing().await;
}
fn send_close(&self) {}
}

impl EPrimitives for McastMux {
Expand Down
28 changes: 25 additions & 3 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
collections::HashMap,
fmt,
sync::{Arc, Weak},
time::Duration,
};

use tokio_util::sync::CancellationToken;
Expand All @@ -37,13 +38,16 @@ use super::{
super::router::*,
interests::{declare_final, declare_interest, undeclare_interest, CurrentInterest},
resource::*,
tables::{self, TablesLock},
tables::TablesLock,
};
use crate::{
api::key_expr::KeyExpr,
net::{
primitives::{McastMux, Mux, Primitives},
routing::interceptor::{InterceptorTrait, InterceptorsChain},
routing::{
dispatcher::interests::finalize_pending_interests,
interceptor::{InterceptorTrait, InterceptorsChain},
},
},
};

Expand Down Expand Up @@ -421,7 +425,25 @@ impl Primitives for Face {
}

fn send_close(&self) {
tables::close_face(&self.tables, &Arc::downgrade(&self.state));
tracing::debug!("Close {}", self.state);
let mut state = self.state.clone();
state.task_controller.terminate_all(Duration::from_secs(10));
finalize_pending_queries(&self.tables, &mut state);
let mut declares = vec![];
let ctrl_lock = zlock!(self.tables.ctrl_lock);
finalize_pending_interests(&self.tables, &mut state, &mut |p, m| {
declares.push((p.clone(), m))
});
ctrl_lock.close_face(
&self.tables,
&self.tables.clone(),
&mut state,
&mut |p, m| declares.push((p.clone(), m)),
);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
}

Expand Down
24 changes: 1 addition & 23 deletions zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::{
any::Any,
collections::HashMap,
sync::{Arc, Mutex, RwLock, Weak},
sync::{Arc, Mutex, RwLock},
time::Duration,
};

Expand All @@ -30,7 +30,6 @@ use zenoh_sync::get_mut_unchecked;
use super::face::FaceState;
pub use super::{pubsub::*, queries::*, resource::*};
use crate::net::routing::{
dispatcher::interests::finalize_pending_interests,
hat::{self, HatTrait},
interceptor::{interceptor_factories, InterceptorFactory},
};
Expand Down Expand Up @@ -169,27 +168,6 @@ impl Tables {
}
}

pub fn close_face(tables: &TablesLock, face: &Weak<FaceState>) {
match face.upgrade() {
Some(mut face) => {
tracing::debug!("Close {}", face);
face.task_controller.terminate_all(Duration::from_secs(10));
finalize_pending_queries(tables, &mut face);
let mut declares = vec![];
let ctrl_lock = zlock!(tables.ctrl_lock);
finalize_pending_interests(tables, &mut face, &mut |p, m| {
declares.push((p.clone(), m))
});
ctrl_lock.close_face(tables, &mut face, &mut |p, m| declares.push((p.clone(), m)));
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
None => tracing::error!("Face already closed!"),
}
}

pub struct TablesLock {
pub tables: RwLock<Tables>,
pub(crate) ctrl_lock: Mutex<Box<dyn HatTrait + Send + Sync>>,
Expand Down
Loading