From e259e5b9b97c04ca05244e91070460cd2576e01b Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Tue, 1 Oct 2024 22:07:23 -0700 Subject: [PATCH 01/11] Enforce active work to be a power of two This is the second attempt to mitigate send buffer misalignment. Previous one (#1307) didn't handle all the edge cases and was abandoned in favour of this PR. What I believe makes this change work is the new requirement for active work to be a power of two. With this constraint, it is much easier to align the read size with it. Given that `total_capacity = active * record_size`, we can represent `read_size` as a multiple of `record_size` too: `read_size = X * record_size`. If X is a power of two and active_work is a power of two, then they will always be aligned with each other. For example, if active work is 16, read size is 10 bytes and record size is 3 bytes, then: ``` total_capacity = 16*3 read_size = X*3 (close to 10) X = 2 (power of two that satisfies the requirement) ``` when picking up the read size, we are rounding down to avoid buffer overflows. In the example above, setting X=3 would make it closer to the desired read size, but it is greater than 10, so we pick 2 instead. --- ipa-core/src/helpers/gateway/mod.rs | 211 ++++++++++++++++++++++++++- ipa-core/src/helpers/gateway/send.rs | 103 +++++++++++-- 2 files changed, 296 insertions(+), 18 deletions(-) diff --git a/ipa-core/src/helpers/gateway/mod.rs b/ipa-core/src/helpers/gateway/mod.rs index e654f85f7..55d1b1ffc 100644 --- a/ipa-core/src/helpers/gateway/mod.rs +++ b/ipa-core/src/helpers/gateway/mod.rs @@ -73,6 +73,7 @@ pub struct State { pub struct GatewayConfig { /// The number of items that can be active at the one time. /// This is used to determine the size of sending and receiving buffers. + /// Any value that is not a power of two will be rejected pub active: NonZeroUsize, /// Number of bytes packed and sent together in one batch down to the network layer. This @@ -84,6 +85,10 @@ pub struct GatewayConfig { /// payload may not be exactly this, but it will be the closest multiple of record size to this /// number. For instance, having 14 bytes records and batch size of 4096 will result in /// 4088 bytes being sent in a batch. + /// + /// The actual size for read chunks may be bigger or smaller, depending on the record size + /// sent through each channel. Read size will be aligned with [`Self::active_work`] value to + /// prevent deadlocks. pub read_size: NonZeroUsize, /// Time to wait before checking gateway progress. If no progress has been made between @@ -279,7 +284,8 @@ impl GatewayConfig { // capabilities (see #ipa/1171) to allow that currently. usize::from(value.size), ), - ); + ) + .next_power_of_two(); // we set active to be at least 2, so unwrap is fine. self.active = NonZeroUsize::new(active).unwrap(); } @@ -299,23 +305,35 @@ mod tests { use std::{ iter::{repeat, zip}, num::NonZeroUsize, + sync::Arc, }; use futures::{ future::{join, try_join, try_join_all}, + stream, stream::StreamExt, }; + use proptest::proptest; use crate::{ - ff::{boolean_array::BA3, Fp31, Fp32BitPrime, Gf2, U128Conversions}, + ff::{ + boolean_array::{BA20, BA256, BA3, BA4, BA5, BA6, BA7, BA8}, + FieldType, Fp31, Fp32BitPrime, Gf2, U128Conversions, + }, helpers::{ - ChannelId, Direction, GatewayConfig, MpcMessage, Role, SendingEnd, TotalRecords, + gateway::QueryConfig, + query::{QuerySize, QueryType}, + ChannelId, Direction, GatewayConfig, MpcMessage, MpcReceivingEnd, Role, SendingEnd, + TotalRecords, }, protocol::{ context::{Context, ShardedContext}, Gate, RecordId, }, - secret_sharing::{replicated::semi_honest::AdditiveShare, SharedValue}, + secret_sharing::{ + replicated::semi_honest::AdditiveShare, SharedValue, SharedValueArray, StdArray, + }, + seq_join::seq_join, sharding::ShardConfiguration, test_executor::run, test_fixture::{Reconstruct, Runner, TestWorld, TestWorldConfig, WithShards}, @@ -569,6 +587,87 @@ mod tests { }); } + macro_rules! send_recv_test { + ( + message: $message:expr, + read_size: $read_size:expr, + active_work: $active_work:expr, + total_records: $total_records:expr, + $test_fn: ident + ) => { + #[test] + fn $test_fn() { + run(|| async { + send_recv($read_size, $active_work, $total_records, $message).await; + }); + } + }; + } + + send_recv_test! { + message: BA20::ZERO, + read_size: 5, + active_work: 8, + total_records: 25, + test_ba20_5_10_25 + } + + send_recv_test! { + message: StdArray::::ZERO_ARRAY, + read_size: 2048, + active_work: 16, + total_records: 43, + test_ba256_by_16_2048_10_43 + } + + send_recv_test! { + message: StdArray::::ZERO_ARRAY, + read_size: 2048, + active_work: 32, + total_records: 50, + test_ba8_by_16_2048_37_50 + } + + proptest! { + #[test] + fn send_recv_randomized( + total_records in 1_usize..500, + active in 2_usize..1000, + read_size in (1_usize..32768), + record_size in 1_usize..=8, + ) { + let active = active.next_power_of_two(); + run(move || async move { + match record_size { + 1 => send_recv(read_size, active, total_records, StdArray::::ZERO_ARRAY).await, + 2 => send_recv(read_size, active, total_records, StdArray::::ZERO_ARRAY).await, + 3 => send_recv(read_size, active, total_records, BA3::ZERO).await, + 4 => send_recv(read_size, active, total_records, BA4::ZERO).await, + 5 => send_recv(read_size, active, total_records, BA5::ZERO).await, + 6 => send_recv(read_size, active, total_records, BA6::ZERO).await, + 7 => send_recv(read_size, active, total_records, BA7::ZERO).await, + 8 => send_recv(read_size, active, total_records, StdArray::::ZERO_ARRAY).await, + _ => unreachable!(), + } + }); + } + } + + /// ensures when active work is set from query input, it is always a power of two + #[test] + fn gateway_config_active_work_power_of_two() { + let mut config = GatewayConfig { + active: 2.try_into().unwrap(), + ..Default::default() + }; + config.set_active_work_from_query_config(&QueryConfig { + size: QuerySize::try_from(5).unwrap(), + field_type: FieldType::Fp31, + query_type: QueryType::TestAddInPrimeField, + }); + assert_eq!(8, config.active_work().get()); + } + async fn shard_comms_test(test_world: &TestWorld>) { let input = vec![BA3::truncate_from(0_u32), BA3::truncate_from(1_u32)]; @@ -606,4 +705,108 @@ mod tests { let world_ptr = world as *mut _; (world, world_ptr) } + + /// This serves the purpose of randomized testing of our send channels by providing + /// variable sizes for read size, active work and record size + async fn send_recv(read_size: usize, active_work: usize, total_records: usize, sample: M) + where + M: MpcMessage + Clone + PartialEq, + { + fn duplex_channel( + world: &TestWorld, + left: Role, + right: Role, + total_records: usize, + active_work: usize, + ) -> (SendingEnd, MpcReceivingEnd) { + ( + world.gateway(left).get_mpc_sender::( + &ChannelId::new(right, Gate::default()), + TotalRecords::specified(total_records).unwrap(), + active_work.try_into().unwrap(), + ), + world + .gateway(right) + .get_mpc_receiver::(&ChannelId::new(left, Gate::default())), + ) + } + + async fn circuit( + send_channel: SendingEnd, + recv_channel: MpcReceivingEnd, + active_work: usize, + total_records: usize, + msg: M, + ) where + M: MpcMessage + Clone + PartialEq, + { + let send_notify = Arc::new(tokio::sync::Notify::new()); + + // perform "multiplication-like" operation (send + subsequent receive) + // and "validate": block the future until we have at least `active_work` + // futures pending and unblock them all at the same time + seq_join( + active_work.try_into().unwrap(), + stream::iter(std::iter::repeat(msg).take(total_records).enumerate()).map( + |(record_id, msg)| { + let send_channel = &send_channel; + let recv_channel = &recv_channel; + let send_notify = Arc::clone(&send_notify); + async move { + send_channel + .send(record_id.into(), msg.clone()) + .await + .unwrap(); + let r = recv_channel.receive(record_id.into()).await.unwrap(); + // this simulates validate_record API by forcing futures to wait + // until the entire batch is validated by the last future in that batch + if record_id % active_work == active_work - 1 + || record_id == total_records - 1 + { + send_notify.notify_waiters(); + } else { + send_notify.notified().await; + } + assert_eq!(msg, r); + } + }, + ), + ) + .collect::>() + .await; + } + + let config = TestWorldConfig { + gateway_config: GatewayConfig { + active: active_work.try_into().unwrap(), + read_size: read_size.try_into().unwrap(), + ..Default::default() + }, + ..Default::default() + }; + + let world = TestWorld::new_with(&config); + let (h1_send_channel, h1_recv_channel) = + duplex_channel(&world, Role::H1, Role::H2, total_records, active_work); + let (h2_send_channel, h2_recv_channel) = + duplex_channel(&world, Role::H2, Role::H1, total_records, active_work); + + join( + circuit( + h1_send_channel, + h1_recv_channel, + active_work, + total_records, + sample.clone(), + ), + circuit( + h2_send_channel, + h2_recv_channel, + active_work, + total_records, + sample, + ), + ) + .await; + } } diff --git a/ipa-core/src/helpers/gateway/send.rs b/ipa-core/src/helpers/gateway/send.rs index 07018fb14..dab292353 100644 --- a/ipa-core/src/helpers/gateway/send.rs +++ b/ipa-core/src/helpers/gateway/send.rs @@ -248,28 +248,50 @@ impl Stream for GatewaySendStream { impl SendChannelConfig { fn new(gateway_config: GatewayConfig, total_records: TotalRecords) -> Self { - debug_assert!(M::Size::USIZE > 0, "Message size cannot be 0"); + Self::new_with(gateway_config, total_records, M::Size::USIZE) + } + fn new_with( + gateway_config: GatewayConfig, + total_records: TotalRecords, + record_size: usize, + ) -> Self { + debug_assert!(record_size > 0, "Message size cannot be 0"); + debug_assert!( + gateway_config.active.is_power_of_two(), + "Active work {} must be a power of two", + gateway_config.active.get() + ); - let record_size = M::Size::USIZE; let total_capacity = gateway_config.active.get() * record_size; - Self { + // define read size as a multiplier of record size. The multiplier must be + // a power of two to align perfectly with total capacity. + let read_size_multiplier = { + // next_power_of_two returns a value that is greater than or equal to. + // That is not what we want here: if read_size / record_size is a power + // of two, then subsequent division will get us further away from desired target. + // For example: if read_size / record_size = 4, then prev_power_of_two = 2. + // In such cases, we want to stay where we are, so we add +1 for that. + let prev_power_of_two = + (gateway_config.read_size.get() / record_size + 1).next_power_of_two() / 2; + std::cmp::max(1, prev_power_of_two) + }; + + let this = Self { total_capacity: total_capacity.try_into().unwrap(), record_size: record_size.try_into().unwrap(), - read_size: if total_records.is_indeterminate() - || gateway_config.read_size.get() <= record_size - { + read_size: if total_records.is_indeterminate() { record_size } else { - std::cmp::min( - total_capacity, - // closest multiple of record_size to read_size - gateway_config.read_size.get() / record_size * record_size, - ) + std::cmp::min(total_capacity, read_size_multiplier * record_size) } .try_into() .unwrap(), total_records, - } + }; + + debug_assert!(this.total_capacity.get() >= record_size * gateway_config.active.get()); + + this } } @@ -277,6 +299,7 @@ impl SendChannelConfig { mod test { use std::num::NonZeroUsize; + use proptest::proptest; use typenum::Unsigned; use crate::{ @@ -379,15 +402,67 @@ mod test { fn config_read_size_closest_multiple_to_record_size() { assert_eq!( 6, - send_config::(TotalRecords::Specified(2.try_into().unwrap())) + send_config::(TotalRecords::Specified(2.try_into().unwrap())) .read_size .get() ); assert_eq!( 6, - send_config::(TotalRecords::Specified(2.try_into().unwrap())) + send_config::(TotalRecords::Specified(2.try_into().unwrap())) .read_size .get() ); } + + #[test] + fn config_read_size_record_size_misalignment() { + ensure_config(Some(15), 90, 16, 3); + } + + fn ensure_config( + total_records: Option, + active: usize, + read_size: usize, + record_size: usize, + ) { + let gateway_config = GatewayConfig { + active: active.next_power_of_two().try_into().unwrap(), + read_size: read_size.try_into().unwrap(), + // read_size: read_size.next_power_of_two().try_into().unwrap(), + ..Default::default() + }; + let config = SendChannelConfig::new_with( + gateway_config, + total_records.map_or(TotalRecords::Indeterminate, |v| { + TotalRecords::specified(v).unwrap() + }), + record_size, + ); + + // total capacity checks + assert!(config.total_capacity.get() > 0); + assert!(config.total_capacity.get() >= config.read_size.get()); + assert_eq!(0, config.total_capacity.get() % config.record_size.get()); + assert_eq!( + config.total_capacity.get(), + record_size * gateway_config.active.get() + ); + + // read size checks + assert!(config.read_size.get() > 0); + assert!(config.read_size.get() >= config.record_size.get()); + assert_eq!(0, config.total_capacity.get() % config.read_size.get()); + } + + proptest! { + #[test] + fn config_prop( + total_records in proptest::option::of(1_usize..1 << 32), + active in 1_usize..100_000, + read_size in 1_usize..32768, + record_size in 1_usize..4096, + ) { + ensure_config(total_records, active, read_size, record_size); + } + } } From d9a29f3483cf5d2b84b8cdeab78c1c0ff4cb9d15 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Wed, 2 Oct 2024 12:50:55 -0700 Subject: [PATCH 02/11] Add a type that enforces power of two constraint While working on changing the gateway and parameters, I ran into several issues where the power of two constraint was not enforced and breakages were hard to find. A better model for me is to gate the active work at the config level, prohibiting invalid constructions at the caller side. --- ipa-core/src/app.rs | 7 +- ipa-core/src/helpers/gateway/mod.rs | 29 +++-- ipa-core/src/helpers/gateway/send.rs | 28 +++-- .../src/helpers/gateway/stall_detection.rs | 4 +- ipa-core/src/helpers/prss_protocol.rs | 4 +- .../src/protocol/context/dzkp_malicious.rs | 7 +- ipa-core/src/protocol/context/malicious.rs | 7 +- ipa-core/src/protocol/context/mod.rs | 9 +- ipa-core/src/query/processor.rs | 6 +- ipa-core/src/test_fixture/circuit.rs | 10 +- ipa-core/src/utils/mod.rs | 5 + ipa-core/src/utils/power_of_two.rs | 110 ++++++++++++++++++ 12 files changed, 185 insertions(+), 41 deletions(-) create mode 100644 ipa-core/src/utils/power_of_two.rs diff --git a/ipa-core/src/app.rs b/ipa-core/src/app.rs index da56e67e3..f84aed06e 100644 --- a/ipa-core/src/app.rs +++ b/ipa-core/src/app.rs @@ -1,4 +1,4 @@ -use std::{num::NonZeroUsize, sync::Weak}; +use std::sync::Weak; use async_trait::async_trait; @@ -13,17 +13,18 @@ use crate::{ protocol::QueryId, query::{NewQueryError, QueryProcessor, QueryStatus}, sync::Arc, + utils::NonZeroU32PowerOfTwo, }; #[derive(Default)] pub struct AppConfig { - active_work: Option, + active_work: Option, key_registry: Option>, } impl AppConfig { #[must_use] - pub fn with_active_work(mut self, active_work: Option) -> Self { + pub fn with_active_work(mut self, active_work: Option) -> Self { self.active_work = active_work; self } diff --git a/ipa-core/src/helpers/gateway/mod.rs b/ipa-core/src/helpers/gateway/mod.rs index 55d1b1ffc..15d2580d2 100644 --- a/ipa-core/src/helpers/gateway/mod.rs +++ b/ipa-core/src/helpers/gateway/mod.rs @@ -30,6 +30,7 @@ use crate::{ protocol::QueryId, sharding::ShardIndex, sync::{Arc, Mutex}, + utils::NonZeroU32PowerOfTwo, }; /// Alias for the currently configured transport. @@ -73,8 +74,7 @@ pub struct State { pub struct GatewayConfig { /// The number of items that can be active at the one time. /// This is used to determine the size of sending and receiving buffers. - /// Any value that is not a power of two will be rejected - pub active: NonZeroUsize, + pub active: NonZeroU32PowerOfTwo, /// Number of bytes packed and sent together in one batch down to the network layer. This /// shouldn't be too small to keep the network throughput, but setting it large enough may @@ -155,7 +155,7 @@ impl Gateway { &self, channel_id: &HelperChannelId, total_records: TotalRecords, - active_work: NonZeroUsize, + active_work: NonZeroU32PowerOfTwo, ) -> send::SendingEnd { let transport = &self.transports.mpc; let channel = self.inner.mpc_senders.get::( @@ -265,6 +265,11 @@ impl GatewayConfig { /// The configured amount of active work. #[must_use] pub fn active_work(&self) -> NonZeroUsize { + self.active.to_non_zero_usize() + } + + #[must_use] + pub fn active_work_as_power_of_two(&self) -> NonZeroU32PowerOfTwo { self.active } @@ -287,12 +292,12 @@ impl GatewayConfig { ) .next_power_of_two(); // we set active to be at least 2, so unwrap is fine. - self.active = NonZeroUsize::new(active).unwrap(); + self.active = NonZeroU32PowerOfTwo::try_from(active).unwrap(); } /// Creates a new configuration by overriding the value of active work. #[must_use] - pub fn set_active_work(&self, active_work: NonZeroUsize) -> Self { + pub fn set_active_work(&self, active_work: NonZeroU32PowerOfTwo) -> Self { Self { active: active_work, ..*self @@ -304,7 +309,6 @@ impl GatewayConfig { mod tests { use std::{ iter::{repeat, zip}, - num::NonZeroUsize, sync::Arc, }; @@ -337,6 +341,7 @@ mod tests { sharding::ShardConfiguration, test_executor::run, test_fixture::{Reconstruct, Runner, TestWorld, TestWorldConfig, WithShards}, + utils::NonZeroU32PowerOfTwo, }; /// Verifies that [`Gateway`] send buffer capacity is adjusted to the message size. @@ -556,13 +561,19 @@ mod tests { run(|| async move { let world = TestWorld::new_with(TestWorldConfig { gateway_config: GatewayConfig { - active: 5.try_into().unwrap(), + active: 8.try_into().unwrap(), ..Default::default() }, ..Default::default() }); - let new_active_work = NonZeroUsize::new(3).unwrap(); - assert!(new_active_work < world.gateway(Role::H1).config().active_work()); + let new_active_work = NonZeroU32PowerOfTwo::try_from(4).unwrap(); + assert!( + new_active_work + < world + .gateway(Role::H1) + .config() + .active_work_as_power_of_two() + ); let sender = world.gateway(Role::H1).get_mpc_sender::( &ChannelId::new(Role::H2, Gate::default()), TotalRecords::specified(15).unwrap(), diff --git a/ipa-core/src/helpers/gateway/send.rs b/ipa-core/src/helpers/gateway/send.rs index dab292353..de9dcabd5 100644 --- a/ipa-core/src/helpers/gateway/send.rs +++ b/ipa-core/src/helpers/gateway/send.rs @@ -255,12 +255,7 @@ impl SendChannelConfig { total_records: TotalRecords, record_size: usize, ) -> Self { - debug_assert!(record_size > 0, "Message size cannot be 0"); - debug_assert!( - gateway_config.active.is_power_of_two(), - "Active work {} must be a power of two", - gateway_config.active.get() - ); + assert!(record_size > 0, "Message size cannot be 0"); let total_capacity = gateway_config.active.get() * record_size; // define read size as a multiplier of record size. The multiplier must be @@ -289,7 +284,8 @@ impl SendChannelConfig { total_records, }; - debug_assert!(this.total_capacity.get() >= record_size * gateway_config.active.get()); + assert!(this.total_capacity.get() >= record_size * gateway_config.active.get()); + assert_eq!(0, this.total_capacity.get() % this.read_size.get()); this } @@ -304,7 +300,7 @@ mod test { use crate::{ ff::{ - boolean_array::{BA16, BA20, BA256, BA3, BA7}, + boolean_array::{BA16, BA20, BA256, BA3, BA32, BA7}, Serializable, }, helpers::{gateway::send::SendChannelConfig, GatewayConfig, TotalRecords}, @@ -419,6 +415,21 @@ mod test { ensure_config(Some(15), 90, 16, 3); } + #[test] + fn config_read_size_multiple_of_record_size() { + // 4 bytes * 8 = 32 bytes total capacity. + // desired read size is 15 bytes, and the closest multiple of BA32 + // to it that is a power of two is 2 (4 gets us over 15 byte target) + assert_eq!(8, send_config::(50.into()).read_size.get()); + + // here, read size is already a power of two + assert_eq!(16, send_config::(50.into()).read_size.get()); + + // read size can be ridiculously small, config adjusts it to fit + // at least one record + assert_eq!(3, send_config::(50.into()).read_size.get()); + } + fn ensure_config( total_records: Option, active: usize, @@ -428,7 +439,6 @@ mod test { let gateway_config = GatewayConfig { active: active.next_power_of_two().try_into().unwrap(), read_size: read_size.try_into().unwrap(), - // read_size: read_size.next_power_of_two().try_into().unwrap(), ..Default::default() }; let config = SendChannelConfig::new_with( diff --git a/ipa-core/src/helpers/gateway/stall_detection.rs b/ipa-core/src/helpers/gateway/stall_detection.rs index 43706f450..4a844386f 100644 --- a/ipa-core/src/helpers/gateway/stall_detection.rs +++ b/ipa-core/src/helpers/gateway/stall_detection.rs @@ -67,7 +67,6 @@ impl Observed { } mod gateway { - use std::num::NonZeroUsize; use delegate::delegate; @@ -81,6 +80,7 @@ mod gateway { protocol::QueryId, sharding::ShardIndex, sync::Arc, + utils::NonZeroU32PowerOfTwo, }; pub struct InstrumentedGateway { @@ -154,7 +154,7 @@ mod gateway { &self, channel_id: &HelperChannelId, total_records: TotalRecords, - active_work: NonZeroUsize, + active_work: NonZeroU32PowerOfTwo, ) -> SendingEnd { Observed::wrap( Weak::clone(self.get_sn()), diff --git a/ipa-core/src/helpers/prss_protocol.rs b/ipa-core/src/helpers/prss_protocol.rs index f9284f9eb..850d6c733 100644 --- a/ipa-core/src/helpers/prss_protocol.rs +++ b/ipa-core/src/helpers/prss_protocol.rs @@ -24,12 +24,12 @@ pub async fn negotiate( let left_sender = gateway.get_mpc_sender::( &left_channel, TotalRecords::ONE, - gateway.config().active_work(), + gateway.config().active_work_as_power_of_two(), ); let right_sender = gateway.get_mpc_sender::( &right_channel, TotalRecords::ONE, - gateway.config().active_work(), + gateway.config().active_work_as_power_of_two(), ); let left_receiver = gateway.get_mpc_receiver::(&left_channel); let right_receiver = gateway.get_mpc_receiver::(&right_channel); diff --git a/ipa-core/src/protocol/context/dzkp_malicious.rs b/ipa-core/src/protocol/context/dzkp_malicious.rs index 9f28239ba..80762fb52 100644 --- a/ipa-core/src/protocol/context/dzkp_malicious.rs +++ b/ipa-core/src/protocol/context/dzkp_malicious.rs @@ -61,8 +61,11 @@ impl<'a> DZKPUpgraded<'a> { // This overrides the active work for this context and all children // created from it by using narrow, clone, etc. // This allows all steps participating in malicious validation - // to use the same active work window and prevent deadlocks - base_ctx: base_ctx.set_active_work(active_work), + // to use the same active work window and prevent deadlocks. + // + // This also checks that active work is a power of two and + // panics if it is not. + base_ctx: base_ctx.set_active_work(active_work.get().try_into().unwrap()), } } diff --git a/ipa-core/src/protocol/context/malicious.rs b/ipa-core/src/protocol/context/malicious.rs index 8c287b1f2..b11f6f5a8 100644 --- a/ipa-core/src/protocol/context/malicious.rs +++ b/ipa-core/src/protocol/context/malicious.rs @@ -80,7 +80,7 @@ impl<'a> Context<'a> { } #[must_use] - pub fn set_active_work(self, new_active_work: NonZeroUsize) -> Self { + pub fn set_active_work(self, new_active_work: NonZeroU32PowerOfTwo) -> Self { Self { inner: self.inner.set_active_work(new_active_work), } @@ -171,7 +171,10 @@ impl Debug for Context<'_> { } } -use crate::sync::{Mutex, Weak}; +use crate::{ + sync::{Mutex, Weak}, + utils::NonZeroU32PowerOfTwo, +}; pub(super) type MacBatcher<'a, F> = Mutex>>; diff --git a/ipa-core/src/protocol/context/mod.rs b/ipa-core/src/protocol/context/mod.rs index eead81a16..abf6f8476 100644 --- a/ipa-core/src/protocol/context/mod.rs +++ b/ipa-core/src/protocol/context/mod.rs @@ -44,6 +44,7 @@ use crate::{ secret_sharing::replicated::malicious::ExtendableField, seq_join::SeqJoin, sharding::{NotSharded, ShardBinding, ShardConfiguration, ShardIndex, Sharded}, + utils::NonZeroU32PowerOfTwo, }; /// Context used by each helper to perform secure computation. Provides access to shared randomness @@ -162,7 +163,7 @@ pub struct Base<'a, B: ShardBinding = NotSharded> { inner: Inner<'a>, gate: Gate, total_records: TotalRecords, - active_work: NonZeroUsize, + active_work: NonZeroU32PowerOfTwo, /// This indicates whether the system uses sharding or no. It's not ideal that we keep it here /// because it gets cloned often, a potential solution to that, if this shows up on flame graph, /// would be to move it to [`Inner`] struct. @@ -181,13 +182,13 @@ impl<'a, B: ShardBinding> Base<'a, B> { inner: Inner::new(participant, gateway), gate, total_records, - active_work: gateway.config().active_work(), + active_work: gateway.config().active_work_as_power_of_two(), sharding, } } #[must_use] - pub fn set_active_work(self, new_active_work: NonZeroUsize) -> Self { + pub fn set_active_work(self, new_active_work: NonZeroU32PowerOfTwo) -> Self { Self { active_work: new_active_work, ..self.clone() @@ -336,7 +337,7 @@ impl ShardConfiguration for Base<'_, Sharded> { impl<'a, B: ShardBinding> SeqJoin for Base<'a, B> { fn active_work(&self) -> NonZeroUsize { - self.active_work + self.active_work.to_non_zero_usize() } } diff --git a/ipa-core/src/query/processor.rs b/ipa-core/src/query/processor.rs index a8694012e..679b740fd 100644 --- a/ipa-core/src/query/processor.rs +++ b/ipa-core/src/query/processor.rs @@ -1,7 +1,6 @@ use std::{ collections::hash_map::Entry, fmt::{Debug, Formatter}, - num::NonZeroUsize, }; use futures::{future::try_join, stream}; @@ -22,6 +21,7 @@ use crate::{ CompletionHandle, ProtocolResult, }, sync::Arc, + utils::NonZeroU32PowerOfTwo, }; /// `Processor` accepts and tracks requests to initiate new queries on this helper party @@ -44,7 +44,7 @@ use crate::{ pub struct Processor { queries: RunningQueries, key_registry: Arc>, - active_work: Option, + active_work: Option, } impl Default for Processor { @@ -118,7 +118,7 @@ impl Processor { #[must_use] pub fn new( key_registry: KeyRegistry, - active_work: Option, + active_work: Option, ) -> Self { Self { queries: RunningQueries::default(), diff --git a/ipa-core/src/test_fixture/circuit.rs b/ipa-core/src/test_fixture/circuit.rs index 5a1ecd67e..17920591f 100644 --- a/ipa-core/src/test_fixture/circuit.rs +++ b/ipa-core/src/test_fixture/circuit.rs @@ -1,4 +1,4 @@ -use std::{array, num::NonZeroUsize}; +use std::array; use futures::{future::join3, stream, StreamExt}; use ipa_step::StepNarrow; @@ -17,7 +17,7 @@ use crate::{ secret_sharing::{replicated::semi_honest::AdditiveShare as Replicated, FieldSimd, IntoShares}, seq_join::seq_join, test_fixture::{ReconstructArr, TestWorld, TestWorldConfig}, - utils::array::zip3, + utils::{array::zip3, NonZeroU32PowerOfTwo}, }; pub struct Inputs, const N: usize> { @@ -76,7 +76,7 @@ pub async fn arithmetic( [F; N]: IntoShares>, Standard: Distribution, { - let active = NonZeroUsize::new(active_work).unwrap(); + let active = NonZeroU32PowerOfTwo::try_from(active_work.next_power_of_two()).unwrap(); let config = TestWorldConfig { gateway_config: GatewayConfig { active, @@ -85,7 +85,7 @@ pub async fn arithmetic( initial_gate: Some(Gate::default().narrow(&ProtocolStep::Test)), ..Default::default() }; - let world = TestWorld::new_with(config); + let world = TestWorld::new_with(&config); // Re-use contexts for the entire execution because record identifiers are contiguous. let contexts = world.contexts(); @@ -96,7 +96,7 @@ pub async fn arithmetic( // accumulated. This gives the best performance for vectorized operation. let ctx = ctx.set_total_records(TotalRecords::Indeterminate); seq_join( - active, + config.gateway_config.active_work(), stream::iter((0..(width / u32::try_from(N).unwrap())).zip(col_data)).map( move |(record, Inputs { a, b })| { circuit(ctx.clone(), RecordId::from(record), depth, a, b) diff --git a/ipa-core/src/utils/mod.rs b/ipa-core/src/utils/mod.rs index a3600e899..e8dfd95ae 100644 --- a/ipa-core/src/utils/mod.rs +++ b/ipa-core/src/utils/mod.rs @@ -1,2 +1,7 @@ pub mod array; pub mod arraychunks; +#[cfg(target_pointer_width = "64")] +mod power_of_two; + +#[cfg(target_pointer_width = "64")] +pub use power_of_two::NonZeroU32PowerOfTwo; diff --git a/ipa-core/src/utils/power_of_two.rs b/ipa-core/src/utils/power_of_two.rs new file mode 100644 index 000000000..abce8055e --- /dev/null +++ b/ipa-core/src/utils/power_of_two.rs @@ -0,0 +1,110 @@ +use std::{fmt::Display, num::NonZeroUsize, str::FromStr}; + +#[derive(Debug, thiserror::Error)] +#[error("{0} is not a power of two or not within the 1..u32::MAX range")] +pub struct ConvertError(I); + +impl PartialEq for ConvertError { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} + +/// This construction guarantees the value to be a power of two and +/// within the range 0..2^32-1 +#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub struct NonZeroU32PowerOfTwo(u32); + +impl Display for NonZeroU32PowerOfTwo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", u32::from(*self)) + } +} + +impl TryFrom for NonZeroU32PowerOfTwo { + type Error = ConvertError; + + fn try_from(value: usize) -> Result { + if value > 0 && value < usize::try_from(u32::MAX).unwrap() && value.is_power_of_two() { + Ok(NonZeroU32PowerOfTwo(u32::try_from(value).unwrap())) + } else { + Err(ConvertError(value)) + } + } +} + +impl From for usize { + fn from(value: NonZeroU32PowerOfTwo) -> Self { + // we are using 64 bit registers + usize::try_from(value.0).unwrap() + } +} + +impl From for u32 { + fn from(value: NonZeroU32PowerOfTwo) -> Self { + value.0 + } +} + +impl FromStr for NonZeroU32PowerOfTwo { + type Err = ConvertError; + + fn from_str(s: &str) -> Result { + let v = s.parse::().map_err(|_| ConvertError(s.to_owned()))?; + NonZeroU32PowerOfTwo::try_from(v).map_err(|_| ConvertError(s.to_owned())) + } +} + +impl NonZeroU32PowerOfTwo { + #[must_use] + pub fn to_non_zero_usize(self) -> NonZeroUsize { + let v = usize::from(self); + NonZeroUsize::new(v).unwrap_or_else(|| unreachable!()) + } + + #[must_use] + pub fn get(self) -> usize { + usize::from(self) + } +} + +#[cfg(test)] +mod tests { + use super::{ConvertError, NonZeroU32PowerOfTwo}; + + #[test] + fn rejects_invalid_values() { + assert!(matches!( + NonZeroU32PowerOfTwo::try_from(0), + Err(ConvertError(0)) + )); + assert!(matches!( + NonZeroU32PowerOfTwo::try_from(3), + Err(ConvertError(3)) + )); + + assert!(matches!( + NonZeroU32PowerOfTwo::try_from(1_usize << 33), + Err(ConvertError(_)) + )); + } + + #[test] + fn accepts_valid() { + assert_eq!(4, u32::from(NonZeroU32PowerOfTwo::try_from(4).unwrap())); + assert_eq!(16, u32::from(NonZeroU32PowerOfTwo::try_from(16).unwrap())); + } + + #[test] + fn parse_from_str() { + assert_eq!(NonZeroU32PowerOfTwo(4), "4".parse().unwrap()); + assert_eq!( + ConvertError("0".to_owned()), + "0".parse::().unwrap_err() + ); + assert_eq!( + ConvertError("3".to_owned()), + "3".parse::().unwrap_err() + ); + } +} From 46087f93f56c3ccf3d00719ec4cdbc8582e17fb9 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Wed, 2 Oct 2024 14:56:03 -0700 Subject: [PATCH 03/11] Fix batch size to be a power of two in tests --- ipa-core/src/bin/helper.rs | 5 ++--- ipa-core/src/helpers/gateway/send.rs | 1 + ipa-core/src/lib.rs | 2 +- ipa-core/src/protocol/basics/mul/dzkp_malicious.rs | 2 +- ipa-core/src/protocol/context/dzkp_validator.rs | 7 ++----- ipa-core/src/protocol/ipa_prf/mod.rs | 6 ++++-- ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs | 4 ++-- ipa-core/src/test_fixture/world.rs | 2 +- 8 files changed, 14 insertions(+), 15 deletions(-) diff --git a/ipa-core/src/bin/helper.rs b/ipa-core/src/bin/helper.rs index 790245587..884745180 100644 --- a/ipa-core/src/bin/helper.rs +++ b/ipa-core/src/bin/helper.rs @@ -2,7 +2,6 @@ use std::{ fs, io::BufReader, net::TcpListener, - num::NonZeroUsize, os::fd::{FromRawFd, RawFd}, path::{Path, PathBuf}, process, @@ -18,7 +17,7 @@ use ipa_core::{ error::BoxError, helpers::HelperIdentity, net::{ClientIdentity, HttpShardTransport, HttpTransport, MpcHelperClient}, - AppConfig, AppSetup, + AppConfig, AppSetup, NonZeroU32PowerOfTwo, }; use tracing::{error, info}; @@ -93,7 +92,7 @@ struct ServerArgs { /// Override the amount of active work processed in parallel #[arg(long)] - active_work: Option, + active_work: Option, } #[derive(Debug, Subcommand)] diff --git a/ipa-core/src/helpers/gateway/send.rs b/ipa-core/src/helpers/gateway/send.rs index de9dcabd5..b1dc11155 100644 --- a/ipa-core/src/helpers/gateway/send.rs +++ b/ipa-core/src/helpers/gateway/send.rs @@ -436,6 +436,7 @@ mod test { read_size: usize, record_size: usize, ) { + #[allow(clippy::needless_update)] // stall detection feature wants default value let gateway_config = GatewayConfig { active: active.next_power_of_two().try_into().unwrap(), read_size: read_size.try_into().unwrap(), diff --git a/ipa-core/src/lib.rs b/ipa-core/src/lib.rs index 59cae0106..f88ea718e 100644 --- a/ipa-core/src/lib.rs +++ b/ipa-core/src/lib.rs @@ -32,8 +32,8 @@ mod seq_join; mod serde; pub mod sharding; mod utils; - pub use app::{AppConfig, HelperApp, Setup as AppSetup}; +pub use utils::NonZeroU32PowerOfTwo; extern crate core; #[cfg(all(feature = "shuttle", test))] diff --git a/ipa-core/src/protocol/basics/mul/dzkp_malicious.rs b/ipa-core/src/protocol/basics/mul/dzkp_malicious.rs index 23a96c982..e024c4483 100644 --- a/ipa-core/src/protocol/basics/mul/dzkp_malicious.rs +++ b/ipa-core/src/protocol/basics/mul/dzkp_malicious.rs @@ -101,7 +101,7 @@ mod test { let res = world .malicious((a, b), |ctx, (a, b)| async move { - let validator = ctx.dzkp_validator(TEST_DZKP_STEPS, 10); + let validator = ctx.dzkp_validator(TEST_DZKP_STEPS, 8); let mctx = validator.context(); let result = a .multiply(&b, mctx.set_total_records(1), RecordId::from(0)) diff --git a/ipa-core/src/protocol/context/dzkp_validator.rs b/ipa-core/src/protocol/context/dzkp_validator.rs index 835a32e9d..a16ca32fd 100644 --- a/ipa-core/src/protocol/context/dzkp_validator.rs +++ b/ipa-core/src/protocol/context/dzkp_validator.rs @@ -1197,10 +1197,7 @@ mod tests { fn max_multiplications_per_gate_strategy(record_count: usize) -> impl Strategy { let max_max_mults = record_count.min(128); - prop_oneof![ - 1usize..=max_max_mults, - (0u32..=max_max_mults.ilog2()).prop_map(|i| 1usize << i) - ] + (0u32..=max_max_mults.ilog2()).prop_map(|i| 1usize << i) } prop_compose! { @@ -1546,7 +1543,7 @@ mod tests { let [h1_batch, h2_batch, h3_batch] = world .malicious((a, b), |ctx, (a, b)| async move { - let mut validator = ctx.dzkp_validator(TEST_DZKP_STEPS, 10); + let mut validator = ctx.dzkp_validator(TEST_DZKP_STEPS, 8); let mctx = validator.context(); let _ = a .multiply(&b, mctx.set_total_records(1), RecordId::from(0)) diff --git a/ipa-core/src/protocol/ipa_prf/mod.rs b/ipa-core/src/protocol/ipa_prf/mod.rs index cc3fa2633..754f179b6 100644 --- a/ipa-core/src/protocol/ipa_prf/mod.rs +++ b/ipa-core/src/protocol/ipa_prf/mod.rs @@ -308,8 +308,10 @@ where // We expect 2*256 = 512 gates in total for two additions per conversion. The vectorization factor // is CONV_CHUNK. Let `len` equal the number of converted shares. The total amount of // multiplications is CONV_CHUNK*512*len. We want CONV_CHUNK*512*len ≈ 50M, or len ≈ 381, for a -// reasonably-sized proof. -const CONV_PROOF_CHUNK: usize = 400; +// reasonably-sized proof. There is also a constraint on proof chunks to be powers of two, so +// we pick the closest power of two close to 381 but less than that value. 256 gives us around 33M +// multiplications per batch +const CONV_PROOF_CHUNK: usize = 256; #[tracing::instrument(name = "compute_prf_for_inputs", skip_all)] async fn compute_prf_for_inputs( diff --git a/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs b/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs index 9a1f8f278..e3c8cf49c 100644 --- a/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs +++ b/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs @@ -512,7 +512,7 @@ where }, // TODO: this should not be necessary, but probably can't be removed // until we align read_size with the batch size. - std::cmp::min(sh_ctx.active_work().get(), chunk_size), + std::cmp::min(sh_ctx.active_work().get(), chunk_size.next_power_of_two()), ); dzkp_validator.set_total_records(TotalRecords::specified(histogram[1]).unwrap()); let ctx_for_row_number = set_up_contexts(&dzkp_validator.context(), histogram)?; @@ -541,7 +541,7 @@ where protocol: &Step::Aggregate, validate: &Step::AggregateValidate, }, - aggregate_values_proof_chunk(B, usize::try_from(TV::BITS).unwrap()), + aggregate_values_proof_chunk(B, usize::try_from(TV::BITS).unwrap()).next_power_of_two(), ); let user_contributions = flattened_user_results.try_collect::>().await?; let result = diff --git a/ipa-core/src/test_fixture/world.rs b/ipa-core/src/test_fixture/world.rs index f92326c9b..bdcd7448e 100644 --- a/ipa-core/src/test_fixture/world.rs +++ b/ipa-core/src/test_fixture/world.rs @@ -676,7 +676,7 @@ impl Runner for TestWorld { R: Future + Send, { self.malicious(input, |ctx, share| async { - let v = ctx.dzkp_validator(TEST_DZKP_STEPS, 10); + let v = ctx.dzkp_validator(TEST_DZKP_STEPS, 8); let m_ctx = v.context(); let m_result = helper_fn(m_ctx, share).await; v.validate().await.unwrap(); From 831986c3c45c1c2fb6d43035804694185bb3ed8a Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Wed, 2 Oct 2024 16:28:08 -0700 Subject: [PATCH 04/11] Fix quicksort batch size and align it with a power of two --- ipa-core/src/protocol/ipa_prf/quicksort.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipa-core/src/protocol/ipa_prf/quicksort.rs b/ipa-core/src/protocol/ipa_prf/quicksort.rs index 16c3f1c8d..30218e5c2 100644 --- a/ipa-core/src/protocol/ipa_prf/quicksort.rs +++ b/ipa-core/src/protocol/ipa_prf/quicksort.rs @@ -176,7 +176,7 @@ where }, // TODO: use something like this when validating in chunks // `TARGET_PROOF_SIZE / usize::try_from(K::BITS).unwrap() / SORT_CHUNK`` - total_records_usize, + total_records_usize.next_power_of_two(), ); let c = v.context(); let cmp_ctx = c.narrow(&QuicksortPassStep::Compare); From 27f65d01123503805484070c7432a977b2db77cc Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Wed, 2 Oct 2024 16:46:00 -0700 Subject: [PATCH 05/11] Fix oneshot bench --- ipa-core/benches/oneshot/ipa.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ipa-core/benches/oneshot/ipa.rs b/ipa-core/benches/oneshot/ipa.rs index 3d19836ee..b880c28d6 100644 --- a/ipa-core/benches/oneshot/ipa.rs +++ b/ipa-core/benches/oneshot/ipa.rs @@ -86,6 +86,7 @@ impl Args { self.active_work .map(NonZeroUsize::get) .unwrap_or_else(|| self.query_size.clamp(16, 1024)) + .next_power_of_two() } fn attribution_window(&self) -> Option { From 52fd07704335fe6f3ca4eb01f5022df39b235024 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Fri, 4 Oct 2024 13:50:36 -0700 Subject: [PATCH 06/11] Fix shuttle tests --- ipa-core/src/helpers/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ipa-core/src/helpers/mod.rs b/ipa-core/src/helpers/mod.rs index e33a2ec99..2c43ccd53 100644 --- a/ipa-core/src/helpers/mod.rs +++ b/ipa-core/src/helpers/mod.rs @@ -817,7 +817,7 @@ mod concurrency_tests { let input = (0u32..11).map(TestField::truncate_from).collect::>(); let config = TestWorldConfig { gateway_config: GatewayConfig { - active: input.len().try_into().unwrap(), + active: input.len().next_power_of_two().try_into().unwrap(), ..Default::default() }, ..Default::default() @@ -875,7 +875,7 @@ mod concurrency_tests { let input = (0u32..11).map(TestField::truncate_from).collect::>(); let config = TestWorldConfig { gateway_config: GatewayConfig { - active: input.len().try_into().unwrap(), + active: input.len().next_power_of_two().try_into().unwrap(), ..Default::default() }, ..Default::default() From 279c3063dfe17e7436055fde9bc749eb78a0567e Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Fri, 4 Oct 2024 13:52:45 -0700 Subject: [PATCH 07/11] Fix flaky `send_recv_randomized` test When using more than one thread, this test was failing if futures were scheduled out of order, because `Notify` couldn't wake up futures scheduled after `notify_all` call. Using barriers solves the issue --- ipa-core/src/helpers/gateway/mod.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/ipa-core/src/helpers/gateway/mod.rs b/ipa-core/src/helpers/gateway/mod.rs index 15d2580d2..42982abc0 100644 --- a/ipa-core/src/helpers/gateway/mod.rs +++ b/ipa-core/src/helpers/gateway/mod.rs @@ -318,6 +318,7 @@ mod tests { stream::StreamExt, }; use proptest::proptest; + use tokio::sync::Barrier; use crate::{ ff::{ @@ -751,7 +752,11 @@ mod tests { ) where M: MpcMessage + Clone + PartialEq, { - let send_notify = Arc::new(tokio::sync::Notify::new()); + let last_batch_size = total_records % active_work; + let last_batch = total_records / active_work; + + let barrier = Arc::new(Barrier::new(active_work)); + let last_batch_barrier = Arc::new(Barrier::new(last_batch_size)); // perform "multiplication-like" operation (send + subsequent receive) // and "validate": block the future until we have at least `active_work` @@ -762,7 +767,8 @@ mod tests { |(record_id, msg)| { let send_channel = &send_channel; let recv_channel = &recv_channel; - let send_notify = Arc::clone(&send_notify); + let barrier = Arc::clone(&barrier); + let last_batch_barrier = Arc::clone(&last_batch_barrier); async move { send_channel .send(record_id.into(), msg.clone()) @@ -771,13 +777,12 @@ mod tests { let r = recv_channel.receive(record_id.into()).await.unwrap(); // this simulates validate_record API by forcing futures to wait // until the entire batch is validated by the last future in that batch - if record_id % active_work == active_work - 1 - || record_id == total_records - 1 - { - send_notify.notify_waiters(); + if record_id >= last_batch * active_work { + last_batch_barrier.wait().await; } else { - send_notify.notified().await; + barrier.wait().await; } + assert_eq!(msg, r); } }, From 5e228244f151492f0f03c5b167ad146eaf680a11 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Fri, 4 Oct 2024 14:54:27 -0700 Subject: [PATCH 08/11] Change how we compute the previous power of two. Using bitshift turns out to be much easier to understand --- ipa-core/src/helpers/gateway/send.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/ipa-core/src/helpers/gateway/send.rs b/ipa-core/src/helpers/gateway/send.rs index b1dc11155..9e0753ab7 100644 --- a/ipa-core/src/helpers/gateway/send.rs +++ b/ipa-core/src/helpers/gateway/send.rs @@ -259,17 +259,12 @@ impl SendChannelConfig { let total_capacity = gateway_config.active.get() * record_size; // define read size as a multiplier of record size. The multiplier must be - // a power of two to align perfectly with total capacity. - let read_size_multiplier = { - // next_power_of_two returns a value that is greater than or equal to. - // That is not what we want here: if read_size / record_size is a power - // of two, then subsequent division will get us further away from desired target. - // For example: if read_size / record_size = 4, then prev_power_of_two = 2. - // In such cases, we want to stay where we are, so we add +1 for that. - let prev_power_of_two = - (gateway_config.read_size.get() / record_size + 1).next_power_of_two() / 2; - std::cmp::max(1, prev_power_of_two) - }; + // a power of two to align perfectly with total capacity. We don't want to exceed + // the target read size, so we pick a power of two <= read_size. + let read_size_multiplier = + // this computes the highest power of 2 less than or equal to read_size / record_size. + // Note, that if record_size is greater than read_size, we round it to 1 + 1 << (std::cmp::max(1, usize::BITS - (gateway_config.read_size.get() / record_size).leading_zeros()) - 1); let this = Self { total_capacity: total_capacity.try_into().unwrap(), From 4aeb686c3ba1df34da30b4e77b16df4eb16b7070 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Fri, 4 Oct 2024 14:54:52 -0700 Subject: [PATCH 09/11] Change comment on `read_size` --- ipa-core/src/helpers/gateway/mod.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/ipa-core/src/helpers/gateway/mod.rs b/ipa-core/src/helpers/gateway/mod.rs index 42982abc0..2d48380db 100644 --- a/ipa-core/src/helpers/gateway/mod.rs +++ b/ipa-core/src/helpers/gateway/mod.rs @@ -82,13 +82,20 @@ pub struct GatewayConfig { /// A rule of thumb is that this should get as close to network packet size as possible. /// /// This will be set for all channels and because they send records of different side, the actual - /// payload may not be exactly this, but it will be the closest multiple of record size to this - /// number. For instance, having 14 bytes records and batch size of 4096 will result in - /// 4088 bytes being sent in a batch. + /// payload may not be exactly this, but it will be the closest multiple of record size smaller than + /// or equal to number. For alignment reasons, this multiple will be a power of two, otherwise + /// a deadlock is possible. See ipa/#1300 for details how it can happen. /// - /// The actual size for read chunks may be bigger or smaller, depending on the record size - /// sent through each channel. Read size will be aligned with [`Self::active_work`] value to - /// prevent deadlocks. + /// For instance, having 14 bytes records and batch size of 4096 will result in + /// 3584 bytes being sent in a batch (`2^8 * 14 < 4096, 2^9 * 14 > 4096`). + /// + /// The consequence is that HTTP buffer size may not be perfectly aligned with the target. + /// As long as we use TCP it does not matter, but if we want to switch to UDP and have + /// precise control over the size of chunk sent, we should tune the buffer size at the + /// HTTP layer instead (using Hyper/H3 API or something like that). If we do this, then + /// read size becomes obsolete and should be removed in favor of flushing the entire + /// buffer chunks from the application layer down to HTTP and let network to figure out + /// the best way to slice this data before sending it to a peer. pub read_size: NonZeroUsize, /// Time to wait before checking gateway progress. If no progress has been made between From ac31cbbfa612e3dbeab23cba8d7093ee46d76ac5 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Fri, 4 Oct 2024 14:55:21 -0700 Subject: [PATCH 10/11] Don't run send config tests under Shuttle There is no reason to do that. --- ipa-core/src/helpers/gateway/send.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipa-core/src/helpers/gateway/send.rs b/ipa-core/src/helpers/gateway/send.rs index 9e0753ab7..3a0fc1ad8 100644 --- a/ipa-core/src/helpers/gateway/send.rs +++ b/ipa-core/src/helpers/gateway/send.rs @@ -286,7 +286,7 @@ impl SendChannelConfig { } } -#[cfg(test)] +#[cfg(all(test, unit_test))] mod test { use std::num::NonZeroUsize; From 34838feb2873400f01c65e1d90c464b7581e5147 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Fri, 4 Oct 2024 15:11:04 -0700 Subject: [PATCH 11/11] Don't run non zero power of two tests under Shuttle There is no reason to do that. --- ipa-core/src/utils/power_of_two.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipa-core/src/utils/power_of_two.rs b/ipa-core/src/utils/power_of_two.rs index abce8055e..a84455c92 100644 --- a/ipa-core/src/utils/power_of_two.rs +++ b/ipa-core/src/utils/power_of_two.rs @@ -68,7 +68,7 @@ impl NonZeroU32PowerOfTwo { } } -#[cfg(test)] +#[cfg(all(test, unit_test))] mod tests { use super::{ConvertError, NonZeroU32PowerOfTwo};