From cbeffcf72649855d43b23893b4c8d499623cabc4 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 22 Apr 2024 13:06:58 +0200 Subject: [PATCH] Allow applications to load plugins (#953) * Move plugins loading in Runtime init * Plugins support is behind conditional feature * Cargo.toml format * Update config format * Update config format * Admin space config * Runtime::new_with_plugins_manager * RuntimeBuilder * RuntimeBuilder * zenohd uses zenoh::open --- Cargo.lock | 19 ++ DEFAULT_CONFIG.json5 | 16 +- commons/zenoh-config/src/lib.rs | 18 +- commons/zenoh-util/src/std_only/lib_loader.rs | 7 + .../zenoh-plugin-storage-manager/src/lib.rs | 6 +- plugins/zenoh-plugin-trait/src/manager.rs | 19 +- .../src/manager/dynamic_plugin.rs | 7 +- .../src/manager/static_plugin.rs | 7 +- plugins/zenoh-plugin-trait/src/plugin.rs | 2 +- zenoh/Cargo.toml | 2 +- zenoh/src/lib.rs | 5 + zenoh/src/net/runtime/adminspace.rs | 175 ++++++++++-------- zenoh/src/net/runtime/mod.rs | 119 +++++++++--- zenoh/src/plugins/loader.rs | 117 ++++++++++++ zenoh/src/plugins/mod.rs | 1 + zenoh/src/plugins/sealed.rs | 2 +- zenoh/src/session.rs | 36 ++-- zenohd/Cargo.toml | 2 +- zenohd/src/main.rs | 135 ++------------ 19 files changed, 432 insertions(+), 263 deletions(-) create mode 100644 zenoh/src/plugins/loader.rs diff --git a/Cargo.lock b/Cargo.lock index 8494abf28b..16f7b4d1a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -531,6 +531,9 @@ name = "bitflags" version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" +dependencies = [ + "serde", +] [[package]] name = "blake3" @@ -3004,6 +3007,18 @@ dependencies = [ "cache-padded", ] +[[package]] +name = "ron" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94" +dependencies = [ + "base64 0.21.4", + "bitflags 2.4.2", + "serde", + "serde_derive", +] + [[package]] name = "route-recognizer" version = "0.2.0" @@ -5430,8 +5445,12 @@ version = "0.11.0-dev" dependencies = [ "futures", "lazy_static", + "libc", + "ron", + "serde", "tokio", "zenoh-collections", + "zenoh-macros", "zenoh-result", ] diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 974f3588df..bd3bbbaf6b 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -364,6 +364,8 @@ /// Configure the Admin Space /// Unstable: this configuration part works as advertised, but may change in a future release adminspace: { + // Enables the admin space + enabled: false, // read and/or write permissions on the admin space permissions: { read: true, @@ -374,9 +376,15 @@ /// /// Plugins configurations /// - // /// Directories where plugins configured by name should be looked for. Plugins configured by __path__ are not subject to lookup - // plugins_search_dirs: [], - // /// Plugins are only loaded if present in the configuration. When starting + // + // plugins_loading: { + // // Enable plugins loading. + // enabled: false, + // /// Directories where plugins configured by name should be looked for. Plugins configured by __path__ are not subject to lookup. + // /// If `enabled: true` and `search_dirs` is not specified then `search_dirs` falls back to the default value: ".:~/.zenoh/lib:/opt/homebrew/lib:/usr/local/lib:/usr/lib" + // search_dirs: [], + // }, + // /// Plugins are only loaded if `plugins_loading: { enabled: true }` and present in the configuration when starting. // /// Once loaded, they may react to changes in the configuration made through the zenoh instance's adminspace. // plugins: { // /// If no `__path__` is given to a plugin, zenohd will automatically search for a shared library matching the plugin's name (here, `libzenoh_plugin_rest.so` would be searched for on linux) @@ -385,7 +393,7 @@ // /// - If `__config__` is specified, it's content is merged into plugin configuration // /// - Properties loaded from `__config__` file overrides existing properties // /// - If json objects in loaded file contains `__config__` properties, they are processed recursively - // /// This is used in the 'storcge_manager' which supports subplugins, each with it's own config + // /// This is used in the 'storage_manager' which supports subplugins, each with it's own config // /// // /// See below exapmle of plugin configuration using `__config__` property // diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 1029446557..00ba5bca73 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -482,6 +482,9 @@ validated_struct::validator! { /// To use it, you must enable zenoh's unstable feature flag. /// AdminSpaceConf { + /// Enable the admin space + #[serde(default = "set_false")] + pub enabled: bool, /// Permissions on the admin space pub permissions: PermissionsConf { @@ -507,7 +510,11 @@ validated_struct::validator! { /// A list of directories where plugins may be searched for if no `__path__` was specified for them. /// The executable's current directory will be added to the search paths. - plugins_search_dirs: Vec, // TODO (low-prio): Switch this String to a PathBuf? (applies to other paths in the config as well) + pub plugins_loading: #[derive(Default)] + PluginsLoading { + pub enabled: bool, + pub search_dirs: Option>, // TODO (low-prio): Switch this String to a PathBuf? (applies to other paths in the config as well) + }, #[validated(recursive_accessors)] /// The configuration for plugins. /// @@ -721,10 +728,13 @@ impl Config { } pub fn libloader(&self) -> LibLoader { - if self.plugins_search_dirs.is_empty() { - LibLoader::default() + if self.plugins_loading.enabled { + match self.plugins_loading.search_dirs() { + Some(dirs) => LibLoader::new(dirs, true), + None => LibLoader::default(), + } } else { - LibLoader::new(&self.plugins_search_dirs, true) + LibLoader::empty() } } } diff --git a/commons/zenoh-util/src/std_only/lib_loader.rs b/commons/zenoh-util/src/std_only/lib_loader.rs index dec5bc07af..9c682e4343 100644 --- a/commons/zenoh-util/src/std_only/lib_loader.rs +++ b/commons/zenoh-util/src/std_only/lib_loader.rs @@ -36,6 +36,13 @@ pub struct LibLoader { } impl LibLoader { + /// Return an empty `LibLoader`. + pub fn empty() -> LibLoader { + LibLoader { + search_paths: Vec::new(), + } + } + /// Returns the list of search paths used by `LibLoader::default()` pub fn default_search_paths() -> &'static str { &LIB_DEFAULT_SEARCH_PATHS diff --git a/plugins/zenoh-plugin-storage-manager/src/lib.rs b/plugins/zenoh-plugin-storage-manager/src/lib.rs index b577a5f4ff..2c281144ce 100644 --- a/plugins/zenoh-plugin-storage-manager/src/lib.rs +++ b/plugins/zenoh-plugin-storage-manager/src/lib.rs @@ -112,7 +112,7 @@ impl StorageRuntimeInner { .unwrap_or_default(); let plugins_manager = PluginsManager::dynamic(lib_loader.clone(), BACKEND_LIB_PREFIX) - .declare_static_plugin::(); + .declare_static_plugin::(true); let session = Arc::new(zenoh::init(runtime.clone()).res_sync()?); @@ -198,10 +198,10 @@ impl StorageRuntimeInner { declared } else if let Some(paths) = config.paths() { self.plugins_manager - .declare_dynamic_plugin_by_paths(volume_id, paths)? + .declare_dynamic_plugin_by_paths(volume_id, paths, true)? } else { self.plugins_manager - .declare_dynamic_plugin_by_name(volume_id, backend_name)? + .declare_dynamic_plugin_by_name(volume_id, backend_name, true)? }; let loaded = declared.load()?; loaded.start(config)?; diff --git a/plugins/zenoh-plugin-trait/src/manager.rs b/plugins/zenoh-plugin-trait/src/manager.rs index 359d854d56..a205c3972d 100644 --- a/plugins/zenoh-plugin-trait/src/manager.rs +++ b/plugins/zenoh-plugin-trait/src/manager.rs @@ -31,6 +31,7 @@ pub trait DeclaredPlugin: PluginStatus { } pub trait LoadedPlugin: PluginStatus { fn as_status(&self) -> &dyn PluginStatus; + fn required(&self) -> bool; fn start(&mut self, args: &StartArgs) -> ZResult<&mut dyn StartedPlugin>; fn started(&self) -> Option<&dyn StartedPlugin>; fn started_mut(&mut self) -> Option<&mut dyn StartedPlugin>; @@ -44,11 +45,11 @@ pub trait StartedPlugin: PluginStatus { } struct PluginRecord( - Box + Send>, + Box + Send + Sync>, ); impl PluginRecord { - fn new + Send + 'static>(plugin: P) -> Self { + fn new + Send + Sync + 'static>(plugin: P) -> Self { Self(Box::new(plugin)) } } @@ -126,8 +127,9 @@ impl P: Plugin + Send + Sync, >( mut self, + required: bool, ) -> Self { - let plugin_loader: StaticPlugin = StaticPlugin::new(); + let plugin_loader: StaticPlugin = StaticPlugin::new(required); self.plugins.push(PluginRecord::new(plugin_loader)); tracing::debug!( "Declared static plugin {}", @@ -141,6 +143,7 @@ impl &mut self, name: S, plugin_name: &str, + required: bool, ) -> ZResult<&mut dyn DeclaredPlugin> { let name = name.into(); let plugin_name = format!("{}{}", self.default_lib_prefix, plugin_name); @@ -150,8 +153,11 @@ impl .ok_or("Dynamic plugin loading is disabled")? .clone(); tracing::debug!("Declared dynamic plugin {} by name {}", &name, &plugin_name); - let loader = - DynamicPlugin::new(name, DynamicPluginSource::ByName((libloader, plugin_name))); + let loader = DynamicPlugin::new( + name, + DynamicPluginSource::ByName((libloader, plugin_name)), + required, + ); self.plugins.push(PluginRecord::new(loader)); Ok(self.plugins.last_mut().unwrap()) } @@ -161,11 +167,12 @@ impl &mut self, name: S, paths: &[P], + required: bool, ) -> ZResult<&mut dyn DeclaredPlugin> { let name = name.into(); let paths = paths.iter().map(|p| p.as_ref().into()).collect(); tracing::debug!("Declared dynamic plugin {} by paths {:?}", &name, &paths); - let loader = DynamicPlugin::new(name, DynamicPluginSource::ByPaths(paths)); + let loader = DynamicPlugin::new(name, DynamicPluginSource::ByPaths(paths), required); self.plugins.push(PluginRecord::new(loader)); Ok(self.plugins.last_mut().unwrap()) } diff --git a/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs b/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs index 1b3168d1cf..90008aad36 100644 --- a/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs +++ b/plugins/zenoh-plugin-trait/src/manager/dynamic_plugin.rs @@ -106,6 +106,7 @@ impl pub struct DynamicPlugin { name: String, + required: bool, report: PluginReport, source: DynamicPluginSource, starter: Option>, @@ -113,9 +114,10 @@ pub struct DynamicPlugin { } impl DynamicPlugin { - pub fn new(name: String, source: DynamicPluginSource) -> Self { + pub fn new(name: String, source: DynamicPluginSource, required: bool) -> Self { Self { name, + required, report: PluginReport::new(), source, starter: None, @@ -202,6 +204,9 @@ impl LoadedPlugin &dyn PluginStatus { self } + fn required(&self) -> bool { + self.required + } fn start(&mut self, args: &StartArgs) -> ZResult<&mut dyn StartedPlugin> { let starter = self .starter diff --git a/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs b/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs index 577639261a..6d1bcae278 100644 --- a/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs +++ b/plugins/zenoh-plugin-trait/src/manager/static_plugin.rs @@ -20,6 +20,7 @@ where P: Plugin, { instance: Option, + required: bool, phantom: PhantomData

, } @@ -27,9 +28,10 @@ impl StaticPlugin, { - pub fn new() -> Self { + pub fn new(required: bool) -> Self { Self { instance: None, + required, phantom: PhantomData, } } @@ -92,6 +94,9 @@ where fn as_status(&self) -> &dyn PluginStatus { self } + fn required(&self) -> bool { + self.required + } fn start(&mut self, args: &StartArgs) -> ZResult<&mut dyn StartedPlugin> { if self.instance.is_none() { tracing::debug!("Plugin `{}` started", self.name()); diff --git a/plugins/zenoh-plugin-trait/src/plugin.rs b/plugins/zenoh-plugin-trait/src/plugin.rs index f1c2d09385..6911d614d5 100644 --- a/plugins/zenoh-plugin-trait/src/plugin.rs +++ b/plugins/zenoh-plugin-trait/src/plugin.rs @@ -155,7 +155,7 @@ pub trait PluginControl { pub trait PluginStartArgs: StructVersion {} -pub trait PluginInstance: StructVersion + PluginControl + Send {} +pub trait PluginInstance: StructVersion + PluginControl + Send + Sync {} /// Base plugin trait. The loaded plugin pub trait Plugin: Sized + 'static { diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index bb6087494b..7c9288731f 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -32,6 +32,7 @@ maintenance = { status = "actively-developed" } auth_pubkey = ["zenoh-transport/auth_pubkey"] auth_usrpwd = ["zenoh-transport/auth_usrpwd"] complete_n = ["zenoh-codec/complete_n"] +plugins = [] shared-memory = [ "zenoh-shm", "zenoh-protocol/shared-memory", @@ -70,7 +71,6 @@ ahash = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } const_format = { workspace = true } - event-listener = { workspace = true } flume = { workspace = true } form_urlencoded = { workspace = true } diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index d8820f7ad1..8a41464267 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -99,6 +99,10 @@ pub use zenoh_result::ZResult as Result; const GIT_VERSION: &str = git_version!(prefix = "v", cargo_prefix = "v"); +lazy_static::lazy_static!( + static ref LONG_VERSION: String = format!("{} built with {}", GIT_VERSION, env!("RUSTC_VERSION")); +); + pub const FEATURES: &str = concat_enabled_features!( prefix = "zenoh", features = [ @@ -137,6 +141,7 @@ pub mod handlers; pub mod info; #[cfg(feature = "unstable")] pub mod liveliness; +#[cfg(all(feature = "unstable", feature = "plugins"))] pub mod plugins; pub mod prelude; pub mod publication; diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index dde776f463..f6e15ef113 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -14,6 +14,7 @@ use super::routing::dispatcher::face::Face; use super::Runtime; use crate::key_expr::KeyExpr; use crate::net::primitives::Primitives; +#[cfg(all(feature = "unstable", feature = "plugins"))] use crate::plugins::sealed::{self as plugins}; use crate::prelude::sync::{Sample, SyncResolve}; use crate::queryable::Query; @@ -28,7 +29,9 @@ use std::sync::Mutex; use tracing::{error, trace}; use zenoh_buffers::buffer::SplitBuffer; use zenoh_config::{ConfigValidator, ValidatedMap, WhatAmI}; +#[cfg(all(feature = "unstable", feature = "plugins"))] use zenoh_plugin_trait::{PluginControl, PluginStatus}; +#[cfg(all(feature = "unstable", feature = "plugins"))] use zenoh_protocol::core::key_expr::keyexpr; use zenoh_protocol::{ core::{key_expr::OwnedKeyExpr, ExprId, KnownEncoding, WireExpr, ZenohId, EMPTY_EXPR_ID}, @@ -44,7 +47,6 @@ use zenoh_transport::unicast::TransportUnicast; pub struct AdminContext { runtime: Runtime, - plugins_mgr: Mutex, zid_str: String, version: String, metadata: serde_json::Value, @@ -60,6 +62,7 @@ pub struct AdminSpace { context: Arc, } +#[cfg(all(feature = "unstable", feature = "plugins"))] #[derive(Debug, Clone)] enum PluginDiff { Delete(String), @@ -74,31 +77,41 @@ impl ConfigValidator for AdminSpace { current: &serde_json::Map, new: &serde_json::Map, ) -> ZResult>> { - let plugin_mgr = zlock!(self.context.plugins_mgr); - let Some(plugin) = plugin_mgr.started_plugin(name) else { - tracing::warn!("Plugin `{}` is not started", name); - // If plugin not started, just allow any config. The plugin `name` will be attempted to start with this config - // on config comparison (see `PluginDiff`) - return Ok(None); - }; - plugin.instance().config_checker(path, current, new) + #[cfg(all(feature = "unstable", feature = "plugins"))] + { + let plugins_mgr = self.context.runtime.plugins_manager(); + let Some(plugin) = plugins_mgr.started_plugin(name) else { + tracing::warn!("Plugin `{}` is not started", name); + // If plugin not started, just allow any config. The plugin `name` will be attempted to start with this config + // on config comparison (see `PluginDiff`) + return Ok(None); + }; + plugin.instance().config_checker(path, current, new) + } + #[cfg(not(all(feature = "unstable", feature = "plugins")))] + { + let _ = (name, path, current, new); + Ok(None) + } } } impl AdminSpace { + #[cfg(all(feature = "unstable", feature = "plugins"))] fn start_plugin( plugin_mgr: &mut plugins::PluginsManager, config: &crate::config::PluginLoad, start_args: &Runtime, + required: bool, ) -> ZResult<()> { let name = &config.name; let declared = if let Some(declared) = plugin_mgr.plugin_mut(name) { tracing::warn!("Plugin `{}` was already declared", declared.name()); declared } else if let Some(paths) = &config.paths { - plugin_mgr.declare_dynamic_plugin_by_paths(name, paths)? + plugin_mgr.declare_dynamic_plugin_by_paths(name, paths, required)? } else { - plugin_mgr.declare_dynamic_plugin_by_name(name, name)? + plugin_mgr.declare_dynamic_plugin_by_name(name, name, required)? }; let loaded = if let Some(loaded) = declared.loaded_mut() { @@ -126,7 +139,7 @@ impl AdminSpace { Ok(()) } - pub async fn start(runtime: &Runtime, plugins_mgr: plugins::PluginsManager, version: String) { + pub async fn start(runtime: &Runtime, version: String) { let zid_str = runtime.state.zid.to_string(); let metadata = runtime.state.metadata.clone(); let root_key: OwnedKeyExpr = format!("@/router/{zid_str}").try_into().unwrap(); @@ -161,10 +174,14 @@ impl AdminSpace { .unwrap(), Arc::new(queryables_data), ); + + #[cfg(all(feature = "unstable", feature = "plugins"))] handlers.insert( format!("@/router/{zid_str}/plugins/**").try_into().unwrap(), Arc::new(plugins_data), ); + + #[cfg(all(feature = "unstable", feature = "plugins"))] handlers.insert( format!("@/router/{zid_str}/status/plugins/**") .try_into() @@ -172,14 +189,15 @@ impl AdminSpace { Arc::new(plugins_status), ); - let mut active_plugins = plugins_mgr + #[cfg(all(feature = "unstable", feature = "plugins"))] + let mut active_plugins = runtime + .plugins_manager() .started_plugins_iter() .map(|rec| (rec.name().to_string(), rec.path().to_string())) .collect::>(); let context = Arc::new(AdminContext { runtime: runtime.clone(), - plugins_mgr: Mutex::new(plugins_mgr), zid_str, version, metadata, @@ -200,72 +218,77 @@ impl AdminSpace { .lock() .set_plugin_validator(Arc::downgrade(&admin)); - let cfg_rx = admin.context.runtime.state.config.subscribe(); - tokio::task::spawn({ - let admin = admin.clone(); - async move { - while let Ok(change) = cfg_rx.recv_async().await { - let change = change.strip_prefix('/').unwrap_or(&change); - if !change.starts_with("plugins") { - continue; - } - - let requested_plugins = { - let cfg_guard = admin.context.runtime.state.config.lock(); - cfg_guard.plugins().load_requests().collect::>() - }; - let mut diffs = Vec::new(); - for plugin in active_plugins.keys() { - if !requested_plugins.iter().any(|r| &r.name == plugin) { - diffs.push(PluginDiff::Delete(plugin.clone())) + #[cfg(all(feature = "unstable", feature = "plugins"))] + { + let cfg_rx = admin.context.runtime.state.config.subscribe(); + + tokio::task::spawn({ + let admin = admin.clone(); + async move { + while let Ok(change) = cfg_rx.recv_async().await { + let change = change.strip_prefix('/').unwrap_or(&change); + if !change.starts_with("plugins") { + continue; } - } - for request in requested_plugins { - if let Some(active) = active_plugins.get(&request.name) { - if request - .paths - .as_ref() - .map(|p| p.contains(active)) - .unwrap_or(true) - { - continue; + + let requested_plugins = { + let cfg_guard = admin.context.runtime.state.config.lock(); + cfg_guard.plugins().load_requests().collect::>() + }; + let mut diffs = Vec::new(); + for plugin in active_plugins.keys() { + if !requested_plugins.iter().any(|r| &r.name == plugin) { + diffs.push(PluginDiff::Delete(plugin.clone())) } - diffs.push(PluginDiff::Delete(request.name.clone())) } - diffs.push(PluginDiff::Start(request)) - } - let mut plugins_mgr = zlock!(admin.context.plugins_mgr); - for diff in diffs { - match diff { - PluginDiff::Delete(name) => { - active_plugins.remove(name.as_str()); - if let Some(running) = plugins_mgr.started_plugin_mut(&name) { - running.stop() + for request in requested_plugins { + if let Some(active) = active_plugins.get(&request.name) { + if request + .paths + .as_ref() + .map(|p| p.contains(active)) + .unwrap_or(true) + { + continue; } + diffs.push(PluginDiff::Delete(request.name.clone())) } - PluginDiff::Start(plugin) => { - if let Err(e) = Self::start_plugin( - &mut plugins_mgr, - &plugin, - &admin.context.runtime, - ) { - if plugin.required { - panic!("Failed to load plugin `{}`: {}", plugin.name, e) - } else { - tracing::error!( - "Failed to load plugin `{}`: {}", - plugin.name, - e - ) + diffs.push(PluginDiff::Start(request)) + } + let mut plugins_mgr = admin.context.runtime.plugins_manager(); + for diff in diffs { + match diff { + PluginDiff::Delete(name) => { + active_plugins.remove(name.as_str()); + if let Some(running) = plugins_mgr.started_plugin_mut(&name) { + running.stop() + } + } + PluginDiff::Start(plugin) => { + if let Err(e) = Self::start_plugin( + &mut plugins_mgr, + &plugin, + &admin.context.runtime, + plugin.required, + ) { + if plugin.required { + panic!("Failed to load plugin `{}`: {}", plugin.name, e) + } else { + tracing::error!( + "Failed to load plugin `{}`: {}", + plugin.name, + e + ) + } } } } } } + tracing::info!("Running plugins: {:?}", &active_plugins) } - tracing::info!("Running plugins: {:?}", &active_plugins) - } - }); + }); + } let primitives = runtime.state.router.new_primitives(admin.clone()); zlock!(admin.primitives).replace(primitives.clone()); @@ -493,13 +516,16 @@ fn router_data(context: &AdminContext, query: Query) { let transport_mgr = context.runtime.manager().clone(); // plugins info + #[cfg(all(feature = "unstable", feature = "plugins"))] let plugins: serde_json::Value = { - let plugins_mgr = zlock!(context.plugins_mgr); + let plugins_mgr = context.runtime.plugins_manager(); plugins_mgr .started_plugins_iter() .map(|rec| (rec.name(), json!({ "path": rec.path() }))) .collect() }; + #[cfg(not(all(feature = "unstable", feature = "plugins")))] + let plugins = serde_json::Value::Null; // locators info let locators: Vec = transport_mgr @@ -693,8 +719,9 @@ fn queryables_data(context: &AdminContext, query: Query) { } } +#[cfg(all(feature = "unstable", feature = "plugins"))] fn plugins_data(context: &AdminContext, query: Query) { - let guard = zlock!(context.plugins_mgr); + let guard = context.runtime.plugins_manager(); let root_key = format!("@/router/{}/plugins", &context.zid_str); let root_key = unsafe { keyexpr::from_str_unchecked(&root_key) }; tracing::debug!("requested plugins status {:?}", query.key_expr()); @@ -711,9 +738,10 @@ fn plugins_data(context: &AdminContext, query: Query) { } } +#[cfg(all(feature = "unstable", feature = "plugins"))] fn plugins_status(context: &AdminContext, query: Query) { - let selector = query.selector(); - let guard = zlock!(context.plugins_mgr); + let selector: crate::Selector<'_> = query.selector(); + let guard = context.runtime.plugins_manager(); let mut root_key = format!("@/router/{}/status/plugins/", &context.zid_str); for plugin in guard.started_plugins_iter() { @@ -779,6 +807,7 @@ fn plugins_status(context: &AdminContext, query: Query) { } } +#[cfg(all(feature = "unstable", feature = "plugins"))] fn with_extended_string R>( prefix: &mut String, suffixes: &[&str], diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index 0f11e7cdb4..765eab91bb 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -24,12 +24,16 @@ use super::primitives::DeMux; use super::routing; use super::routing::router::Router; use crate::config::{unwrap_or_default, Config, ModeDependent, Notifier}; -use crate::GIT_VERSION; +#[cfg(all(feature = "unstable", feature = "plugins"))] +use crate::plugins::sealed::PluginsManager; +use crate::{GIT_VERSION, LONG_VERSION}; pub use adminspace::AdminSpace; use futures::stream::StreamExt; use futures::Future; use std::any::Any; use std::sync::{Arc, Weak}; +#[cfg(all(feature = "unstable", feature = "plugins"))] +use std::sync::{Mutex, MutexGuard}; use std::time::Duration; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -46,7 +50,7 @@ use zenoh_transport::{ TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, }; -struct RuntimeState { +pub(crate) struct RuntimeState { zid: ZenohId, whatami: WhatAmI, metadata: serde_json::Value, @@ -57,6 +61,8 @@ struct RuntimeState { locators: std::sync::RwLock>, hlc: Option>, task_controller: TaskController, + #[cfg(all(feature = "unstable", feature = "plugins"))] + plugins_manager: Mutex, } pub struct WeakRuntime { @@ -69,37 +75,37 @@ impl WeakRuntime { } } -#[derive(Clone)] -pub struct Runtime { - state: Arc, +pub struct RuntimeBuilder { + config: Config, + #[cfg(all(feature = "unstable", feature = "plugins"))] + plugins_manager: Option, } -impl StructVersion for Runtime { - fn struct_version() -> u64 { - 1 - } - fn struct_features() -> &'static str { - crate::FEATURES +impl RuntimeBuilder { + pub fn new(config: Config) -> Self { + Self { + config, + #[cfg(all(feature = "unstable", feature = "plugins"))] + plugins_manager: None, + } } -} - -impl PluginStartArgs for Runtime {} -impl Runtime { - pub async fn new(config: Config) -> ZResult { - let mut runtime = Runtime::init(config).await?; - match runtime.start().await { - Ok(()) => Ok(runtime), - Err(err) => Err(err), - } + #[cfg(all(feature = "unstable", feature = "plugins"))] + pub fn plugins_manager>>(mut self, plugins_manager: T) -> Self { + self.plugins_manager = plugins_manager.into(); + self } - pub(crate) async fn init(config: Config) -> ZResult { - tracing::debug!("Zenoh Rust API {}", GIT_VERSION); + pub async fn build(self) -> ZResult { + let RuntimeBuilder { + config, + #[cfg(all(feature = "unstable", feature = "plugins"))] + mut plugins_manager, + } = self; + tracing::debug!("Zenoh Rust API {}", GIT_VERSION); let zid = *config.id(); - - tracing::info!("Using PID: {}", zid); + tracing::info!("Using ZID: {}", zid); let whatami = unwrap_or_default!(config.mode()); let metadata = config.metadata().clone(); @@ -119,8 +125,15 @@ impl Runtime { .zid(zid) .build(handler.clone())?; - let config = Notifier::new(config); + // Plugins manager + #[cfg(all(feature = "unstable", feature = "plugins"))] + let plugins_manager = plugins_manager + .take() + .unwrap_or_else(|| crate::plugins::loader::load_plugins(&config)); + // Admin space creation flag + let start_admin_space = *config.adminspace.enabled(); + let config = Notifier::new(config); let runtime = Runtime { state: Arc::new(RuntimeState { zid, @@ -133,11 +146,18 @@ impl Runtime { locators: std::sync::RwLock::new(vec![]), hlc, task_controller: TaskController::default(), + #[cfg(all(feature = "unstable", feature = "plugins"))] + plugins_manager: Mutex::new(plugins_manager), }), }; *handler.runtime.write().unwrap() = Runtime::downgrade(&runtime); get_mut_unchecked(&mut runtime.state.router.clone()).init_link_state(runtime.clone()); + // Start plugins + #[cfg(all(feature = "unstable", feature = "plugins"))] + crate::plugins::loader::start_plugins(&runtime); + + // Start notifier task let receiver = config.subscribe(); let token = runtime.get_cancellation_token(); runtime.spawn({ @@ -164,20 +184,63 @@ impl Runtime { } }); + // Admin space + if start_admin_space { + AdminSpace::start(&runtime, LONG_VERSION.clone()).await; + } + Ok(runtime) } +} + +#[derive(Clone)] +pub struct Runtime { + state: Arc, +} + +impl StructVersion for Runtime { + fn struct_version() -> u64 { + 1 + } + fn struct_features() -> &'static str { + crate::FEATURES + } +} + +impl PluginStartArgs for Runtime {} + +impl Runtime { + pub async fn new(config: Config) -> ZResult { + // Create plugin_manager and load plugins + let mut runtime = Runtime::init(config).await?; + match runtime.start().await { + Ok(()) => Ok(runtime), + Err(err) => Err(err), + } + } + + pub(crate) async fn init(config: Config) -> ZResult { + RuntimeBuilder::new(config).build().await + } #[inline(always)] - pub fn manager(&self) -> &TransportManager { + pub(crate) fn manager(&self) -> &TransportManager { &self.state.manager } - pub fn new_handler(&self, handler: Arc) { + #[cfg(all(feature = "unstable", feature = "plugins"))] + #[inline(always)] + pub(crate) fn plugins_manager(&self) -> MutexGuard<'_, PluginsManager> { + zlock!(self.state.plugins_manager) + } + + pub(crate) fn new_handler(&self, handler: Arc) { zwrite!(self.state.transport_handlers).push(handler); } pub async fn close(&self) -> ZResult<()> { tracing::trace!("Runtime::close())"); + // TODO: Plugins should be stopped // TODO: Check this whether is able to terminate all spawned task by Runtime::spawn self.state .task_controller diff --git a/zenoh/src/plugins/loader.rs b/zenoh/src/plugins/loader.rs new file mode 100644 index 0000000000..084bae82b7 --- /dev/null +++ b/zenoh/src/plugins/loader.rs @@ -0,0 +1,117 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use super::sealed::{PluginsManager, PLUGIN_PREFIX}; +use crate::runtime::Runtime; +use zenoh_config::{Config, PluginLoad}; +use zenoh_result::ZResult; + +pub(crate) fn load_plugin( + plugin_mgr: &mut PluginsManager, + name: &str, + paths: &Option>, + required: bool, +) -> ZResult<()> { + let declared = if let Some(declared) = plugin_mgr.plugin_mut(name) { + tracing::warn!("Plugin `{}` was already declared", declared.name()); + declared + } else if let Some(paths) = paths { + plugin_mgr.declare_dynamic_plugin_by_paths(name, paths, required)? + } else { + plugin_mgr.declare_dynamic_plugin_by_name(name, name, required)? + }; + + if let Some(loaded) = declared.loaded_mut() { + tracing::warn!( + "Plugin `{}` was already loaded from {}", + loaded.name(), + loaded.path() + ); + } else { + let _ = declared.load()?; + }; + Ok(()) +} + +pub(crate) fn load_plugins(config: &Config) -> PluginsManager { + let mut manager = PluginsManager::dynamic(config.libloader(), PLUGIN_PREFIX.to_string()); + // Static plugins are to be added here, with `.add_static::()` + for plugin_load in config.plugins().load_requests() { + let PluginLoad { + name, + paths, + required, + } = plugin_load; + tracing::info!( + "Loading {req} plugin \"{name}\"", + req = if required { "required" } else { "" } + ); + if let Err(e) = load_plugin(&mut manager, &name, &paths, required) { + if required { + panic!("Plugin load failure: {}", e) + } else { + tracing::error!("Plugin load failure: {}", e) + } + } + } + manager +} + +pub(crate) fn start_plugins(runtime: &Runtime) { + let mut manager = runtime.plugins_manager(); + for plugin in manager.loaded_plugins_iter_mut() { + let required = plugin.required(); + tracing::info!( + "Starting {req} plugin \"{name}\"", + req = if required { "required" } else { "" }, + name = plugin.name() + ); + match plugin.start(runtime) { + Ok(_) => { + tracing::info!( + "Successfully started plugin {} from {:?}", + plugin.name(), + plugin.path() + ); + } + Err(e) => { + let report = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| e.to_string())) { + Ok(s) => s, + Err(_) => panic!("Formatting the error from plugin {} ({:?}) failed, this is likely due to ABI unstability.\r\nMake sure your plugin was built with the same version of cargo as zenohd", plugin.name(), plugin.path()), + }; + if required { + panic!( + "Plugin \"{}\" failed to start: {}", + plugin.name(), + if report.is_empty() { + "no details provided" + } else { + report.as_str() + } + ); + } else { + tracing::error!( + "Required plugin \"{}\" failed to start: {}", + plugin.name(), + if report.is_empty() { + "no details provided" + } else { + report.as_str() + } + ); + } + } + } + tracing::info!("Finished loading plugins"); + } +} diff --git a/zenoh/src/plugins/mod.rs b/zenoh/src/plugins/mod.rs index d72139cc29..be70cf75ec 100644 --- a/zenoh/src/plugins/mod.rs +++ b/zenoh/src/plugins/mod.rs @@ -17,6 +17,7 @@ //! This module is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](../../zenoh/index.html) +pub(crate) mod loader; pub(crate) mod sealed; #[zenoh_macros::unstable] diff --git a/zenoh/src/plugins/sealed.rs b/zenoh/src/plugins/sealed.rs index cc11fc213d..a3bfdc3aac 100644 --- a/zenoh/src/plugins/sealed.rs +++ b/zenoh/src/plugins/sealed.rs @@ -27,7 +27,7 @@ zconfigurable! { pub static ref PLUGIN_PREFIX: String = "zenoh_plugin_".to_string(); } /// A zenoh plugin, when started, must return this type. -pub type RunningPlugin = Box; +pub type RunningPlugin = Box; /// Zenoh plugins should implement this trait to ensure type-safety, even if the starting arguments and expected plugin types change in a future release. pub trait ZenohPlugin: Plugin {} diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 1791d39c77..37ae02fe8b 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -824,28 +824,20 @@ impl Session { tracing::debug!("Config: {:?}", &config); let aggregated_subscribers = config.aggregation().subscribers().clone(); let aggregated_publishers = config.aggregation().publishers().clone(); - match Runtime::init(config).await { - Ok(mut runtime) => { - let mut session = Self::init( - runtime.clone(), - aggregated_subscribers, - aggregated_publishers, - ) - .res_async() - .await; - session.owns_runtime = true; - match runtime.start().await { - Ok(()) => { - // Workaround for the declare_and_shoot problem - tokio::time::sleep(Duration::from_millis(*API_OPEN_SESSION_DELAY)) - .await; - Ok(session) - } - Err(err) => Err(err), - } - } - Err(err) => Err(err), - } + let mut runtime = Runtime::init(config).await?; + + let mut session = Self::init( + runtime.clone(), + aggregated_subscribers, + aggregated_publishers, + ) + .res_async() + .await; + session.owns_runtime = true; + runtime.start().await?; + // Workaround for the declare_and_shoot problem + tokio::time::sleep(Duration::from_millis(*API_OPEN_SESSION_DELAY)).await; + Ok(session) }) } diff --git a/zenohd/Cargo.toml b/zenohd/Cargo.toml index 9f471046a6..caf7169673 100644 --- a/zenohd/Cargo.toml +++ b/zenohd/Cargo.toml @@ -42,7 +42,7 @@ tracing = {workspace = true} tracing-subscriber = {workspace = true} tracing-loki = {workspace = true, optional = true } url = {workspace = true, optional = true } -zenoh = { workspace = true, features = ["unstable"] } +zenoh = { workspace = true, features = ["unstable", "plugins"] } [dev-dependencies] rand = { workspace = true, features = ["default"] } diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index e602d7c8a1..471b78380b 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -14,14 +14,11 @@ use clap::Parser; use futures::future; use git_version::git_version; -use std::collections::HashSet; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; -use zenoh::config::{Config, ModeDependentValue, PermissionsConf, PluginLoad, ValidatedMap}; -use zenoh::plugins::PluginsManager; -use zenoh::prelude::{EndPoint, WhatAmI}; -use zenoh::runtime::{AdminSpace, Runtime}; +use zenoh::config::{Config, ModeDependentValue, PermissionsConf, ValidatedMap}; +use zenoh::prelude::r#async::*; use zenoh::Result; #[cfg(feature = "loki")] @@ -94,129 +91,30 @@ struct Args { adminspace_permissions: Option, } -fn load_plugin( - plugin_mgr: &mut PluginsManager, - name: &str, - paths: &Option>, -) -> Result<()> { - let declared = if let Some(declared) = plugin_mgr.plugin_mut(name) { - tracing::warn!("Plugin `{}` was already declared", declared.name()); - declared - } else if let Some(paths) = paths { - plugin_mgr.declare_dynamic_plugin_by_paths(name, paths)? - } else { - plugin_mgr.declare_dynamic_plugin_by_name(name, name)? - }; - - if let Some(loaded) = declared.loaded_mut() { - tracing::warn!( - "Plugin `{}` was already loaded from {}", - loaded.name(), - loaded.path() - ); - } else { - let _ = declared.load()?; - }; - Ok(()) -} - fn main() { tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap() .block_on(async { - init_logging().unwrap(); - - tracing::info!("zenohd {}", *LONG_VERSION); - - let args = Args::parse(); - let config = config_from_args(&args); - tracing::info!("Initial conf: {}", &config); + init_logging().unwrap(); - let mut plugin_mgr = PluginsManager::dynamic(config.libloader(), "zenoh_plugin_"); - // Static plugins are to be added here, with `.add_static::()` - let mut required_plugins = HashSet::new(); - for plugin_load in config.plugins().load_requests() { - let PluginLoad { - name, - paths, - required, - } = plugin_load; - tracing::info!( - "Loading {req} plugin \"{name}\"", - req = if required { "required" } else { "" } - ); - if let Err(e) = load_plugin(&mut plugin_mgr, &name, &paths) { - if required { - panic!("Plugin load failure: {}", e) - } else { - tracing::error!("Plugin load failure: {}", e) - } - } - if required { - required_plugins.insert(name); - } - } + tracing::info!("zenohd {}", *LONG_VERSION); - let runtime = match Runtime::new(config).await { - Ok(runtime) => runtime, - Err(e) => { - println!("{e}. Exiting..."); - std::process::exit(-1); - } - }; + let args = Args::parse(); + let config = config_from_args(&args); + tracing::info!("Initial conf: {}", &config); - for plugin in plugin_mgr.loaded_plugins_iter_mut() { - let required = required_plugins.contains(plugin.name()); - tracing::info!( - "Starting {req} plugin \"{name}\"", - req = if required { "required" } else { "" }, - name = plugin.name() - ); - match plugin.start(&runtime) { - Ok(_) => { - tracing::info!( - "Successfully started plugin {} from {:?}", - plugin.name(), - plugin.path() - ); - } + let _session = match zenoh::open(config).res().await { + Ok(runtime) => runtime, Err(e) => { - let report = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| e.to_string())) { - Ok(s) => s, - Err(_) => panic!("Formatting the error from plugin {} ({:?}) failed, this is likely due to ABI unstability.\r\nMake sure your plugin was built with the same version of cargo as zenohd", plugin.name(), plugin.path()), - }; - if required { - panic!( - "Plugin \"{}\" failed to start: {}", - plugin.name(), - if report.is_empty() { - "no details provided" - } else { - report.as_str() - } - ); - } else { - tracing::error!( - "Required plugin \"{}\" failed to start: {}", - plugin.name(), - if report.is_empty() { - "no details provided" - } else { - report.as_str() - } - ); - } + println!("{e}. Exiting..."); + std::process::exit(-1); } - } - } - tracing::info!("Finished loading plugins"); + }; - AdminSpace::start(&runtime, plugin_mgr, LONG_VERSION.clone()).await; - - future::pending::<()>().await; - }); + future::pending::<()>().await; + }); } fn config_from_args(args: &Args) -> Config { @@ -246,9 +144,12 @@ fn config_from_args(args: &Args) -> Config { .unwrap(); } } + config.adminspace.set_enabled(true).unwrap(); + config.plugins_loading.set_enabled(true).unwrap(); if !args.plugin_search_dir.is_empty() { config - .set_plugins_search_dirs(args.plugin_search_dir.clone()) + .plugins_loading + .set_search_dirs(Some(args.plugin_search_dir.clone())) .unwrap(); } for plugin in &args.plugin {