-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix large volume test #7
base: main
Are you sure you want to change the base?
Fix large volume test #7
Conversation
…oid queue jumping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got some questions around the wait queue processing, fearing it might become too expensive, can you shed some light on that? :)
src/io.rs
Outdated
@@ -719,6 +721,16 @@ where | |||
|
|||
/// Handles a new item to send out that arrived through the incoming channel. | |||
fn handle_incoming_item(&mut self, item: QueuedItem) -> Result<(), LocalProtocolViolation> { | |||
// Process the wait queue to avoid this new item "jumping the queue". | |||
match &item { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1dff662 seems to add starvation protection, i.e. newer data cannot consistently get in front of existing data. Was this behavior observed to be problematic?
My core issue with this is that if we process the wait queue each time anyway, it might be better to not even check if we can bypass it and just put everything in the wait queue every time. However, processing the wait queue is expensive, especially if the previously mentioned change is made. Queuing messages will then result in quadratic complexity!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was problematic in that Alice's requests started timing out, as ones in the wait queue weren't processed since newer ones kept getting preferential treatment.
I did consider just dumping everything in the wait queue, however I had the same reservation as you about the cost (and it also seemed to be somewhat abusing the intent of the wait queue - it would at least need renamed for clarity I think if we did that).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the very least add QueuedItem::is_request
:)
This may be less of an issue if the "new" WaitQueue
(see above) is added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the very least add
QueuedItem::is_request
:)
I can do, but tbh I don't see why we'd want that or where we'd use it?
@@ -835,11 +849,6 @@ where | |||
self.wait_queue[channel.get() as usize].push_back(item); | |||
} else { | |||
self.send_to_ready_queue(item)?; | |||
|
|||
// No need to look further if we have saturated the channel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did 0253804#diff-76866598ce8fd16261a27ac58a84b2825e6e77fc37c163a6afa60f0f4477e569L852-L856 fix an issue? The code was supposed to bring down the potential
total complexity of processing the queue
times. What's the case that triggers this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wasn't an issue exposed via a test. Rather I thought it was a bug while following the logic during debugging.
The issue is that the wait queue can have not only requests but responses, so it would be wrong to exit early in the case where a bunch of responses could have been moved out of the wait queue.
As an aside, I wonder if it would be worthwhile creating a new enum just for the wait queue, similar to QueuedItem
but with only Request
and Response
variants?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an aside, I wonder if it would be worthwhile creating a new enum just for the wait queue, similar to QueuedItem but with only Request and Response variants?
My guess is that the problem is likely best solved with two separate queues, one for requests and one for large messages (although I am not entirely sure yet how to handle the case where a message is both large and a request). Alternatively, we should keep some sort of state to ensure it can distinguish these cases quickly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the code being as-is in this PR, I'm still uncomfortable with the situation. Imagine queuing single-frame messages at a very high rate. Once we have saturated the ready queue, they will all go into the wait queue, and every call will process the now-growing entire queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could consider adding this:
struct WaitSubQueue {
single_frame: VecDeque<QueuedItem>,
multi_frame: VecDeque<QueuedItem>,
}
struct WaitQueue {
requests: WaitSubQueue,
other: Vec<QueuedItem>,
prefer_request: bool,
}
impl WaitSubQueue {
#[inline(always)]
fn next_item(&mut self, allow_multi_frame: bool) -> Option<QueuedItem> {
if allow_multi_frame && !self.multi_frame.is_empty() {
self.multi_frame.pop_front()
} else {
self.singe_frame.pop_front()
}
}
}
impl WaitQueue {
pub fn next_item(
&mut self,
request_allowed: bool,
multiframe_allowed: bool,
) -> Option<QueuedItem> {
if request_allowed {
self.next_item_allowing_request(multiframe_allowed)
} else {
self.other.next_item()
}
}
/// Returns the next item, assuming a request is allowed.
// Note: This function is separate out for readability.
#[inline(always)]
fn next_item_allowing_request(&mut self, multiframe_allowed: bool) {
let candidate = if prefer_request {
self.requests
.next_item(multiframe_allowed)
.or_else(|| self.other.next_item(multiframe_allowed))
} else {
self.other
.next_item(multiframe_allowed)
.or_else(|| self.requests.next_item(multiframe_allowed))
}?;
// Alternate, to prevent starvation is receiver is procesing at a rate
// that matches our production rate. This essentially subdivides the
// channel into request/non-request subchannels.
self.prefer_request = !candidate.is_request();
Some(candidate)
}
}
Since the logic gets more complex, it would be wise to separate it out. This is just a sketch, at least some comments would need to be filled in.
The key idea is to know what kind of item we can produce next by checking the state of our multiframe sends and request limits, then use the separated queue to optimize. This reorders items that weren't reordered before by separating the queues.
This PR fixes the failing
run_large_volume_test_with_default_values_10_channels
.The root cause of the failure was a deadlock in the test itself, but during investigations a couple of other potential issues in the production code were identified and addressed. Each has been separated into an individual commit so it can be reverted if required.
Closes #4575.