-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Storage Replication Align Queryable improvements #886
Conversation
@@ -119,7 +119,7 @@ impl Replica { | |||
|
|||
// Create channels for communication between components | |||
// channel to queue digests to be aligned | |||
let (tx_digest, rx_digest) = flume::unbounded(); | |||
let (tx_digest, rx_digest) = flume::bounded(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chachi Could you give us more insight as to this value of 10
? Was it providing a good trade-off when you tried the replication for your use-case?
In all cases, hard-coding a value is not something we are particularly keen on doing (there are rare cases of "one size fits all"). It should instead be part of the configuration of the replication.
If you want to add to your PR I would happily review, otherwise I will create a dedicated issue such that I can address this when I will rework the replication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is entirely a speculative first stab at an improvement. I certainly agree that a configurable value would be better than hardcoding anything.
Truthfully, it feels like it would be better for this buffer management to be done entirely at the Zenoh level so that the backpressure happens when a Publisher tries to send its digest and it gets dropped because there's no space to receive.
Even better, frankly, would be to remove this middle queue entirely and just have all the digest processing happen on recv from the digest-sub. I'm not entirely sure what the value of doing just the JSON parsing and is_processed
checking separately from the rest of the handling is.
match tx.try_send((from.to_string(), digest)) { | ||
Ok(()) => {} | ||
Err(e) => log::error!("[DIGEST_SUB] Error sending digest to aligner: {}", e), | ||
Err(e) => { | ||
// Trace because this can happen _a lot_ on busy channels. | ||
log::trace!("[DIGEST_SUB] Error sending digest to aligner: {}", e) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to further investigate the implications of this change. As of today, I do not know if skipping digests could have unforeseen consequences on the replication (my first guess is that it doesn't but I want to make sure).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, it's not a small change. Ultimately this system needs to have some sort of backpressure and dropping because as a network of storages grows, if anything is out of sync it's impossible to parse and process every digest w/o dropping.
Overall, this is more of a proof-of-concept PR than anything else. @J-Loudet if you'd like to just close it and make an issue, feel free to. |
See #937 |
2 commits:
#1 is definitely valuable for long queries to make incremental progress. #2 is a bit less obvious if it's what y'all desire upstream. I'd strongly suggest something that's not unbounded as that's a recipe for disaster, but the sizing of that is where I don't know what y'all are thinking.