Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage Replication Align Queryable improvements #886

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 30 additions & 58 deletions plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::collections::{BTreeSet, HashMap, HashSet};
use std::str;
use std::str::FromStr;
use zenoh::prelude::r#async::*;
use zenoh::queryable::Query;
use zenoh::time::Timestamp;
use zenoh::Session;

Expand Down Expand Up @@ -89,87 +90,58 @@ impl AlignQueryable {
diff_required
);
if diff_required.is_some() {
let values = self.get_value(diff_required.unwrap()).await;
log::trace!("[ALIGN QUERYABLE] value for the query is {:?}", values);
for value in values {
match value {
AlignData::Interval(i, c) => {
let sample = Sample::new(
query.key_expr().clone(),
serde_json::to_string(&(i, c)).unwrap(),
);
query.reply(Ok(sample)).res().await.unwrap();
}
AlignData::Subinterval(i, c) => {
let sample = Sample::new(
query.key_expr().clone(),
serde_json::to_string(&(i, c)).unwrap(),
);
query.reply(Ok(sample)).res().await.unwrap();
}
AlignData::Content(i, c) => {
let sample = Sample::new(
query.key_expr().clone(),
serde_json::to_string(&(i, c)).unwrap(),
);
query.reply(Ok(sample)).res().await.unwrap();
}
AlignData::Data(k, (v, ts)) => {
let sample = Sample::new(k, v).with_timestamp(ts);
query.reply(Ok(sample)).res().await.unwrap();
}
}
}
self.reply_diff(diff_required.unwrap(), query).await;
}
}
}

async fn get_value(&self, diff_required: AlignComponent) -> Vec<AlignData> {
async fn reply_diff(&self, diff_required: AlignComponent, query: Query) {
let reply = |value| async {
let ke = query.key_expr().clone();
let sample = match value {
AlignData::Content(i, c) => {
Sample::new(ke, serde_json::to_string(&(i, c)).unwrap())
}

AlignData::Subinterval(i, c) | AlignData::Interval(i, c) => {
Sample::new(ke, serde_json::to_string(&(i, c)).unwrap())
}
AlignData::Data(k, (v, ts)) => Sample::new(k, v).with_timestamp(ts),
};
query.reply(Ok(sample)).res().await.unwrap();
};

// TODO: Discuss if having timestamp is useful
match diff_required {
AlignComponent::Era(era) => {
let intervals = self.get_intervals(&era).await;
let mut result = Vec::new();
for (i, c) in intervals {
result.push(AlignData::Interval(i, c));
for (i, c) in self.get_intervals(&era).await {
reply(AlignData::Interval(i, c)).await;
}
result
}
AlignComponent::Intervals(intervals) => {
let mut subintervals = HashMap::new();
for each in intervals {
subintervals.extend(self.get_subintervals(each).await);
}
let mut result = Vec::new();
for (i, c) in subintervals {
result.push(AlignData::Subinterval(i, c));
for (i, c) in self.get_subintervals(each).await {
reply(AlignData::Subinterval(i, c)).await;
}
}
result
}
AlignComponent::Subintervals(subintervals) => {
let mut content = HashMap::new();
for each in subintervals {
content.extend(self.get_content(each).await);
}
let mut result = Vec::new();
for (i, c) in content {
result.push(AlignData::Content(i, c));
for (i, c) in self.get_content(each).await {
reply(AlignData::Content(i, c)).await;
}
}
result
}
AlignComponent::Contents(contents) => {
let mut result = Vec::new();
for each in contents {
let entry = self.get_entry(&each).await;
if entry.is_some() {
let entry = entry.unwrap();
result.push(AlignData::Data(
if let Some(entry) = self.get_entry(&each).await {
reply(AlignData::Data(
OwnedKeyExpr::from(entry.key_expr),
(entry.value, each.timestamp),
));
))
.await;
}
}
result
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions plugins/zenoh-plugin-storage-manager/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl Replica {

// Create channels for communication between components
// channel to queue digests to be aligned
let (tx_digest, rx_digest) = flume::unbounded();
let (tx_digest, rx_digest) = flume::bounded(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chachi Could you give us more insight as to this value of 10? Was it providing a good trade-off when you tried the replication for your use-case?

In all cases, hard-coding a value is not something we are particularly keen on doing (there are rare cases of "one size fits all"). It should instead be part of the configuration of the replication.

If you want to add to your PR I would happily review, otherwise I will create a dedicated issue such that I can address this when I will rework the replication.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is entirely a speculative first stab at an improvement. I certainly agree that a configurable value would be better than hardcoding anything.

Truthfully, it feels like it would be better for this buffer management to be done entirely at the Zenoh level so that the backpressure happens when a Publisher tries to send its digest and it gets dropped because there's no space to receive.

Even better, frankly, would be to remove this middle queue entirely and just have all the digest processing happen on recv from the digest-sub. I'm not entirely sure what the value of doing just the JSON parsing and is_processed checking separately from the rest of the handling is.

// channel for aligner to send missing samples to storage
let (tx_sample, rx_sample) = flume::unbounded();
// channel for storage to send logging information back
Expand Down Expand Up @@ -247,9 +247,12 @@ impl Replica {
.await;
if to_be_processed {
log::trace!("[DIGEST_SUB] sending {} to aligner", digest.checksum);
match tx.send_async((from.to_string(), digest)).await {
match tx.try_send((from.to_string(), digest)) {
Ok(()) => {}
Err(e) => log::error!("[DIGEST_SUB] Error sending digest to aligner: {}", e),
Err(e) => {
// Trace because this can happen _a lot_ on busy channels.
log::trace!("[DIGEST_SUB] Error sending digest to aligner: {}", e)
}
Comment on lines +250 to +255
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to further investigate the implications of this change. As of today, I do not know if skipping digests could have unforeseen consequences on the replication (my first guess is that it doesn't but I want to make sure).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, it's not a small change. Ultimately this system needs to have some sort of backpressure and dropping because as a network of storages grows, if anything is out of sync it's impossible to parse and process every digest w/o dropping.

}
};
received.insert(from.to_string(), ts);
Expand Down