diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs index 1ccde4e87..6b2f213c3 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs @@ -51,6 +51,7 @@ use super::{ /// hence directly skipping to the `SubIntervals` variant. #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] pub(crate) enum AlignmentQuery { + All, Diff(DigestDiff), Intervals(HashSet), SubIntervals(HashMap>), @@ -101,6 +102,38 @@ impl Replication { }; match alignment_query { + AlignmentQuery::All => { + tracing::trace!("Processing `AlignmentQuery::All`"); + + let idx_intervals = self + .replication_log + .read() + .await + .intervals + .keys() + .copied() + .collect::>(); + + for interval_idx in idx_intervals { + let mut events_to_send = Vec::default(); + if let Some(interval) = self + .replication_log + .read() + .await + .intervals + .get(&interval_idx) + { + interval.sub_intervals.values().for_each(|sub_interval| { + events_to_send.extend(sub_interval.events.values().map(Into::into)); + }); + } + + // NOTE: As we took the lock in the `if let` block, it is released here, + // diminishing contention. + + self.reply_events(&query, events_to_send).await; + } + } AlignmentQuery::Diff(digest_diff) => { tracing::trace!("Processing `AlignmentQuery::Diff`"); if digest_diff.cold_eras_differ { @@ -262,6 +295,11 @@ impl Replication { /// is the reason why we need the consolidation to set to be `None` (⚠️). pub(crate) async fn reply_events(&self, query: &Query, events_to_retrieve: Vec) { for event_metadata in events_to_retrieve { + if event_metadata.action == SampleKind::Delete { + reply_to_query(query, AlignmentReply::Retrieval(event_metadata), None).await; + continue; + } + let stored_data = { let mut storage = self.storage.lock().await; match storage.get(event_metadata.stripped_key.clone(), "").await { @@ -629,18 +667,26 @@ impl Replication { } } - if matches!( - self.storage - .lock() - .await - .put( - replica_event.stripped_key.clone(), - sample.into(), - replica_event.timestamp, - ) - .await, - Ok(StorageInsertionResult::Outdated) | Err(_) - ) { + // NOTE: This code can only be called with `action` set to `delete` on an initial + // alignment, in which case the Storage of the receiving Replica is empty => there + // is no need to actually call `storage.delete`. + // + // Outside of an initial alignment, the `delete` action will be performed at the + // step above, in `AlignmentReply::Events`. + if replica_event.action == SampleKind::Put + && matches!( + self.storage + .lock() + .await + .put( + replica_event.stripped_key.clone(), + sample.into(), + replica_event.timestamp, + ) + .await, + Ok(StorageInsertionResult::Outdated) | Err(_) + ) + { return; } diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs index 5b9709c01..38f9d0bd3 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs @@ -116,6 +116,63 @@ impl Replication { replica_ke.zid().to_string(), )) } + + /// Performs an initial alignment, skipping the comparison of Digest, asking directly the first + /// discovered Replica for all its entries. + /// + /// # ⚠️ Assumption: empty Storage + /// + /// We assume that this method will only be called if the underlying Storage is empty. This has + /// at least one consequence: if the Aligner receives a `delete` event from the Replica, it will + /// not attempt to delete anything from the Storage. + /// + /// # Replica discovery + /// + /// To discover a Replica, this method will create a Digest subscriber, wait to receive a + /// *valid* Digest and, upon reception, ask that Replica for all its entries. + /// + /// To avoid waiting indefinitely (in case there are no other Replica on the network), the + /// subscriber will wait for, at most, the duration of two Intervals. + pub(crate) async fn initial_alignment(&self) { + let digest_subscriber = match self.generate_digest_subscriber().await { + Ok(sub) => sub, + Err(e) => { + tracing::error!("Failed to generate Subscriber, skipping initial alignment: {e:?}"); + return; + } + }; + + // NOTE: If we are in this method it means an initial alignment has to be performed. That + // means that the tasks operating on the Replication Log have not been started yet, which + // means that there is no contention! + // + // NOTE: We multiple the duration of an interval by 2 to be (relatively) sure to catch the + // publication of a Digest by a Replica. + let timeout = self.replication_log.read().await.configuration.interval * 2; + + match digest_subscriber.recv_timeout(timeout) { + Ok(sample) => { + let (replica_aligner_ke, _replica_zid) = + match self.compute_replica_aligner_ke(&sample).await { + Ok(ke) => ke, + Err(e) => { + tracing::error!( + "Failed to generate a key expression to contact Replica, skipping \ + initial alignment: {e:?}" + ); + return; + } + }; + + self.query_replica_aligner(replica_aligner_ke, AlignmentQuery::All) + .await; + } + Err(e) => { + tracing::debug!("Found no Replica to perform initial alignment: {e:?}"); + } + } + } + /// Spawns a task that periodically publishes the [Digest] of the Replication [Log]. /// /// This task will perform the following steps: diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs index f3b87c6c3..e48fb4fec 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs @@ -12,13 +12,13 @@ // ZettaScale Zenoh Team, // -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tokio::{ sync::{broadcast::Receiver, RwLock}, task::JoinHandle, }; -use zenoh::{key_expr::OwnedKeyExpr, query::QueryTarget, sample::Locality, session::Session}; +use zenoh::{key_expr::OwnedKeyExpr, session::Session}; use super::{core::Replication, LogLatest}; use crate::storages_mgt::{LatestUpdates, StorageMessage, StorageService}; @@ -29,17 +29,22 @@ pub(crate) struct ReplicationService { aligner_queryable_handle: JoinHandle<()>, } -pub(crate) const MAX_RETRY: usize = 2; -pub(crate) const WAIT_PERIOD_SECS: u64 = 4; - impl ReplicationService { /// Starts the `ReplicationService`, spawning multiple tasks. /// + /// # Initial alignment + /// + /// To optimise network resources, if the Storage is empty an "initial alignment" will be + /// performed: if a Replica is detected, a query will be made to retrieve the entire content of + /// its Storage. + /// /// # Tasks spawned /// - /// This function will spawn two tasks: + /// This function will spawn four long-lived tasks: /// 1. One to publish the [Digest]. - /// 2. One to wait on the provided [Receiver] in order to stop the Replication Service, + /// 2. One to receive the [Digest] of other Replica. + /// 3. One to receive alignment queries of other Replica. + /// 4. One to wait on the provided [Receiver] in order to stop the Replication Service, /// attempting to abort all the tasks that were spawned, once a Stop message has been /// received. pub async fn spawn_start( @@ -50,65 +55,27 @@ impl ReplicationService { latest_updates: Arc>, mut rx: Receiver, ) { - // We perform a "wait-try" policy because Zenoh needs some time to propagate the routing - // information and, here, we need to have the queryables propagated. - // - // 4 seconds is an arbitrary value. - let mut attempt = 0; - let mut received_reply = false; - - while attempt < MAX_RETRY { - attempt += 1; - tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await; - - match zenoh_session - .get(&storage_key_expr) - // `BestMatching`, the default option for `target`, will try to minimise the storage - // that are queried and their distance while trying to maximise the key space - // covered. - // - // In other words, if there is a close and complete storage, it will only query this - // one. - .target(QueryTarget::BestMatching) - // The value `Remote` is self-explanatory but why it is needed deserves an - // explanation: we do not want to query the local database as the purpose is to get - // the data from other replicas (if there is one). - .allowed_destination(Locality::Remote) - .await - { - Ok(replies) => { - while let Ok(reply) = replies.recv_async().await { - received_reply = true; - if let Ok(sample) = reply.into_result() { - if let Err(e) = storage_service.process_sample(sample).await { - tracing::error!("{e:?}"); - } - } - } - } - Err(e) => tracing::error!("Initial alignment Query failed with: {e:?}"), - } + let storage = storage_service.storage.clone(); - if received_reply { - break; - } + let replication = Replication { + zenoh_session, + replication_log, + storage_key_expr, + latest_updates, + storage, + }; - tracing::debug!( - "Found no Queryable matching '{storage_key_expr}'. Attempt {attempt}/{MAX_RETRY}." - ); + if replication + .replication_log + .read() + .await + .intervals + .is_empty() + { + replication.initial_alignment().await; } - let storage = storage_service.storage.clone(); - tokio::task::spawn(async move { - let replication = Replication { - zenoh_session, - replication_log, - storage_key_expr, - latest_updates, - storage, - }; - let replication_service = Self { digest_publisher_handle: replication.spawn_digest_publisher(), digest_subscriber_handle: replication.spawn_digest_subscriber(), @@ -124,7 +91,7 @@ impl ReplicationService { }); } - /// Stops all the tasks spawned by the `ReplicationService`. + /// Stops all the long-lived tasks spawned by the `ReplicationService`. pub fn stop(self) { self.digest_publisher_handle.abort(); self.digest_subscriber_handle.abort();