Skip to content

Commit

Permalink
refactor(storage-manager): initial alignment on empty Storage
Browse files Browse the repository at this point in the history
This commit changes the way a replicated Storage starts: if it is empty
and configured to be replicated, it will attempt to align with an active
and compatible (i.e. same configuration) Replica before anything.

The previous behaviour made a query on the key expression of the
Storage. Although, it could, in some cases, actually perform the same
initial alignment, it could not guarantee to only query a Storage that
was configured to be replicated.

To perform this controlled initial alignment, new variants to the
`AlignmentQuery` and `AlignmentReply` enumerations were added:
`Discovery` to discover an active Replica and reply with its `ZenohId`,
`All` to request the data from the discovered Replica.

To avoid contention, this transfer is performed by batch, one `Interval`
at a time.

* plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs:
  - Added new variants `AlignmentQuery::All` and
    `AlignmentQuery::Discovery`.
  - Added new variant `AlignmentReply::Discovery`.
  - Updated the `aligner` method to:
    - Send the `ZenohId` as a reply to an `AlignmentQuery::Discovery`.
    - Send all the data of the Storage as a reply to an
      `AlignmentQuery::All`. This leverages the already existing
      `reply_events` method.
  - Updated the `reply_events` method to not attempt to fetch the
    content of the Storage if the action is set to `delete`. Before this
    commit, the only time this method was called was during an alignment
    which filters out the deleted events (hence not requiring this
    code).
  - Updated the `spawn_query_replica_aligner` method:
    - It now returns the handle of the newly created task as we want to
      wait for it to finish when performing the initial alignment.
    - Changed the consolidation to `ConsolidationMode::Monotonic` when
      sending an `AlignmentQuery::Discovery`: we only want one reply.
  - Updated the `process_alignment_reply`:
    - Process an `AlignmentReply::Discovery` by sending a follow-up
      `AlignmentQuery::All` to retrieve the content of the Storage of
      the discovered Replica.
    - It does not attempt to delete an entry in the Storage when
      processing an `AlignmentReply::Retrieval`. This could only happen
      when performing an initial alignment in which case the receiving
      Storage is empty. We basically only need to record the fact that a
      delete was performed in the Replication Log.

* plugins/zenoh-plugin-storage-manager/src/replication/core.rs:
  implemented the `initial_alignment` method that attempts to discover a
  Replica by sending out an `AlignmentQuery::Discovery` on the Aligner
  Queryable for all Replicas. Before making this query we wait a small
  delay to give enough time for Zenoh to propagate the routing tables.

* plugins/zenoh-plugin-storage-manager/src/replication/service.rs:
  - Removed the constants `MAX_RETRY` and `WAIT_PERIOD_SECS` as they
    were no longer needed.
  - Updated the documentation of the `spawn_start` function.
  - Removed the previous implementation of the initial alignment that
    made a query on the key expression of the Storage.
  - Added a check after creating the `Replication` structure: if the
    Replication Log is empty, which indicates an empty Storage, then
    perform an initial alignment.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Sep 26, 2024
1 parent 9abde33 commit a73ce15
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 79 deletions.
131 changes: 114 additions & 17 deletions plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ use std::{
};

use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;
use zenoh::{
bytes::ZBytes,
internal::Value,
key_expr::OwnedKeyExpr,
key_expr::{format::keformat, OwnedKeyExpr},
query::{ConsolidationMode, Query, Selector},
sample::{Sample, SampleKind},
session::ZenohId,
};
use zenoh_backend_traits::StorageInsertionResult;

use super::{
classification::{IntervalIdx, SubIntervalIdx},
core::Replication,
core::{aligner_key_expr_formatter, Replication},
digest::{DigestDiff, Fingerprint},
log::EventMetadata,
};
Expand All @@ -51,6 +53,8 @@ use super::{
/// hence directly skipping to the `SubIntervals` variant.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) enum AlignmentQuery {
Discovery,
All,
Diff(DigestDiff),
Intervals(HashSet<IntervalIdx>),
SubIntervals(HashMap<IntervalIdx, HashSet<SubIntervalIdx>>),
Expand All @@ -67,6 +71,7 @@ pub(crate) enum AlignmentQuery {
/// Not all replies are made, it depends on the Era when a misalignment was detected.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) enum AlignmentReply {
Discovery(ZenohId),
Intervals(HashMap<IntervalIdx, Fingerprint>),
SubIntervals(HashMap<IntervalIdx, HashMap<SubIntervalIdx, Fingerprint>>),
Events(Vec<EventMetadata>),
Expand Down Expand Up @@ -101,6 +106,47 @@ impl Replication {
};

match alignment_query {
AlignmentQuery::Discovery => {
tracing::trace!("Processing `AlignmentQuery::Discovery`");
reply_to_query(
&query,
AlignmentReply::Discovery(self.zenoh_session.zid()),
None,
)
.await;
}
AlignmentQuery::All => {
tracing::trace!("Processing `AlignmentQuery::All`");

let idx_intervals = self
.replication_log
.read()
.await
.intervals
.keys()
.copied()
.collect::<Vec<_>>();

for interval_idx in idx_intervals {
let mut events_to_send = Vec::default();
if let Some(interval) = self
.replication_log
.read()
.await
.intervals
.get(&interval_idx)
{
interval.sub_intervals.values().for_each(|sub_interval| {
events_to_send.extend(sub_interval.events.values().map(Into::into));
});
}

// NOTE: As we took the lock in the `if let` block, it is released here,
// diminishing contention.

self.reply_events(&query, events_to_send).await;
}
}
AlignmentQuery::Diff(digest_diff) => {
tracing::trace!("Processing `AlignmentQuery::Diff`");
if digest_diff.cold_eras_differ {
Expand Down Expand Up @@ -262,6 +308,11 @@ impl Replication {
/// is the reason why we need the consolidation to set to be `None` (⚠️).
pub(crate) async fn reply_events(&self, query: &Query, events_to_retrieve: Vec<EventMetadata>) {
for event_metadata in events_to_retrieve {
if event_metadata.action == SampleKind::Delete {
reply_to_query(query, AlignmentReply::Retrieval(event_metadata), None).await;
continue;
}

let stored_data = {
let mut storage = self.storage.lock().await;
match storage.get(event_metadata.stripped_key.clone(), "").await {
Expand Down Expand Up @@ -323,7 +374,7 @@ impl Replication {
&self,
replica_aligner_ke: OwnedKeyExpr,
alignment_query: AlignmentQuery,
) {
) -> JoinHandle<()> {
let replication = self.clone();
tokio::task::spawn(async move {
let attachment = match bincode::serialize(&alignment_query) {
Expand All @@ -334,6 +385,11 @@ impl Replication {
}
};

let mut consolidation = ConsolidationMode::None;
if matches!(alignment_query, AlignmentQuery::Discovery) {
consolidation = ConsolidationMode::Monotonic;
}

match replication
.zenoh_session
.get(Into::<Selector>::into(replica_aligner_ke.clone()))
Expand All @@ -344,7 +400,7 @@ impl Replication {
//
// When we retrieve Samples from a Replica, each Sample is sent in a separate
// reply. Hence the need to have no consolidation.
.consolidation(ConsolidationMode::None)
.consolidation(consolidation)
.await
{
Err(e) => {
Expand Down Expand Up @@ -390,7 +446,7 @@ impl Replication {
}
}
}
});
})
}

/// Processes the [AlignmentReply] sent by the Replica that has potentially data this Storage is
Expand Down Expand Up @@ -438,6 +494,39 @@ impl Replication {
sample: Sample,
) {
match alignment_reply {
AlignmentReply::Discovery(replica_zid) => {
let parsed_ke = match aligner_key_expr_formatter::parse(&replica_aligner_ke) {
Ok(ke) => ke,
Err(e) => {
tracing::error!(
"Failed to parse < {replica_aligner_ke} > as a valid Aligner key \
expression: {e:?}"
);
return;
}
};

let replica_aligner_ke = match keformat!(
aligner_key_expr_formatter::formatter(),
hash_configuration = parsed_ke.hash_configuration(),
zid = replica_zid,
) {
Ok(ke) => ke,
Err(e) => {
tracing::error!("Failed to generate a valid Aligner key expression: {e:?}");
return;
}
};

tracing::debug!("Performing initial alignment with Replica < {replica_zid} >");

if let Err(e) = self
.spawn_query_replica_aligner(replica_aligner_ke, AlignmentQuery::All)
.await
{
tracing::error!("Error returned while performing the initial alignment: {e:?}");
}
}
AlignmentReply::Intervals(replica_intervals) => {
tracing::trace!("Processing `AlignmentReply::Intervals`");
let intervals_diff = {
Expand Down Expand Up @@ -616,18 +705,26 @@ impl Replication {
}
}

if matches!(
self.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
sample.into(),
replica_event.timestamp,
)
.await,
Ok(StorageInsertionResult::Outdated) | Err(_)
) {
// NOTE: This code can only be called with `action` set to `delete` on an initial
// alignment, in which case the Storage of the receiving Replica is empty => there
// is no need to actually call `storage.delete`.
//
// Outside of an initial alignment, the `delete` action will be performed at the
// step above, in `AlignmentReply::Events`.
if replica_event.action == SampleKind::Put
&& matches!(
self.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
sample.into(),
replica_event.timestamp,
)
.await,
Ok(StorageInsertionResult::Outdated) | Err(_)
)
{
return;
}

Expand Down
54 changes: 54 additions & 0 deletions plugins/zenoh-plugin-storage-manager/src/replication/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,60 @@ pub(crate) struct Replication {
}

impl Replication {
/// Performs an initial alignment, skipping the comparison of Digest, asking directly the first
/// discovered Replica for all its entries.
///
/// # ⚠️ Assumption: empty Storage
///
/// We assume that this method will only be called if the underlying Storage is empty. This has
/// at least one consequence: if the Aligner receives a `delete` event from the Replica, it will
/// not attempt to delete anything from the Storage.
///
/// # Replica discovery
///
/// To discover a Replica, this method will create a Digest subscriber, wait to receive a
/// *valid* Digest and, upon reception, ask that Replica for all its entries.
///
/// To avoid waiting indefinitely (in case there are no other Replica on the network), the
/// subscriber will wait for, at most, the duration of two Intervals.
pub(crate) async fn initial_alignment(&self) {
let ke_all_replicas = match keformat!(
aligner_key_expr_formatter::formatter(),
hash_configuration = *self
.replication_log
.read()
.await
.configuration
.fingerprint(),
zid = "*",
) {
Ok(ke) => ke,
Err(e) => {
tracing::error!(
"Failed to generate key expression to query all Replicas: {e:?}. Skipping \
initial alignment."
);
return;
}
};

let delay = self
.zenoh_session
.config()
.lock()
.scouting
.delay()
.unwrap_or(500);
tokio::time::sleep(Duration::from_millis(delay)).await;

if let Err(e) = self
.spawn_query_replica_aligner(ke_all_replicas, AlignmentQuery::Discovery)
.await
{
tracing::error!("Initial alignment failed with: {e:?}");
}
}

/// Spawns a task that periodically publishes the [Digest] of the Replication [Log].
///
/// This task will perform the following steps:
Expand Down
Loading

0 comments on commit a73ce15

Please sign in to comment.