Skip to content

Commit

Permalink
refactor(storage-manager): dedicated method to query the Aligner
Browse files Browse the repository at this point in the history
This commit anticipates the need of the initial alignment: the logic to
query the Aligner of a Replica was placed in a method to not necessarily
spawn a task whenever doing so.

* plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs:
  - Moved the logic to query the Aligner of a Replica in the method
    `query_replica_aligner`. This method spawns no task.
  - The method `spawn_query_replica_aligner` now simply spawns a task
    and calls `query_replica_aligner`.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Sep 25, 2024
1 parent e574ce1 commit ac40add
Showing 1 changed file with 63 additions and 50 deletions.
113 changes: 63 additions & 50 deletions plugins/zenoh-plugin-storage-manager/src/replication/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,23 @@ impl Replication {
/// Spawns a new task to query the Aligner of the Replica which potentially has data this
/// Storage is missing.
///
/// This method is for convenience to avoid having to repeat the code clone the Replication
/// structure and spawning a new task.
pub(crate) fn spawn_query_replica_aligner(
&self,
replica_aligner_ke: OwnedKeyExpr,
alignment_query: AlignmentQuery,
) {
let replication = self.clone();
tokio::task::spawn(async move {
replication
.query_replica_aligner(replica_aligner_ke, alignment_query)
.await;
});
}

/// Query the Aligner of the Replica which potentially has data this Storage is missing.
///
/// This method will:
/// 1. Serialise the AlignmentQuery.
/// 2. Send a Query to the Aligner of the Replica, adding the serialised AlignmentQuery as an
Expand All @@ -319,22 +336,20 @@ impl Replication {
/// information), spawning a new task.
///
/// This process is stateless and all the required information are carried in the query / reply.
pub(crate) fn spawn_query_replica_aligner(
pub(crate) async fn query_replica_aligner(
&self,
replica_aligner_ke: OwnedKeyExpr,
alignment_query: AlignmentQuery,
) {
let replication = self.clone();
tokio::task::spawn(async move {
let attachment = match bincode::serialize(&alignment_query) {
Ok(attachment) => attachment,
Err(e) => {
tracing::error!("Failed to serialize AlignmentQuery: {e:?}");
return;
}
};
let attachment = match bincode::serialize(&alignment_query) {
Ok(attachment) => attachment,
Err(e) => {
tracing::error!("Failed to serialize AlignmentQuery: {e:?}");
return;
}
};

match replication
match self
.zenoh_session
.get(Into::<Selector>::into(replica_aligner_ke.clone()))
.attachment(attachment)
Expand All @@ -346,51 +361,49 @@ impl Replication {
// reply. Hence the need to have no consolidation.
.consolidation(ConsolidationMode::None)
.await
{
Err(e) => {
tracing::error!("Failed to query Aligner < {replica_aligner_ke} >: {e:?}");
}
Ok(reply_receiver) => {
while let Ok(reply) = reply_receiver.recv_async().await {
let sample = match reply.into_result() {
Ok(sample) => sample,
{
Err(e) => {
tracing::error!("Failed to query Aligner < {replica_aligner_ke} >: {e:?}");
}
Ok(reply_receiver) => {
while let Ok(reply) = reply_receiver.recv_async().await {
let sample = match reply.into_result() {
Ok(sample) => sample,
Err(e) => {
tracing::warn!(
"Skipping reply to query to < {replica_aligner_ke} >: {e:?}"
);
continue;
}
};

let alignment_reply = match sample.attachment() {
None => {
tracing::debug!("Skipping reply without attachment");
continue;
}
Some(attachment) => match bincode::deserialize::<AlignmentReply>(
&attachment.into::<Cow<[u8]>>(),
) {
Err(e) => {
tracing::warn!(
"Skipping reply to query to < {replica_aligner_ke} >: {e:?}"
tracing::error!(
"Failed to deserialize attachment as AlignmentReply: {e:?}"
);
continue;
}
};

let alignment_reply = match sample.attachment() {
None => {
tracing::debug!("Skipping reply without attachment");
continue;
}
Some(attachment) => match bincode::deserialize::<AlignmentReply>(
&attachment.into::<Cow<[u8]>>(),
) {
Err(e) => {
tracing::error!(
"Failed to deserialize attachment as AlignmentReply: {e:?}"
);
continue;
}
Ok(alignment_reply) => alignment_reply,
},
};

replication
.process_alignment_reply(
replica_aligner_ke.clone(),
alignment_reply,
sample,
)
.await;
}
Ok(alignment_reply) => alignment_reply,
},
};

self.process_alignment_reply(
replica_aligner_ke.clone(),
alignment_reply,
sample,
)
.await;
}
}
});
}
}

/// Processes the [AlignmentReply] sent by the Replica that has potentially data this Storage is
Expand Down

0 comments on commit ac40add

Please sign in to comment.