Skip to content

Commit

Permalink
fix: starting rest plugin like any other plugin (#1478)
Browse files Browse the repository at this point in the history
* fix: starting rest plugin like any other plugin

Signed-off-by: Gabriele Baldoni <[email protected]>

* fix: using mix of blockon_runtime and runtime_spawn

Signed-off-by: Gabriele Baldoni <[email protected]>

* chore: removing commented line

Signed-off-by: Gabriele Baldoni <[email protected]>

---------

Signed-off-by: Gabriele Baldoni <[email protected]>
  • Loading branch information
gabrik authored Sep 27, 2024
1 parent a6c6994 commit f60a7f0
Showing 1 changed file with 24 additions and 5 deletions.
29 changes: 24 additions & 5 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use futures::StreamExt;
use http_types::Method;
use serde::{Deserialize, Serialize};
use tide::{http::Mime, sse::Sender, Request, Response, Server, StatusCode};
use tokio::time::timeout;
use tokio::{task::JoinHandle, time::timeout};
use zenoh::{
bytes::{Encoding, ZBytes},
internal::{
Expand Down Expand Up @@ -72,6 +72,7 @@ lazy_static::lazy_static! {
.build()
.expect("Unable to create runtime");
}

#[inline(always)]
pub(crate) fn blockon_runtime<F: Future>(task: F) -> F::Output {
// Check whether able to get the current runtime
Expand All @@ -87,6 +88,24 @@ pub(crate) fn blockon_runtime<F: Future>(task: F) -> F::Output {
}
}

pub(crate) fn spawn_runtime<F>(task: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
// Check whether able to get the current runtime
match tokio::runtime::Handle::try_current() {
Ok(rt) => {
// Able to get the current runtime (standalone binary), spawn on the current runtime
rt.spawn(task)
}
Err(_) => {
// Unable to get the current runtime (dynamic plugins), spawn on the global runtime
TOKIO_RUNTIME.spawn(task)
}
}
}

#[derive(Serialize, Deserialize)]
struct JSONSample {
key: String,
Expand Down Expand Up @@ -286,15 +305,15 @@ impl Plugin for RestPlugin {
MAX_BLOCK_THREAD_NUM.store(conf.max_block_thread_num, Ordering::SeqCst);

let task = run(runtime.clone(), conf.clone());
let task = blockon_runtime(async {
timeout(Duration::from_millis(1), TOKIO_RUNTIME.spawn(task)).await
});
let task =
blockon_runtime(async { timeout(Duration::from_millis(1), spawn_runtime(task)).await });

// The spawn task (TOKIO_RUNTIME.spawn(task)) should not return immediately. The server should block inside.
// The spawn task (spawn_runtime(task)).await) should not return immediately. The server should block inside.
// If it returns immediately (for example, address already in use), we can get the error inside Ok
if let Ok(Ok(Err(e))) = task {
bail!("REST server failed within 1ms: {e}")
}

Ok(Box::new(RunningPlugin(conf)))
}
}
Expand Down

0 comments on commit f60a7f0

Please sign in to comment.