diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs index e805bda26c..b957fc50f6 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs @@ -18,18 +18,20 @@ use std::{ }; use serde::{Deserialize, Serialize}; +use tokio::task::JoinHandle; use zenoh::{ bytes::ZBytes, internal::Value, - key_expr::OwnedKeyExpr, + key_expr::{format::keformat, OwnedKeyExpr}, query::{ConsolidationMode, Query, Selector}, sample::{Sample, SampleKind}, + session::ZenohId, }; use zenoh_backend_traits::StorageInsertionResult; use super::{ classification::{IntervalIdx, SubIntervalIdx}, - core::Replication, + core::{aligner_key_expr_formatter, Replication}, digest::{DigestDiff, Fingerprint}, log::EventMetadata, }; @@ -51,6 +53,8 @@ use super::{ /// hence directly skipping to the `SubIntervals` variant. #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] pub(crate) enum AlignmentQuery { + Discovery, + All, Diff(DigestDiff), Intervals(HashSet), SubIntervals(HashMap>), @@ -67,6 +71,7 @@ pub(crate) enum AlignmentQuery { /// Not all replies are made, it depends on the Era when a misalignment was detected. #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] pub(crate) enum AlignmentReply { + Discovery(ZenohId), Intervals(HashMap), SubIntervals(HashMap>), Events(Vec), @@ -101,6 +106,47 @@ impl Replication { }; match alignment_query { + AlignmentQuery::Discovery => { + tracing::trace!("Processing `AlignmentQuery::Discovery`"); + reply_to_query( + &query, + AlignmentReply::Discovery(self.zenoh_session.zid()), + None, + ) + .await; + } + AlignmentQuery::All => { + tracing::trace!("Processing `AlignmentQuery::All`"); + + let idx_intervals = self + .replication_log + .read() + .await + .intervals + .keys() + .copied() + .collect::>(); + + for interval_idx in idx_intervals { + let mut events_to_send = Vec::default(); + if let Some(interval) = self + .replication_log + .read() + .await + .intervals + .get(&interval_idx) + { + interval.sub_intervals.values().for_each(|sub_interval| { + events_to_send.extend(sub_interval.events.values().map(Into::into)); + }); + } + + // NOTE: As we took the lock in the `if let` block, it is released here, + // diminishing contention. + + self.reply_events(&query, events_to_send).await; + } + } AlignmentQuery::Diff(digest_diff) => { tracing::trace!("Processing `AlignmentQuery::Diff`"); if digest_diff.cold_eras_differ { @@ -262,6 +308,11 @@ impl Replication { /// is the reason why we need the consolidation to set to be `None` (⚠️). pub(crate) async fn reply_events(&self, query: &Query, events_to_retrieve: Vec) { for event_metadata in events_to_retrieve { + if event_metadata.action == SampleKind::Delete { + reply_to_query(query, AlignmentReply::Retrieval(event_metadata), None).await; + continue; + } + let stored_data = { let mut storage = self.storage.lock().await; match storage.get(event_metadata.stripped_key.clone(), "").await { @@ -323,7 +374,7 @@ impl Replication { &self, replica_aligner_ke: OwnedKeyExpr, alignment_query: AlignmentQuery, - ) { + ) -> JoinHandle<()> { let replication = self.clone(); tokio::task::spawn(async move { let attachment = match bincode::serialize(&alignment_query) { @@ -334,17 +385,29 @@ impl Replication { } }; + // NOTE: We need to put the Consolidation to `None` as otherwise if multiple replies are + // sent, they will be "consolidated" and only one of them will make it through. + // + // When we retrieve Samples from a Replica, each Sample is sent in a separate + // reply. Hence the need to have no consolidation. + let mut consolidation = ConsolidationMode::None; + + if matches!(alignment_query, AlignmentQuery::Discovery) { + // NOTE: `Monotonic` means that Zenoh will forward the first answer it receives (and + // ensure that later answers are with a higher timestamp — we do not care + // about that last aspect). + // + // By setting the consolidation to this value when performing the initial + // alignment, we select the most reactive Replica (hopefully the closest as + // well). + consolidation = ConsolidationMode::Monotonic; + } + match replication .zenoh_session .get(Into::::into(replica_aligner_ke.clone())) .attachment(attachment) - // NOTE: We need to put the Consolidation to `None` as otherwise if multiple replies - // are sent, they will be "consolidated" and only one of them will make it - // through. - // - // When we retrieve Samples from a Replica, each Sample is sent in a separate - // reply. Hence the need to have no consolidation. - .consolidation(ConsolidationMode::None) + .consolidation(consolidation) .await { Err(e) => { @@ -387,10 +450,17 @@ impl Replication { sample, ) .await; + + // The consolidation mode `Monotonic`, used for sending out an + // `AlignmentQuery::Discovery`, will keep on sending replies. We only want + // to discover / align with a single Replica so we break here. + if matches!(alignment_query, AlignmentQuery::Discovery) { + return; + } } } } - }); + }) } /// Processes the [AlignmentReply] sent by the Replica that has potentially data this Storage is @@ -438,6 +508,39 @@ impl Replication { sample: Sample, ) { match alignment_reply { + AlignmentReply::Discovery(replica_zid) => { + let parsed_ke = match aligner_key_expr_formatter::parse(&replica_aligner_ke) { + Ok(ke) => ke, + Err(e) => { + tracing::error!( + "Failed to parse < {replica_aligner_ke} > as a valid Aligner key \ + expression: {e:?}" + ); + return; + } + }; + + let replica_aligner_ke = match keformat!( + aligner_key_expr_formatter::formatter(), + hash_configuration = parsed_ke.hash_configuration(), + zid = replica_zid, + ) { + Ok(ke) => ke, + Err(e) => { + tracing::error!("Failed to generate a valid Aligner key expression: {e:?}"); + return; + } + }; + + tracing::debug!("Performing initial alignment with Replica < {replica_zid} >"); + + if let Err(e) = self + .spawn_query_replica_aligner(replica_aligner_ke, AlignmentQuery::All) + .await + { + tracing::error!("Error returned while performing the initial alignment: {e:?}"); + } + } AlignmentReply::Intervals(replica_intervals) => { tracing::trace!("Processing `AlignmentReply::Intervals`"); let intervals_diff = { @@ -616,18 +719,26 @@ impl Replication { } } - if matches!( - self.storage - .lock() - .await - .put( - replica_event.stripped_key.clone(), - sample.into(), - replica_event.timestamp, - ) - .await, - Ok(StorageInsertionResult::Outdated) | Err(_) - ) { + // NOTE: This code can only be called with `action` set to `delete` on an initial + // alignment, in which case the Storage of the receiving Replica is empty => there + // is no need to actually call `storage.delete`. + // + // Outside of an initial alignment, the `delete` action will be performed at the + // step above, in `AlignmentReply::Events`. + if replica_event.action == SampleKind::Put + && matches!( + self.storage + .lock() + .await + .put( + replica_event.stripped_key.clone(), + sample.into(), + replica_event.timestamp, + ) + .await, + Ok(StorageInsertionResult::Outdated) | Err(_) + ) + { return; } diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs index 0ef0824c65..0db1459ec7 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs @@ -35,16 +35,12 @@ use zenoh::{ }; use zenoh_backend_traits::Storage; -use super::{ - digest::Digest, - log::LogLatest, - service::{MAX_RETRY, WAIT_PERIOD_SECS}, -}; +use super::{digest::Digest, log::LogLatest}; use crate::{replication::aligner::AlignmentQuery, storages_mgt::LatestUpdates}; kedefine!( - pub digest_key_expr_formatter: "@-digest/${zid:*}/${storage_ke:**}", - pub aligner_key_expr_formatter: "@zid/${zid:*}/${storage_ke:**}/aligner", + pub digest_key_expr_formatter: "@-digest/${zid:*}/${hash_configuration:*}", + pub aligner_key_expr_formatter: "@zid/${zid:*}/${hash_configuration:*}/aligner", ); #[derive(Clone)] @@ -57,6 +53,63 @@ pub(crate) struct Replication { } impl Replication { + /// Performs an initial alignment, skipping the comparison of Digest, asking directly the first + /// discovered Replica for all its entries. + /// + /// # ⚠️ Assumption: empty Storage + /// + /// We assume that this method will only be called if the underlying Storage is empty. This has + /// at least one consequence: if the Aligner receives a `delete` event from the Replica, it will + /// not attempt to delete anything from the Storage. + /// + /// # Replica discovery + /// + /// To discover a Replica, this method will create a Digest subscriber, wait to receive a + /// *valid* Digest and, upon reception, ask that Replica for all its entries. + /// + /// To avoid waiting indefinitely (in case there are no other Replica on the network), the + /// subscriber will wait for, at most, the duration of two Intervals. + pub(crate) async fn initial_alignment(&self) { + let ke_all_replicas = match keformat!( + aligner_key_expr_formatter::formatter(), + hash_configuration = *self + .replication_log + .read() + .await + .configuration + .fingerprint(), + zid = "*", + ) { + Ok(ke) => ke, + Err(e) => { + tracing::error!( + "Failed to generate key expression to query all Replicas: {e:?}. Skipping \ + initial alignment." + ); + return; + } + }; + + // NOTE: As discussed with @OlivierHecart, the plugins do not wait for the duration of the + // "scouting delay" before performing any Zenoh operation. Hence, we manually enforce this + // delay when performing the initial alignment. + let delay = self + .zenoh_session + .config() + .lock() + .scouting + .delay() + .unwrap_or(500); + tokio::time::sleep(Duration::from_millis(delay)).await; + + if let Err(e) = self + .spawn_query_replica_aligner(ke_all_replicas, AlignmentQuery::Discovery) + .await + { + tracing::error!("Initial alignment failed with: {e:?}"); + } + } + /// Spawns a task that periodically publishes the [Digest] of the Replication [Log]. /// /// This task will perform the following steps: @@ -69,16 +122,20 @@ impl Replication { /// /// [Log]: crate::replication::log::LogLatest pub(crate) fn spawn_digest_publisher(&self) -> JoinHandle<()> { - let zenoh_session = self.zenoh_session.clone(); - let storage_key_expr = self.storage_key_expr.clone(); - let replication_log = self.replication_log.clone(); - let latest_updates = self.latest_updates.clone(); + let replication = self.clone(); tokio::task::spawn(async move { + let configuration = replication + .replication_log + .read() + .await + .configuration + .clone(); + let digest_key_put = match keformat!( digest_key_expr_formatter::formatter(), - zid = zenoh_session.zid(), - storage_ke = storage_key_expr + zid = replication.zenoh_session.zid(), + hash_configuration = *configuration.fingerprint(), ) { Ok(key) => key, Err(e) => { @@ -89,25 +146,14 @@ impl Replication { } }; - // Scope to not forget to release the lock. - let (publication_interval, propagation_delay, last_elapsed_interval) = { - let replication_log_guard = replication_log.read().await; - let configuration = replication_log_guard.configuration(); - let last_elapsed_interval = match configuration.last_elapsed_interval() { - Ok(idx) => idx, - Err(e) => { - tracing::error!( - "Fatal error, call to `last_elapsed_interval` failed with: {e:?}" - ); - return; - } - }; - - ( - configuration.interval, - configuration.propagation_delay, - last_elapsed_interval, - ) + let last_elapsed_interval = match configuration.last_elapsed_interval() { + Ok(idx) => idx, + Err(e) => { + tracing::error!( + "Fatal error, call to `last_elapsed_interval` failed with: {e:?}" + ); + return; + } }; // We have no control over when a replica is going to be started. The purpose is here @@ -115,7 +161,7 @@ impl Replication { // at every interval (+ δ). let duration_until_next_interval = { let millis_last_elapsed = - *last_elapsed_interval as u128 * publication_interval.as_millis(); + *last_elapsed_interval as u128 * configuration.interval.as_millis(); if millis_last_elapsed > u64::MAX as u128 { tracing::error!( @@ -138,7 +184,7 @@ impl Replication { }; Duration::from_millis( - (publication_interval.as_millis() - (millis_since_now - millis_last_elapsed)) + (configuration.interval.as_millis() - (millis_since_now - millis_last_elapsed)) as u64, ) }; @@ -148,7 +194,7 @@ impl Replication { let mut events = HashMap::default(); // Internal delay to avoid an "update storm". - let max_publication_delay = (publication_interval.as_millis() / 3) as u64; + let max_publication_delay = (configuration.interval.as_millis() / 3) as u64; let mut digest_update_start: Instant; let mut digest: Digest; @@ -160,15 +206,15 @@ impl Replication { // Except that we want to take into account the time it takes for a publication to // reach this Zenoh node. Hence, we sleep for `propagation_delay` to, hopefully, // catch the publications that are in transit. - tokio::time::sleep(propagation_delay).await; + tokio::time::sleep(configuration.propagation_delay).await; { - let mut latest_updates_guard = latest_updates.write().await; + let mut latest_updates_guard = replication.latest_updates.write().await; std::mem::swap(&mut events, &mut latest_updates_guard); } { - let mut replication_guard = replication_log.write().await; + let mut replication_guard = replication.replication_log.write().await; replication_guard.update(events.drain().map(|(_, event)| event)); digest = match replication_guard.digest() { Ok(digest) => digest, @@ -194,7 +240,8 @@ impl Replication { // buffer that, hopefully, has enough memory. let buffer_capacity = serialization_buffer.capacity(); - match zenoh_session + match replication + .zenoh_session .put( &digest_key_put, std::mem::replace( @@ -209,17 +256,17 @@ impl Replication { } let digest_update_duration = digest_update_start.elapsed(); - if digest_update_duration > publication_interval { + if digest_update_duration > configuration.interval { tracing::warn!( "The duration it took to update and publish the Digest is superior to the \ duration of an Interval ({} ms), we recommend increasing the duration of \ the latter. Digest update: {} ms (incl. delay: {} ms)", - publication_interval.as_millis(), + configuration.interval.as_millis(), digest_update_duration.as_millis(), - publication_delay + propagation_delay.as_millis() as u64 + publication_delay + configuration.propagation_delay.as_millis() as u64 ); } else { - tokio::time::sleep(publication_interval - digest_update_duration).await; + tokio::time::sleep(configuration.interval - digest_update_duration).await; } } }) @@ -233,16 +280,20 @@ impl Replication { /// /// [DigestDiff]: super::digest::DigestDiff pub(crate) fn spawn_digest_subscriber(&self) -> JoinHandle<()> { - let zenoh_session = self.zenoh_session.clone(); - let storage_key_expr = self.storage_key_expr.clone(); - let replication_log = self.replication_log.clone(); let replication = self.clone(); tokio::task::spawn(async move { + let configuration = replication + .replication_log + .read() + .await + .configuration + .clone(); + let digest_key_sub = match keformat!( digest_key_expr_formatter::formatter(), zid = "*", - storage_ke = &storage_key_expr + hash_configuration = *configuration.fingerprint() ) { Ok(key) => key, Err(e) => { @@ -255,33 +306,22 @@ impl Replication { } }; - let mut retry = 0; - let subscriber = loop { - match zenoh_session + let subscriber = match replication + .zenoh_session .declare_subscriber(&digest_key_sub) // NOTE: We need to explicitly set the locality to `Remote` as otherwise the // Digest subscriber will also receive the Digest published by its own // Digest publisher. .allowed_origin(Locality::Remote) .await - { - Ok(subscriber) => break subscriber, - Err(e) => { - if retry < MAX_RETRY { - retry += 1; - tracing::warn!( - "Failed to declare Digest subscriber: {e:?}. Attempt \ - {retry}/{MAX_RETRY}." - ); - tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await; - } else { - tracing::error!( - "Could not declare Digest subscriber. The storage will not \ - receive the Replication Digest of other replicas." - ); - return; - } - } + { + Ok(subscriber) => subscriber, + Err(e) => { + tracing::error!( + "Could not declare Digest subscriber: {e:?}. The storage will not receive \ + the Replication Digest of other replicas." + ); + return; } }; @@ -325,7 +365,7 @@ impl Replication { tracing::debug!("Replication digest received"); - let digest = match replication_log.read().await.digest() { + let digest = match replication.replication_log.read().await.digest() { Ok(digest) => digest, Err(e) => { tracing::error!( @@ -340,7 +380,7 @@ impl Replication { let replica_aligner_ke = match keformat!( aligner_key_expr_formatter::formatter(), - storage_ke = &storage_key_expr, + hash_configuration = *configuration.fingerprint(), zid = source_zid, ) { Ok(key) => key, @@ -373,15 +413,20 @@ impl Replication { /// responsible for fetching in the Replication Log or in the Storage the relevant information /// to send to the Replica such that it can align its own Storage. pub(crate) fn spawn_aligner_queryable(&self) -> JoinHandle<()> { - let zenoh_session = self.zenoh_session.clone(); - let storage_key_expr = self.storage_key_expr.clone(); let replication = self.clone(); tokio::task::spawn(async move { + let configuration = replication + .replication_log + .read() + .await + .configuration + .clone(); + let aligner_ke = match keformat!( aligner_key_expr_formatter::formatter(), - zid = zenoh_session.zid(), - storage_ke = storage_key_expr, + zid = replication.zenoh_session.zid(), + hash_configuration = *configuration.fingerprint(), ) { Ok(ke) => ke, Err(e) => { @@ -393,30 +438,19 @@ impl Replication { } }; - let mut retry = 0; - let queryable = loop { - match zenoh_session - .declare_queryable(&aligner_ke) - .allowed_origin(Locality::Remote) - .await - { - Ok(queryable) => break queryable, - Err(e) => { - if retry < MAX_RETRY { - retry += 1; - tracing::warn!( - "Failed to declare the Aligner queryable: {e:?}. Attempt \ - {retry}/{MAX_RETRY}." - ); - tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await; - } else { - tracing::error!( - "Could not declare the Aligner queryable. This storage will NOT \ - align with other replicas." - ); - return; - } - } + let queryable = match replication + .zenoh_session + .declare_queryable(&aligner_ke) + .allowed_origin(Locality::Remote) + .await + { + Ok(queryable) => queryable, + Err(e) => { + tracing::error!( + "Could not declare the Aligner queryable: {e:?}. This storage will NOT \ + align with other replicas." + ); + return; } }; diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs index 06ec31d9a2..e48fb4fecd 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs @@ -12,13 +12,13 @@ // ZettaScale Zenoh Team, // -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tokio::{ sync::{broadcast::Receiver, RwLock}, task::JoinHandle, }; -use zenoh::{key_expr::OwnedKeyExpr, query::QueryTarget, sample::Locality, session::Session}; +use zenoh::{key_expr::OwnedKeyExpr, session::Session}; use super::{core::Replication, LogLatest}; use crate::storages_mgt::{LatestUpdates, StorageMessage, StorageService}; @@ -29,84 +29,53 @@ pub(crate) struct ReplicationService { aligner_queryable_handle: JoinHandle<()>, } -pub(crate) const MAX_RETRY: usize = 2; -pub(crate) const WAIT_PERIOD_SECS: u64 = 4; - impl ReplicationService { /// Starts the `ReplicationService`, spawning multiple tasks. /// + /// # Initial alignment + /// + /// To optimise network resources, if the Storage is empty an "initial alignment" will be + /// performed: if a Replica is detected, a query will be made to retrieve the entire content of + /// its Storage. + /// /// # Tasks spawned /// - /// This function will spawn two tasks: + /// This function will spawn four long-lived tasks: /// 1. One to publish the [Digest]. - /// 2. One to wait on the provided [Receiver] in order to stop the Replication Service, + /// 2. One to receive the [Digest] of other Replica. + /// 3. One to receive alignment queries of other Replica. + /// 4. One to wait on the provided [Receiver] in order to stop the Replication Service, /// attempting to abort all the tasks that were spawned, once a Stop message has been /// received. pub async fn spawn_start( zenoh_session: Arc, - storage_service: StorageService, + storage_service: &StorageService, storage_key_expr: OwnedKeyExpr, replication_log: Arc>, latest_updates: Arc>, mut rx: Receiver, ) { - // We perform a "wait-try" policy because Zenoh needs some time to propagate the routing - // information and, here, we need to have the queryables propagated. - // - // 4 seconds is an arbitrary value. - let mut attempt = 0; - let mut received_reply = false; - - while attempt < MAX_RETRY { - attempt += 1; - tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await; - - match zenoh_session - .get(&storage_key_expr) - // `BestMatching`, the default option for `target`, will try to minimise the storage - // that are queried and their distance while trying to maximise the key space - // covered. - // - // In other words, if there is a close and complete storage, it will only query this - // one. - .target(QueryTarget::BestMatching) - // The value `Remote` is self-explanatory but why it is needed deserves an - // explanation: we do not want to query the local database as the purpose is to get - // the data from other replicas (if there is one). - .allowed_destination(Locality::Remote) - .await - { - Ok(replies) => { - while let Ok(reply) = replies.recv_async().await { - received_reply = true; - if let Ok(sample) = reply.into_result() { - if let Err(e) = storage_service.process_sample(sample).await { - tracing::error!("{e:?}"); - } - } - } - } - Err(e) => tracing::error!("Initial alignment Query failed with: {e:?}"), - } + let storage = storage_service.storage.clone(); - if received_reply { - break; - } + let replication = Replication { + zenoh_session, + replication_log, + storage_key_expr, + latest_updates, + storage, + }; - tracing::debug!( - "Found no Queryable matching '{storage_key_expr}'. Attempt {attempt}/{MAX_RETRY}." - ); + if replication + .replication_log + .read() + .await + .intervals + .is_empty() + { + replication.initial_alignment().await; } tokio::task::spawn(async move { - let replication = Replication { - zenoh_session, - replication_log, - storage_key_expr, - latest_updates, - storage: storage_service.storage.clone(), - }; - let replication_service = Self { digest_publisher_handle: replication.spawn_digest_publisher(), digest_subscriber_handle: replication.spawn_digest_subscriber(), @@ -122,7 +91,7 @@ impl ReplicationService { }); } - /// Stops all the tasks spawned by the `ReplicationService`. + /// Stops all the long-lived tasks spawned by the `ReplicationService`. pub fn stop(self) { self.digest_publisher_handle.abort(); self.digest_subscriber_handle.abort(); diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs index 7dcc60bf32..3e5a2e1f09 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs @@ -68,9 +68,8 @@ pub(crate) async fn create_and_start_storage( let storage_name = parts[7]; let name = format!("{uuid}/{storage_name}"); - tracing::trace!("Start storage '{}' on keyexpr '{}'", name, config.key_expr); - let (tx, rx_storage) = tokio::sync::broadcast::channel(1); + let rx_replication = tx.subscribe(); let mut entries = match storage.get_all_entries().await { Ok(entries) => entries @@ -111,50 +110,52 @@ pub(crate) async fn create_and_start_storage( let latest_updates = Arc::new(RwLock::new(latest_updates)); let storage = Arc::new(Mutex::new(storage)); - let storage_service = StorageService::start( - zenoh_session.clone(), - config.clone(), - &name, - storage, - capability, - rx_storage, - CacheLatest::new(latest_updates.clone(), replication_log.clone()), - ) - .await; - - // Testing if the `replication_log` is set is equivalent to testing if the `replication` is - // set: the `replication_log` is only set when the latter is. - if let Some(replication_log) = replication_log { - let rx_replication = tx.subscribe(); - - // NOTE Although the function `ReplicationService::spawn_start` spawns its own tasks, we - // still need to call it within a dedicated task because the Zenoh routing tables are - // populated only after the plugins have been loaded. - // - // If we don't wait for the routing tables to be populated the initial alignment - // (i.e. querying any Storage on the network handling the same key expression), will - // never work. - // - // TODO Do we really want to perform such an initial alignment? Because this query will - // target any Storage that matches the same key expression, regardless of if they have - // been configured to be replicated. - tokio::task::spawn(async move { + + // NOTE The StorageService method `start_storage_queryable_subscriber` does not spawn its own + // task to loop/wait on the Subscriber and Queryable it creates. Thus we spawn the task + // here. + // + // Doing so also allows us to return early from the creation of the Storage, creation which + // blocks populating the routing tables. + // + // TODO Do we really want to perform such an initial alignment? Because this query will + // target any Storage that matches the same key expression, regardless of if they have + // been configured to be replicated. + tokio::task::spawn(async move { + let storage_service = StorageService::new( + zenoh_session.clone(), + config.clone(), + &name, + storage, + capability, + CacheLatest::new(latest_updates.clone(), replication_log.clone()), + ) + .await; + + // Testing if the `replication_log` is set is equivalent to testing if the `replication` is + // set: the `replication_log` is only set when the latter is. + if let Some(replication_log) = replication_log { tracing::debug!( "Starting replication of storage '{}' on keyexpr '{}'", name, config.key_expr, ); + ReplicationService::spawn_start( zenoh_session, - storage_service, + &storage_service, config.key_expr, replication_log, latest_updates, rx_replication, ) .await; - }); - } + } + + storage_service + .start_storage_queryable_subscriber(rx_storage) + .await; + }); Ok(tx) } diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs index 42e82eae94..e96320051f 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs @@ -61,10 +61,8 @@ struct Update { #[derive(Clone)] pub struct StorageService { session: Arc, - key_expr: OwnedKeyExpr, - complete: bool, + configuration: StorageConfig, name: String, - strip_prefix: Option, pub(crate) storage: Arc>>, capability: Capability, tombstones: Arc>>, @@ -73,21 +71,18 @@ pub struct StorageService { } impl StorageService { - pub async fn start( + pub async fn new( session: Arc, config: StorageConfig, name: &str, storage: Arc>>, capability: Capability, - rx: Receiver, cache_latest: CacheLatest, ) -> Self { let storage_service = StorageService { session, - key_expr: config.key_expr, - complete: config.complete, + configuration: config, name: name.to_string(), - strip_prefix: config.strip_prefix, storage, capability, tombstones: Arc::new(RwLock::new(KeBoxTree::default())), @@ -121,22 +116,16 @@ impl StorageService { } } } - storage_service - .clone() - .start_storage_queryable_subscriber(rx, config.garbage_collection_config) - .await; storage_service } - async fn start_storage_queryable_subscriber( - self, - mut rx: Receiver, - gc_config: GarbageCollectionConfig, - ) { + pub(crate) async fn start_storage_queryable_subscriber(self, mut rx: Receiver) { // start periodic GC event let t = Timer::default(); + let gc_config = self.configuration.garbage_collection_config.clone(); + let latest_updates = if self.cache_latest.replication_log.is_none() { Some(self.cache_latest.latest_updates.clone()) } else { @@ -154,8 +143,10 @@ impl StorageService { ); t.add_async(gc).await; + let storage_key_expr = &self.configuration.key_expr; + // subscribe on key_expr - let storage_sub = match self.session.declare_subscriber(&self.key_expr).await { + let storage_sub = match self.session.declare_subscriber(storage_key_expr).await { Ok(storage_sub) => storage_sub, Err(e) => { tracing::error!("Error starting storage '{}': {}", self.name, e); @@ -166,8 +157,8 @@ impl StorageService { // answer to queries on key_expr let storage_queryable = match self .session - .declare_queryable(&self.key_expr) - .complete(self.complete) + .declare_queryable(storage_key_expr) + .complete(self.configuration.complete) .await { Ok(storage_queryable) => storage_queryable, @@ -177,6 +168,12 @@ impl StorageService { } }; + tracing::debug!( + "Starting storage '{}' on keyexpr '{}'", + self.name, + storage_key_expr + ); + tokio::task::spawn(async move { loop { tokio::select!( @@ -249,6 +246,8 @@ impl StorageService { matching_keys ); + let prefix = self.configuration.strip_prefix.as_ref(); + for k in matching_keys { if self.is_deleted(&k, sample_timestamp).await { tracing::trace!("Skipping Sample < {} > deleted later on", k); @@ -297,13 +296,12 @@ impl StorageService { } }; - let stripped_key = - match crate::strip_prefix(self.strip_prefix.as_ref(), sample_to_store.key_expr()) { - Ok(stripped) => stripped, - Err(e) => { - bail!("{e:?}"); - } - }; + let stripped_key = match crate::strip_prefix(prefix, sample_to_store.key_expr()) { + Ok(stripped) => stripped, + Err(e) => { + bail!("{e:?}"); + } + }; // If the Storage was declared as only keeping the Latest value, we ensure that, for // each received Sample, it is indeed the Latest value that is processed. @@ -436,19 +434,21 @@ impl StorageService { let wildcards = self.wildcard_updates.read().await; let mut ts = timestamp; let mut update = None; + + let prefix = self.configuration.strip_prefix.as_ref(); + for node in wildcards.intersecting_keys(key_expr) { let weight = wildcards.weight_at(&node); if weight.is_some() && weight.unwrap().data.timestamp > *ts { // if the key matches a wild card update, check whether it was saved in storage // remember that wild card updates change only existing keys - let stripped_key = - match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) { - Ok(stripped) => stripped, - Err(e) => { - tracing::error!("{}", e); - break; - } - }; + let stripped_key = match crate::strip_prefix(prefix, &key_expr.into()) { + Ok(stripped) => stripped, + Err(e) => { + tracing::error!("{}", e); + break; + } + }; let mut storage = self.storage.lock().await; match storage.get(stripped_key, "").await { Ok(stored_data) => { @@ -531,20 +531,22 @@ impl StorageService { } }; tracing::trace!("[STORAGE] Processing query on key_expr: {}", q.key_expr()); + + let prefix = self.configuration.strip_prefix.as_ref(); + if q.key_expr().is_wild() { // resolve key expr into individual keys let matching_keys = self.get_matching_keys(q.key_expr()).await; let mut storage = self.storage.lock().await; for key in matching_keys { - let stripped_key = - match crate::strip_prefix(self.strip_prefix.as_ref(), &key.clone().into()) { - Ok(k) => k, - Err(e) => { - tracing::error!("{}", e); - // @TODO: return error when it is supported - return; - } - }; + let stripped_key = match crate::strip_prefix(prefix, &key.clone().into()) { + Ok(k) => k, + Err(e) => { + tracing::error!("{}", e); + // @TODO: return error when it is supported + return; + } + }; match storage.get(stripped_key, q.parameters().as_str()).await { Ok(stored_data) => { for entry in stored_data { @@ -569,7 +571,7 @@ impl StorageService { } drop(storage); } else { - let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), q.key_expr()) { + let stripped_key = match crate::strip_prefix(prefix, q.key_expr()) { Ok(k) => k, Err(e) => { tracing::error!("{}", e); @@ -606,13 +608,26 @@ impl StorageService { let mut result = Vec::new(); // @TODO: if cache exists, use that to get the list let storage = self.storage.lock().await; + + let prefix = self.configuration.strip_prefix.as_ref(); + match storage.get_all_entries().await { Ok(entries) => { for (k, _ts) in entries { // @TODO: optimize adding back the prefix (possible inspiration from https://github.com/eclipse-zenoh/zenoh/blob/0.5.0-beta.9/backends/traits/src/utils.rs#L79) let full_key = match k { - Some(key) => crate::prefix(self.strip_prefix.as_ref(), &key), - None => self.strip_prefix.clone().unwrap(), + Some(key) => crate::prefix(prefix, &key), + None => { + let Some(prefix) = prefix else { + // TODO Check if we have anything in place that would prevent such + // an error from happening. + tracing::error!( + "Internal bug: empty key with no `strip_prefix` configured" + ); + continue; + }; + prefix.clone() + } }; if key_expr.intersects(&full_key.clone()) { result.push(full_key); diff --git a/zenoh/tests/authentication.rs b/zenoh/tests/authentication.rs index a49261c9d1..3d6579d6a4 100644 --- a/zenoh/tests/authentication.rs +++ b/zenoh/tests/authentication.rs @@ -41,10 +41,10 @@ mod test { create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - test_pub_sub_deny_then_allow_usrpswd(37447).await; - test_pub_sub_allow_then_deny_usrpswd(37447).await; - test_get_qbl_allow_then_deny_usrpswd(37447).await; - test_get_qbl_deny_then_allow_usrpswd(37447).await; + test_pub_sub_deny_then_allow_usrpswd(29447).await; + test_pub_sub_allow_then_deny_usrpswd(29447).await; + test_get_qbl_allow_then_deny_usrpswd(29447).await; + test_get_qbl_deny_then_allow_usrpswd(29447).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -53,10 +53,10 @@ mod test { create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - test_pub_sub_deny_then_allow_tls(37448, false).await; - test_pub_sub_allow_then_deny_tls(37449).await; - test_get_qbl_allow_then_deny_tls(37450).await; - test_get_qbl_deny_then_allow_tls(37451).await; + test_pub_sub_deny_then_allow_tls(29448, false).await; + test_pub_sub_allow_then_deny_tls(29449).await; + test_get_qbl_allow_then_deny_tls(29450).await; + test_get_qbl_deny_then_allow_tls(29451).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -65,10 +65,10 @@ mod test { create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - test_pub_sub_deny_then_allow_quic(37452).await; - test_pub_sub_allow_then_deny_quic(37453).await; - test_get_qbl_deny_then_allow_quic(37454).await; - test_get_qbl_allow_then_deny_quic(37455).await; + test_pub_sub_deny_then_allow_quic(29452).await; + test_pub_sub_allow_then_deny_quic(29453).await; + test_get_qbl_deny_then_allow_quic(29454).await; + test_get_qbl_allow_then_deny_quic(29455).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -78,7 +78,7 @@ mod test { create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - test_pub_sub_deny_then_allow_tls(37456, true).await; + test_pub_sub_deny_then_allow_tls(29456, true).await; } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -87,8 +87,8 @@ mod test { create_new_files(TESTFILES_PATH.to_path_buf()) .await .unwrap(); - test_deny_allow_combination(37457).await; - test_allow_deny_combination(37458).await; + test_deny_allow_combination(29457).await; + test_allow_deny_combination(29458).await; } #[allow(clippy::all)] diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 76c0ccff41..25dac7ddb5 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -145,7 +145,7 @@ fn downsampling_test( fn downsampling_by_keyexpr_impl(flow: InterceptorFlow) { let ke_prefix = "test/downsamples_by_keyexp"; - let locator = "tcp/127.0.0.1:38446"; + let locator = "tcp/127.0.0.1:31446"; let ke_10hz: KeyExpr = format!("{ke_prefix}/10hz").try_into().unwrap(); let ke_20hz: KeyExpr = format!("{ke_prefix}/20hz").try_into().unwrap(); @@ -198,7 +198,7 @@ fn downsampling_by_keyexpr() { #[cfg(unix)] fn downsampling_by_interface_impl(flow: InterceptorFlow) { let ke_prefix = "test/downsamples_by_interface"; - let locator = "tcp/127.0.0.1:38447"; + let locator = "tcp/127.0.0.1:31447"; let ke_10hz: KeyExpr = format!("{ke_prefix}/10hz").try_into().unwrap(); let ke_no_effect: KeyExpr = format!("{ke_prefix}/no_effect").try_into().unwrap(); diff --git a/zenoh/tests/unicity.rs b/zenoh/tests/unicity.rs index d6a8c8f621..eefead014e 100644 --- a/zenoh/tests/unicity.rs +++ b/zenoh/tests/unicity.rs @@ -82,7 +82,7 @@ async fn open_router_session() -> Session { config .listen .endpoints - .set(vec!["tcp/127.0.0.1:37447".parse().unwrap()]) + .set(vec!["tcp/127.0.0.1:30447".parse().unwrap()]) .unwrap(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][00a] Opening router session"); @@ -100,7 +100,7 @@ async fn open_client_sessions() -> (Session, Session, Session) { config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect - .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:37447" + .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:30447" .parse::() .unwrap()])) .unwrap(); @@ -111,7 +111,7 @@ async fn open_client_sessions() -> (Session, Session, Session) { config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect - .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:37447" + .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:30447" .parse::() .unwrap()])) .unwrap(); @@ -122,7 +122,7 @@ async fn open_client_sessions() -> (Session, Session, Session) { config.set_mode(Some(WhatAmI::Client)).unwrap(); config .connect - .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:37447" + .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:30447" .parse::() .unwrap()])) .unwrap();