Skip to content

Commit

Permalink
Merge branch 'main' into workspace
Browse files Browse the repository at this point in the history
  • Loading branch information
taikiy committed Nov 24, 2023
2 parents fd30724 + 24a2018 commit f88b14c
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 26 deletions.
4 changes: 2 additions & 2 deletions ipa-core/src/ff/boolean_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ macro_rules! store_impl {

/// iterator for Boolean arrays
pub struct BAIterator<'a> {
iterator: Iter<'a, u8, Lsb0>,
iterator: std::iter::Take<Iter<'a, u8, Lsb0>>,
}

///impl Iterator for all Boolean arrays
Expand Down Expand Up @@ -228,7 +228,7 @@ macro_rules! boolean_array_impl {

fn into_iter(self) -> Self::IntoIter {
BAIterator {
iterator: self.0.iter(),
iterator: self.0.iter().take(usize::try_from(<$name>::BITS).unwrap()),
}
}
}
Expand Down
60 changes: 56 additions & 4 deletions ipa-core/src/helpers/buffers/ordering_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,13 @@ impl State {
M::Size::USIZE,
self.spare.get()
);
let open = self.accept_writes();
let b = &mut self.buf[self.written..];
if M::Size::USIZE <= b.len() {
if open && M::Size::USIZE <= b.len() {
self.written += M::Size::USIZE;
m.serialize(GenericArray::from_mut_slice(&mut b[..M::Size::USIZE]));

if self.written + self.spare.get() >= self.buf.len() {
if !self.accept_writes() {
Self::wake(&mut self.stream_ready);
}
Poll::Ready(())
Expand Down Expand Up @@ -111,6 +112,15 @@ impl State {
self.closed = true;
Self::wake(&mut self.stream_ready);
}

/// Returns `true` if more writes can be accepted by this sender.
/// If message size exceeds the remaining capacity, [`write`] may
/// still return `Poll::Pending` even if sender is open for writes.
///
/// [`write`]: Self::write
fn accept_writes(&self) -> bool {
self.written + self.spare.get() < self.buf.len()
}
}

/// An saved waker for a given index.
Expand Down Expand Up @@ -259,6 +269,11 @@ impl Waiting {
/// Data less than the `write_size` threshold only becomes available to
/// the stream when the sender is closed (with [`close`]).
///
/// Once `write_size` threshold has been reached, no subsequent writes
/// are allowed, until stream is polled. `OrderingSender` guarantees equal
/// size chunks will be sent to the stream when it is used to buffer
/// same-sized messages.
///
/// The `spare` capacity determines the size of messages that can be sent;
/// see [`send`] for details.
///
Expand Down Expand Up @@ -465,14 +480,14 @@ impl<B: Borrow<OrderingSender> + Unpin> Stream for OrderedStream<B> {

#[cfg(all(test, any(unit_test, feature = "shuttle")))]
mod test {
use std::{iter::zip, num::NonZeroUsize};
use std::{future::poll_fn, iter::zip, num::NonZeroUsize, pin::pin};

use futures::{
future::{join, join3, join_all},
stream::StreamExt,
FutureExt,
};
use futures_util::future::try_join_all;
use futures_util::future::{poll_immediate, try_join_all};
use generic_array::GenericArray;
use rand::Rng;
#[cfg(feature = "shuttle")]
Expand Down Expand Up @@ -677,4 +692,41 @@ mod test {
.unwrap();
});
}

/// If sender is at capacity, but still have some bytes inside spare, we block the sends
/// until the stream is flushed. That ensures `OrderingSender` yields the equal-sized
/// chunks.
///
/// This behavior is important for channels working in parallel `[parallel_join]` and wrapped
/// inside a windowed execution [`seq_join`]. Not enforcing this leads to some channels moving
/// forward faster and eventually getting outside of active work window. See [`issue`] for
/// more details.
///
/// [`seq_join`]: crate::seq_join::SeqJoin::try_join
/// [`parallel_join`]: crate::seq_join::SeqJoin::parallel_join
/// [`issue`]: https://github.com/private-attribution/ipa/issues/843
#[test]
fn reader_blocks_writers() {
const SZ: usize = <<Fp32BitPrime as Serializable>::Size as Unsigned>::USIZE;
run(|| async {
const CAPACITY: usize = SZ + 1;
const SPARE: usize = 2 * SZ;
let sender =
OrderingSender::new(CAPACITY.try_into().unwrap(), SPARE.try_into().unwrap());

// enough bytes in the buffer to hold 2 items
for i in 0..2 {
sender
.send(i, Fp32BitPrime::truncate_from(u128::try_from(i).unwrap()))
.await;
}

// spare has enough capacity, but buffer is considered full.
let mut f = pin!(sender.send(2, Fp32BitPrime::truncate_from(2_u128)));
assert_eq!(None, poll_immediate(&mut f).await);

drop(poll_fn(|ctx| sender.take_next(ctx)).await);
assert_eq!(Some(()), poll_immediate(f).await);
});
}
}
9 changes: 5 additions & 4 deletions ipa-core/src/helpers/gateway/stall_detection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ impl<T: ObserveState> Observed<T> {
}

mod gateway {

use delegate::delegate;

use super::*;
use super::{receive, send, AtomicUsize, Debug, Formatter, ObserveState, Observed, Weak};
use crate::{
helpers::{
gateway::{Gateway, State},
Expand Down Expand Up @@ -126,7 +127,7 @@ mod gateway {
let mut last_sn_seen = 0;
loop {
::tokio::time::sleep(config.progress_check_interval).await;
let now = gateway.get_sn().upgrade().map(|v| v.load(Ordering::Relaxed));
let now = gateway.get_sn().upgrade().map(|v| v.load(core::sync::atomic::Ordering::Relaxed));
if let Some(now) = now {
if now == last_sn_seen {
if let Some(state) = gateway.get_state() {
Expand Down Expand Up @@ -215,7 +216,7 @@ mod receive {
fmt::{Debug, Formatter},
};

use super::*;
use super::{ObserveState, Observed};
use crate::{
helpers::{
error::Error,
Expand Down Expand Up @@ -273,7 +274,7 @@ mod send {
fmt::{Debug, Formatter},
};

use super::*;
use super::{ObserveState, Observed};
use crate::{
helpers::{
error::Error,
Expand Down
15 changes: 4 additions & 11 deletions ipa-core/src/protocol/ipa_prf/boolean_ops/addition_sequential.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
#[cfg(all(test, unit_test))]
use ipa_macros::Step;

#[cfg(all(test, unit_test))]
use crate::{
error::Error,
ff::{ArrayAccess, CustomArray, Field},
protocol::{
basics::{if_else, SecureMul},
context::Context,
step::BitOpStep,
RecordId,
},
protocol::{basics::SecureMul, context::Context, step::BitOpStep, RecordId},
secret_sharing::{replicated::semi_honest::AdditiveShare, WeakSharedValue},
};

Expand All @@ -25,7 +19,7 @@ pub(crate) enum Step {
/// adds y to x, Output has same length as x (carries and indices of y too large for x are ignored)
/// # Errors
/// propagates errors from multiply
#[cfg(all(test, unit_test))]
#[allow(dead_code)]
pub async fn integer_add<C, XS, YS>(
ctx: C,
record_id: RecordId,
Expand Down Expand Up @@ -61,7 +55,7 @@ where
S: CustomArray + Field,
S::Element: Field,
{
use crate::ff::Expand;
use crate::{ff::Expand, protocol::basics::if_else};
let mut carry = AdditiveShare::<S::Element>::ZERO;
let result = addition_circuit(
ctx.narrow(&Step::SaturatedAddition),
Expand Down Expand Up @@ -92,7 +86,7 @@ where
/// for all i: output[i] = x[i] + (c[i-1] + y[i])
/// # Errors
/// propagates errors from multiply
#[cfg(all(test, unit_test))]
#[allow(dead_code)]
async fn addition_circuit<C, XS, YS>(
ctx: C,
record_id: RecordId,
Expand Down Expand Up @@ -131,7 +125,6 @@ where
/// update carry to carry = ( x + carry)(y + carry) + carry
/// # Errors
/// propagates errors from multiply
#[cfg(all(test, unit_test))]
async fn bit_adder<C, S>(
ctx: C,
record_id: RecordId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
#[cfg(all(test, unit_test))]
use std::ops::Neg;

#[cfg(all(test, unit_test))]
use ipa_macros::Step;

#[cfg(all(test, unit_test))]
use crate::{
error::Error,
ff::{
Expand All @@ -22,7 +19,6 @@ use crate::{
},
};

#[cfg(all(test, unit_test))]
#[derive(Step)]
pub(crate) enum Step {
GenerateSecretSharing,
Expand Down Expand Up @@ -96,7 +92,7 @@ pub(crate) enum Step {
///
/// # Errors
/// Propagates Errors from Integer Subtraction and Partial Reveal
#[cfg(all(test, unit_test))]
#[allow(dead_code)]
async fn convert_to_fp25519<C, B>(
ctx: C,
record_id: RecordId,
Expand Down

0 comments on commit f88b14c

Please sign in to comment.