From ea2ba3d72b0a356d1af9bbf3fb3cf463325ed1d4 Mon Sep 17 00:00:00 2001 From: Pierre Avital Date: Tue, 25 Jul 2023 14:40:35 +0200 Subject: [PATCH] Allow plugins to fail at startup, and Zenohd to react to that failure (#517) Co-authored-by: Pierre Avital Co-authored-by: Luca Cominardi --- io/zenoh-transport/src/unicast/link.rs | 7 +++++-- plugins/zenoh-plugin-rest/src/lib.rs | 10 ++++++++-- zenohd/src/main.rs | 27 +++++++++++++++++++++++--- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index c3200c872d..6c99cc627d 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -590,8 +590,11 @@ fn set_uncompressed_batch_header( if is_streamed { let mut header = [0_u8, 0_u8]; header[..HEADER_BYTES_SIZE].copy_from_slice(&bytes[..HEADER_BYTES_SIZE]); - let mut batch_size = u16::from_le_bytes(header); - batch_size += 1; + let batch_size = if let Some(size) = u16::from_le_bytes(header).checked_add(1) { + size + } else { + bail!("Compression error: unable to convert compression size into u16",) + }; buff[0..HEADER_BYTES_SIZE].copy_from_slice(&batch_size.to_le_bytes()); buff[COMPRESSION_BYTE_INDEX_STREAMED] = COMPRESSION_DISABLED; let batch_size: usize = batch_size.into(); diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 6d6e5956eb..d9165d0b1d 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -198,7 +198,11 @@ impl Plugin for RestPlugin { let conf: Config = serde_json::from_value(plugin_conf.clone()) .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?; - async_std::task::spawn(run(runtime.clone(), conf.clone())); + let task = async_std::task::spawn(run(runtime.clone(), conf.clone())); + let task = async_std::task::block_on(task.timeout(std::time::Duration::from_millis(1))); + if let Ok(Err(e)) = task { + bail!("REST server failed within 1ms: {e}") + } Ok(Box::new(RunningPlugin(conf))) } } @@ -435,7 +439,7 @@ async fn write(mut req: Request<(Arc, String)>) -> tide::Result ZResult<()> { // Try to initiate login. // Required in case of dynamic lib, otherwise no logs. // But cannot be done twice in case of static link. @@ -461,7 +465,9 @@ pub async fn run(runtime: Runtime, conf: Config) { if let Err(e) = app.listen(conf.http_port).await { log::error!("Unable to start http server for REST: {:?}", e); + return Err(e.into()); } + Ok(()) } fn path_to_key_expr<'a>(path: &'a str, zid: &str) -> ZResult> { diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index ba61d8fa29..3d436cad6d 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -15,6 +15,7 @@ use async_std::task; use clap::{ArgMatches, Command}; use futures::future; use git_version::git_version; +use std::collections::HashSet; use zenoh::config::{ Config, EndPoint, ModeDependentValue, PermissionsConf, PluginLoad, ValidatedMap, }; @@ -78,15 +79,20 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na let mut plugins = PluginsManager::dynamic(config.libloader()); // Static plugins are to be added here, with `.add_static::()` + let mut required_plugins = HashSet::new(); for plugin_load in config.plugins().load_requests() { let PluginLoad { name, paths, required, } = plugin_load; + log::info!( + "Loading {req} plugin \"{name}\"", + req = if required { "required" } else { "" } + ); if let Err(e) = match paths { - None => plugins.load_plugin_by_name(name), - Some(paths) => plugins.load_plugin_by_paths(name, &paths), + None => plugins.load_plugin_by_name(name.clone()), + Some(paths) => plugins.load_plugin_by_paths(name.clone(), &paths), } { if required { panic!("Plugin load failure: {}", e) @@ -94,6 +100,9 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na log::error!("Plugin load failure: {}", e) } } + if required { + required_plugins.insert(name); + } } let runtime = match Runtime::new(config).await { @@ -105,6 +114,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na }; for (name, path, start_result) in plugins.start_all(&runtime) { + let required = required_plugins.contains(name); + log::info!( + "Starting {req} plugin \"{name}\"", + req = if required { "required" } else { "" } + ); match start_result { Ok(Some(_)) => log::info!("Successfully started plugin {} from {:?}", name, path), Ok(None) => log::warn!("Plugin {} from {:?} wasn't loaded, as an other plugin by the same name is already running", name, path), @@ -113,7 +127,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na Ok(s) => s, Err(_) => panic!("Formatting the error from plugin {} ({:?}) failed, this is likely due to ABI unstability.\r\nMake sure your plugin was built with the same version of cargo as zenohd", name, path), }; - log::error!("Plugin start failure: {}", if report.is_empty() {"no details provided"} else {report.as_str()}); + if required { + panic!("Plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()}); + }else { + log::error!("Required plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()}); + } } } } @@ -157,6 +175,9 @@ fn config_from_args(args: &ArgMatches) -> Config { config .insert_json5("plugins/rest/http_port", &format!(r#""{value}""#)) .unwrap(); + config + .insert_json5("plugins/rest/__required__", "true") + .unwrap(); } } if let Some(plugins_search_dirs) = args.values_of("plugin-search-dir") {