From 3e44c2e092c6f6214ea78533ae974812427c978a Mon Sep 17 00:00:00 2001
From: Julien Loudet <julien.loudet@zettascale.tech>
Date: Tue, 24 Sep 2024 12:32:12 +0200
Subject: [PATCH 1/5] refactor(storage-manager): keep configuration in
 StorageService

The `StorageService` was splitting the `StorageConfig` that was used to
create it. In addition to adding noise, this prevented separating the
creation of the structure from spawning the subscriber and queryable
associated with a Storage.

This commit changes the fields of the `StorageService` structure to keep
the entire `StorageConfig` -- thus allowing separating the creation from
spawning the queryable and subscriber.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs:
  - Removed the following fields from the `StorageService` structure:
    - `key_expr`,
    - `complete`,
    - `strip_prefix`.
  - Added the field `configuration` to keep track of the associated
    `StorageConfig`.
  - Changed the signature of the `start_storage_queryable_subscriber`
    removing the `GarbageConfig` as it is now contained in `&self`.
  - Updated the calls to access `key_expr`, `complete` and
    `strip_prefix`.
  - Removed an `unwrap()` and instead log an error.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
---
 .../src/storages_mgt/service.rs               | 98 +++++++++++--------
 1 file changed, 56 insertions(+), 42 deletions(-)

diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
index 42e82eae94..4d493ec8b2 100644
--- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
+++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
@@ -61,10 +61,8 @@ struct Update {
 #[derive(Clone)]
 pub struct StorageService {
     session: Arc<Session>,
-    key_expr: OwnedKeyExpr,
-    complete: bool,
+    configuration: StorageConfig,
     name: String,
-    strip_prefix: Option<OwnedKeyExpr>,
     pub(crate) storage: Arc<Mutex<Box<dyn zenoh_backend_traits::Storage>>>,
     capability: Capability,
     tombstones: Arc<RwLock<KeBoxTree<Timestamp, NonWild, KeyedSetProvider>>>,
@@ -84,10 +82,8 @@ impl StorageService {
     ) -> Self {
         let storage_service = StorageService {
             session,
-            key_expr: config.key_expr,
-            complete: config.complete,
+            configuration: config,
             name: name.to_string(),
-            strip_prefix: config.strip_prefix,
             storage,
             capability,
             tombstones: Arc::new(RwLock::new(KeBoxTree::default())),
@@ -123,20 +119,18 @@ impl StorageService {
         }
         storage_service
             .clone()
-            .start_storage_queryable_subscriber(rx, config.garbage_collection_config)
+            .start_storage_queryable_subscriber(rx)
             .await;
 
         storage_service
     }
 
-    async fn start_storage_queryable_subscriber(
-        self,
-        mut rx: Receiver<StorageMessage>,
-        gc_config: GarbageCollectionConfig,
-    ) {
+    async fn start_storage_queryable_subscriber(self, mut rx: Receiver<StorageMessage>) {
         // start periodic GC event
         let t = Timer::default();
 
+        let gc_config = self.configuration.garbage_collection_config.clone();
+
         let latest_updates = if self.cache_latest.replication_log.is_none() {
             Some(self.cache_latest.latest_updates.clone())
         } else {
@@ -154,8 +148,10 @@ impl StorageService {
         );
         t.add_async(gc).await;
 
+        let storage_key_expr = &self.configuration.key_expr;
+
         // subscribe on key_expr
-        let storage_sub = match self.session.declare_subscriber(&self.key_expr).await {
+        let storage_sub = match self.session.declare_subscriber(storage_key_expr).await {
             Ok(storage_sub) => storage_sub,
             Err(e) => {
                 tracing::error!("Error starting storage '{}': {}", self.name, e);
@@ -166,8 +162,8 @@ impl StorageService {
         // answer to queries on key_expr
         let storage_queryable = match self
             .session
-            .declare_queryable(&self.key_expr)
-            .complete(self.complete)
+            .declare_queryable(storage_key_expr)
+            .complete(self.configuration.complete)
             .await
         {
             Ok(storage_queryable) => storage_queryable,
@@ -249,6 +245,8 @@ impl StorageService {
             matching_keys
         );
 
+        let prefix = self.configuration.strip_prefix.as_ref();
+
         for k in matching_keys {
             if self.is_deleted(&k, sample_timestamp).await {
                 tracing::trace!("Skipping Sample < {} > deleted later on", k);
@@ -297,13 +295,12 @@ impl StorageService {
                 }
             };
 
-            let stripped_key =
-                match crate::strip_prefix(self.strip_prefix.as_ref(), sample_to_store.key_expr()) {
-                    Ok(stripped) => stripped,
-                    Err(e) => {
-                        bail!("{e:?}");
-                    }
-                };
+            let stripped_key = match crate::strip_prefix(prefix, sample_to_store.key_expr()) {
+                Ok(stripped) => stripped,
+                Err(e) => {
+                    bail!("{e:?}");
+                }
+            };
 
             // If the Storage was declared as only keeping the Latest value, we ensure that, for
             // each received Sample, it is indeed the Latest value that is processed.
@@ -436,19 +433,21 @@ impl StorageService {
         let wildcards = self.wildcard_updates.read().await;
         let mut ts = timestamp;
         let mut update = None;
+
+        let prefix = self.configuration.strip_prefix.as_ref();
+
         for node in wildcards.intersecting_keys(key_expr) {
             let weight = wildcards.weight_at(&node);
             if weight.is_some() && weight.unwrap().data.timestamp > *ts {
                 // if the key matches a wild card update, check whether it was saved in storage
                 // remember that wild card updates change only existing keys
-                let stripped_key =
-                    match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) {
-                        Ok(stripped) => stripped,
-                        Err(e) => {
-                            tracing::error!("{}", e);
-                            break;
-                        }
-                    };
+                let stripped_key = match crate::strip_prefix(prefix, &key_expr.into()) {
+                    Ok(stripped) => stripped,
+                    Err(e) => {
+                        tracing::error!("{}", e);
+                        break;
+                    }
+                };
                 let mut storage = self.storage.lock().await;
                 match storage.get(stripped_key, "").await {
                     Ok(stored_data) => {
@@ -531,20 +530,22 @@ impl StorageService {
             }
         };
         tracing::trace!("[STORAGE] Processing query on key_expr: {}", q.key_expr());
+
+        let prefix = self.configuration.strip_prefix.as_ref();
+
         if q.key_expr().is_wild() {
             // resolve key expr into individual keys
             let matching_keys = self.get_matching_keys(q.key_expr()).await;
             let mut storage = self.storage.lock().await;
             for key in matching_keys {
-                let stripped_key =
-                    match crate::strip_prefix(self.strip_prefix.as_ref(), &key.clone().into()) {
-                        Ok(k) => k,
-                        Err(e) => {
-                            tracing::error!("{}", e);
-                            // @TODO: return error when it is supported
-                            return;
-                        }
-                    };
+                let stripped_key = match crate::strip_prefix(prefix, &key.clone().into()) {
+                    Ok(k) => k,
+                    Err(e) => {
+                        tracing::error!("{}", e);
+                        // @TODO: return error when it is supported
+                        return;
+                    }
+                };
                 match storage.get(stripped_key, q.parameters().as_str()).await {
                     Ok(stored_data) => {
                         for entry in stored_data {
@@ -569,7 +570,7 @@ impl StorageService {
             }
             drop(storage);
         } else {
-            let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), q.key_expr()) {
+            let stripped_key = match crate::strip_prefix(prefix, q.key_expr()) {
                 Ok(k) => k,
                 Err(e) => {
                     tracing::error!("{}", e);
@@ -606,13 +607,26 @@ impl StorageService {
         let mut result = Vec::new();
         // @TODO: if cache exists, use that to get the list
         let storage = self.storage.lock().await;
+
+        let prefix = self.configuration.strip_prefix.as_ref();
+
         match storage.get_all_entries().await {
             Ok(entries) => {
                 for (k, _ts) in entries {
                     // @TODO: optimize adding back the prefix (possible inspiration from https://github.com/eclipse-zenoh/zenoh/blob/0.5.0-beta.9/backends/traits/src/utils.rs#L79)
                     let full_key = match k {
-                        Some(key) => crate::prefix(self.strip_prefix.as_ref(), &key),
-                        None => self.strip_prefix.clone().unwrap(),
+                        Some(key) => crate::prefix(prefix, &key),
+                        None => {
+                            let Some(prefix) = prefix else {
+                                // TODO Check if we have anything in place that would prevent such
+                                //      an error from happening.
+                                tracing::error!(
+                                    "Internal bug: empty key with no `strip_prefix` configured"
+                                );
+                                continue;
+                            };
+                            prefix.clone()
+                        }
                     };
                     if key_expr.intersects(&full_key.clone()) {
                         result.push(full_key);

From 1188b5282b8a13675f1b208a77092c0392d991da Mon Sep 17 00:00:00 2001
From: Julien Loudet <julien.loudet@zettascale.tech>
Date: Tue, 24 Sep 2024 13:19:12 +0200
Subject: [PATCH 2/5] refactor(storage-manager): separate creation/start of
 StorageService

This commit separates creating the `StorageService` from starting it.

This change is motivated by the Replication feature: when performing the
initial alignment we want to delay the Storage from answering queries
until after the initial alignment has been performed.

In order to have this functionality we need to be able to dissociate
creating the `StorageService` from starting it.

As the method `start_storage_queryable_subscriber` takes ownership of
the `StorageService`, it became mandatory to first create the
`StorageService`, then start the Replication and lastly start the
Storage. Because of this, as the Replication code was inside a task, the
code to create and start the Storage was also moved inside the task.

* plugins/zenoh-plugin-storage-manager/src/replication/service.rs:
  - Take a reference over the `StorageService` structure as it is only
    needed before spawning the different Replication tasks. The
    StorageService is still needed to call `process_sample`.
  - Clone the `Arc` of the underlying Storage before spawning the
    Replication tasks.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs:
  - Move the logging until starting the Storage.
  - Move the code starting the Storage inside the task.
  - Start the `StorageService` after having started the
    `ReplicationService`.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs:
  - Renamed the function `start` to `new` as it now only creates an
    instance of the `StorageService`.
  - Removed the parameter `rx` from the call to `new` as it no longer
    also starts it.
  - Removed the call to `start_storage_queryable_subscriber` from `new`.
  - Changed the visibility of the method
    `start_storage_queryable_subscriber` to `pub(crate)` as it is called
    from outside the `service` module.
  - Added logging information before the Storage "loop" is started (to
    help confirm, with the logs, the order in which the different
    elements are started).

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
---
 .../src/replication/service.rs                |  6 +-
 .../src/storages_mgt/mod.rs                   | 67 ++++++++++---------
 .../src/storages_mgt/service.rs               | 15 +++--
 3 files changed, 46 insertions(+), 42 deletions(-)

diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs
index 06ec31d9a2..f3b87c6c3f 100644
--- a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs
+++ b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs
@@ -44,7 +44,7 @@ impl ReplicationService {
     ///    received.
     pub async fn spawn_start(
         zenoh_session: Arc<Session>,
-        storage_service: StorageService,
+        storage_service: &StorageService,
         storage_key_expr: OwnedKeyExpr,
         replication_log: Arc<RwLock<LogLatest>>,
         latest_updates: Arc<RwLock<LatestUpdates>>,
@@ -98,13 +98,15 @@ impl ReplicationService {
             );
         }
 
+        let storage = storage_service.storage.clone();
+
         tokio::task::spawn(async move {
             let replication = Replication {
                 zenoh_session,
                 replication_log,
                 storage_key_expr,
                 latest_updates,
-                storage: storage_service.storage.clone(),
+                storage,
             };
 
             let replication_service = Self {
diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs
index 7dcc60bf32..3e5a2e1f09 100644
--- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs
+++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs
@@ -68,9 +68,8 @@ pub(crate) async fn create_and_start_storage(
     let storage_name = parts[7];
     let name = format!("{uuid}/{storage_name}");
 
-    tracing::trace!("Start storage '{}' on keyexpr '{}'", name, config.key_expr);
-
     let (tx, rx_storage) = tokio::sync::broadcast::channel(1);
+    let rx_replication = tx.subscribe();
 
     let mut entries = match storage.get_all_entries().await {
         Ok(entries) => entries
@@ -111,50 +110,52 @@ pub(crate) async fn create_and_start_storage(
     let latest_updates = Arc::new(RwLock::new(latest_updates));
 
     let storage = Arc::new(Mutex::new(storage));
-    let storage_service = StorageService::start(
-        zenoh_session.clone(),
-        config.clone(),
-        &name,
-        storage,
-        capability,
-        rx_storage,
-        CacheLatest::new(latest_updates.clone(), replication_log.clone()),
-    )
-    .await;
-
-    // Testing if the `replication_log` is set is equivalent to testing if the `replication` is
-    // set: the `replication_log` is only set when the latter is.
-    if let Some(replication_log) = replication_log {
-        let rx_replication = tx.subscribe();
-
-        // NOTE Although the function `ReplicationService::spawn_start` spawns its own tasks, we
-        //      still need to call it within a dedicated task because the Zenoh routing tables are
-        //      populated only after the plugins have been loaded.
-        //
-        //      If we don't wait for the routing tables to be populated the initial alignment
-        //      (i.e. querying any Storage on the network handling the same key expression), will
-        //      never work.
-        //
-        // TODO Do we really want to perform such an initial alignment? Because this query will
-        //      target any Storage that matches the same key expression, regardless of if they have
-        //      been configured to be replicated.
-        tokio::task::spawn(async move {
+
+    // NOTE The StorageService method `start_storage_queryable_subscriber` does not spawn its own
+    //      task to loop/wait on the Subscriber and Queryable it creates. Thus we spawn the task
+    //      here.
+    //
+    //      Doing so also allows us to return early from the creation of the Storage, creation which
+    //      blocks populating the routing tables.
+    //
+    // TODO Do we really want to perform such an initial alignment? Because this query will
+    //      target any Storage that matches the same key expression, regardless of if they have
+    //      been configured to be replicated.
+    tokio::task::spawn(async move {
+        let storage_service = StorageService::new(
+            zenoh_session.clone(),
+            config.clone(),
+            &name,
+            storage,
+            capability,
+            CacheLatest::new(latest_updates.clone(), replication_log.clone()),
+        )
+        .await;
+
+        // Testing if the `replication_log` is set is equivalent to testing if the `replication` is
+        // set: the `replication_log` is only set when the latter is.
+        if let Some(replication_log) = replication_log {
             tracing::debug!(
                 "Starting replication of storage '{}' on keyexpr '{}'",
                 name,
                 config.key_expr,
             );
+
             ReplicationService::spawn_start(
                 zenoh_session,
-                storage_service,
+                &storage_service,
                 config.key_expr,
                 replication_log,
                 latest_updates,
                 rx_replication,
             )
             .await;
-        });
-    }
+        }
+
+        storage_service
+            .start_storage_queryable_subscriber(rx_storage)
+            .await;
+    });
 
     Ok(tx)
 }
diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
index 4d493ec8b2..e96320051f 100644
--- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
+++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
@@ -71,13 +71,12 @@ pub struct StorageService {
 }
 
 impl StorageService {
-    pub async fn start(
+    pub async fn new(
         session: Arc<Session>,
         config: StorageConfig,
         name: &str,
         storage: Arc<Mutex<Box<dyn zenoh_backend_traits::Storage>>>,
         capability: Capability,
-        rx: Receiver<StorageMessage>,
         cache_latest: CacheLatest,
     ) -> Self {
         let storage_service = StorageService {
@@ -117,15 +116,11 @@ impl StorageService {
                 }
             }
         }
-        storage_service
-            .clone()
-            .start_storage_queryable_subscriber(rx)
-            .await;
 
         storage_service
     }
 
-    async fn start_storage_queryable_subscriber(self, mut rx: Receiver<StorageMessage>) {
+    pub(crate) async fn start_storage_queryable_subscriber(self, mut rx: Receiver<StorageMessage>) {
         // start periodic GC event
         let t = Timer::default();
 
@@ -173,6 +168,12 @@ impl StorageService {
             }
         };
 
+        tracing::debug!(
+            "Starting storage '{}' on keyexpr '{}'",
+            self.name,
+            storage_key_expr
+        );
+
         tokio::task::spawn(async move {
             loop {
                 tokio::select!(

From 1492a26fab6c8d37d61ddf5694024e94b5c38167 Mon Sep 17 00:00:00 2001
From: Julien Loudet <julien.loudet@zettascale.tech>
Date: Wed, 25 Sep 2024 14:44:18 +0200
Subject: [PATCH 3/5] refactor(storage-manager): use hash in Replication key
 expressions

As we were using the key expression of the Storage to generate the key
expressions used in the Replication, it was possible to receive Digest
emitted by Replicas that were operating on a subset of the key space of
the Storage.

This commit changes the way the key expressions for the Replication are
generated by using the hash of the configuration of the Replication:
this renders these key expressions unique, hence avoiding the issue just
described.

This property is interesting for the initial Alignment: had we not made
that change, we would have had to ensure that we perform that Alignment
on a Replica operating on exactly the same key space (and not a subset)
and the same configuration (in particular, the `strip_prefix`).

NOTE: This does not solve the initial alignment step that will still
contact Storage that are operating on a subset (if there is no better
match on the network).

* plugins/zenoh-plugin-storage-manager/src/replication/core.rs:
  - Renamed `storage_ke` to `hash_configuration` in the key expression
    formatters of the Digest and the Aligner.
  - Removed the unnecessary clones when spawning the Digest Publisher +
    fixed the different call sites.
  - Removed the scope to access the configuration as we clone it earlier
    in the code + fixed the different call sites.
  - Used the hash of the configuration to generate the key expression
    for the Digest.
  - Removed the unnecessary clones when spawning the Digest Subscriber +
    fixed the different call sites.
  - Used the hash of the configuration to generate the key expression
    for the Digest.
  - Removed the unnecessary clones when spawning the Digest Publisher +
    fixed the different call sites.
  - Used the hash of the configuration to generate the key expression
    for the Digest.
  - Removed the unnecessary clones when spawning the Aligner + fixed the
    different call sites.
  - Used the hash of the configuration to generate the key expression
    for the Aligner Queryable.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
---
 .../src/replication/core.rs                   | 105 +++++++++---------
 1 file changed, 55 insertions(+), 50 deletions(-)

diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs
index 0ef0824c65..522385938c 100644
--- a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs
+++ b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs
@@ -43,8 +43,8 @@ use super::{
 use crate::{replication::aligner::AlignmentQuery, storages_mgt::LatestUpdates};
 
 kedefine!(
-    pub digest_key_expr_formatter: "@-digest/${zid:*}/${storage_ke:**}",
-    pub aligner_key_expr_formatter: "@zid/${zid:*}/${storage_ke:**}/aligner",
+    pub digest_key_expr_formatter: "@-digest/${zid:*}/${hash_configuration:*}",
+    pub aligner_key_expr_formatter: "@zid/${zid:*}/${hash_configuration:*}/aligner",
 );
 
 #[derive(Clone)]
@@ -69,16 +69,20 @@ impl Replication {
     ///
     /// [Log]: crate::replication::log::LogLatest
     pub(crate) fn spawn_digest_publisher(&self) -> JoinHandle<()> {
-        let zenoh_session = self.zenoh_session.clone();
-        let storage_key_expr = self.storage_key_expr.clone();
-        let replication_log = self.replication_log.clone();
-        let latest_updates = self.latest_updates.clone();
+        let replication = self.clone();
 
         tokio::task::spawn(async move {
+            let configuration = replication
+                .replication_log
+                .read()
+                .await
+                .configuration
+                .clone();
+
             let digest_key_put = match keformat!(
                 digest_key_expr_formatter::formatter(),
-                zid = zenoh_session.zid(),
-                storage_ke = storage_key_expr
+                zid = replication.zenoh_session.zid(),
+                hash_configuration = *configuration.fingerprint(),
             ) {
                 Ok(key) => key,
                 Err(e) => {
@@ -89,25 +93,14 @@ impl Replication {
                 }
             };
 
-            // Scope to not forget to release the lock.
-            let (publication_interval, propagation_delay, last_elapsed_interval) = {
-                let replication_log_guard = replication_log.read().await;
-                let configuration = replication_log_guard.configuration();
-                let last_elapsed_interval = match configuration.last_elapsed_interval() {
-                    Ok(idx) => idx,
-                    Err(e) => {
-                        tracing::error!(
-                            "Fatal error, call to `last_elapsed_interval` failed with: {e:?}"
-                        );
-                        return;
-                    }
-                };
-
-                (
-                    configuration.interval,
-                    configuration.propagation_delay,
-                    last_elapsed_interval,
-                )
+            let last_elapsed_interval = match configuration.last_elapsed_interval() {
+                Ok(idx) => idx,
+                Err(e) => {
+                    tracing::error!(
+                        "Fatal error, call to `last_elapsed_interval` failed with: {e:?}"
+                    );
+                    return;
+                }
             };
 
             // We have no control over when a replica is going to be started. The purpose is here
@@ -115,7 +108,7 @@ impl Replication {
             // at every interval (+ δ).
             let duration_until_next_interval = {
                 let millis_last_elapsed =
-                    *last_elapsed_interval as u128 * publication_interval.as_millis();
+                    *last_elapsed_interval as u128 * configuration.interval.as_millis();
 
                 if millis_last_elapsed > u64::MAX as u128 {
                     tracing::error!(
@@ -138,7 +131,7 @@ impl Replication {
                     };
 
                 Duration::from_millis(
-                    (publication_interval.as_millis() - (millis_since_now - millis_last_elapsed))
+                    (configuration.interval.as_millis() - (millis_since_now - millis_last_elapsed))
                         as u64,
                 )
             };
@@ -148,7 +141,7 @@ impl Replication {
             let mut events = HashMap::default();
 
             // Internal delay to avoid an "update storm".
-            let max_publication_delay = (publication_interval.as_millis() / 3) as u64;
+            let max_publication_delay = (configuration.interval.as_millis() / 3) as u64;
 
             let mut digest_update_start: Instant;
             let mut digest: Digest;
@@ -160,15 +153,15 @@ impl Replication {
                 // Except that we want to take into account the time it takes for a publication to
                 // reach this Zenoh node. Hence, we sleep for `propagation_delay` to, hopefully,
                 // catch the publications that are in transit.
-                tokio::time::sleep(propagation_delay).await;
+                tokio::time::sleep(configuration.propagation_delay).await;
 
                 {
-                    let mut latest_updates_guard = latest_updates.write().await;
+                    let mut latest_updates_guard = replication.latest_updates.write().await;
                     std::mem::swap(&mut events, &mut latest_updates_guard);
                 }
 
                 {
-                    let mut replication_guard = replication_log.write().await;
+                    let mut replication_guard = replication.replication_log.write().await;
                     replication_guard.update(events.drain().map(|(_, event)| event));
                     digest = match replication_guard.digest() {
                         Ok(digest) => digest,
@@ -194,7 +187,8 @@ impl Replication {
                 // buffer that, hopefully, has enough memory.
                 let buffer_capacity = serialization_buffer.capacity();
 
-                match zenoh_session
+                match replication
+                    .zenoh_session
                     .put(
                         &digest_key_put,
                         std::mem::replace(
@@ -209,17 +203,17 @@ impl Replication {
                 }
 
                 let digest_update_duration = digest_update_start.elapsed();
-                if digest_update_duration > publication_interval {
+                if digest_update_duration > configuration.interval {
                     tracing::warn!(
                         "The duration it took to update and publish the Digest is superior to the \
                          duration of an Interval ({} ms), we recommend increasing the duration of \
                          the latter. Digest update: {} ms (incl. delay: {} ms)",
-                        publication_interval.as_millis(),
+                        configuration.interval.as_millis(),
                         digest_update_duration.as_millis(),
-                        publication_delay + propagation_delay.as_millis() as u64
+                        publication_delay + configuration.propagation_delay.as_millis() as u64
                     );
                 } else {
-                    tokio::time::sleep(publication_interval - digest_update_duration).await;
+                    tokio::time::sleep(configuration.interval - digest_update_duration).await;
                 }
             }
         })
@@ -233,16 +227,20 @@ impl Replication {
     ///
     /// [DigestDiff]: super::digest::DigestDiff
     pub(crate) fn spawn_digest_subscriber(&self) -> JoinHandle<()> {
-        let zenoh_session = self.zenoh_session.clone();
-        let storage_key_expr = self.storage_key_expr.clone();
-        let replication_log = self.replication_log.clone();
         let replication = self.clone();
 
         tokio::task::spawn(async move {
+            let configuration = replication
+                .replication_log
+                .read()
+                .await
+                .configuration
+                .clone();
+
             let digest_key_sub = match keformat!(
                 digest_key_expr_formatter::formatter(),
                 zid = "*",
-                storage_ke = &storage_key_expr
+                hash_configuration = *configuration.fingerprint()
             ) {
                 Ok(key) => key,
                 Err(e) => {
@@ -257,7 +255,8 @@ impl Replication {
 
             let mut retry = 0;
             let subscriber = loop {
-                match zenoh_session
+                match replication
+                    .zenoh_session
                     .declare_subscriber(&digest_key_sub)
                     // NOTE: We need to explicitly set the locality to `Remote` as otherwise the
                     //       Digest subscriber will also receive the Digest published by its own
@@ -325,7 +324,7 @@ impl Replication {
 
                         tracing::debug!("Replication digest received");
 
-                        let digest = match replication_log.read().await.digest() {
+                        let digest = match replication.replication_log.read().await.digest() {
                             Ok(digest) => digest,
                             Err(e) => {
                                 tracing::error!(
@@ -340,7 +339,7 @@ impl Replication {
 
                             let replica_aligner_ke = match keformat!(
                                 aligner_key_expr_formatter::formatter(),
-                                storage_ke = &storage_key_expr,
+                                hash_configuration = *configuration.fingerprint(),
                                 zid = source_zid,
                             ) {
                                 Ok(key) => key,
@@ -373,15 +372,20 @@ impl Replication {
     /// responsible for fetching in the Replication Log or in the Storage the relevant information
     /// to send to the Replica such that it can align its own Storage.
     pub(crate) fn spawn_aligner_queryable(&self) -> JoinHandle<()> {
-        let zenoh_session = self.zenoh_session.clone();
-        let storage_key_expr = self.storage_key_expr.clone();
         let replication = self.clone();
 
         tokio::task::spawn(async move {
+            let configuration = replication
+                .replication_log
+                .read()
+                .await
+                .configuration
+                .clone();
+
             let aligner_ke = match keformat!(
                 aligner_key_expr_formatter::formatter(),
-                zid = zenoh_session.zid(),
-                storage_ke = storage_key_expr,
+                zid = replication.zenoh_session.zid(),
+                hash_configuration = *configuration.fingerprint(),
             ) {
                 Ok(ke) => ke,
                 Err(e) => {
@@ -395,7 +399,8 @@ impl Replication {
 
             let mut retry = 0;
             let queryable = loop {
-                match zenoh_session
+                match replication
+                    .zenoh_session
                     .declare_queryable(&aligner_ke)
                     .allowed_origin(Locality::Remote)
                     .await

From 9abde33066bdfbd539be0f523831308509a2512b Mon Sep 17 00:00:00 2001
From: Julien Loudet <julien.loudet@zettascale.tech>
Date: Wed, 25 Sep 2024 16:03:36 +0200
Subject: [PATCH 4/5] refactor(storage-manager): remove unnecessary wait/retry
 policy

It does not bring anything to wait and retry on error when attempting to
declare a Queryable or a Subscriber: either the Session is established
and these operations will succeed or the Session is no longer existing
in which case we should terminate.

* plugins/zenoh-plugin-storage-manager/src/replication/core.rs:
  - The `MAX_RETRY` and `WAIT_PERIODS_SECS` constants are no longer
    needed.
  - Removed the wait/retry loops when creating the Digest Subscriber and
    Aligner Queryable.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
---
 .../src/replication/core.rs                   | 74 ++++++-------------
 1 file changed, 23 insertions(+), 51 deletions(-)

diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs
index 522385938c..bd96912a4b 100644
--- a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs
+++ b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs
@@ -35,11 +35,7 @@ use zenoh::{
 };
 use zenoh_backend_traits::Storage;
 
-use super::{
-    digest::Digest,
-    log::LogLatest,
-    service::{MAX_RETRY, WAIT_PERIOD_SECS},
-};
+use super::{digest::Digest, log::LogLatest};
 use crate::{replication::aligner::AlignmentQuery, storages_mgt::LatestUpdates};
 
 kedefine!(
@@ -253,9 +249,7 @@ impl Replication {
                 }
             };
 
-            let mut retry = 0;
-            let subscriber = loop {
-                match replication
+            let subscriber = match replication
                     .zenoh_session
                     .declare_subscriber(&digest_key_sub)
                     // NOTE: We need to explicitly set the locality to `Remote` as otherwise the
@@ -263,24 +257,14 @@ impl Replication {
                     //       Digest publisher.
                     .allowed_origin(Locality::Remote)
                     .await
-                {
-                    Ok(subscriber) => break subscriber,
-                    Err(e) => {
-                        if retry < MAX_RETRY {
-                            retry += 1;
-                            tracing::warn!(
-                                "Failed to declare Digest subscriber: {e:?}. Attempt \
-                                 {retry}/{MAX_RETRY}."
-                            );
-                            tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await;
-                        } else {
-                            tracing::error!(
-                                "Could not declare Digest subscriber. The storage will not \
-                                 receive the Replication Digest of other replicas."
-                            );
-                            return;
-                        }
-                    }
+            {
+                Ok(subscriber) => subscriber,
+                Err(e) => {
+                    tracing::error!(
+                        "Could not declare Digest subscriber: {e:?}. The storage will not receive \
+                         the Replication Digest of other replicas."
+                    );
+                    return;
                 }
             };
 
@@ -397,31 +381,19 @@ impl Replication {
                 }
             };
 
-            let mut retry = 0;
-            let queryable = loop {
-                match replication
-                    .zenoh_session
-                    .declare_queryable(&aligner_ke)
-                    .allowed_origin(Locality::Remote)
-                    .await
-                {
-                    Ok(queryable) => break queryable,
-                    Err(e) => {
-                        if retry < MAX_RETRY {
-                            retry += 1;
-                            tracing::warn!(
-                                "Failed to declare the Aligner queryable: {e:?}. Attempt \
-                                 {retry}/{MAX_RETRY}."
-                            );
-                            tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await;
-                        } else {
-                            tracing::error!(
-                                "Could not declare the Aligner queryable. This storage will NOT \
-                                 align with other replicas."
-                            );
-                            return;
-                        }
-                    }
+            let queryable = match replication
+                .zenoh_session
+                .declare_queryable(&aligner_ke)
+                .allowed_origin(Locality::Remote)
+                .await
+            {
+                Ok(queryable) => queryable,
+                Err(e) => {
+                    tracing::error!(
+                        "Could not declare the Aligner queryable: {e:?}. This storage will NOT \
+                         align with other replicas."
+                    );
+                    return;
                 }
             };
 

From b21bb1e15b03336b68b408c8542c0ae95a50b587 Mon Sep 17 00:00:00 2001
From: Julien Loudet <julien.loudet@zettascale.tech>
Date: Wed, 25 Sep 2024 16:43:11 +0200
Subject: [PATCH 5/5] refactor(storage-manager): initial alignment on empty
 Storage

This commit changes the way a replicated Storage starts: if it is empty
and configured to be replicated, it will attempt to align with an active
and compatible (i.e. same configuration) Replica before anything.

The previous behaviour made a query on the key expression of the
Storage. Although, it could, in some cases, actually perform the same
initial alignment, it could not guarantee to only query a Storage that
was configured to be replicated.

To perform this controlled initial alignment, new variants to the
`AlignmentQuery` and `AlignmentReply` enumerations were added:
`Discovery` to discover an active Replica and reply with its `ZenohId`,
`All` to request the data from the discovered Replica.

To avoid contention, this transfer is performed by batch, one `Interval`
at a time.

* plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs:
  - Added new variants `AlignmentQuery::All` and
    `AlignmentQuery::Discovery`.
  - Added new variant `AlignmentReply::Discovery`.
  - Updated the `aligner` method to:
    - Send the `ZenohId` as a reply to an `AlignmentQuery::Discovery`.
    - Send all the data of the Storage as a reply to an
      `AlignmentQuery::All`. This leverages the already existing
      `reply_events` method.
  - Updated the `reply_events` method to not attempt to fetch the
    content of the Storage if the action is set to `delete`. Before this
    commit, the only time this method was called was during an alignment
    which filters out the deleted events (hence not requiring this
    code).
  - Updated the `spawn_query_replica_aligner` method:
    - It now returns the handle of the newly created task as we want to
      wait for it to finish when performing the initial alignment.
    - Changed the consolidation to `ConsolidationMode::Monotonic` when
      sending an `AlignmentQuery::Discovery`: we want to contact the
      fastest answering Replica (hopefully the closest).
    - Stopped processing replies when processing an
      `AlignmentQuery::Discovery` as we only want to perform the initial
      alignment once.
  - Updated the `process_alignment_reply`:
    - Process an `AlignmentReply::Discovery` by sending a follow-up
      `AlignmentQuery::All` to retrieve the content of the Storage of
      the discovered Replica.
    - It does not attempt to delete an entry in the Storage when
      processing an `AlignmentReply::Retrieval`. This could only happen
      when performing an initial alignment in which case the receiving
      Storage is empty. We basically only need to record the fact that a
      delete was performed in the Replication Log.

* plugins/zenoh-plugin-storage-manager/src/replication/core.rs:
  implemented the `initial_alignment` method that attempts to discover a
  Replica by sending out an `AlignmentQuery::Discovery` on the Aligner
  Queryable for all Replicas. Before making this query we wait a small
  delay to give enough time for Zenoh to propagate the routing tables.

* plugins/zenoh-plugin-storage-manager/src/replication/service.rs:
  - Removed the constants `MAX_RETRY` and `WAIT_PERIOD_SECS` as they
    were no longer needed.
  - Updated the documentation of the `spawn_start` function.
  - Removed the previous implementation of the initial alignment that
    made a query on the key expression of the Storage.
  - Added a check after creating the `Replication` structure: if the
    Replication Log is empty, which indicates an empty Storage, then
    perform an initial alignment.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
---
 .../src/replication/aligner.rs                | 157 +++++++++++++++---
 .../src/replication/core.rs                   |  57 +++++++
 .../src/replication/service.rs                |  91 ++++------
 3 files changed, 220 insertions(+), 85 deletions(-)

diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs
index e805bda26c..b957fc50f6 100644
--- a/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs
+++ b/plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs
@@ -18,18 +18,20 @@ use std::{
 };
 
 use serde::{Deserialize, Serialize};
+use tokio::task::JoinHandle;
 use zenoh::{
     bytes::ZBytes,
     internal::Value,
-    key_expr::OwnedKeyExpr,
+    key_expr::{format::keformat, OwnedKeyExpr},
     query::{ConsolidationMode, Query, Selector},
     sample::{Sample, SampleKind},
+    session::ZenohId,
 };
 use zenoh_backend_traits::StorageInsertionResult;
 
 use super::{
     classification::{IntervalIdx, SubIntervalIdx},
-    core::Replication,
+    core::{aligner_key_expr_formatter, Replication},
     digest::{DigestDiff, Fingerprint},
     log::EventMetadata,
 };
@@ -51,6 +53,8 @@ use super::{
 /// hence directly skipping to the `SubIntervals` variant.
 #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
 pub(crate) enum AlignmentQuery {
+    Discovery,
+    All,
     Diff(DigestDiff),
     Intervals(HashSet<IntervalIdx>),
     SubIntervals(HashMap<IntervalIdx, HashSet<SubIntervalIdx>>),
@@ -67,6 +71,7 @@ pub(crate) enum AlignmentQuery {
 /// Not all replies are made, it depends on the Era when a misalignment was detected.
 #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
 pub(crate) enum AlignmentReply {
+    Discovery(ZenohId),
     Intervals(HashMap<IntervalIdx, Fingerprint>),
     SubIntervals(HashMap<IntervalIdx, HashMap<SubIntervalIdx, Fingerprint>>),
     Events(Vec<EventMetadata>),
@@ -101,6 +106,47 @@ impl Replication {
             };
 
         match alignment_query {
+            AlignmentQuery::Discovery => {
+                tracing::trace!("Processing `AlignmentQuery::Discovery`");
+                reply_to_query(
+                    &query,
+                    AlignmentReply::Discovery(self.zenoh_session.zid()),
+                    None,
+                )
+                .await;
+            }
+            AlignmentQuery::All => {
+                tracing::trace!("Processing `AlignmentQuery::All`");
+
+                let idx_intervals = self
+                    .replication_log
+                    .read()
+                    .await
+                    .intervals
+                    .keys()
+                    .copied()
+                    .collect::<Vec<_>>();
+
+                for interval_idx in idx_intervals {
+                    let mut events_to_send = Vec::default();
+                    if let Some(interval) = self
+                        .replication_log
+                        .read()
+                        .await
+                        .intervals
+                        .get(&interval_idx)
+                    {
+                        interval.sub_intervals.values().for_each(|sub_interval| {
+                            events_to_send.extend(sub_interval.events.values().map(Into::into));
+                        });
+                    }
+
+                    // NOTE: As we took the lock in the `if let` block, it is released here,
+                    // diminishing contention.
+
+                    self.reply_events(&query, events_to_send).await;
+                }
+            }
             AlignmentQuery::Diff(digest_diff) => {
                 tracing::trace!("Processing `AlignmentQuery::Diff`");
                 if digest_diff.cold_eras_differ {
@@ -262,6 +308,11 @@ impl Replication {
     /// is the reason why we need the consolidation to set to be `None` (⚠️).
     pub(crate) async fn reply_events(&self, query: &Query, events_to_retrieve: Vec<EventMetadata>) {
         for event_metadata in events_to_retrieve {
+            if event_metadata.action == SampleKind::Delete {
+                reply_to_query(query, AlignmentReply::Retrieval(event_metadata), None).await;
+                continue;
+            }
+
             let stored_data = {
                 let mut storage = self.storage.lock().await;
                 match storage.get(event_metadata.stripped_key.clone(), "").await {
@@ -323,7 +374,7 @@ impl Replication {
         &self,
         replica_aligner_ke: OwnedKeyExpr,
         alignment_query: AlignmentQuery,
-    ) {
+    ) -> JoinHandle<()> {
         let replication = self.clone();
         tokio::task::spawn(async move {
             let attachment = match bincode::serialize(&alignment_query) {
@@ -334,17 +385,29 @@ impl Replication {
                 }
             };
 
+            // NOTE: We need to put the Consolidation to `None` as otherwise if multiple replies are
+            //       sent, they will be "consolidated" and only one of them will make it through.
+            //
+            //       When we retrieve Samples from a Replica, each Sample is sent in a separate
+            //       reply. Hence the need to have no consolidation.
+            let mut consolidation = ConsolidationMode::None;
+
+            if matches!(alignment_query, AlignmentQuery::Discovery) {
+                // NOTE: `Monotonic` means that Zenoh will forward the first answer it receives (and
+                //       ensure that later answers are with a higher timestamp — we do not care
+                //       about that last aspect).
+                //
+                //       By setting the consolidation to this value when performing the initial
+                //       alignment, we select the most reactive Replica (hopefully the closest as
+                //       well).
+                consolidation = ConsolidationMode::Monotonic;
+            }
+
             match replication
                 .zenoh_session
                 .get(Into::<Selector>::into(replica_aligner_ke.clone()))
                 .attachment(attachment)
-                // NOTE: We need to put the Consolidation to `None` as otherwise if multiple replies
-                //       are sent, they will be "consolidated" and only one of them will make it
-                //       through.
-                //
-                //       When we retrieve Samples from a Replica, each Sample is sent in a separate
-                //       reply. Hence the need to have no consolidation.
-                .consolidation(ConsolidationMode::None)
+                .consolidation(consolidation)
                 .await
             {
                 Err(e) => {
@@ -387,10 +450,17 @@ impl Replication {
                                 sample,
                             )
                             .await;
+
+                        // The consolidation mode `Monotonic`, used for sending out an
+                        // `AlignmentQuery::Discovery`, will keep on sending replies. We only want
+                        // to discover / align with a single Replica so we break here.
+                        if matches!(alignment_query, AlignmentQuery::Discovery) {
+                            return;
+                        }
                     }
                 }
             }
-        });
+        })
     }
 
     /// Processes the [AlignmentReply] sent by the Replica that has potentially data this Storage is
@@ -438,6 +508,39 @@ impl Replication {
         sample: Sample,
     ) {
         match alignment_reply {
+            AlignmentReply::Discovery(replica_zid) => {
+                let parsed_ke = match aligner_key_expr_formatter::parse(&replica_aligner_ke) {
+                    Ok(ke) => ke,
+                    Err(e) => {
+                        tracing::error!(
+                            "Failed to parse < {replica_aligner_ke} > as a valid Aligner key \
+                             expression: {e:?}"
+                        );
+                        return;
+                    }
+                };
+
+                let replica_aligner_ke = match keformat!(
+                    aligner_key_expr_formatter::formatter(),
+                    hash_configuration = parsed_ke.hash_configuration(),
+                    zid = replica_zid,
+                ) {
+                    Ok(ke) => ke,
+                    Err(e) => {
+                        tracing::error!("Failed to generate a valid Aligner key expression: {e:?}");
+                        return;
+                    }
+                };
+
+                tracing::debug!("Performing initial alignment with Replica < {replica_zid} >");
+
+                if let Err(e) = self
+                    .spawn_query_replica_aligner(replica_aligner_ke, AlignmentQuery::All)
+                    .await
+                {
+                    tracing::error!("Error returned while performing the initial alignment: {e:?}");
+                }
+            }
             AlignmentReply::Intervals(replica_intervals) => {
                 tracing::trace!("Processing `AlignmentReply::Intervals`");
                 let intervals_diff = {
@@ -616,18 +719,26 @@ impl Replication {
                     }
                 }
 
-                if matches!(
-                    self.storage
-                        .lock()
-                        .await
-                        .put(
-                            replica_event.stripped_key.clone(),
-                            sample.into(),
-                            replica_event.timestamp,
-                        )
-                        .await,
-                    Ok(StorageInsertionResult::Outdated) | Err(_)
-                ) {
+                // NOTE: This code can only be called with `action` set to `delete` on an initial
+                // alignment, in which case the Storage of the receiving Replica is empty => there
+                // is no need to actually call `storage.delete`.
+                //
+                // Outside of an initial alignment, the `delete` action will be performed at the
+                // step above, in `AlignmentReply::Events`.
+                if replica_event.action == SampleKind::Put
+                    && matches!(
+                        self.storage
+                            .lock()
+                            .await
+                            .put(
+                                replica_event.stripped_key.clone(),
+                                sample.into(),
+                                replica_event.timestamp,
+                            )
+                            .await,
+                        Ok(StorageInsertionResult::Outdated) | Err(_)
+                    )
+                {
                     return;
                 }
 
diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs
index bd96912a4b..0db1459ec7 100644
--- a/plugins/zenoh-plugin-storage-manager/src/replication/core.rs
+++ b/plugins/zenoh-plugin-storage-manager/src/replication/core.rs
@@ -53,6 +53,63 @@ pub(crate) struct Replication {
 }
 
 impl Replication {
+    /// Performs an initial alignment, skipping the comparison of Digest, asking directly the first
+    /// discovered Replica for all its entries.
+    ///
+    /// # ⚠️ Assumption: empty Storage
+    ///
+    /// We assume that this method will only be called if the underlying Storage is empty. This has
+    /// at least one consequence: if the Aligner receives a `delete` event from the Replica, it will
+    /// not attempt to delete anything from the Storage.
+    ///
+    /// # Replica discovery
+    ///
+    /// To discover a Replica, this method will create a Digest subscriber, wait to receive a
+    /// *valid* Digest and, upon reception, ask that Replica for all its entries.
+    ///
+    /// To avoid waiting indefinitely (in case there are no other Replica on the network), the
+    /// subscriber will wait for, at most, the duration of two Intervals.
+    pub(crate) async fn initial_alignment(&self) {
+        let ke_all_replicas = match keformat!(
+            aligner_key_expr_formatter::formatter(),
+            hash_configuration = *self
+                .replication_log
+                .read()
+                .await
+                .configuration
+                .fingerprint(),
+            zid = "*",
+        ) {
+            Ok(ke) => ke,
+            Err(e) => {
+                tracing::error!(
+                    "Failed to generate key expression to query all Replicas: {e:?}. Skipping \
+                     initial alignment."
+                );
+                return;
+            }
+        };
+
+        // NOTE: As discussed with @OlivierHecart, the plugins do not wait for the duration of the
+        // "scouting delay" before performing any Zenoh operation. Hence, we manually enforce this
+        // delay when performing the initial alignment.
+        let delay = self
+            .zenoh_session
+            .config()
+            .lock()
+            .scouting
+            .delay()
+            .unwrap_or(500);
+        tokio::time::sleep(Duration::from_millis(delay)).await;
+
+        if let Err(e) = self
+            .spawn_query_replica_aligner(ke_all_replicas, AlignmentQuery::Discovery)
+            .await
+        {
+            tracing::error!("Initial alignment failed with: {e:?}");
+        }
+    }
+
     /// Spawns a task that periodically publishes the [Digest] of the Replication [Log].
     ///
     /// This task will perform the following steps:
diff --git a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs
index f3b87c6c3f..e48fb4fecd 100644
--- a/plugins/zenoh-plugin-storage-manager/src/replication/service.rs
+++ b/plugins/zenoh-plugin-storage-manager/src/replication/service.rs
@@ -12,13 +12,13 @@
 //   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
 //
 
-use std::{sync::Arc, time::Duration};
+use std::sync::Arc;
 
 use tokio::{
     sync::{broadcast::Receiver, RwLock},
     task::JoinHandle,
 };
-use zenoh::{key_expr::OwnedKeyExpr, query::QueryTarget, sample::Locality, session::Session};
+use zenoh::{key_expr::OwnedKeyExpr, session::Session};
 
 use super::{core::Replication, LogLatest};
 use crate::storages_mgt::{LatestUpdates, StorageMessage, StorageService};
@@ -29,17 +29,22 @@ pub(crate) struct ReplicationService {
     aligner_queryable_handle: JoinHandle<()>,
 }
 
-pub(crate) const MAX_RETRY: usize = 2;
-pub(crate) const WAIT_PERIOD_SECS: u64 = 4;
-
 impl ReplicationService {
     /// Starts the `ReplicationService`, spawning multiple tasks.
     ///
+    /// # Initial alignment
+    ///
+    /// To optimise network resources, if the Storage is empty an "initial alignment" will be
+    /// performed: if a Replica is detected, a query will be made to retrieve the entire content of
+    /// its Storage.
+    ///
     /// # Tasks spawned
     ///
-    /// This function will spawn two tasks:
+    /// This function will spawn four long-lived tasks:
     /// 1. One to publish the [Digest].
-    /// 2. One to wait on the provided [Receiver] in order to stop the Replication Service,
+    /// 2. One to receive the [Digest] of other Replica.
+    /// 3. One to receive alignment queries of other Replica.
+    /// 4. One to wait on the provided [Receiver] in order to stop the Replication Service,
     ///    attempting to abort all the tasks that were spawned, once a Stop message has been
     ///    received.
     pub async fn spawn_start(
@@ -50,65 +55,27 @@ impl ReplicationService {
         latest_updates: Arc<RwLock<LatestUpdates>>,
         mut rx: Receiver<StorageMessage>,
     ) {
-        // We perform a "wait-try" policy because Zenoh needs some time to propagate the routing
-        // information and, here, we need to have the queryables propagated.
-        //
-        // 4 seconds is an arbitrary value.
-        let mut attempt = 0;
-        let mut received_reply = false;
-
-        while attempt < MAX_RETRY {
-            attempt += 1;
-            tokio::time::sleep(Duration::from_secs(WAIT_PERIOD_SECS)).await;
-
-            match zenoh_session
-                .get(&storage_key_expr)
-                // `BestMatching`, the default option for `target`, will try to minimise the storage
-                // that are queried and their distance while trying to maximise the key space
-                // covered.
-                //
-                // In other words, if there is a close and complete storage, it will only query this
-                // one.
-                .target(QueryTarget::BestMatching)
-                // The value `Remote` is self-explanatory but why it is needed deserves an
-                // explanation: we do not want to query the local database as the purpose is to get
-                // the data from other replicas (if there is one).
-                .allowed_destination(Locality::Remote)
-                .await
-            {
-                Ok(replies) => {
-                    while let Ok(reply) = replies.recv_async().await {
-                        received_reply = true;
-                        if let Ok(sample) = reply.into_result() {
-                            if let Err(e) = storage_service.process_sample(sample).await {
-                                tracing::error!("{e:?}");
-                            }
-                        }
-                    }
-                }
-                Err(e) => tracing::error!("Initial alignment Query failed with: {e:?}"),
-            }
+        let storage = storage_service.storage.clone();
 
-            if received_reply {
-                break;
-            }
+        let replication = Replication {
+            zenoh_session,
+            replication_log,
+            storage_key_expr,
+            latest_updates,
+            storage,
+        };
 
-            tracing::debug!(
-                "Found no Queryable matching '{storage_key_expr}'. Attempt {attempt}/{MAX_RETRY}."
-            );
+        if replication
+            .replication_log
+            .read()
+            .await
+            .intervals
+            .is_empty()
+        {
+            replication.initial_alignment().await;
         }
 
-        let storage = storage_service.storage.clone();
-
         tokio::task::spawn(async move {
-            let replication = Replication {
-                zenoh_session,
-                replication_log,
-                storage_key_expr,
-                latest_updates,
-                storage,
-            };
-
             let replication_service = Self {
                 digest_publisher_handle: replication.spawn_digest_publisher(),
                 digest_subscriber_handle: replication.spawn_digest_subscriber(),
@@ -124,7 +91,7 @@ impl ReplicationService {
         });
     }
 
-    /// Stops all the tasks spawned by the `ReplicationService`.
+    /// Stops all the long-lived tasks spawned by the `ReplicationService`.
     pub fn stop(self) {
         self.digest_publisher_handle.abort();
         self.digest_subscriber_handle.abort();