diff --git a/Cargo.lock b/Cargo.lock index 4ad3f6f7e7..b331a798b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4415,6 +4415,7 @@ dependencies = [ "zenoh-runtime", "zenoh-shm", "zenoh-sync", + "zenoh-task", "zenoh-transport", "zenoh-util", ] @@ -4543,6 +4544,7 @@ dependencies = [ "zenoh-result", "zenoh-runtime", "zenoh-sync", + "zenoh-task", "zenoh-util", ] @@ -4925,6 +4927,7 @@ dependencies = [ name = "zenoh-runtime" version = "0.11.0-dev" dependencies = [ + "futures", "lazy_static", "tokio", "zenoh-collections", @@ -4956,6 +4959,18 @@ dependencies = [ "zenoh-runtime", ] +[[package]] +name = "zenoh-task" +version = "0.11.0-dev" +dependencies = [ + "futures", + "log", + "tokio", + "tokio-util", + "zenoh-core", + "zenoh-runtime", +] + [[package]] name = "zenoh-transport" version = "0.11.0-dev" @@ -4987,6 +5002,7 @@ dependencies = [ "zenoh-runtime", "zenoh-shm", "zenoh-sync", + "zenoh-task", "zenoh-util", ] diff --git a/commons/zenoh-macros/build.rs b/commons/zenoh-macros/build.rs index 557593d00e..d5ce6632dc 100644 --- a/commons/zenoh-macros/build.rs +++ b/commons/zenoh-macros/build.rs @@ -23,6 +23,7 @@ fn main() { let version_rs = std::path::PathBuf::from(env::var_os("OUT_DIR").unwrap()).join("version.rs"); let mut version_rs = OpenOptions::new() .create(true) + .truncate(true) .write(true) .open(version_rs) .unwrap(); diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 7137ac0212..361f4fe69e 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -171,7 +171,7 @@ impl LinkUnicastTrait for LinkUnicastTcp { // impl Drop for LinkUnicastTcp { // fn drop(&mut self) { // // Close the underlying TCP socket -// zenoh_runtime::ZRuntime::TX.block_in_place(async { +// zenoh_runtime::ZRuntime::Acceptor.block_in_place(async { // let _ = self.get_mut_socket().shutdown().await; // }); // } diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 7da711161e..b24ce4ac31 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -204,8 +204,8 @@ impl Drop for LinkUnicastTls { fn drop(&mut self) { // Close the underlying TCP stream let (tcp_stream, _) = self.get_sock_mut().get_mut(); - let _ = - zenoh_runtime::ZRuntime::TX.block_in_place(async move { tcp_stream.shutdown().await }); + let _ = zenoh_runtime::ZRuntime::Acceptor + .block_in_place(async move { tcp_stream.shutdown().await }); } } diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index 53441ab89c..b85cee9c66 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -144,7 +144,7 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream { impl Drop for LinkUnicastUnixSocketStream { fn drop(&mut self) { // Close the underlying UnixSocketStream socket - let _ = zenoh_runtime::ZRuntime::TX + let _ = zenoh_runtime::ZRuntime::Acceptor .block_in_place(async move { self.get_mut_socket().shutdown().await }); } } diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 6a0cf64e6e..1a6d0fecf3 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -224,7 +224,7 @@ impl LinkUnicastTrait for LinkUnicastWs { impl Drop for LinkUnicastWs { fn drop(&mut self) { - zenoh_runtime::ZRuntime::TX.block_in_place(async { + zenoh_runtime::ZRuntime::Acceptor.block_in_place(async { let mut guard = zasynclock!(self.send); // Close the underlying TCP socket guard.close().await.unwrap_or_else(|e| { diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index f97c126f8b..a52a35af83 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -438,8 +438,8 @@ impl TransportManager { // TODO(yuyuan): Can we make this async as above? pub fn get_locators(&self) -> Vec { - let mut lsu = zenoh_runtime::ZRuntime::TX.block_in_place(self.get_locators_unicast()); - let mut lsm = zenoh_runtime::ZRuntime::TX.block_in_place(self.get_locators_multicast()); + let mut lsu = zenoh_runtime::ZRuntime::Net.block_in_place(self.get_locators_unicast()); + let mut lsm = zenoh_runtime::ZRuntime::Net.block_in_place(self.get_locators_multicast()); lsu.append(&mut lsm); lsu } diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index 93a6c717dd..6e86a59aa7 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -102,7 +102,7 @@ impl TransportLinkUnicastUniversal { // to finish in the close() joining its handle // TODO(yuyuan): do more study to check which ZRuntime should be used or refine the // termination - zenoh_runtime::ZRuntime::TX + zenoh_runtime::ZRuntime::Net .spawn(async move { transport.del_link(tx.inner.link()).await }); } }; diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index eec398d592..aede6a2ee4 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -170,7 +170,7 @@ impl<'a> PublicationCache<'a> { let token = TerminatableTask::create_cancellation_token(); let token2 = token.clone(); let task = TerminatableTask::spawn( - zenoh_runtime::ZRuntime::TX, + zenoh_runtime::ZRuntime::Application, async move { let mut cache: HashMap> = HashMap::with_capacity(resources_limit.unwrap_or(32)); diff --git a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs index f651ccdc0d..bbe7bd9024 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs @@ -408,7 +408,7 @@ impl Network { if !self.autoconnect.is_empty() { // Connect discovered peers - if zenoh_runtime::ZRuntime::Net + if zenoh_runtime::ZRuntime::Acceptor .block_in_place(strong_runtime.manager().get_transport_unicast(&zid)) .is_none() && self.autoconnect.matches(whatami) diff --git a/zenoh/src/scouting.rs b/zenoh/src/scouting.rs index cd25393754..f2c90123ce 100644 --- a/zenoh/src/scouting.rs +++ b/zenoh/src/scouting.rs @@ -326,7 +326,7 @@ fn scout( let cancellation_token = TerminatableTask::create_cancellation_token(); let cancellation_token_clone = cancellation_token.clone(); let task = TerminatableTask::spawn( - zenoh_runtime::ZRuntime::Net, + zenoh_runtime::ZRuntime::Acceptor, async move { let scout = Runtime::scout(&sockets, what, &addr, move |hello| { let callback = callback.clone(); diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 7cd495d378..46a88db356 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -1508,7 +1508,7 @@ impl Session { // Cannot hold session lock when calling tables (matching_status()) // TODO: check which ZRuntime should be used self.task_controller - .spawn_with_rt(zenoh_runtime::ZRuntime::RX, { + .spawn_with_rt(zenoh_runtime::ZRuntime::Net, { let session = self.clone(); let msub = msub.clone(); async move { @@ -1546,7 +1546,7 @@ impl Session { // Cannot hold session lock when calling tables (matching_status()) // TODO: check which ZRuntime should be used self.task_controller - .spawn_with_rt(zenoh_runtime::ZRuntime::RX, { + .spawn_with_rt(zenoh_runtime::ZRuntime::Net, { let session = self.clone(); let msub = msub.clone(); async move { diff --git a/zenoh/tests/connection_retry.rs b/zenoh/tests/connection_retry.rs index db84d7bd5d..fcb071b489 100644 --- a/zenoh/tests/connection_retry.rs +++ b/zenoh/tests/connection_retry.rs @@ -36,8 +36,7 @@ fn retry_config_overriding() { .insert_json5("listen/exit_on_failure", "false") .unwrap(); - let expected = vec![ - // global value + let expected = [ ConnectionRetryConf { period_init_ms: 3000, period_max_ms: 6000,