Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage-manager): replication initial alignment #1468

Merged
merged 5 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 134 additions & 23 deletions plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ use std::{
};

use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;
use zenoh::{
bytes::ZBytes,
internal::Value,
key_expr::OwnedKeyExpr,
key_expr::{format::keformat, OwnedKeyExpr},
query::{ConsolidationMode, Query, Selector},
sample::{Sample, SampleKind},
session::ZenohId,
};
use zenoh_backend_traits::StorageInsertionResult;

use super::{
classification::{IntervalIdx, SubIntervalIdx},
core::Replication,
core::{aligner_key_expr_formatter, Replication},
digest::{DigestDiff, Fingerprint},
log::EventMetadata,
};
Expand All @@ -51,6 +53,8 @@ use super::{
/// hence directly skipping to the `SubIntervals` variant.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) enum AlignmentQuery {
Discovery,
All,
Diff(DigestDiff),
Intervals(HashSet<IntervalIdx>),
SubIntervals(HashMap<IntervalIdx, HashSet<SubIntervalIdx>>),
Expand All @@ -67,6 +71,7 @@ pub(crate) enum AlignmentQuery {
/// Not all replies are made, it depends on the Era when a misalignment was detected.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) enum AlignmentReply {
Discovery(ZenohId),
Intervals(HashMap<IntervalIdx, Fingerprint>),
SubIntervals(HashMap<IntervalIdx, HashMap<SubIntervalIdx, Fingerprint>>),
Events(Vec<EventMetadata>),
Expand Down Expand Up @@ -101,6 +106,47 @@ impl Replication {
};

match alignment_query {
AlignmentQuery::Discovery => {
tracing::trace!("Processing `AlignmentQuery::Discovery`");
reply_to_query(
&query,
AlignmentReply::Discovery(self.zenoh_session.zid()),
None,
)
.await;
}
AlignmentQuery::All => {
tracing::trace!("Processing `AlignmentQuery::All`");

let idx_intervals = self
.replication_log
.read()
.await
.intervals
.keys()
.copied()
.collect::<Vec<_>>();

for interval_idx in idx_intervals {
let mut events_to_send = Vec::default();
if let Some(interval) = self
.replication_log
.read()
.await
.intervals
.get(&interval_idx)
{
interval.sub_intervals.values().for_each(|sub_interval| {
events_to_send.extend(sub_interval.events.values().map(Into::into));
});
}

// NOTE: As we took the lock in the `if let` block, it is released here,
// diminishing contention.

self.reply_events(&query, events_to_send).await;
}
}
AlignmentQuery::Diff(digest_diff) => {
tracing::trace!("Processing `AlignmentQuery::Diff`");
if digest_diff.cold_eras_differ {
Expand Down Expand Up @@ -262,6 +308,11 @@ impl Replication {
/// is the reason why we need the consolidation to set to be `None` (⚠️).
pub(crate) async fn reply_events(&self, query: &Query, events_to_retrieve: Vec<EventMetadata>) {
for event_metadata in events_to_retrieve {
if event_metadata.action == SampleKind::Delete {
reply_to_query(query, AlignmentReply::Retrieval(event_metadata), None).await;
continue;
}

let stored_data = {
let mut storage = self.storage.lock().await;
match storage.get(event_metadata.stripped_key.clone(), "").await {
Expand Down Expand Up @@ -323,7 +374,7 @@ impl Replication {
&self,
replica_aligner_ke: OwnedKeyExpr,
alignment_query: AlignmentQuery,
) {
) -> JoinHandle<()> {
let replication = self.clone();
tokio::task::spawn(async move {
let attachment = match bincode::serialize(&alignment_query) {
Expand All @@ -334,17 +385,29 @@ impl Replication {
}
};

// NOTE: We need to put the Consolidation to `None` as otherwise if multiple replies are
// sent, they will be "consolidated" and only one of them will make it through.
//
// When we retrieve Samples from a Replica, each Sample is sent in a separate
// reply. Hence the need to have no consolidation.
let mut consolidation = ConsolidationMode::None;

if matches!(alignment_query, AlignmentQuery::Discovery) {
// NOTE: `Monotonic` means that Zenoh will forward the first answer it receives (and
// ensure that later answers are with a higher timestamp — we do not care
// about that last aspect).
//
// By setting the consolidation to this value when performing the initial
// alignment, we select the most reactive Replica (hopefully the closest as
// well).
consolidation = ConsolidationMode::Monotonic;
}

match replication
.zenoh_session
.get(Into::<Selector>::into(replica_aligner_ke.clone()))
.attachment(attachment)
// NOTE: We need to put the Consolidation to `None` as otherwise if multiple replies
// are sent, they will be "consolidated" and only one of them will make it
// through.
//
// When we retrieve Samples from a Replica, each Sample is sent in a separate
// reply. Hence the need to have no consolidation.
.consolidation(ConsolidationMode::None)
.consolidation(consolidation)
.await
{
Err(e) => {
Expand Down Expand Up @@ -387,10 +450,17 @@ impl Replication {
sample,
)
.await;

// The consolidation mode `Monotonic`, used for sending out an
// `AlignmentQuery::Discovery`, will keep on sending replies. We only want
// to discover / align with a single Replica so we break here.
if matches!(alignment_query, AlignmentQuery::Discovery) {
return;
}
}
}
}
});
})
}

/// Processes the [AlignmentReply] sent by the Replica that has potentially data this Storage is
Expand Down Expand Up @@ -438,6 +508,39 @@ impl Replication {
sample: Sample,
) {
match alignment_reply {
AlignmentReply::Discovery(replica_zid) => {
let parsed_ke = match aligner_key_expr_formatter::parse(&replica_aligner_ke) {
Ok(ke) => ke,
Err(e) => {
tracing::error!(
"Failed to parse < {replica_aligner_ke} > as a valid Aligner key \
expression: {e:?}"
);
return;
}
};

let replica_aligner_ke = match keformat!(
aligner_key_expr_formatter::formatter(),
hash_configuration = parsed_ke.hash_configuration(),
zid = replica_zid,
) {
Ok(ke) => ke,
Err(e) => {
tracing::error!("Failed to generate a valid Aligner key expression: {e:?}");
return;
}
};

tracing::debug!("Performing initial alignment with Replica < {replica_zid} >");

if let Err(e) = self
.spawn_query_replica_aligner(replica_aligner_ke, AlignmentQuery::All)
.await
{
tracing::error!("Error returned while performing the initial alignment: {e:?}");
}
}
AlignmentReply::Intervals(replica_intervals) => {
tracing::trace!("Processing `AlignmentReply::Intervals`");
let intervals_diff = {
Expand Down Expand Up @@ -616,18 +719,26 @@ impl Replication {
}
}

if matches!(
self.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
sample.into(),
replica_event.timestamp,
)
.await,
Ok(StorageInsertionResult::Outdated) | Err(_)
) {
// NOTE: This code can only be called with `action` set to `delete` on an initial
// alignment, in which case the Storage of the receiving Replica is empty => there
// is no need to actually call `storage.delete`.
//
// Outside of an initial alignment, the `delete` action will be performed at the
// step above, in `AlignmentReply::Events`.
if replica_event.action == SampleKind::Put
&& matches!(
self.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
sample.into(),
replica_event.timestamp,
)
.await,
Ok(StorageInsertionResult::Outdated) | Err(_)
)
{
return;
}

Expand Down
Loading