Skip to content

Commit

Permalink
refactor(storage-manager): use hash in Replication key expressions
Browse files Browse the repository at this point in the history
As we were using the key expression of the Storage to generate the key
expressions used in the Replication, it was possible to receive Digest
emitted by Replicas that were operating on a subset of the key space of
the Storage.

This commit changes the way the key expressions for the Replication are
generated by using the hash of the configuration of the Replication:
this renders these key expressions unique, hence avoiding the issue just
described.

This property is interesting for the initial Alignment: had we not made
that change, we would have had to ensure that we perform that Alignment
on a Replica operating on exactly the same key space (and not a subset)
and the same configuration (in particular, the `strip_prefix`).

NOTE: This does not solve the initial alignment step that will still
contact Storage that are operating on a subset (if there is no better
match on the network).

* plugins/zenoh-plugin-storage-manager/src/replication/core.rs:
  - Renamed `storage_ke` to `hash_configuration` in the key expression
    formatters of the Digest and the Aligner.
  - Removed the unnecessary clones when spawning the Digest Publisher +
    fixed the different call sites.
  - Removed the scope to access the configuration as we clone it earlier
    in the code + fixed the different call sites.
  - Used the hash of the configuration to generate the key expression
    for the Digest.
  - Removed the unnecessary clones when spawning the Digest Subscriber +
    fixed the different call sites.
  - Used the hash of the configuration to generate the key expression
    for the Digest.
  - Removed the unnecessary clones when spawning the Digest Publisher +
    fixed the different call sites.
  - Used the hash of the configuration to generate the key expression
    for the Digest.
  - Removed the unnecessary clones when spawning the Aligner + fixed the
    different call sites.
  - Used the hash of the configuration to generate the key expression
    for the Aligner Queryable.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Sep 25, 2024
1 parent 1188b52 commit 1492a26
Showing 1 changed file with 55 additions and 50 deletions.
105 changes: 55 additions & 50 deletions plugins/zenoh-plugin-storage-manager/src/replication/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use super::{
use crate::{replication::aligner::AlignmentQuery, storages_mgt::LatestUpdates};

kedefine!(
pub digest_key_expr_formatter: "@-digest/${zid:*}/${storage_ke:**}",
pub aligner_key_expr_formatter: "@zid/${zid:*}/${storage_ke:**}/aligner",
pub digest_key_expr_formatter: "@-digest/${zid:*}/${hash_configuration:*}",
pub aligner_key_expr_formatter: "@zid/${zid:*}/${hash_configuration:*}/aligner",
);

#[derive(Clone)]
Expand All @@ -69,16 +69,20 @@ impl Replication {
///
/// [Log]: crate::replication::log::LogLatest
pub(crate) fn spawn_digest_publisher(&self) -> JoinHandle<()> {
let zenoh_session = self.zenoh_session.clone();
let storage_key_expr = self.storage_key_expr.clone();
let replication_log = self.replication_log.clone();
let latest_updates = self.latest_updates.clone();
let replication = self.clone();

tokio::task::spawn(async move {
let configuration = replication
.replication_log
.read()
.await
.configuration
.clone();

let digest_key_put = match keformat!(
digest_key_expr_formatter::formatter(),
zid = zenoh_session.zid(),
storage_ke = storage_key_expr
zid = replication.zenoh_session.zid(),
hash_configuration = *configuration.fingerprint(),
) {
Ok(key) => key,
Err(e) => {
Expand All @@ -89,33 +93,22 @@ impl Replication {
}
};

// Scope to not forget to release the lock.
let (publication_interval, propagation_delay, last_elapsed_interval) = {
let replication_log_guard = replication_log.read().await;
let configuration = replication_log_guard.configuration();
let last_elapsed_interval = match configuration.last_elapsed_interval() {
Ok(idx) => idx,
Err(e) => {
tracing::error!(
"Fatal error, call to `last_elapsed_interval` failed with: {e:?}"
);
return;
}
};

(
configuration.interval,
configuration.propagation_delay,
last_elapsed_interval,
)
let last_elapsed_interval = match configuration.last_elapsed_interval() {
Ok(idx) => idx,
Err(e) => {
tracing::error!(
"Fatal error, call to `last_elapsed_interval` failed with: {e:?}"
);
return;
}
};

// We have no control over when a replica is going to be started. The purpose is here
// is to try to align its publications and make it so that they happen more or less
// at every interval (+ δ).
let duration_until_next_interval = {
let millis_last_elapsed =
*last_elapsed_interval as u128 * publication_interval.as_millis();
*last_elapsed_interval as u128 * configuration.interval.as_millis();

if millis_last_elapsed > u64::MAX as u128 {
tracing::error!(
Expand All @@ -138,7 +131,7 @@ impl Replication {
};

Duration::from_millis(
(publication_interval.as_millis() - (millis_since_now - millis_last_elapsed))
(configuration.interval.as_millis() - (millis_since_now - millis_last_elapsed))
as u64,
)
};
Expand All @@ -148,7 +141,7 @@ impl Replication {
let mut events = HashMap::default();

// Internal delay to avoid an "update storm".
let max_publication_delay = (publication_interval.as_millis() / 3) as u64;
let max_publication_delay = (configuration.interval.as_millis() / 3) as u64;

let mut digest_update_start: Instant;
let mut digest: Digest;
Expand All @@ -160,15 +153,15 @@ impl Replication {
// Except that we want to take into account the time it takes for a publication to
// reach this Zenoh node. Hence, we sleep for `propagation_delay` to, hopefully,
// catch the publications that are in transit.
tokio::time::sleep(propagation_delay).await;
tokio::time::sleep(configuration.propagation_delay).await;

{
let mut latest_updates_guard = latest_updates.write().await;
let mut latest_updates_guard = replication.latest_updates.write().await;
std::mem::swap(&mut events, &mut latest_updates_guard);
}

{
let mut replication_guard = replication_log.write().await;
let mut replication_guard = replication.replication_log.write().await;
replication_guard.update(events.drain().map(|(_, event)| event));
digest = match replication_guard.digest() {
Ok(digest) => digest,
Expand All @@ -194,7 +187,8 @@ impl Replication {
// buffer that, hopefully, has enough memory.
let buffer_capacity = serialization_buffer.capacity();

match zenoh_session
match replication
.zenoh_session
.put(
&digest_key_put,
std::mem::replace(
Expand All @@ -209,17 +203,17 @@ impl Replication {
}

let digest_update_duration = digest_update_start.elapsed();
if digest_update_duration > publication_interval {
if digest_update_duration > configuration.interval {
tracing::warn!(
"The duration it took to update and publish the Digest is superior to the \
duration of an Interval ({} ms), we recommend increasing the duration of \
the latter. Digest update: {} ms (incl. delay: {} ms)",
publication_interval.as_millis(),
configuration.interval.as_millis(),
digest_update_duration.as_millis(),
publication_delay + propagation_delay.as_millis() as u64
publication_delay + configuration.propagation_delay.as_millis() as u64
);
} else {
tokio::time::sleep(publication_interval - digest_update_duration).await;
tokio::time::sleep(configuration.interval - digest_update_duration).await;
}
}
})
Expand All @@ -233,16 +227,20 @@ impl Replication {
///
/// [DigestDiff]: super::digest::DigestDiff
pub(crate) fn spawn_digest_subscriber(&self) -> JoinHandle<()> {
let zenoh_session = self.zenoh_session.clone();
let storage_key_expr = self.storage_key_expr.clone();
let replication_log = self.replication_log.clone();
let replication = self.clone();

tokio::task::spawn(async move {
let configuration = replication
.replication_log
.read()
.await
.configuration
.clone();

let digest_key_sub = match keformat!(
digest_key_expr_formatter::formatter(),
zid = "*",
storage_ke = &storage_key_expr
hash_configuration = *configuration.fingerprint()
) {
Ok(key) => key,
Err(e) => {
Expand All @@ -257,7 +255,8 @@ impl Replication {

let mut retry = 0;
let subscriber = loop {
match zenoh_session
match replication
.zenoh_session
.declare_subscriber(&digest_key_sub)
// NOTE: We need to explicitly set the locality to `Remote` as otherwise the
// Digest subscriber will also receive the Digest published by its own
Expand Down Expand Up @@ -325,7 +324,7 @@ impl Replication {

tracing::debug!("Replication digest received");

let digest = match replication_log.read().await.digest() {
let digest = match replication.replication_log.read().await.digest() {
Ok(digest) => digest,
Err(e) => {
tracing::error!(
Expand All @@ -340,7 +339,7 @@ impl Replication {

let replica_aligner_ke = match keformat!(
aligner_key_expr_formatter::formatter(),
storage_ke = &storage_key_expr,
hash_configuration = *configuration.fingerprint(),
zid = source_zid,
) {
Ok(key) => key,
Expand Down Expand Up @@ -373,15 +372,20 @@ impl Replication {
/// responsible for fetching in the Replication Log or in the Storage the relevant information
/// to send to the Replica such that it can align its own Storage.
pub(crate) fn spawn_aligner_queryable(&self) -> JoinHandle<()> {
let zenoh_session = self.zenoh_session.clone();
let storage_key_expr = self.storage_key_expr.clone();
let replication = self.clone();

tokio::task::spawn(async move {
let configuration = replication
.replication_log
.read()
.await
.configuration
.clone();

let aligner_ke = match keformat!(
aligner_key_expr_formatter::formatter(),
zid = zenoh_session.zid(),
storage_ke = storage_key_expr,
zid = replication.zenoh_session.zid(),
hash_configuration = *configuration.fingerprint(),
) {
Ok(ke) => ke,
Err(e) => {
Expand All @@ -395,7 +399,8 @@ impl Replication {

let mut retry = 0;
let queryable = loop {
match zenoh_session
match replication
.zenoh_session
.declare_queryable(&aligner_ke)
.allowed_origin(Locality::Remote)
.await
Expand Down

0 comments on commit 1492a26

Please sign in to comment.