From 0897e69aa611907f8cab8f7325d4a7dd1ed21ec1 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Fri, 27 Sep 2024 08:08:25 +0200 Subject: [PATCH 1/3] fix: starting rest plugin like any other plugin Signed-off-by: Gabriele Baldoni --- plugins/zenoh-plugin-rest/src/lib.rs | 31 ++++++++++++---------------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index c06fc09951..adf6758b10 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -26,7 +26,6 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, }, - time::Duration, }; use base64::Engine; @@ -34,11 +33,10 @@ use futures::StreamExt; use http_types::Method; use serde::{Deserialize, Serialize}; use tide::{http::Mime, sse::Sender, Request, Response, Server, StatusCode}; -use tokio::time::timeout; +use tokio::{task::JoinHandle, time::timeout}; use zenoh::{ bytes::{Encoding, ZBytes}, internal::{ - bail, plugins::{RunningPluginTrait, ZenohPlugin}, runtime::Runtime, zerror, @@ -72,17 +70,22 @@ lazy_static::lazy_static! { .build() .expect("Unable to create runtime"); } -#[inline(always)] -pub(crate) fn blockon_runtime(task: F) -> F::Output { + +pub(crate) fn spawn_runtime(task: F) -> JoinHandle +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + println!("########### I'M HERE #############"); // Check whether able to get the current runtime match tokio::runtime::Handle::try_current() { Ok(rt) => { - // Able to get the current runtime (standalone binary), use the current runtime - tokio::task::block_in_place(|| rt.block_on(task)) + // Able to get the current runtime (standalone binary), spawn on the current runtime + rt.spawn(task) } Err(_) => { - // Unable to get the current runtime (dynamic plugins), reuse the global runtime - tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task)) + // Unable to get the current runtime (dynamic plugins), spawn on the global runtime + TOKIO_RUNTIME.spawn(task) } } } @@ -285,16 +288,8 @@ impl Plugin for RestPlugin { WORKER_THREAD_NUM.store(conf.work_thread_num, Ordering::SeqCst); MAX_BLOCK_THREAD_NUM.store(conf.max_block_thread_num, Ordering::SeqCst); - let task = run(runtime.clone(), conf.clone()); - let task = blockon_runtime(async { - timeout(Duration::from_millis(1), TOKIO_RUNTIME.spawn(task)).await - }); + spawn_runtime(run(runtime.clone(), conf.clone())); - // The spawn task (TOKIO_RUNTIME.spawn(task)) should not return immediately. The server should block inside. - // If it returns immediately (for example, address already in use), we can get the error inside Ok - if let Ok(Ok(Err(e))) = task { - bail!("REST server failed within 1ms: {e}") - } Ok(Box::new(RunningPlugin(conf))) } } From c5f708b8060e29bb71c0d2bbd5ed58dd5dd47006 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Fri, 27 Sep 2024 08:57:23 +0200 Subject: [PATCH 2/3] fix: using mix of blockon_runtime and runtime_spawn Signed-off-by: Gabriele Baldoni --- plugins/zenoh-plugin-rest/src/lib.rs | 30 ++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index adf6758b10..726c87b78f 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -26,6 +26,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, }, + time::Duration, }; use base64::Engine; @@ -37,6 +38,7 @@ use tokio::{task::JoinHandle, time::timeout}; use zenoh::{ bytes::{Encoding, ZBytes}, internal::{ + bail, plugins::{RunningPluginTrait, ZenohPlugin}, runtime::Runtime, zerror, @@ -71,12 +73,26 @@ lazy_static::lazy_static! { .expect("Unable to create runtime"); } +#[inline(always)] +pub(crate) fn blockon_runtime(task: F) -> F::Output { + // Check whether able to get the current runtime + match tokio::runtime::Handle::try_current() { + Ok(rt) => { + // Able to get the current runtime (standalone binary), use the current runtime + tokio::task::block_in_place(|| rt.block_on(task)) + } + Err(_) => { + // Unable to get the current runtime (dynamic plugins), reuse the global runtime + tokio::task::block_in_place(|| TOKIO_RUNTIME.block_on(task)) + } + } +} + pub(crate) fn spawn_runtime(task: F) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, { - println!("########### I'M HERE #############"); // Check whether able to get the current runtime match tokio::runtime::Handle::try_current() { Ok(rt) => { @@ -288,7 +304,17 @@ impl Plugin for RestPlugin { WORKER_THREAD_NUM.store(conf.work_thread_num, Ordering::SeqCst); MAX_BLOCK_THREAD_NUM.store(conf.max_block_thread_num, Ordering::SeqCst); - spawn_runtime(run(runtime.clone(), conf.clone())); + // spawn_runtime(run(runtime.clone(), conf.clone())); + + let task = run(runtime.clone(), conf.clone()); + let task = + blockon_runtime(async { timeout(Duration::from_millis(1), spawn_runtime(task)).await }); + + // The spawn task (spawn_runtime(task)).await) should not return immediately. The server should block inside. + // If it returns immediately (for example, address already in use), we can get the error inside Ok + if let Ok(Ok(Err(e))) = task { + bail!("REST server failed within 1ms: {e}") + } Ok(Box::new(RunningPlugin(conf))) } From 18c7103897ebb4e66f0afeccee25dc956f05b3b9 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Fri, 27 Sep 2024 09:09:59 +0200 Subject: [PATCH 3/3] chore: removing commented line Signed-off-by: Gabriele Baldoni --- plugins/zenoh-plugin-rest/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 726c87b78f..f02a47c9a5 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -304,8 +304,6 @@ impl Plugin for RestPlugin { WORKER_THREAD_NUM.store(conf.work_thread_num, Ordering::SeqCst); MAX_BLOCK_THREAD_NUM.store(conf.max_block_thread_num, Ordering::SeqCst); - // spawn_runtime(run(runtime.clone(), conf.clone())); - let task = run(runtime.clone(), conf.clone()); let task = blockon_runtime(async { timeout(Duration::from_millis(1), spawn_runtime(task)).await });