diff --git a/zenoh-ext/Cargo.toml b/zenoh-ext/Cargo.toml index 8eef5acf18..372eaf234a 100644 --- a/zenoh-ext/Cargo.toml +++ b/zenoh-ext/Cargo.toml @@ -44,6 +44,7 @@ zenoh-macros = { workspace = true } zenoh-result = { workspace = true } zenoh-sync = { workspace = true } zenoh-util = { workspace = true } +zenoh-runtime = { workspace = true } [dev-dependencies] clap = { workspace = true, features = ["derive"] } diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index 2efc09e99d..c8c5679c91 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -165,8 +165,9 @@ impl<'a> PublicationCache<'a> { let resources_limit = conf.resources_limit; let history = conf.history; + // TODO(yuyuan): use CancellationToken to manage it let (stoptx, stoprx) = bounded::(1); - tokio::task::spawn(async move { + zenoh_runtime::ZRuntime::TX.spawn(async move { let mut cache: HashMap> = HashMap::with_capacity(resources_limit.unwrap_or(32)); let limit = resources_limit.unwrap_or(usize::MAX); diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 4aac565f78..298548f3b7 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -423,7 +423,11 @@ impl Runtime { // Must set to nonblocking according to the doc of tokio // https://docs.rs/tokio/latest/tokio/net/struct.UdpSocket.html#notes socket.set_nonblocking(true)?; - Ok(UdpSocket::from_std(socket.into())?) + + // UdpSocket::from_std requires a runtime even though it's a sync function + let udp_socket = zenoh_runtime::ZRuntime::Net + .block_in_place(async { UdpSocket::from_std(socket.into()) })?; + Ok(udp_socket) } pub fn bind_ucast_port(addr: IpAddr) -> ZResult { @@ -453,7 +457,11 @@ impl Runtime { // Must set to nonblocking according to the doc of tokio // https://docs.rs/tokio/latest/tokio/net/struct.UdpSocket.html#notes socket.set_nonblocking(true)?; - Ok(UdpSocket::from_std(socket.into())?) + + // UdpSocket::from_std requires a runtime even though it's a sync function + let udp_socket = zenoh_runtime::ZRuntime::Net + .block_in_place(async { UdpSocket::from_std(socket.into()) })?; + Ok(udp_socket) } async fn spawn_peer_connector(&self, peer: EndPoint) -> ZResult<()> {