From 1188b5282b8a13675f1b208a77092c0392d991da Mon Sep 17 00:00:00 2001 From: Julien Loudet Date: Tue, 24 Sep 2024 13:19:12 +0200 Subject: [PATCH] refactor(storage-manager): separate creation/start of StorageService This commit separates creating the `StorageService` from starting it. This change is motivated by the Replication feature: when performing the initial alignment we want to delay the Storage from answering queries until after the initial alignment has been performed. In order to have this functionality we need to be able to dissociate creating the `StorageService` from starting it. As the method `start_storage_queryable_subscriber` takes ownership of the `StorageService`, it became mandatory to first create the `StorageService`, then start the Replication and lastly start the Storage. Because of this, as the Replication code was inside a task, the code to create and start the Storage was also moved inside the task. * plugins/zenoh-plugin-storage-manager/src/replication/service.rs: - Take a reference over the `StorageService` structure as it is only needed before spawning the different Replication tasks. The StorageService is still needed to call `process_sample`. - Clone the `Arc` of the underlying Storage before spawning the Replication tasks. * plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs: - Move the logging until starting the Storage. - Move the code starting the Storage inside the task. - Start the `StorageService` after having started the `ReplicationService`. * plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs: - Renamed the function `start` to `new` as it now only creates an instance of the `StorageService`. - Removed the parameter `rx` from the call to `new` as it no longer also starts it. - Removed the call to `start_storage_queryable_subscriber` from `new`. - Changed the visibility of the method `start_storage_queryable_subscriber` to `pub(crate)` as it is called from outside the `service` module. - Added logging information before the Storage "loop" is started (to help confirm, with the logs, the order in which the different elements are started). Signed-off-by: Julien Loudet --- .../src/replication/service.rs | 6 +- .../src/storages_mgt/mod.rs | 67 ++++++++++--------- .../src/storages_mgt/service.rs | 15 +++-- 3 files changed, 46 insertions(+), 42 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs index 06ec31d9a2..f3b87c6c3f 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs @@ -44,7 +44,7 @@ impl ReplicationService { /// received. pub async fn spawn_start( zenoh_session: Arc, - storage_service: StorageService, + storage_service: &StorageService, storage_key_expr: OwnedKeyExpr, replication_log: Arc>, latest_updates: Arc>, @@ -98,13 +98,15 @@ impl ReplicationService { ); } + let storage = storage_service.storage.clone(); + tokio::task::spawn(async move { let replication = Replication { zenoh_session, replication_log, storage_key_expr, latest_updates, - storage: storage_service.storage.clone(), + storage, }; let replication_service = Self { diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs index 7dcc60bf32..3e5a2e1f09 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs @@ -68,9 +68,8 @@ pub(crate) async fn create_and_start_storage( let storage_name = parts[7]; let name = format!("{uuid}/{storage_name}"); - tracing::trace!("Start storage '{}' on keyexpr '{}'", name, config.key_expr); - let (tx, rx_storage) = tokio::sync::broadcast::channel(1); + let rx_replication = tx.subscribe(); let mut entries = match storage.get_all_entries().await { Ok(entries) => entries @@ -111,50 +110,52 @@ pub(crate) async fn create_and_start_storage( let latest_updates = Arc::new(RwLock::new(latest_updates)); let storage = Arc::new(Mutex::new(storage)); - let storage_service = StorageService::start( - zenoh_session.clone(), - config.clone(), - &name, - storage, - capability, - rx_storage, - CacheLatest::new(latest_updates.clone(), replication_log.clone()), - ) - .await; - - // Testing if the `replication_log` is set is equivalent to testing if the `replication` is - // set: the `replication_log` is only set when the latter is. - if let Some(replication_log) = replication_log { - let rx_replication = tx.subscribe(); - - // NOTE Although the function `ReplicationService::spawn_start` spawns its own tasks, we - // still need to call it within a dedicated task because the Zenoh routing tables are - // populated only after the plugins have been loaded. - // - // If we don't wait for the routing tables to be populated the initial alignment - // (i.e. querying any Storage on the network handling the same key expression), will - // never work. - // - // TODO Do we really want to perform such an initial alignment? Because this query will - // target any Storage that matches the same key expression, regardless of if they have - // been configured to be replicated. - tokio::task::spawn(async move { + + // NOTE The StorageService method `start_storage_queryable_subscriber` does not spawn its own + // task to loop/wait on the Subscriber and Queryable it creates. Thus we spawn the task + // here. + // + // Doing so also allows us to return early from the creation of the Storage, creation which + // blocks populating the routing tables. + // + // TODO Do we really want to perform such an initial alignment? Because this query will + // target any Storage that matches the same key expression, regardless of if they have + // been configured to be replicated. + tokio::task::spawn(async move { + let storage_service = StorageService::new( + zenoh_session.clone(), + config.clone(), + &name, + storage, + capability, + CacheLatest::new(latest_updates.clone(), replication_log.clone()), + ) + .await; + + // Testing if the `replication_log` is set is equivalent to testing if the `replication` is + // set: the `replication_log` is only set when the latter is. + if let Some(replication_log) = replication_log { tracing::debug!( "Starting replication of storage '{}' on keyexpr '{}'", name, config.key_expr, ); + ReplicationService::spawn_start( zenoh_session, - storage_service, + &storage_service, config.key_expr, replication_log, latest_updates, rx_replication, ) .await; - }); - } + } + + storage_service + .start_storage_queryable_subscriber(rx_storage) + .await; + }); Ok(tx) } diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs index 4d493ec8b2..e96320051f 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs @@ -71,13 +71,12 @@ pub struct StorageService { } impl StorageService { - pub async fn start( + pub async fn new( session: Arc, config: StorageConfig, name: &str, storage: Arc>>, capability: Capability, - rx: Receiver, cache_latest: CacheLatest, ) -> Self { let storage_service = StorageService { @@ -117,15 +116,11 @@ impl StorageService { } } } - storage_service - .clone() - .start_storage_queryable_subscriber(rx) - .await; storage_service } - async fn start_storage_queryable_subscriber(self, mut rx: Receiver) { + pub(crate) async fn start_storage_queryable_subscriber(self, mut rx: Receiver) { // start periodic GC event let t = Timer::default(); @@ -173,6 +168,12 @@ impl StorageService { } }; + tracing::debug!( + "Starting storage '{}' on keyexpr '{}'", + self.name, + storage_key_expr + ); + tokio::task::spawn(async move { loop { tokio::select!(