Skip to content

Commit

Permalink
WIP: initial alignment on empty Storage
Browse files Browse the repository at this point in the history
- [x] Validate functionality with manual tests.
- [x] Refactoring, repetition of code:
  - [x] Generation of Digest subscriber.
  - [x] Generation of Replica Aligner.
- [ ] Documentation.
- [ ] Maybe split commit?

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Sep 25, 2024
1 parent 1188b52 commit 9410194
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 194 deletions.
173 changes: 111 additions & 62 deletions plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use super::{
/// hence directly skipping to the `SubIntervals` variant.
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) enum AlignmentQuery {
All,
Diff(DigestDiff),
Intervals(HashSet<IntervalIdx>),
SubIntervals(HashMap<IntervalIdx, HashSet<SubIntervalIdx>>),
Expand Down Expand Up @@ -101,6 +102,38 @@ impl Replication {
};

match alignment_query {
AlignmentQuery::All => {
tracing::trace!("Processing `AlignmentQuery::All`");

let idx_intervals = self
.replication_log
.read()
.await
.intervals
.keys()
.copied()
.collect::<Vec<_>>();

for interval_idx in idx_intervals {
let mut events_to_send = Vec::default();
if let Some(interval) = self
.replication_log
.read()
.await
.intervals
.get(&interval_idx)
{
interval.sub_intervals.values().for_each(|sub_interval| {
events_to_send.extend(sub_interval.events.values().map(Into::into));
});
}

// NOTE: As we took the lock in the `if let` block, it is released here,
// diminishing contention.

self.reply_events(&query, events_to_send).await;
}
}
AlignmentQuery::Diff(digest_diff) => {
tracing::trace!("Processing `AlignmentQuery::Diff`");
if digest_diff.cold_eras_differ {
Expand Down Expand Up @@ -262,6 +295,11 @@ impl Replication {
/// is the reason why we need the consolidation to set to be `None` (⚠️).
pub(crate) async fn reply_events(&self, query: &Query, events_to_retrieve: Vec<EventMetadata>) {
for event_metadata in events_to_retrieve {
if event_metadata.action == SampleKind::Delete {
reply_to_query(query, AlignmentReply::Retrieval(event_metadata), None).await;
continue;
}

let stored_data = {
let mut storage = self.storage.lock().await;
match storage.get(event_metadata.stripped_key.clone(), "").await {
Expand Down Expand Up @@ -306,6 +344,19 @@ impl Replication {
}
}

pub(crate) fn spawn_query_replica_aligner(
&self,
replica_aligner_ke: OwnedKeyExpr,
alignment_query: AlignmentQuery,
) {
let replication = self.clone();
tokio::task::spawn(async move {
replication
.query_replica_aligner(replica_aligner_ke, alignment_query)
.await;
});
}

/// Spawns a new task to query the Aligner of the Replica which potentially has data this
/// Storage is missing.
///
Expand All @@ -319,22 +370,20 @@ impl Replication {
/// information), spawning a new task.
///
/// This process is stateless and all the required information are carried in the query / reply.
pub(crate) fn spawn_query_replica_aligner(
pub(crate) async fn query_replica_aligner(
&self,
replica_aligner_ke: OwnedKeyExpr,
alignment_query: AlignmentQuery,
) {
let replication = self.clone();
tokio::task::spawn(async move {
let attachment = match bincode::serialize(&alignment_query) {
Ok(attachment) => attachment,
Err(e) => {
tracing::error!("Failed to serialize AlignmentQuery: {e:?}");
return;
}
};
let attachment = match bincode::serialize(&alignment_query) {
Ok(attachment) => attachment,
Err(e) => {
tracing::error!("Failed to serialize AlignmentQuery: {e:?}");
return;
}
};

match replication
match self
.zenoh_session
.get(Into::<Selector>::into(replica_aligner_ke.clone()))
.attachment(attachment)
Expand All @@ -346,51 +395,49 @@ impl Replication {
// reply. Hence the need to have no consolidation.
.consolidation(ConsolidationMode::None)
.await
{
Err(e) => {
tracing::error!("Failed to query Aligner < {replica_aligner_ke} >: {e:?}");
}
Ok(reply_receiver) => {
while let Ok(reply) = reply_receiver.recv_async().await {
let sample = match reply.into_result() {
Ok(sample) => sample,
{
Err(e) => {
tracing::error!("Failed to query Aligner < {replica_aligner_ke} >: {e:?}");
}
Ok(reply_receiver) => {
while let Ok(reply) = reply_receiver.recv_async().await {
let sample = match reply.into_result() {
Ok(sample) => sample,
Err(e) => {
tracing::warn!(
"Skipping reply to query to < {replica_aligner_ke} >: {e:?}"
);
continue;
}
};

let alignment_reply = match sample.attachment() {
None => {
tracing::debug!("Skipping reply without attachment");
continue;
}
Some(attachment) => match bincode::deserialize::<AlignmentReply>(
&attachment.into::<Cow<[u8]>>(),
) {
Err(e) => {
tracing::warn!(
"Skipping reply to query to < {replica_aligner_ke} >: {e:?}"
tracing::error!(
"Failed to deserialize attachment as AlignmentReply: {e:?}"
);
continue;
}
};

let alignment_reply = match sample.attachment() {
None => {
tracing::debug!("Skipping reply without attachment");
continue;
}
Some(attachment) => match bincode::deserialize::<AlignmentReply>(
&attachment.into::<Cow<[u8]>>(),
) {
Err(e) => {
tracing::error!(
"Failed to deserialize attachment as AlignmentReply: {e:?}"
);
continue;
}
Ok(alignment_reply) => alignment_reply,
},
};

replication
.process_alignment_reply(
replica_aligner_ke.clone(),
alignment_reply,
sample,
)
.await;
}
Ok(alignment_reply) => alignment_reply,
},
};

self.process_alignment_reply(
replica_aligner_ke.clone(),
alignment_reply,
sample,
)
.await;
}
}
});
}
}

/// Processes the [AlignmentReply] sent by the Replica that has potentially data this Storage is
Expand Down Expand Up @@ -616,18 +663,20 @@ impl Replication {
}
}

if matches!(
self.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
sample.into(),
replica_event.timestamp,
)
.await,
Ok(StorageInsertionResult::Outdated) | Err(_)
) {
if replica_event.action == SampleKind::Put
&& matches!(
self.storage
.lock()
.await
.put(
replica_event.stripped_key.clone(),
sample.into(),
replica_event.timestamp,
)
.await,
Ok(StorageInsertionResult::Outdated) | Err(_)
)
{
return;
}

Expand Down
Loading

0 comments on commit 9410194

Please sign in to comment.