diff --git a/ipa-core/src/ff/boolean_array.rs b/ipa-core/src/ff/boolean_array.rs index 706a79b93..a69bb86ae 100644 --- a/ipa-core/src/ff/boolean_array.rs +++ b/ipa-core/src/ff/boolean_array.rs @@ -27,7 +27,7 @@ macro_rules! store_impl { /// iterator for Boolean arrays pub struct BAIterator<'a> { - iterator: Iter<'a, u8, Lsb0>, + iterator: std::iter::Take>, } ///impl Iterator for all Boolean arrays @@ -228,7 +228,7 @@ macro_rules! boolean_array_impl { fn into_iter(self) -> Self::IntoIter { BAIterator { - iterator: self.0.iter(), + iterator: self.0.iter().take(usize::try_from(<$name>::BITS).unwrap()), } } } diff --git a/ipa-core/src/helpers/buffers/ordering_sender.rs b/ipa-core/src/helpers/buffers/ordering_sender.rs index 5ecc21c7f..ee73b3cb4 100644 --- a/ipa-core/src/helpers/buffers/ordering_sender.rs +++ b/ipa-core/src/helpers/buffers/ordering_sender.rs @@ -78,12 +78,13 @@ impl State { M::Size::USIZE, self.spare.get() ); + let open = self.accept_writes(); let b = &mut self.buf[self.written..]; - if M::Size::USIZE <= b.len() { + if open && M::Size::USIZE <= b.len() { self.written += M::Size::USIZE; m.serialize(GenericArray::from_mut_slice(&mut b[..M::Size::USIZE])); - if self.written + self.spare.get() >= self.buf.len() { + if !self.accept_writes() { Self::wake(&mut self.stream_ready); } Poll::Ready(()) @@ -111,6 +112,15 @@ impl State { self.closed = true; Self::wake(&mut self.stream_ready); } + + /// Returns `true` if more writes can be accepted by this sender. + /// If message size exceeds the remaining capacity, [`write`] may + /// still return `Poll::Pending` even if sender is open for writes. + /// + /// [`write`]: Self::write + fn accept_writes(&self) -> bool { + self.written + self.spare.get() < self.buf.len() + } } /// An saved waker for a given index. @@ -259,6 +269,11 @@ impl Waiting { /// Data less than the `write_size` threshold only becomes available to /// the stream when the sender is closed (with [`close`]). /// +/// Once `write_size` threshold has been reached, no subsequent writes +/// are allowed, until stream is polled. `OrderingSender` guarantees equal +/// size chunks will be sent to the stream when it is used to buffer +/// same-sized messages. +/// /// The `spare` capacity determines the size of messages that can be sent; /// see [`send`] for details. /// @@ -465,14 +480,14 @@ impl + Unpin> Stream for OrderedStream { #[cfg(all(test, any(unit_test, feature = "shuttle")))] mod test { - use std::{iter::zip, num::NonZeroUsize}; + use std::{future::poll_fn, iter::zip, num::NonZeroUsize, pin::pin}; use futures::{ future::{join, join3, join_all}, stream::StreamExt, FutureExt, }; - use futures_util::future::try_join_all; + use futures_util::future::{poll_immediate, try_join_all}; use generic_array::GenericArray; use rand::Rng; #[cfg(feature = "shuttle")] @@ -677,4 +692,41 @@ mod test { .unwrap(); }); } + + /// If sender is at capacity, but still have some bytes inside spare, we block the sends + /// until the stream is flushed. That ensures `OrderingSender` yields the equal-sized + /// chunks. + /// + /// This behavior is important for channels working in parallel `[parallel_join]` and wrapped + /// inside a windowed execution [`seq_join`]. Not enforcing this leads to some channels moving + /// forward faster and eventually getting outside of active work window. See [`issue`] for + /// more details. + /// + /// [`seq_join`]: crate::seq_join::SeqJoin::try_join + /// [`parallel_join`]: crate::seq_join::SeqJoin::parallel_join + /// [`issue`]: https://github.com/private-attribution/ipa/issues/843 + #[test] + fn reader_blocks_writers() { + const SZ: usize = <::Size as Unsigned>::USIZE; + run(|| async { + const CAPACITY: usize = SZ + 1; + const SPARE: usize = 2 * SZ; + let sender = + OrderingSender::new(CAPACITY.try_into().unwrap(), SPARE.try_into().unwrap()); + + // enough bytes in the buffer to hold 2 items + for i in 0..2 { + sender + .send(i, Fp32BitPrime::truncate_from(u128::try_from(i).unwrap())) + .await; + } + + // spare has enough capacity, but buffer is considered full. + let mut f = pin!(sender.send(2, Fp32BitPrime::truncate_from(2_u128))); + assert_eq!(None, poll_immediate(&mut f).await); + + drop(poll_fn(|ctx| sender.take_next(ctx)).await); + assert_eq!(Some(()), poll_immediate(f).await); + }); + } } diff --git a/ipa-core/src/helpers/gateway/stall_detection.rs b/ipa-core/src/helpers/gateway/stall_detection.rs index 236b09700..c2e288572 100644 --- a/ipa-core/src/helpers/gateway/stall_detection.rs +++ b/ipa-core/src/helpers/gateway/stall_detection.rs @@ -67,9 +67,10 @@ impl Observed { } mod gateway { + use delegate::delegate; - use super::*; + use super::{receive, send, AtomicUsize, Debug, Formatter, ObserveState, Observed, Weak}; use crate::{ helpers::{ gateway::{Gateway, State}, @@ -126,7 +127,7 @@ mod gateway { let mut last_sn_seen = 0; loop { ::tokio::time::sleep(config.progress_check_interval).await; - let now = gateway.get_sn().upgrade().map(|v| v.load(Ordering::Relaxed)); + let now = gateway.get_sn().upgrade().map(|v| v.load(core::sync::atomic::Ordering::Relaxed)); if let Some(now) = now { if now == last_sn_seen { if let Some(state) = gateway.get_state() { @@ -215,7 +216,7 @@ mod receive { fmt::{Debug, Formatter}, }; - use super::*; + use super::{ObserveState, Observed}; use crate::{ helpers::{ error::Error, @@ -273,7 +274,7 @@ mod send { fmt::{Debug, Formatter}, }; - use super::*; + use super::{ObserveState, Observed}; use crate::{ helpers::{ error::Error, diff --git a/ipa-core/src/protocol/ipa_prf/boolean_ops/addition_sequential.rs b/ipa-core/src/protocol/ipa_prf/boolean_ops/addition_sequential.rs index 1b50e95b3..71f571f5d 100644 --- a/ipa-core/src/protocol/ipa_prf/boolean_ops/addition_sequential.rs +++ b/ipa-core/src/protocol/ipa_prf/boolean_ops/addition_sequential.rs @@ -1,16 +1,10 @@ #[cfg(all(test, unit_test))] use ipa_macros::Step; -#[cfg(all(test, unit_test))] use crate::{ error::Error, ff::{ArrayAccess, CustomArray, Field}, - protocol::{ - basics::{if_else, SecureMul}, - context::Context, - step::BitOpStep, - RecordId, - }, + protocol::{basics::SecureMul, context::Context, step::BitOpStep, RecordId}, secret_sharing::{replicated::semi_honest::AdditiveShare, WeakSharedValue}, }; @@ -25,7 +19,7 @@ pub(crate) enum Step { /// adds y to x, Output has same length as x (carries and indices of y too large for x are ignored) /// # Errors /// propagates errors from multiply -#[cfg(all(test, unit_test))] +#[allow(dead_code)] pub async fn integer_add( ctx: C, record_id: RecordId, @@ -61,7 +55,7 @@ where S: CustomArray + Field, S::Element: Field, { - use crate::ff::Expand; + use crate::{ff::Expand, protocol::basics::if_else}; let mut carry = AdditiveShare::::ZERO; let result = addition_circuit( ctx.narrow(&Step::SaturatedAddition), @@ -92,7 +86,7 @@ where /// for all i: output[i] = x[i] + (c[i-1] + y[i]) /// # Errors /// propagates errors from multiply -#[cfg(all(test, unit_test))] +#[allow(dead_code)] async fn addition_circuit( ctx: C, record_id: RecordId, @@ -131,7 +125,6 @@ where /// update carry to carry = ( x + carry)(y + carry) + carry /// # Errors /// propagates errors from multiply -#[cfg(all(test, unit_test))] async fn bit_adder( ctx: C, record_id: RecordId, diff --git a/ipa-core/src/protocol/ipa_prf/boolean_ops/share_conversion_aby.rs b/ipa-core/src/protocol/ipa_prf/boolean_ops/share_conversion_aby.rs index d17825210..3ce7cd833 100644 --- a/ipa-core/src/protocol/ipa_prf/boolean_ops/share_conversion_aby.rs +++ b/ipa-core/src/protocol/ipa_prf/boolean_ops/share_conversion_aby.rs @@ -1,10 +1,7 @@ -#[cfg(all(test, unit_test))] use std::ops::Neg; -#[cfg(all(test, unit_test))] use ipa_macros::Step; -#[cfg(all(test, unit_test))] use crate::{ error::Error, ff::{ @@ -22,7 +19,6 @@ use crate::{ }, }; -#[cfg(all(test, unit_test))] #[derive(Step)] pub(crate) enum Step { GenerateSecretSharing, @@ -96,7 +92,7 @@ pub(crate) enum Step { /// /// # Errors /// Propagates Errors from Integer Subtraction and Partial Reveal -#[cfg(all(test, unit_test))] +#[allow(dead_code)] async fn convert_to_fp25519( ctx: C, record_id: RecordId,