Skip to content

Commit

Permalink
Allow plugins to fail at startup, and Zenohd to react to that failure (
Browse files Browse the repository at this point in the history
…#517)

Co-authored-by: Pierre Avital <[email protected]>
Co-authored-by: Luca Cominardi <[email protected]>
  • Loading branch information
3 people authored Jul 25, 2023
1 parent 72e3ecb commit ea2ba3d
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 7 deletions.
7 changes: 5 additions & 2 deletions io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,11 @@ fn set_uncompressed_batch_header(
if is_streamed {
let mut header = [0_u8, 0_u8];
header[..HEADER_BYTES_SIZE].copy_from_slice(&bytes[..HEADER_BYTES_SIZE]);
let mut batch_size = u16::from_le_bytes(header);
batch_size += 1;
let batch_size = if let Some(size) = u16::from_le_bytes(header).checked_add(1) {
size
} else {
bail!("Compression error: unable to convert compression size into u16",)
};
buff[0..HEADER_BYTES_SIZE].copy_from_slice(&batch_size.to_le_bytes());
buff[COMPRESSION_BYTE_INDEX_STREAMED] = COMPRESSION_DISABLED;
let batch_size: usize = batch_size.into();
Expand Down
10 changes: 8 additions & 2 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,11 @@ impl Plugin for RestPlugin {

let conf: Config = serde_json::from_value(plugin_conf.clone())
.map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
async_std::task::spawn(run(runtime.clone(), conf.clone()));
let task = async_std::task::spawn(run(runtime.clone(), conf.clone()));
let task = async_std::task::block_on(task.timeout(std::time::Duration::from_millis(1)));
if let Ok(Err(e)) = task {
bail!("REST server failed within 1ms: {e}")
}
Ok(Box::new(RunningPlugin(conf)))
}
}
Expand Down Expand Up @@ -435,7 +439,7 @@ async fn write(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
}
}

pub async fn run(runtime: Runtime, conf: Config) {
pub async fn run(runtime: Runtime, conf: Config) -> ZResult<()> {
// Try to initiate login.
// Required in case of dynamic lib, otherwise no logs.
// But cannot be done twice in case of static link.
Expand All @@ -461,7 +465,9 @@ pub async fn run(runtime: Runtime, conf: Config) {

if let Err(e) = app.listen(conf.http_port).await {
log::error!("Unable to start http server for REST: {:?}", e);
return Err(e.into());
}
Ok(())
}

fn path_to_key_expr<'a>(path: &'a str, zid: &str) -> ZResult<KeyExpr<'a>> {
Expand Down
27 changes: 24 additions & 3 deletions zenohd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use async_std::task;
use clap::{ArgMatches, Command};
use futures::future;
use git_version::git_version;
use std::collections::HashSet;
use zenoh::config::{
Config, EndPoint, ModeDependentValue, PermissionsConf, PluginLoad, ValidatedMap,
};
Expand Down Expand Up @@ -78,22 +79,30 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na

let mut plugins = PluginsManager::dynamic(config.libloader());
// Static plugins are to be added here, with `.add_static::<PluginType>()`
let mut required_plugins = HashSet::new();
for plugin_load in config.plugins().load_requests() {
let PluginLoad {
name,
paths,
required,
} = plugin_load;
log::info!(
"Loading {req} plugin \"{name}\"",
req = if required { "required" } else { "" }
);
if let Err(e) = match paths {
None => plugins.load_plugin_by_name(name),
Some(paths) => plugins.load_plugin_by_paths(name, &paths),
None => plugins.load_plugin_by_name(name.clone()),
Some(paths) => plugins.load_plugin_by_paths(name.clone(), &paths),
} {
if required {
panic!("Plugin load failure: {}", e)
} else {
log::error!("Plugin load failure: {}", e)
}
}
if required {
required_plugins.insert(name);
}
}

let runtime = match Runtime::new(config).await {
Expand All @@ -105,6 +114,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na
};

for (name, path, start_result) in plugins.start_all(&runtime) {
let required = required_plugins.contains(name);
log::info!(
"Starting {req} plugin \"{name}\"",
req = if required { "required" } else { "" }
);
match start_result {
Ok(Some(_)) => log::info!("Successfully started plugin {} from {:?}", name, path),
Ok(None) => log::warn!("Plugin {} from {:?} wasn't loaded, as an other plugin by the same name is already running", name, path),
Expand All @@ -113,7 +127,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na
Ok(s) => s,
Err(_) => panic!("Formatting the error from plugin {} ({:?}) failed, this is likely due to ABI unstability.\r\nMake sure your plugin was built with the same version of cargo as zenohd", name, path),
};
log::error!("Plugin start failure: {}", if report.is_empty() {"no details provided"} else {report.as_str()});
if required {
panic!("Plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()});
}else {
log::error!("Required plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()});
}
}
}
}
Expand Down Expand Up @@ -157,6 +175,9 @@ fn config_from_args(args: &ArgMatches) -> Config {
config
.insert_json5("plugins/rest/http_port", &format!(r#""{value}""#))
.unwrap();
config
.insert_json5("plugins/rest/__required__", "true")
.unwrap();
}
}
if let Some(plugins_search_dirs) = args.values_of("plugin-search-dir") {
Expand Down

0 comments on commit ea2ba3d

Please sign in to comment.