Skip to content

Commit

Permalink
Merge commit 'a6c69946f7bd40b541ef78c9b7f417d0baa978a3'
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Sep 26, 2024
2 parents 99fac5f + a6c6994 commit 0d0b32a
Show file tree
Hide file tree
Showing 8 changed files with 413 additions and 283 deletions.
157 changes: 134 additions & 23 deletions plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ use std::{
};

use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;
use zenoh::{
bytes::ZBytes,
internal::Value,
key_expr::OwnedKeyExpr,
key_expr::{format::keformat, OwnedKeyExpr},
query::{ConsolidationMode, Query, Selector},
sample::{Sample, SampleKind},
session::ZenohId,
};
use zenoh_backend_traits::StorageInsertionResult;

use super::{
classification::{IntervalIdx, SubIntervalIdx},
core::Replication,
core::{aligner_key_expr_formatter, Replication},
digest::{DigestDiff, Fingerprint},
log::EventMetadata,
};
Expand All @@ -51,6 +53,8 @@ use super::{
/// hence directly skipping to the `SubIntervals` variant.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) enum AlignmentQuery {
Discovery,
All,
Diff(DigestDiff),
Intervals(HashSet<IntervalIdx>),
SubIntervals(HashMap<IntervalIdx, HashSet<SubIntervalIdx>>),
Expand All @@ -67,6 +71,7 @@ pub(crate) enum AlignmentQuery {
/// Not all replies are made, it depends on the Era when a misalignment was detected.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) enum AlignmentReply {
Discovery(ZenohId),
Intervals(HashMap<IntervalIdx, Fingerprint>),
SubIntervals(HashMap<IntervalIdx, HashMap<SubIntervalIdx, Fingerprint>>),
Events(Vec<EventMetadata>),
Expand Down Expand Up @@ -101,6 +106,47 @@ impl Replication {
};

match alignment_query {
AlignmentQuery::Discovery => {
tracing::trace!("Processing `AlignmentQuery::Discovery`");
reply_to_query(
&query,
AlignmentReply::Discovery(self.zenoh_session.zid()),
None,
)
.await;
}
AlignmentQuery::All => {
tracing::trace!("Processing `AlignmentQuery::All`");

let idx_intervals = self
.replication_log
.read()
.await
.intervals
.keys()
.copied()
.collect::<Vec<_>>();

for interval_idx in idx_intervals {
let mut events_to_send = Vec::default();
if let Some(interval) = self
.replication_log
.read()
.await
.intervals
.get(&interval_idx)
{
interval.sub_intervals.values().for_each(|sub_interval| {
events_to_send.extend(sub_interval.events.values().map(Into::into));
});
}

// NOTE: As we took the lock in the `if let` block, it is released here,
// diminishing contention.

self.reply_events(&query, events_to_send).await;
}
}
AlignmentQuery::Diff(digest_diff) => {
tracing::trace!("Processing `AlignmentQuery::Diff`");
if digest_diff.cold_eras_differ {
Expand Down Expand Up @@ -262,6 +308,11 @@ impl Replication {
/// is the reason why we need the consolidation to set to be `None` (⚠️).
pub(crate) async fn reply_events(&self, query: &Query, events_to_retrieve: Vec<EventMetadata>) {
for event_metadata in events_to_retrieve {
if event_metadata.action == SampleKind::Delete {
reply_to_query(query, AlignmentReply::Retrieval(event_metadata), None).await;
continue;
}

let stored_data = {
let mut storage = self.storage.lock().await;
match storage.get(event_metadata.stripped_key.clone(), "").await {
Expand Down Expand Up @@ -323,7 +374,7 @@ impl Replication {
&self,
replica_aligner_ke: OwnedKeyExpr,
alignment_query: AlignmentQuery,
) {
) -> JoinHandle<()> {
let replication = self.clone();
tokio::task::spawn(async move {
let attachment = match bincode::serialize(&alignment_query) {
Expand All @@ -334,17 +385,29 @@ impl Replication {
}
};

// NOTE: We need to put the Consolidation to `None` as otherwise if multiple replies are
// sent, they will be "consolidated" and only one of them will make it through.
//
// When we retrieve Samples from a Replica, each Sample is sent in a separate
// reply. Hence the need to have no consolidation.
let mut consolidation = ConsolidationMode::None;

if matches!(alignment_query, AlignmentQuery::Discovery) {
// NOTE: `Monotonic` means that Zenoh will forward the first answer it receives (and
// ensure that later answers are with a higher timestamp — we do not care
// about that last aspect).
//
// By setting the consolidation to this value when performing the initial
// alignment, we select the most reactive Replica (hopefully the closest as
// well).
consolidation = ConsolidationMode::Monotonic;
}

match replication
.zenoh_session
.get(Into::<Selector>::into(replica_aligner_ke.clone()))
.attachment(attachment)
// NOTE: We need to put the Consolidation to `None` as otherwise if multiple replies
// are sent, they will be "consolidated" and only one of them will make it
// through.
//
// When we retrieve Samples from a Replica, each Sample is sent in a separate
// reply. Hence the need to have no consolidation.
.consolidation(ConsolidationMode::None)
.consolidation(consolidation)
.await
{
Err(e) => {
Expand Down Expand Up @@ -387,10 +450,17 @@ impl Replication {
sample,
)
.await;

// The consolidation mode `Monotonic`, used for sending out an
// `AlignmentQuery::Discovery`, will keep on sending replies. We only want
// to discover / align with a single Replica so we break here.
if matches!(alignment_query, AlignmentQuery::Discovery) {
return;
}
}
}
}
});
})
}

/// Processes the [AlignmentReply] sent by the Replica that has potentially data this Storage is
Expand Down Expand Up @@ -438,6 +508,39 @@ impl Replication {
sample: Sample,
) {
match alignment_reply {
AlignmentReply::Discovery(replica_zid) => {
let parsed_ke = match aligner_key_expr_formatter::parse(&replica_aligner_ke) {
Ok(ke) => ke,
Err(e) => {
tracing::error!(
"Failed to parse < {replica_aligner_ke} > as a valid Aligner key \
expression: {e:?}"
);
return;
}
};

let replica_aligner_ke = match keformat!(
aligner_key_expr_formatter::formatter(),
hash_configuration = parsed_ke.hash_configuration(),
zid = replica_zid,
) {
Ok(ke) => ke,
Err(e) => {
tracing::error!("Failed to generate a valid Aligner key expression: {e:?}");
return;
}
};

tracing::debug!("Performing initial alignment with Replica < {replica_zid} >");

if let Err(e) = self
.spawn_query_replica_aligner(replica_aligner_ke, AlignmentQuery::All)
.await
{
tracing::error!("Error returned while performing the initial alignment: {e:?}");
}
}
AlignmentReply::Intervals(replica_intervals) => {
tracing::trace!("Processing `AlignmentReply::Intervals`");
let intervals_diff = {
Expand Down Expand Up @@ -616,18 +719,26 @@ impl Replication {
}
}

if matches!(
self.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
sample.into(),
replica_event.timestamp,
)
.await,
Ok(StorageInsertionResult::Outdated) | Err(_)
) {
// NOTE: This code can only be called with `action` set to `delete` on an initial
// alignment, in which case the Storage of the receiving Replica is empty => there
// is no need to actually call `storage.delete`.
//
// Outside of an initial alignment, the `delete` action will be performed at the
// step above, in `AlignmentReply::Events`.
if replica_event.action == SampleKind::Put
&& matches!(
self.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
sample.into(),
replica_event.timestamp,
)
.await,
Ok(StorageInsertionResult::Outdated) | Err(_)
)
{
return;
}

Expand Down
Loading

0 comments on commit 0d0b32a

Please sign in to comment.