From 1492a26fab6c8d37d61ddf5694024e94b5c38167 Mon Sep 17 00:00:00 2001 From: Julien Loudet Date: Wed, 25 Sep 2024 14:44:18 +0200 Subject: [PATCH] refactor(storage-manager): use hash in Replication key expressions As we were using the key expression of the Storage to generate the key expressions used in the Replication, it was possible to receive Digest emitted by Replicas that were operating on a subset of the key space of the Storage. This commit changes the way the key expressions for the Replication are generated by using the hash of the configuration of the Replication: this renders these key expressions unique, hence avoiding the issue just described. This property is interesting for the initial Alignment: had we not made that change, we would have had to ensure that we perform that Alignment on a Replica operating on exactly the same key space (and not a subset) and the same configuration (in particular, the `strip_prefix`). NOTE: This does not solve the initial alignment step that will still contact Storage that are operating on a subset (if there is no better match on the network). * plugins/zenoh-plugin-storage-manager/src/replication/core.rs: - Renamed `storage_ke` to `hash_configuration` in the key expression formatters of the Digest and the Aligner. - Removed the unnecessary clones when spawning the Digest Publisher + fixed the different call sites. - Removed the scope to access the configuration as we clone it earlier in the code + fixed the different call sites. - Used the hash of the configuration to generate the key expression for the Digest. - Removed the unnecessary clones when spawning the Digest Subscriber + fixed the different call sites. - Used the hash of the configuration to generate the key expression for the Digest. - Removed the unnecessary clones when spawning the Digest Publisher + fixed the different call sites. - Used the hash of the configuration to generate the key expression for the Digest. - Removed the unnecessary clones when spawning the Aligner + fixed the different call sites. - Used the hash of the configuration to generate the key expression for the Aligner Queryable. Signed-off-by: Julien Loudet --- .../src/replication/core.rs | 105 +++++++++--------- 1 file changed, 55 insertions(+), 50 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs index 0ef0824c65..522385938c 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs @@ -43,8 +43,8 @@ use super::{ use crate::{replication::aligner::AlignmentQuery, storages_mgt::LatestUpdates}; kedefine!( - pub digest_key_expr_formatter: "@-digest/${zid:*}/${storage_ke:**}", - pub aligner_key_expr_formatter: "@zid/${zid:*}/${storage_ke:**}/aligner", + pub digest_key_expr_formatter: "@-digest/${zid:*}/${hash_configuration:*}", + pub aligner_key_expr_formatter: "@zid/${zid:*}/${hash_configuration:*}/aligner", ); #[derive(Clone)] @@ -69,16 +69,20 @@ impl Replication { /// /// [Log]: crate::replication::log::LogLatest pub(crate) fn spawn_digest_publisher(&self) -> JoinHandle<()> { - let zenoh_session = self.zenoh_session.clone(); - let storage_key_expr = self.storage_key_expr.clone(); - let replication_log = self.replication_log.clone(); - let latest_updates = self.latest_updates.clone(); + let replication = self.clone(); tokio::task::spawn(async move { + let configuration = replication + .replication_log + .read() + .await + .configuration + .clone(); + let digest_key_put = match keformat!( digest_key_expr_formatter::formatter(), - zid = zenoh_session.zid(), - storage_ke = storage_key_expr + zid = replication.zenoh_session.zid(), + hash_configuration = *configuration.fingerprint(), ) { Ok(key) => key, Err(e) => { @@ -89,25 +93,14 @@ impl Replication { } }; - // Scope to not forget to release the lock. - let (publication_interval, propagation_delay, last_elapsed_interval) = { - let replication_log_guard = replication_log.read().await; - let configuration = replication_log_guard.configuration(); - let last_elapsed_interval = match configuration.last_elapsed_interval() { - Ok(idx) => idx, - Err(e) => { - tracing::error!( - "Fatal error, call to `last_elapsed_interval` failed with: {e:?}" - ); - return; - } - }; - - ( - configuration.interval, - configuration.propagation_delay, - last_elapsed_interval, - ) + let last_elapsed_interval = match configuration.last_elapsed_interval() { + Ok(idx) => idx, + Err(e) => { + tracing::error!( + "Fatal error, call to `last_elapsed_interval` failed with: {e:?}" + ); + return; + } }; // We have no control over when a replica is going to be started. The purpose is here @@ -115,7 +108,7 @@ impl Replication { // at every interval (+ δ). let duration_until_next_interval = { let millis_last_elapsed = - *last_elapsed_interval as u128 * publication_interval.as_millis(); + *last_elapsed_interval as u128 * configuration.interval.as_millis(); if millis_last_elapsed > u64::MAX as u128 { tracing::error!( @@ -138,7 +131,7 @@ impl Replication { }; Duration::from_millis( - (publication_interval.as_millis() - (millis_since_now - millis_last_elapsed)) + (configuration.interval.as_millis() - (millis_since_now - millis_last_elapsed)) as u64, ) }; @@ -148,7 +141,7 @@ impl Replication { let mut events = HashMap::default(); // Internal delay to avoid an "update storm". - let max_publication_delay = (publication_interval.as_millis() / 3) as u64; + let max_publication_delay = (configuration.interval.as_millis() / 3) as u64; let mut digest_update_start: Instant; let mut digest: Digest; @@ -160,15 +153,15 @@ impl Replication { // Except that we want to take into account the time it takes for a publication to // reach this Zenoh node. Hence, we sleep for `propagation_delay` to, hopefully, // catch the publications that are in transit. - tokio::time::sleep(propagation_delay).await; + tokio::time::sleep(configuration.propagation_delay).await; { - let mut latest_updates_guard = latest_updates.write().await; + let mut latest_updates_guard = replication.latest_updates.write().await; std::mem::swap(&mut events, &mut latest_updates_guard); } { - let mut replication_guard = replication_log.write().await; + let mut replication_guard = replication.replication_log.write().await; replication_guard.update(events.drain().map(|(_, event)| event)); digest = match replication_guard.digest() { Ok(digest) => digest, @@ -194,7 +187,8 @@ impl Replication { // buffer that, hopefully, has enough memory. let buffer_capacity = serialization_buffer.capacity(); - match zenoh_session + match replication + .zenoh_session .put( &digest_key_put, std::mem::replace( @@ -209,17 +203,17 @@ impl Replication { } let digest_update_duration = digest_update_start.elapsed(); - if digest_update_duration > publication_interval { + if digest_update_duration > configuration.interval { tracing::warn!( "The duration it took to update and publish the Digest is superior to the \ duration of an Interval ({} ms), we recommend increasing the duration of \ the latter. Digest update: {} ms (incl. delay: {} ms)", - publication_interval.as_millis(), + configuration.interval.as_millis(), digest_update_duration.as_millis(), - publication_delay + propagation_delay.as_millis() as u64 + publication_delay + configuration.propagation_delay.as_millis() as u64 ); } else { - tokio::time::sleep(publication_interval - digest_update_duration).await; + tokio::time::sleep(configuration.interval - digest_update_duration).await; } } }) @@ -233,16 +227,20 @@ impl Replication { /// /// [DigestDiff]: super::digest::DigestDiff pub(crate) fn spawn_digest_subscriber(&self) -> JoinHandle<()> { - let zenoh_session = self.zenoh_session.clone(); - let storage_key_expr = self.storage_key_expr.clone(); - let replication_log = self.replication_log.clone(); let replication = self.clone(); tokio::task::spawn(async move { + let configuration = replication + .replication_log + .read() + .await + .configuration + .clone(); + let digest_key_sub = match keformat!( digest_key_expr_formatter::formatter(), zid = "*", - storage_ke = &storage_key_expr + hash_configuration = *configuration.fingerprint() ) { Ok(key) => key, Err(e) => { @@ -257,7 +255,8 @@ impl Replication { let mut retry = 0; let subscriber = loop { - match zenoh_session + match replication + .zenoh_session .declare_subscriber(&digest_key_sub) // NOTE: We need to explicitly set the locality to `Remote` as otherwise the // Digest subscriber will also receive the Digest published by its own @@ -325,7 +324,7 @@ impl Replication { tracing::debug!("Replication digest received"); - let digest = match replication_log.read().await.digest() { + let digest = match replication.replication_log.read().await.digest() { Ok(digest) => digest, Err(e) => { tracing::error!( @@ -340,7 +339,7 @@ impl Replication { let replica_aligner_ke = match keformat!( aligner_key_expr_formatter::formatter(), - storage_ke = &storage_key_expr, + hash_configuration = *configuration.fingerprint(), zid = source_zid, ) { Ok(key) => key, @@ -373,15 +372,20 @@ impl Replication { /// responsible for fetching in the Replication Log or in the Storage the relevant information /// to send to the Replica such that it can align its own Storage. pub(crate) fn spawn_aligner_queryable(&self) -> JoinHandle<()> { - let zenoh_session = self.zenoh_session.clone(); - let storage_key_expr = self.storage_key_expr.clone(); let replication = self.clone(); tokio::task::spawn(async move { + let configuration = replication + .replication_log + .read() + .await + .configuration + .clone(); + let aligner_ke = match keformat!( aligner_key_expr_formatter::formatter(), - zid = zenoh_session.zid(), - storage_ke = storage_key_expr, + zid = replication.zenoh_session.zid(), + hash_configuration = *configuration.fingerprint(), ) { Ok(ke) => ke, Err(e) => { @@ -395,7 +399,8 @@ impl Replication { let mut retry = 0; let queryable = loop { - match zenoh_session + match replication + .zenoh_session .declare_queryable(&aligner_ke) .allowed_origin(Locality::Remote) .await