From 45eddd62f5c1c8727cd5289641280a0488779848 Mon Sep 17 00:00:00 2001 From: Julien Loudet Date: Wed, 25 Sep 2024 16:43:11 +0200 Subject: [PATCH] refactor(storage-manager): initial alignment on empty Storage This commit changes the way a replicated Storage starts: if it is empty and configured to be replicated, it will attempt to align with an active and compatible (i.e. same configuration) Replica before anything. The previous behaviour made a query on the key expression of the Storage. Although, it could, in some cases, actually perform the same initial alignment, it could not guarantee to only query a Storage that was configured to be replicated. To perform this controlled initial alignment, a new variant to the `AlignmentQuery` was added: `AlignmentQuery::All`. As indicated by its name, when an Aligner receives this query it will answer with the content of its Storage. To avoid contention, this transfer is performed by batch, one `Interval` at a time. * plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs: - Added new variant `AlignmentQuery::All`. - Updated the `aligner` method to send all the data of the Storage as a reply to an `AlignmentQuery::All`. This leverages the already existing `reply_events` method. - Updated the `reply_events` method to not attempt to fetch the content of the Storage if the action is set to `delete`. Before this commit, the only time this method was called was during an alignment which filters out the deleted events (hence not requiring this code). - Updated the `process_alignment_reply` to not attempt to delete an entry in the Storage when processing an `AlignmentReply::Retrieval` as it could only happen when performing an initial alignment in which case the receiving Storage is empty. We basically only need to record the fact that a delete was performed in the Replication Log. * plugins/zenoh-plugin-storage-manager/src/replication/core.rs: implemented the `initial_alignment` method that subscribe to the Digest, if an answer is received, compute the key expression to contact the Aligner of the Replica that generated the Digest and, finally, query the Aligner to retrieve all the content of the Storage of the Replica. As the Digest is published at every `interval`. We put a timeout on the Subscriber to twice this duration, to be sure to wait long enough. * plugins/zenoh-plugin-storage-manager/src/replication/service.rs: - Removed the constants `MAX_RETRY` and `WAIT_PERIOD_SECS` as they were no longer needed. - Updated the documentation of the `spawn_start` function. - Removed the previous implementation of the initial alignment that made a query on the key expression of the Storage. - Added a check after creating the `Replication` structure: if the Replication Log is empty, which indicates an empty Storage, then perform an initial alignment. Signed-off-by: Julien Loudet --- .../src/replication/aligner.rs | 70 +++++++++++--- .../src/replication/core.rs | 57 ++++++++++++ .../src/replication/service.rs | 91 ++++++------------- 3 files changed, 144 insertions(+), 74 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs index 1ccde4e87..6b2f213c3 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs @@ -51,6 +51,7 @@ use super::{ /// hence directly skipping to the `SubIntervals` variant. #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] pub(crate) enum AlignmentQuery { + All, Diff(DigestDiff), Intervals(HashSet), SubIntervals(HashMap>), @@ -101,6 +102,38 @@ impl Replication { }; match alignment_query { + AlignmentQuery::All => { + tracing::trace!("Processing `AlignmentQuery::All`"); + + let idx_intervals = self + .replication_log + .read() + .await + .intervals + .keys() + .copied() + .collect::>(); + + for interval_idx in idx_intervals { + let mut events_to_send = Vec::default(); + if let Some(interval) = self + .replication_log + .read() + .await + .intervals + .get(&interval_idx) + { + interval.sub_intervals.values().for_each(|sub_interval| { + events_to_send.extend(sub_interval.events.values().map(Into::into)); + }); + } + + // NOTE: As we took the lock in the `if let` block, it is released here, + // diminishing contention. + + self.reply_events(&query, events_to_send).await; + } + } AlignmentQuery::Diff(digest_diff) => { tracing::trace!("Processing `AlignmentQuery::Diff`"); if digest_diff.cold_eras_differ { @@ -262,6 +295,11 @@ impl Replication { /// is the reason why we need the consolidation to set to be `None` (⚠️). pub(crate) async fn reply_events(&self, query: &Query, events_to_retrieve: Vec) { for event_metadata in events_to_retrieve { + if event_metadata.action == SampleKind::Delete { + reply_to_query(query, AlignmentReply::Retrieval(event_metadata), None).await; + continue; + } + let stored_data = { let mut storage = self.storage.lock().await; match storage.get(event_metadata.stripped_key.clone(), "").await { @@ -629,18 +667,26 @@ impl Replication { } } - if matches!( - self.storage - .lock() - .await - .put( - replica_event.stripped_key.clone(), - sample.into(), - replica_event.timestamp, - ) - .await, - Ok(StorageInsertionResult::Outdated) | Err(_) - ) { + // NOTE: This code can only be called with `action` set to `delete` on an initial + // alignment, in which case the Storage of the receiving Replica is empty => there + // is no need to actually call `storage.delete`. + // + // Outside of an initial alignment, the `delete` action will be performed at the + // step above, in `AlignmentReply::Events`. + if replica_event.action == SampleKind::Put + && matches!( + self.storage + .lock() + .await + .put( + replica_event.stripped_key.clone(), + sample.into(), + replica_event.timestamp, + ) + .await, + Ok(StorageInsertionResult::Outdated) | Err(_) + ) + { return; } diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs index 5b9709c01..38f9d0bd3 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs @@ -116,6 +116,63 @@ impl Replication { replica_ke.zid().to_string(), )) } + + /// Performs an initial alignment, skipping the comparison of Digest, asking directly the first + /// discovered Replica for all its entries. + /// + /// # ⚠️ Assumption: empty Storage + /// + /// We assume that this method will only be called if the underlying Storage is empty. This has + /// at least one consequence: if the Aligner receives a `delete` event from the Replica, it will + /// not attempt to delete anything from the Storage. + /// + /// # Replica discovery + /// + /// To discover a Replica, this method will create a Digest subscriber, wait to receive a + /// *valid* Digest and, upon reception, ask that Replica for all its entries. + /// + /// To avoid waiting indefinitely (in case there are no other Replica on the network), the + /// subscriber will wait for, at most, the duration of two Intervals. + pub(crate) async fn initial_alignment(&self) { + let digest_subscriber = match self.generate_digest_subscriber().await { + Ok(sub) => sub, + Err(e) => { + tracing::error!("Failed to generate Subscriber, skipping initial alignment: {e:?}"); + return; + } + }; + + // NOTE: If we are in this method it means an initial alignment has to be performed. That + // means that the tasks operating on the Replication Log have not been started yet, which + // means that there is no contention! + // + // NOTE: We multiple the duration of an interval by 2 to be (relatively) sure to catch the + // publication of a Digest by a Replica. + let timeout = self.replication_log.read().await.configuration.interval * 2; + + match digest_subscriber.recv_timeout(timeout) { + Ok(sample) => { + let (replica_aligner_ke, _replica_zid) = + match self.compute_replica_aligner_ke(&sample).await { + Ok(ke) => ke, + Err(e) => { + tracing::error!( + "Failed to generate a key expression to contact Replica, skipping \ + initial alignment: {e:?}" + ); + return; + } + }; + + self.query_replica_aligner(replica_aligner_ke, AlignmentQuery::All) + .await; + } + Err(e) => { + tracing::debug!("Found no Replica to perform initial alignment: {e:?}"); + } + } + } + /// Spawns a task that periodically publishes the [Digest] of the Replication [Log]. /// /// This task will perform the following steps: diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs index f3b87c6c3..e48fb4fec 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs @@ -12,13 +12,13 @@ // ZettaScale Zenoh Team, // -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tokio::{ sync::{broadcast::Receiver, RwLock}, task::JoinHandle, }; -use zenoh::{key_expr::OwnedKeyExpr, query::QueryTarget, sample::Locality, session::Session}; +use zenoh::{key_expr::OwnedKeyExpr, session::Session}; use super::{core::Replication, LogLatest}; use crate::storages_mgt::{LatestUpdates, StorageMessage, StorageService}; @@ -29,17 +29,22 @@ pub(crate) struct ReplicationService { aligner_queryable_handle: JoinHandle<()>, } -pub(crate) const MAX_RETRY: usize = 2; -pub(crate) const WAIT_PERIOD_SECS: u64 = 4; - impl ReplicationService { /// Starts the `ReplicationService`, spawning multiple tasks. /// + /// # Initial alignment + /// + /// To optimise network resources, if the Storage is empty an "initial alignment" will be + /// performed: if a Replica is detected, a query will be made to retrieve the entire content of + /// its Storage. + /// /// # Tasks spawned /// - /// This function will spawn two tasks: + /// This function will spawn four long-lived tasks: /// 1. One to publish the [Digest]. - /// 2. One to wait on the provided [Receiver] in order to stop the Replication Service, + /// 2. One to receive the [Digest] of other Replica. + /// 3. One to receive alignment queries of other Replica. + /// 4. One to wait on the provided [Receiver] in order to stop the Replication Service, /// attempting to abort all the tasks that were spawned, once a Stop message has been /// received. pub async fn spawn_start( @@ -50,65 +55,27 @@ impl ReplicationService { latest_updates: Arc>, mut rx: Receiver, ) { - // We perform a "wait-try" policy because Zenoh needs some time to propagate the routing - // information and, here, we need to have the queryables propagated. - // - // 4 seconds is an arbitrary value. - let mut attempt = 0; - let mut received_reply = false; - - while attempt < MAX_RETRY { - attempt += 1; - tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await; - - match zenoh_session - .get(&storage_key_expr) - // `BestMatching`, the default option for `target`, will try to minimise the storage - // that are queried and their distance while trying to maximise the key space - // covered. - // - // In other words, if there is a close and complete storage, it will only query this - // one. - .target(QueryTarget::BestMatching) - // The value `Remote` is self-explanatory but why it is needed deserves an - // explanation: we do not want to query the local database as the purpose is to get - // the data from other replicas (if there is one). - .allowed_destination(Locality::Remote) - .await - { - Ok(replies) => { - while let Ok(reply) = replies.recv_async().await { - received_reply = true; - if let Ok(sample) = reply.into_result() { - if let Err(e) = storage_service.process_sample(sample).await { - tracing::error!("{e:?}"); - } - } - } - } - Err(e) => tracing::error!("Initial alignment Query failed with: {e:?}"), - } + let storage = storage_service.storage.clone(); - if received_reply { - break; - } + let replication = Replication { + zenoh_session, + replication_log, + storage_key_expr, + latest_updates, + storage, + }; - tracing::debug!( - "Found no Queryable matching '{storage_key_expr}'. Attempt {attempt}/{MAX_RETRY}." - ); + if replication + .replication_log + .read() + .await + .intervals + .is_empty() + { + replication.initial_alignment().await; } - let storage = storage_service.storage.clone(); - tokio::task::spawn(async move { - let replication = Replication { - zenoh_session, - replication_log, - storage_key_expr, - latest_updates, - storage, - }; - let replication_service = Self { digest_publisher_handle: replication.spawn_digest_publisher(), digest_subscriber_handle: replication.spawn_digest_subscriber(), @@ -124,7 +91,7 @@ impl ReplicationService { }); } - /// Stops all the tasks spawned by the `ReplicationService`. + /// Stops all the long-lived tasks spawned by the `ReplicationService`. pub fn stop(self) { self.digest_publisher_handle.abort(); self.digest_subscriber_handle.abort();