From 51f4f1ca083a8c4431f0bfa724f075de8ffb8723 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 1 Apr 2024 11:02:30 +0200 Subject: [PATCH] Review tokio runtime being used (#875) * Review tokio runtime being used * Fix cargo clippy --- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 2 +- io/zenoh-links/zenoh-link-tls/src/unicast.rs | 4 ++-- io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs | 2 +- io/zenoh-links/zenoh-link-ws/src/unicast.rs | 2 +- io/zenoh-transport/src/manager.rs | 4 ++-- io/zenoh-transport/src/unicast/universal/link.rs | 2 +- zenoh-ext/src/publication_cache.rs | 2 +- zenoh/src/net/routing/hat/p2p_peer/gossip.rs | 2 +- zenoh/src/scouting.rs | 2 +- zenoh/src/session.rs | 4 ++-- 10 files changed, 13 insertions(+), 13 deletions(-) diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 7137ac0212..361f4fe69e 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -171,7 +171,7 @@ impl LinkUnicastTrait for LinkUnicastTcp { // impl Drop for LinkUnicastTcp { // fn drop(&mut self) { // // Close the underlying TCP socket -// zenoh_runtime::ZRuntime::TX.block_in_place(async { +// zenoh_runtime::ZRuntime::Acceptor.block_in_place(async { // let _ = self.get_mut_socket().shutdown().await; // }); // } diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 7da711161e..b24ce4ac31 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -204,8 +204,8 @@ impl Drop for LinkUnicastTls { fn drop(&mut self) { // Close the underlying TCP stream let (tcp_stream, _) = self.get_sock_mut().get_mut(); - let _ = - zenoh_runtime::ZRuntime::TX.block_in_place(async move { tcp_stream.shutdown().await }); + let _ = zenoh_runtime::ZRuntime::Acceptor + .block_in_place(async move { tcp_stream.shutdown().await }); } } diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index 53441ab89c..b85cee9c66 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -144,7 +144,7 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream { impl Drop for LinkUnicastUnixSocketStream { fn drop(&mut self) { // Close the underlying UnixSocketStream socket - let _ = zenoh_runtime::ZRuntime::TX + let _ = zenoh_runtime::ZRuntime::Acceptor .block_in_place(async move { self.get_mut_socket().shutdown().await }); } } diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 6a0cf64e6e..1a6d0fecf3 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -224,7 +224,7 @@ impl LinkUnicastTrait for LinkUnicastWs { impl Drop for LinkUnicastWs { fn drop(&mut self) { - zenoh_runtime::ZRuntime::TX.block_in_place(async { + zenoh_runtime::ZRuntime::Acceptor.block_in_place(async { let mut guard = zasynclock!(self.send); // Close the underlying TCP socket guard.close().await.unwrap_or_else(|e| { diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index f97c126f8b..a52a35af83 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -438,8 +438,8 @@ impl TransportManager { // TODO(yuyuan): Can we make this async as above? pub fn get_locators(&self) -> Vec { - let mut lsu = zenoh_runtime::ZRuntime::TX.block_in_place(self.get_locators_unicast()); - let mut lsm = zenoh_runtime::ZRuntime::TX.block_in_place(self.get_locators_multicast()); + let mut lsu = zenoh_runtime::ZRuntime::Net.block_in_place(self.get_locators_unicast()); + let mut lsm = zenoh_runtime::ZRuntime::Net.block_in_place(self.get_locators_multicast()); lsu.append(&mut lsm); lsu } diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 93a6c717dd..6e86a59aa7 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -102,7 +102,7 @@ impl TransportLinkUnicastUniversal { // to finish in the close() joining its handle // TODO(yuyuan): do more study to check which ZRuntime should be used or refine the // termination - zenoh_runtime::ZRuntime::TX + zenoh_runtime::ZRuntime::Net .spawn(async move { transport.del_link(tx.inner.link()).await }); } }; diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index eec398d592..aede6a2ee4 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -170,7 +170,7 @@ impl<'a> PublicationCache<'a> { let token = TerminatableTask::create_cancellation_token(); let token2 = token.clone(); let task = TerminatableTask::spawn( - zenoh_runtime::ZRuntime::TX, + zenoh_runtime::ZRuntime::Application, async move { let mut cache: HashMap> = HashMap::with_capacity(resources_limit.unwrap_or(32)); diff --git a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs index f651ccdc0d..bbe7bd9024 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs @@ -408,7 +408,7 @@ impl Network { if !self.autoconnect.is_empty() { // Connect discovered peers - if zenoh_runtime::ZRuntime::Net + if zenoh_runtime::ZRuntime::Acceptor .block_in_place(strong_runtime.manager().get_transport_unicast(&zid)) .is_none() && self.autoconnect.matches(whatami) diff --git a/zenoh/src/scouting.rs b/zenoh/src/scouting.rs index cd25393754..f2c90123ce 100644 --- a/zenoh/src/scouting.rs +++ b/zenoh/src/scouting.rs @@ -326,7 +326,7 @@ fn scout( let cancellation_token = TerminatableTask::create_cancellation_token(); let cancellation_token_clone = cancellation_token.clone(); let task = TerminatableTask::spawn( - zenoh_runtime::ZRuntime::Net, + zenoh_runtime::ZRuntime::Acceptor, async move { let scout = Runtime::scout(&sockets, what, &addr, move |hello| { let callback = callback.clone(); diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 5d508db815..e68c3ed955 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -1508,7 +1508,7 @@ impl Session { // Cannot hold session lock when calling tables (matching_status()) // TODO: check which ZRuntime should be used self.task_controller - .spawn_with_rt(zenoh_runtime::ZRuntime::RX, { + .spawn_with_rt(zenoh_runtime::ZRuntime::Net, { let session = self.clone(); let msub = msub.clone(); async move { @@ -1546,7 +1546,7 @@ impl Session { // Cannot hold session lock when calling tables (matching_status()) // TODO: check which ZRuntime should be used self.task_controller - .spawn_with_rt(zenoh_runtime::ZRuntime::RX, { + .spawn_with_rt(zenoh_runtime::ZRuntime::Net, { let session = self.clone(); let msub = msub.clone(); async move {