diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs index e805bda26..3a80b0a07 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs @@ -51,6 +51,7 @@ use super::{ /// hence directly skipping to the `SubIntervals` variant. #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] pub(crate) enum AlignmentQuery { + All, Diff(DigestDiff), Intervals(HashSet), SubIntervals(HashMap>), @@ -101,6 +102,38 @@ impl Replication { }; match alignment_query { + AlignmentQuery::All => { + tracing::trace!("Processing `AlignmentQuery::All`"); + + let idx_intervals = self + .replication_log + .read() + .await + .intervals + .keys() + .copied() + .collect::>(); + + for interval_idx in idx_intervals { + let mut events_to_send = Vec::default(); + if let Some(interval) = self + .replication_log + .read() + .await + .intervals + .get(&interval_idx) + { + interval.sub_intervals.values().for_each(|sub_interval| { + events_to_send.extend(sub_interval.events.values().map(Into::into)); + }); + } + + // NOTE: As we took the lock in the `if let` block, it is released here, + // diminishing contention. + + self.reply_events(&query, events_to_send).await; + } + } AlignmentQuery::Diff(digest_diff) => { tracing::trace!("Processing `AlignmentQuery::Diff`"); if digest_diff.cold_eras_differ { @@ -262,6 +295,11 @@ impl Replication { /// is the reason why we need the consolidation to set to be `None` (⚠️). pub(crate) async fn reply_events(&self, query: &Query, events_to_retrieve: Vec) { for event_metadata in events_to_retrieve { + if event_metadata.action == SampleKind::Delete { + reply_to_query(query, AlignmentReply::Retrieval(event_metadata), None).await; + continue; + } + let stored_data = { let mut storage = self.storage.lock().await; match storage.get(event_metadata.stripped_key.clone(), "").await { @@ -306,6 +344,19 @@ impl Replication { } } + pub(crate) fn spawn_query_replica_aligner( + &self, + replica_aligner_ke: OwnedKeyExpr, + alignment_query: AlignmentQuery, + ) { + let replication = self.clone(); + tokio::task::spawn(async move { + replication + .query_replica_aligner(replica_aligner_ke, alignment_query) + .await; + }); + } + /// Spawns a new task to query the Aligner of the Replica which potentially has data this /// Storage is missing. /// @@ -319,22 +370,20 @@ impl Replication { /// information), spawning a new task. /// /// This process is stateless and all the required information are carried in the query / reply. - pub(crate) fn spawn_query_replica_aligner( + pub(crate) async fn query_replica_aligner( &self, replica_aligner_ke: OwnedKeyExpr, alignment_query: AlignmentQuery, ) { - let replication = self.clone(); - tokio::task::spawn(async move { - let attachment = match bincode::serialize(&alignment_query) { - Ok(attachment) => attachment, - Err(e) => { - tracing::error!("Failed to serialize AlignmentQuery: {e:?}"); - return; - } - }; + let attachment = match bincode::serialize(&alignment_query) { + Ok(attachment) => attachment, + Err(e) => { + tracing::error!("Failed to serialize AlignmentQuery: {e:?}"); + return; + } + }; - match replication + match self .zenoh_session .get(Into::::into(replica_aligner_ke.clone())) .attachment(attachment) @@ -346,51 +395,49 @@ impl Replication { // reply. Hence the need to have no consolidation. .consolidation(ConsolidationMode::None) .await - { - Err(e) => { - tracing::error!("Failed to query Aligner < {replica_aligner_ke} >: {e:?}"); - } - Ok(reply_receiver) => { - while let Ok(reply) = reply_receiver.recv_async().await { - let sample = match reply.into_result() { - Ok(sample) => sample, + { + Err(e) => { + tracing::error!("Failed to query Aligner < {replica_aligner_ke} >: {e:?}"); + } + Ok(reply_receiver) => { + while let Ok(reply) = reply_receiver.recv_async().await { + let sample = match reply.into_result() { + Ok(sample) => sample, + Err(e) => { + tracing::warn!( + "Skipping reply to query to < {replica_aligner_ke} >: {e:?}" + ); + continue; + } + }; + + let alignment_reply = match sample.attachment() { + None => { + tracing::debug!("Skipping reply without attachment"); + continue; + } + Some(attachment) => match bincode::deserialize::( + &attachment.into::>(), + ) { Err(e) => { - tracing::warn!( - "Skipping reply to query to < {replica_aligner_ke} >: {e:?}" + tracing::error!( + "Failed to deserialize attachment as AlignmentReply: {e:?}" ); continue; } - }; - - let alignment_reply = match sample.attachment() { - None => { - tracing::debug!("Skipping reply without attachment"); - continue; - } - Some(attachment) => match bincode::deserialize::( - &attachment.into::>(), - ) { - Err(e) => { - tracing::error!( - "Failed to deserialize attachment as AlignmentReply: {e:?}" - ); - continue; - } - Ok(alignment_reply) => alignment_reply, - }, - }; - - replication - .process_alignment_reply( - replica_aligner_ke.clone(), - alignment_reply, - sample, - ) - .await; - } + Ok(alignment_reply) => alignment_reply, + }, + }; + + self.process_alignment_reply( + replica_aligner_ke.clone(), + alignment_reply, + sample, + ) + .await; } } - }); + } } /// Processes the [AlignmentReply] sent by the Replica that has potentially data this Storage is @@ -616,18 +663,20 @@ impl Replication { } } - if matches!( - self.storage - .lock() - .await - .put( - replica_event.stripped_key.clone(), - sample.into(), - replica_event.timestamp, - ) - .await, - Ok(StorageInsertionResult::Outdated) | Err(_) - ) { + if replica_event.action == SampleKind::Put + && matches!( + self.storage + .lock() + .await + .put( + replica_event.stripped_key.clone(), + sample.into(), + replica_event.timestamp, + ) + .await, + Ok(StorageInsertionResult::Outdated) | Err(_) + ) + { return; } diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs index 0ef0824c6..de8f99e1d 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs @@ -57,6 +57,72 @@ pub(crate) struct Replication { } impl Replication { + pub(crate) async fn initial_alignment(&self) { + let Ok(digest_key_sub) = keformat!( + digest_key_expr_formatter::formatter(), + zid = "*", + storage_ke = &self.storage_key_expr + ) else { + tracing::error!( + "Failed to generate key expression to perform initial alignment. Skipping." + ); + return; + }; + + let Ok(subscriber) = self + .zenoh_session + .declare_subscriber(&digest_key_sub) + .allowed_origin(Locality::Remote) + .await + else { + tracing::error!( + "Failed to declare subscriber on {digest_key_sub} to perform initial alignment. \ + Skipping." + ); + return; + }; + + // NOTE: If we are in this method it means an initial alignment has to be performed. That + // means that the tasks operating on the Replication Log have not been started yet, which + // means that there is no contention! + // + // NOTE: We multiple the duration of an interval by 2 to be (relatively) sure to catch the + // publication of a Digest by a Replica. + let timeout = self.replication_log.read().await.configuration.interval * 2; + + match subscriber.recv_timeout(timeout) { + Ok(sample) => { + let Ok(parsed_ke) = digest_key_expr_formatter::parse(sample.key_expr()) else { + tracing::error!( + "Failed to parse key expression associated with Digest publication < {} > \ + for initial alignment. Skipping.", + sample.key_expr() + ); + return; + }; + let replica_zid = parsed_ke.zid(); + + let Ok(replica_aligner_ke) = keformat!( + aligner_key_expr_formatter::formatter(), + storage_ke = &self.storage_key_expr, + zid = replica_zid, + ) else { + tracing::error!( + "Failed to generate a key expression to contact Aligner of Replica < \ + {replica_zid} > for initial alignment. Skipping." + ); + return; + }; + + self.query_replica_aligner(replica_aligner_ke, AlignmentQuery::All) + .await; + } + Err(e) => { + tracing::debug!("Found no Replica to perform initial alignment: {e:?}"); + } + } + } + /// Spawns a task that periodically publishes the [Digest] of the Replication [Log]. /// /// This task will perform the following steps: diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs index f3b87c6c3..b16d3da57 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs @@ -12,13 +12,13 @@ // ZettaScale Zenoh Team, // -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tokio::{ sync::{broadcast::Receiver, RwLock}, task::JoinHandle, }; -use zenoh::{key_expr::OwnedKeyExpr, query::QueryTarget, sample::Locality, session::Session}; +use zenoh::{key_expr::OwnedKeyExpr, session::Session}; use super::{core::Replication, LogLatest}; use crate::storages_mgt::{LatestUpdates, StorageMessage, StorageService}; @@ -35,6 +35,11 @@ pub(crate) const WAIT_PERIOD_SECS: u64 = 4; impl ReplicationService { /// Starts the `ReplicationService`, spawning multiple tasks. /// + /// # Initial alignment + /// + /// - Determine if the underlying Storage is empty. + /// - If it is empty, make a query to a replicated Storage. + /// /// # Tasks spawned /// /// This function will spawn two tasks: @@ -50,65 +55,26 @@ impl ReplicationService { latest_updates: Arc>, mut rx: Receiver, ) { - // We perform a "wait-try" policy because Zenoh needs some time to propagate the routing - // information and, here, we need to have the queryables propagated. - // - // 4 seconds is an arbitrary value. - let mut attempt = 0; - let mut received_reply = false; - - while attempt < MAX_RETRY { - attempt += 1; - tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await; - - match zenoh_session - .get(&storage_key_expr) - // `BestMatching`, the default option for `target`, will try to minimise the storage - // that are queried and their distance while trying to maximise the key space - // covered. - // - // In other words, if there is a close and complete storage, it will only query this - // one. - .target(QueryTarget::BestMatching) - // The value `Remote` is self-explanatory but why it is needed deserves an - // explanation: we do not want to query the local database as the purpose is to get - // the data from other replicas (if there is one). - .allowed_destination(Locality::Remote) - .await - { - Ok(replies) => { - while let Ok(reply) = replies.recv_async().await { - received_reply = true; - if let Ok(sample) = reply.into_result() { - if let Err(e) = storage_service.process_sample(sample).await { - tracing::error!("{e:?}"); - } - } - } - } - Err(e) => tracing::error!("Initial alignment Query failed with: {e:?}"), - } - - if received_reply { - break; - } + let storage = storage_service.storage.clone(); + let replication = Replication { + zenoh_session, + replication_log, + storage_key_expr, + latest_updates, + storage, + }; - tracing::debug!( - "Found no Queryable matching '{storage_key_expr}'. Attempt {attempt}/{MAX_RETRY}." - ); + if replication + .replication_log + .read() + .await + .intervals + .is_empty() + { + replication.initial_alignment().await; } - let storage = storage_service.storage.clone(); - tokio::task::spawn(async move { - let replication = Replication { - zenoh_session, - replication_log, - storage_key_expr, - latest_updates, - storage, - }; - let replication_service = Self { digest_publisher_handle: replication.spawn_digest_publisher(), digest_subscriber_handle: replication.spawn_digest_subscriber(),