-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix send buffer misalignment issue #1307
Changes from 4 commits
899a744
c871ac7
a2e0249
fe6345c
ef2ba94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
use std::{ | ||
borrow::Borrow, | ||
cmp::min, | ||
fmt::Debug, | ||
marker::PhantomData, | ||
num::NonZeroUsize, | ||
|
@@ -249,35 +250,79 @@ impl<I: Debug> Stream for GatewaySendStream<I> { | |
|
||
impl SendChannelConfig { | ||
fn new<M: Message>(gateway_config: GatewayConfig, total_records: TotalRecords) -> Self { | ||
debug_assert!(M::Size::USIZE > 0, "Message size cannot be 0"); | ||
Self::new_with(gateway_config, total_records, M::Size::USIZE) | ||
} | ||
|
||
let record_size = M::Size::USIZE; | ||
let total_capacity = gateway_config.active.get() * record_size; | ||
Self { | ||
total_capacity: total_capacity.try_into().unwrap(), | ||
record_size: record_size.try_into().unwrap(), | ||
read_size: if total_records.is_indeterminate() | ||
|| gateway_config.read_size.get() <= record_size | ||
{ | ||
fn new_with( | ||
gateway_config: GatewayConfig, | ||
total_records: TotalRecords, | ||
record_size: usize, | ||
) -> Self { | ||
debug_assert!(record_size > 0, "Message size cannot be 0"); | ||
// The absolute minimum of capacity we reserve for this channel. We can't go | ||
// below that number, otherwise a deadlock is almost guaranteed. | ||
let min_capacity = gateway_config.active.get() * record_size; | ||
|
||
// first, compute the read size. It must be a multiple of `record_size` to prevent | ||
// misaligned reads and deadlocks. For indeterminate channels, read size must be | ||
// set to the size of one record, to trigger buffer flush on every write | ||
let read_size = | ||
if total_records.is_indeterminate() || gateway_config.read_size.get() <= record_size { | ||
record_size | ||
} else { | ||
std::cmp::min( | ||
total_capacity, | ||
// closest multiple of record_size to read_size | ||
// closest multiple of record_size to read_size | ||
let proposed_read_size = min( | ||
gateway_config.read_size.get() / record_size * record_size, | ||
) | ||
} | ||
.try_into() | ||
.unwrap(), | ||
min_capacity, | ||
); | ||
// if min capacity is not a multiple of read size. | ||
// we must adjust read size. Adjusting total capacity is not possible due to | ||
// possible deadlocks - it must be strictly aligned with active work. | ||
// read size goes in `record_size` increments to keep it aligned. | ||
// rem is aligned with both capacity and read_size, so subtracting | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is "rem"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be |
||
// it will keep read_size and capacity aligned | ||
// Here is an example how it may work: | ||
// lets say the active work is set to 10, record size is 512 bytes | ||
// and read size in gateway config is set to 2048 bytes (default value). | ||
// the math above will compute total_capacity to 5120 bytes and | ||
// proposed_read_size to 2048 because it is aligned with 512 record size. | ||
// Now, if we don't adjust then we have an issue as 5120 % 2048 = 1024 != 0. | ||
// Keeping read size like this will cause a deadlock, so we adjust it to | ||
// 1024. | ||
proposed_read_size - min_capacity % proposed_read_size | ||
}; | ||
|
||
// total capacity must be a multiple of both read size and record size. | ||
// Record size is easy to justify: misalignment here leads to either waste of memory | ||
// or deadlock on the last write. Aligning read size and total capacity | ||
// has the same reasoning behind it: reading less than total capacity | ||
// can leave the last chunk half-written and backpressure from active work | ||
// preventing the protocols to make further progress. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is too complicated. I worry that this code will become unmaintainable as this is so impenetrable that people simply will not understand it. I certainly don't understand it and we discussed it in person today and I've read this twice. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe the explanation is overly complicated here. All we need to say here is that all 3 parameters need to be aligned. |
||
let total_capacity = min_capacity / read_size * read_size; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
let this = Self { | ||
total_capacity: total_capacity.try_into().unwrap(), | ||
record_size: record_size.try_into().unwrap(), | ||
read_size: read_size.try_into().unwrap(), | ||
total_records, | ||
} | ||
}; | ||
|
||
// make sure we've set these values correctly. | ||
debug_assert_eq!(0, this.total_capacity.get() % this.read_size.get()); | ||
debug_assert_eq!(0, this.total_capacity.get() % this.record_size.get()); | ||
debug_assert!(this.total_capacity.get() >= this.read_size.get()); | ||
debug_assert!(this.total_capacity.get() >= this.record_size.get()); | ||
debug_assert!(this.read_size.get() >= this.record_size.get()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on the comments above, I think you also want to check that:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And there is another condition that the comments seem to suggest we should check, which is:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the addition of these two checks, I think this set of checks are equivalent to:
Where Can we just find
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The read size (2048) does not divide the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea, any prime number for active work makes this broken |
||
|
||
this | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod test { | ||
use std::num::NonZeroUsize; | ||
|
||
use proptest::proptest; | ||
use typenum::Unsigned; | ||
|
||
use crate::{ | ||
|
@@ -286,7 +331,7 @@ mod test { | |
Serializable, | ||
}, | ||
helpers::{gateway::send::SendChannelConfig, GatewayConfig, TotalRecords}, | ||
secret_sharing::SharedValue, | ||
secret_sharing::{Sendable, StdArray}, | ||
}; | ||
|
||
impl Default for SendChannelConfig { | ||
|
@@ -301,7 +346,7 @@ mod test { | |
} | ||
|
||
#[allow(clippy::needless_update)] // to allow progress_check_interval to be conditionally compiled | ||
fn send_config<V: SharedValue, const A: usize, const R: usize>( | ||
fn send_config<V: Sendable, const A: usize, const R: usize>( | ||
total_records: TotalRecords, | ||
) -> SendChannelConfig { | ||
let gateway_config = GatewayConfig { | ||
|
@@ -391,4 +436,62 @@ mod test { | |
.get() | ||
); | ||
} | ||
|
||
/// This test reproduces ipa/#1300. PRF evaluation sent 32*16 = 512 (`record_size` * vectorization) | ||
/// chunks through a channel with total capacity 5120 (active work = 10 records) and read size | ||
/// of 2048 bytes. | ||
/// The problem was that read size of 2048 does not divide 5120, so the last chunk was not sent. | ||
#[test] | ||
fn total_capacity_is_a_multiple_of_read_size() { | ||
let config = | ||
send_config::<StdArray<BA256, 16>, 10, 2048>(TotalRecords::specified(43).unwrap()); | ||
|
||
assert_eq!(0, config.total_capacity.get() % config.read_size.get()); | ||
assert_eq!(config.total_capacity.get(), 10 * config.record_size.get()); | ||
Comment on lines
+448
to
+451
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the number |
||
} | ||
|
||
fn ensure_config( | ||
total_records: Option<usize>, | ||
active: usize, | ||
read_size: usize, | ||
record_size: usize, | ||
) { | ||
let gateway_config = GatewayConfig { | ||
active: active.try_into().unwrap(), | ||
read_size: read_size.try_into().unwrap(), | ||
..Default::default() | ||
}; | ||
let config = SendChannelConfig::new_with( | ||
gateway_config, | ||
total_records.map_or(TotalRecords::Indeterminate, |v| { | ||
TotalRecords::specified(v).unwrap() | ||
}), | ||
record_size, | ||
); | ||
|
||
// total capacity checks | ||
assert!(config.total_capacity.get() > 0); | ||
assert!(config.total_capacity.get() >= record_size); | ||
assert!(config.total_capacity.get() <= record_size * active); | ||
assert!(config.total_capacity.get() >= config.read_size.get()); | ||
assert_eq!(0, config.total_capacity.get() % config.record_size.get()); | ||
|
||
// read size checks | ||
assert!(config.read_size.get() > 0); | ||
assert!(config.read_size.get() >= config.record_size.get()); | ||
assert_eq!(0, config.total_capacity.get() % config.read_size.get()); | ||
assert_eq!(0, config.read_size.get() % config.record_size.get()); | ||
Comment on lines
+473
to
+484
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are duplicated between the test and the actual code. I don't think there is a need for this duplication. Just leaving them in the function seems sufficient. |
||
} | ||
|
||
proptest! { | ||
#[test] | ||
fn config_prop( | ||
total_records in proptest::option::of(1_usize..1 << 32), | ||
active in 1_usize..100_000, | ||
read_size in 1_usize..32768, | ||
record_size in 1_usize..4096, | ||
) { | ||
ensure_config(total_records, active, read_size, record_size); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure this isn't supposed to be
max
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it probably shouldn't. If
read_size
goes abovecapacity
, then we will never read anything from that buffer because even when it is 100% full,read_size
is still larger, so HTTP will back off