diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index c06fc09951..f02a47c9a5 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -34,7 +34,7 @@ use futures::StreamExt; use http_types::Method; use serde::{Deserialize, Serialize}; use tide::{http::Mime, sse::Sender, Request, Response, Server, StatusCode}; -use tokio::time::timeout; +use tokio::{task::JoinHandle, time::timeout}; use zenoh::{ bytes::{Encoding, ZBytes}, internal::{ @@ -72,6 +72,7 @@ lazy_static::lazy_static! { .build() .expect("Unable to create runtime"); } + #[inline(always)] pub(crate) fn blockon_runtime(task: F) -> F::Output { // Check whether able to get the current runtime @@ -87,6 +88,24 @@ pub(crate) fn blockon_runtime(task: F) -> F::Output { } } +pub(crate) fn spawn_runtime(task: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), spawn on the current runtime + rt.spawn(task) + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), spawn on the global runtime + TOKIO_RUNTIME.spawn(task) + } + } +} + #[derive(Serialize, Deserialize)] struct JSONSample { key: String, @@ -286,15 +305,15 @@ impl Plugin for RestPlugin { MAX_BLOCK_THREAD_NUM.store(conf.max_block_thread_num, Ordering::SeqCst); let task = run(runtime.clone(), conf.clone()); - let task = blockon_runtime(async { - timeout(Duration::from_millis(1), TOKIO_RUNTIME.spawn(task)).await - }); + let task = + blockon_runtime(async { timeout(Duration::from_millis(1), spawn_runtime(task)).await }); - // The spawn task (TOKIO_RUNTIME.spawn(task)) should not return immediately. The server should block inside. + // The spawn task (spawn_runtime(task)).await) should not return immediately. The server should block inside. // If it returns immediately (for example, address already in use), we can get the error inside Ok if let Ok(Ok(Err(e))) = task { bail!("REST server failed within 1ms: {e}") } + Ok(Box::new(RunningPlugin(conf))) } }