Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow plugins to fail at startup, and Zenohd to react to that failure #517

Merged
merged 3 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async fn tx_task(
let (batch_size, _) = tx_compressed(
is_compressed,
link.is_streamed(),
&bytes,
bytes,
&mut compression_aux_buff,
)?;
bytes = &compression_aux_buff[..batch_size];
Expand Down Expand Up @@ -471,7 +471,7 @@ fn rx_decompress(
end_pos: &mut usize,
) -> ZResult<()> {
let is_compressed: bool = buffer[COMPRESSION_BYTE_INDEX] == COMPRESSION_ENABLED;
Ok(if is_compressed {
if is_compressed {
let mut aux_buff = pool.try_take().unwrap_or_else(|| pool.alloc());
*end_pos = lz4_flex::block::decompress_into(
&buffer[BATCH_PAYLOAD_START_INDEX..read_bytes],
Expand All @@ -482,7 +482,8 @@ fn rx_decompress(
} else {
*start_pos = BATCH_PAYLOAD_START_INDEX;
*end_pos = read_bytes;
})
};
Ok(())
}

#[cfg(all(feature = "unstable", feature = "transport_compression"))]
Expand Down Expand Up @@ -589,14 +590,11 @@ fn set_uncompressed_batch_header(
if is_streamed {
let mut header = [0_u8, 0_u8];
header[..HEADER_BYTES_SIZE].copy_from_slice(&bytes[..HEADER_BYTES_SIZE]);
let mut batch_size = u16::from_le_bytes(header);
batch_size += 1;
let batch_size: u16 = batch_size.try_into().map_err(|e| {
zerror!(
"Compression error: unable to convert compression size into u16: {}",
e
)
})?;
let batch_size = if let Some(size) = u16::from_le_bytes(header).checked_add(1) {
size
} else {
bail!("Compression error: unable to convert compression size into u16",)
};
buff[0..HEADER_BYTES_SIZE].copy_from_slice(&batch_size.to_le_bytes());
buff[COMPRESSION_BYTE_INDEX_STREAMED] = COMPRESSION_DISABLED;
let batch_size: usize = batch_size.into();
Expand All @@ -612,7 +610,7 @@ fn set_uncompressed_batch_header(
// May happen when the payload size is itself the MTU and adding the header exceeds it.
Err(zerror!("Failed to send uncompressed batch, batch size ({}) exceeds the maximum batch size of {}.", final_batch_size, MAX_BATCH_SIZE))?;
}
return Ok(final_batch_size);
Ok(final_batch_size)
}

#[cfg(all(feature = "transport_compression", feature = "unstable"))]
Expand All @@ -626,47 +624,46 @@ fn tx_compression_test() {
// Compression done for the sake of comparing the result.
let payload_compression_size = lz4_flex::block::compress_into(&payload, &mut buff).unwrap();

fn get_header_value(buff: &Box<[u8]>) -> u16 {
fn get_header_value(buff: &[u8]) -> u16 {
let mut header = [0_u8, 0_u8];
header[..HEADER_BYTES_SIZE].copy_from_slice(&buff[..HEADER_BYTES_SIZE]);
let batch_size = u16::from_le_bytes(header);
batch_size
u16::from_le_bytes(header)
}

// Streamed with compression enabled
let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap();
let header = get_header_value(&buff);
assert_eq!(was_compressed, true);
assert!(was_compressed);
assert_eq!(header as usize, payload_compression_size + COMPRESSION_BYTE);
assert!(batch_size < batch.len() + COMPRESSION_BYTE);
assert_eq!(batch_size, payload_compression_size + 3);

// Not streamed with compression enabled
let batch = payload;
let (batch_size, was_compressed) = tx_compressed(true, false, &batch, &mut buff).unwrap();
assert_eq!(was_compressed, true);
assert!(was_compressed);
assert!(batch_size < batch.len() + COMPRESSION_BYTE);
assert_eq!(batch_size, payload_compression_size + COMPRESSION_BYTE);

// Streamed with compression disabled
let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
let (batch_size, was_compressed) = tx_compressed(false, true, &batch, &mut buff).unwrap();
let header = get_header_value(&buff);
assert_eq!(was_compressed, false);
assert!(!was_compressed);
assert_eq!(header as usize, payload.len() + COMPRESSION_BYTE);
assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE);

// Not streamed and compression disabled
let batch = payload;
let (batch_size, was_compressed) = tx_compressed(false, false, &batch, &mut buff).unwrap();
assert_eq!(was_compressed, false);
assert!(!was_compressed);
assert_eq!(batch_size, payload.len() + COMPRESSION_BYTE);

// Verify that if the compression result is bigger than the original payload size, then the non compressed payload is returned.
let batch = [16, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; // a non compressable payload with no repetitions
let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap();
assert_eq!(was_compressed, false);
assert!(!was_compressed);
assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE);
}

Expand Down
10 changes: 8 additions & 2 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,11 @@ impl Plugin for RestPlugin {

let conf: Config = serde_json::from_value(plugin_conf.clone())
.map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
async_std::task::spawn(run(runtime.clone(), conf.clone()));
let task = async_std::task::spawn(run(runtime.clone(), conf.clone()));
let task = async_std::task::block_on(task.timeout(std::time::Duration::from_millis(1)));
if let Ok(Err(e)) = task {
bail!("REST server failed within 1ms: {e}")
}
Ok(Box::new(RunningPlugin(conf)))
}
}
Expand Down Expand Up @@ -435,7 +439,7 @@ async fn write(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
}
}

pub async fn run(runtime: Runtime, conf: Config) {
pub async fn run(runtime: Runtime, conf: Config) -> ZResult<()> {
// Try to initiate login.
// Required in case of dynamic lib, otherwise no logs.
// But cannot be done twice in case of static link.
Expand All @@ -461,7 +465,9 @@ pub async fn run(runtime: Runtime, conf: Config) {

if let Err(e) = app.listen(conf.http_port).await {
log::error!("Unable to start http server for REST: {:?}", e);
return Err(e.into());
}
Ok(())
}

fn path_to_key_expr<'a>(path: &'a str, zid: &str) -> ZResult<KeyExpr<'a>> {
Expand Down
28 changes: 25 additions & 3 deletions zenohd/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashSet;
Mallets marked this conversation as resolved.
Show resolved Hide resolved

//
// Copyright (c) 2023 ZettaScale Technology
//
Expand Down Expand Up @@ -78,22 +80,30 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na

let mut plugins = PluginsManager::dynamic(config.libloader());
// Static plugins are to be added here, with `.add_static::<PluginType>()`
let mut required_plugins = HashSet::new();
for plugin_load in config.plugins().load_requests() {
let PluginLoad {
name,
paths,
required,
} = plugin_load;
log::info!(
"Loading {req} plugin \"{name}\"",
req = if required { "required" } else { "" }
);
if let Err(e) = match paths {
None => plugins.load_plugin_by_name(name),
Some(paths) => plugins.load_plugin_by_paths(name, &paths),
None => plugins.load_plugin_by_name(name.clone()),
Some(paths) => plugins.load_plugin_by_paths(name.clone(), &paths),
} {
if required {
panic!("Plugin load failure: {}", e)
} else {
log::error!("Plugin load failure: {}", e)
}
}
if required {
required_plugins.insert(name);
}
}

let runtime = match Runtime::new(config).await {
Expand All @@ -105,6 +115,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na
};

for (name, path, start_result) in plugins.start_all(&runtime) {
let required = required_plugins.contains(name);
log::info!(
"Starting {req} plugin \"{name}\"",
req = if required { "required" } else { "" }
);
match start_result {
Ok(Some(_)) => log::info!("Successfully started plugin {} from {:?}", name, path),
Ok(None) => log::warn!("Plugin {} from {:?} wasn't loaded, as an other plugin by the same name is already running", name, path),
Expand All @@ -113,7 +128,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na
Ok(s) => s,
Err(_) => panic!("Formatting the error from plugin {} ({:?}) failed, this is likely due to ABI unstability.\r\nMake sure your plugin was built with the same version of cargo as zenohd", name, path),
};
log::error!("Plugin start failure: {}", if report.is_empty() {"no details provided"} else {report.as_str()});
if required {
panic!("Plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()});
}else {
log::error!("Required plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()});
}
}
}
}
Expand Down Expand Up @@ -157,6 +176,9 @@ fn config_from_args(args: &ArgMatches) -> Config {
config
.insert_json5("plugins/rest/http_port", &format!(r#""{value}""#))
.unwrap();
config
.insert_json5("plugins/rest/__required__", "true")
.unwrap();
}
}
if let Some(plugins_search_dirs) = args.values_of("plugin-search-dir") {
Expand Down