Skip to content

Commit

Permalink
Merge pull request #873 from akoshelev/parallel-join-truly-parallel
Browse files Browse the repository at this point in the history
Support multithreading in `seq_join`/`parallel_join`
  • Loading branch information
akoshelev authored Jan 29, 2024
2 parents b54f8aa + 28c945e commit 4292569
Show file tree
Hide file tree
Showing 14 changed files with 637 additions and 217 deletions.
4 changes: 4 additions & 0 deletions .clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@
disallowed-methods = [
{ path = "futures::future::join_all", reason = "We don't have a replacement for this method yet. Consider extending `SeqJoin` trait." },
{ path = "futures::future::try_join_all", reason = "Use Context.try_join instead." },
{ path = "std::boxed::Box::leak", reason = "Not running the destructors on futures created inside seq_join module will cause UB in IPA. Make sure you don't leak any of those." },
{ path = "std::mem::forget", reason = "Not running the destructors on futures created inside seq_join module will cause UB in IPA. Make sure you don't leak any of those." },
{ path = "std::mem::ManuallyDrop::new", reason = "Not running the destructors on futures created inside seq_join module will cause UB in IPA. Make sure you don't leak any of those." },
{ path = "std::vec::Vec::leak", reason = "Not running the destructors on futures created inside seq_join module will cause UB in IPA. Make sure you don't leak any of those." },
]
26 changes: 22 additions & 4 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ jobs:
if: ${{ success() || failure() }}
run: cargo build --tests

- name: Run Tests
- name: Run tests
run: cargo test

- name: Run tests with multithreading feature enabled
run: cargo test --features "multi-threading"

- name: Run Web Tests
run: cargo test -p ipa-core --no-default-features --features "cli web-app real-world-infra test-fixture descriptive-gate"

Expand Down Expand Up @@ -96,10 +99,10 @@ jobs:
run: cargo build --release

- name: Build concurrency tests
run: cargo build --release --features shuttle
run: cargo build --release --features "shuttle multi-threading"

- name: Run concurrency tests
run: cargo test --release --features shuttle
run: cargo test --release --features "shuttle multi-threading"

extra:
name: Additional Builds and Concurrency Tests
Expand Down Expand Up @@ -148,6 +151,7 @@ jobs:
fail-fast: false
matrix:
sanitizer: [address, leak]
features: ['', 'multi-threading']
env:
TARGET: x86_64-unknown-linux-gnu
steps:
Expand All @@ -156,7 +160,21 @@ jobs:
- name: Add Rust sources
run: rustup component add rust-src
- name: Run tests with sanitizer
run: RUSTFLAGS="-Z sanitizer=${{ matrix.sanitizer }} -Z sanitizer-memory-track-origins" cargo test -Z build-std --target $TARGET --no-default-features --features "cli web-app real-world-infra test-fixture descriptive-gate"
run: RUSTFLAGS="-Z sanitizer=${{ matrix.sanitizer }} -Z sanitizer-memory-track-origins" cargo test -Z build-std --target $TARGET --no-default-features --features "cli web-app real-world-infra test-fixture descriptive-gate ${{ matrix.features }}"

miri:
runs-on: ubuntu-latest
env:
TARGET: x86_64-unknown-linux-gnu
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@nightly
- name: Add Miri
run: rustup component add miri
- name: Setup Miri
run: cargo miri setup
- name: Run seq_join tests
run: cargo miri test --target $TARGET --lib seq_join --features "multi-threading"

coverage:
name: Measure coverage
Expand Down
Empty file added .pre-commit.stashsIsbN1
Empty file.
5 changes: 4 additions & 1 deletion ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ step-trace = ["descriptive-gate"]
# of unit tests use it. Compact uses memory-efficient gates and is suitable for production.
descriptive-gate = []
compact-gate = ["ipa-macros/compact-gate"]
# Enable using more than one thread for protocol execution. Most of the parallelism occurs at parallel/seq_join operations
multi-threading = ["async-scoped"]

# Standalone aggregation protocol. We use IPA infra for communication
# but it has nothing to do with IPA.
Expand All @@ -73,6 +75,7 @@ ipa-macros = { version = "*", path = "../ipa-macros" }

aes = "0.8.3"
async-trait = "0.1.68"
async-scoped = { version = "0.9.0", features = ["use-tokio"], optional = true }
axum = { version = "0.5.17", optional = true, features = ["http2"] }
axum-server = { version = "0.5.1", optional = true, features = [
"rustls",
Expand Down Expand Up @@ -132,7 +135,7 @@ sha2 = "0.10"
shuttle-crate = { package = "shuttle", version = "0.6.1", optional = true }
thiserror = "1.0"
time = { version = "0.3", optional = true }
tokio = { version = "1.28", features = ["fs", "rt", "rt-multi-thread", "macros"] }
tokio = { version = "1.35", features = ["fs", "rt", "rt-multi-thread", "macros"] }
# TODO: axum-server holds onto 0.24 and we can't upgrade until they do. Or we move away from axum-server
tokio-rustls = { version = "0.24", optional = true }
tokio-stream = "0.1.14"
Expand Down
11 changes: 7 additions & 4 deletions ipa-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,19 @@ pub(crate) mod test_executor {
run(f);
}

pub fn run<F, Fut>(f: F)
pub fn run<F, Fut, T>(f: F) -> T
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()>,
Fut: Future<Output = T>,
{
tokio::runtime::Builder::new_multi_thread()
.enable_all()
// enable_all() is common to use to build Tokio runtime, but it enables both IO and time drivers.
// IO driver is not compatible with Miri (https://github.com/rust-lang/miri/issues/2057) which we use to
// sanitize our tests, so this runtime only enables time driver.
.enable_time()
.build()
.unwrap()
.block_on(f());
.block_on(f())
}
}

Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/protocol/basics/reshare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
/// `to_helper` = (`rand_left`, `rand_right`) = (r0, r1)
/// `to_helper.right` = (`rand_right`, part1 + part2) = (r0, part1 + part2)
#[async_trait]
pub trait Reshare<C: Context, B: RecordBinding>: Sized {
pub trait Reshare<C: Context, B: RecordBinding>: Sized + 'static {
async fn reshare<'fut>(
&self,
ctx: C,
Expand Down
5 changes: 4 additions & 1 deletion ipa-core/src/protocol/ipa/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,10 @@ where
.collect::<Vec<_>>()
}

#[cfg(all(test, any(unit_test, feature = "shuttle")))]
#[cfg(all(
test,
any(unit_test, all(feature = "shuttle", not(feature = "multi-threading")))
))]
pub mod tests {
use std::num::NonZeroU32;

Expand Down
32 changes: 17 additions & 15 deletions ipa-core/src/protocol/modulus_conversion/convert_shares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,17 @@ where
/// # Panics
/// If the total record count on the context is unspecified.
#[tracing::instrument(name = "modulus_conversion", skip_all, fields(bits = ?bit_range, gate = %ctx.gate().as_ref()))]
pub fn convert_bits<F, V, C, S, VS>(
pub fn convert_bits<'a, F, V, C, S, VS>(
ctx: C,
binary_shares: VS,
bit_range: Range<u32>,
) -> impl Stream<Item = Result<BitDecomposed<S>, Error>>
) -> impl Stream<Item = Result<BitDecomposed<S>, Error>> + 'a
where
F: PrimeField,
V: ToBitConversionTriples<Residual = ()>,
C: UpgradedContext<F, Share = S>,
V: ToBitConversionTriples<Residual = ()> + 'a,
C: UpgradedContext<F, Share = S> + 'a,
S: LinearSecretSharing<F> + SecureMul<C>,
VS: Stream<Item = V> + Unpin + Send,
VS: Stream<Item = V> + Unpin + Send + 'a,
for<'u> UpgradeContext<'u, C, F, RecordId>:
UpgradeToMalicious<'u, BitConversionTriple<Replicated<F>>, BitConversionTriple<C::Share>>,
{
Expand All @@ -320,35 +320,37 @@ where
/// Note that unconverted fields are not upgraded, so they might need to be upgraded either before or
/// after invoking this function.
#[tracing::instrument(name = "modulus_conversion", skip_all, fields(bits = ?bit_range, gate = %ctx.gate().as_ref()))]
pub fn convert_selected_bits<F, V, C, S, VS, R>(
pub fn convert_selected_bits<'a, F, V, C, S, VS, R>(
ctx: C,
binary_shares: VS,
bit_range: Range<u32>,
) -> impl Stream<Item = Result<(BitDecomposed<S>, R), Error>>
) -> impl Stream<Item = Result<(BitDecomposed<S>, R), Error>> + 'a
where
R: Send + 'static,
F: PrimeField,
V: ToBitConversionTriples<Residual = R>,
C: UpgradedContext<F, Share = S>,
V: ToBitConversionTriples<Residual = R> + 'a,
C: UpgradedContext<F, Share = S> + 'a,
S: LinearSecretSharing<F> + SecureMul<C>,
VS: Stream<Item = V> + Unpin + Send,
VS: Stream<Item = V> + Unpin + Send + 'a,
for<'u> UpgradeContext<'u, C, F, RecordId>:
UpgradeToMalicious<'u, BitConversionTriple<Replicated<F>>, BitConversionTriple<C::Share>>,
{
convert_some_bits(ctx, binary_shares, RecordId::FIRST, bit_range)
}

pub(crate) fn convert_some_bits<F, V, C, S, VS, R>(
pub(crate) fn convert_some_bits<'a, F, V, C, S, VS, R>(
ctx: C,
binary_shares: VS,
first_record: RecordId,
bit_range: Range<u32>,
) -> impl Stream<Item = Result<(BitDecomposed<S>, R), Error>>
) -> impl Stream<Item = Result<(BitDecomposed<S>, R), Error>> + 'a
where
R: Send + 'static,
F: PrimeField,
V: ToBitConversionTriples<Residual = R>,
C: UpgradedContext<F, Share = S>,
V: ToBitConversionTriples<Residual = R> + 'a,
C: UpgradedContext<F, Share = S> + 'a,
S: LinearSecretSharing<F> + SecureMul<C>,
VS: Stream<Item = V> + Unpin + Send,
VS: Stream<Item = V> + Unpin + Send + 'a,
for<'u> UpgradeContext<'u, C, F, RecordId>:
UpgradeToMalicious<'u, BitConversionTriple<Replicated<F>>, BitConversionTriple<C::Share>>,
{
Expand Down
4 changes: 3 additions & 1 deletion ipa-core/src/protocol/sort/generate_permutation_opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ mod tests {
}

/// Passing 32 records for Fp31 doesn't work.
#[tokio::test]
///
/// Requires one extra thread to cancel futures running in parallel with the one that panics.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[should_panic = "prime field ipa_core::ff::prime_field::fp31::Fp31 is too small to sort 32 records"]
async fn fp31_overflow() {
const COUNT: usize = 32;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<V: SharedValue + ExtendableField> LinearSecretSharing<V> for AdditiveShare<
/// when the protocol is done. This should not be used directly.
#[async_trait]
pub trait Downgrade: Send {
type Target: Send;
type Target: Send + 'static;
async fn downgrade(self) -> UnauthorizedDowngradeWrapper<Self::Target>;
}

Expand Down
3 changes: 2 additions & 1 deletion ipa-core/src/secret_sharing/scheme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::SharedValue;
use crate::ff::{AddSub, AddSubAssign, Field, GaloisField};

/// Secret sharing scheme i.e. Replicated secret sharing
pub trait SecretSharing<V: SharedValue>: Clone + Debug + Sized + Send + Sync {
pub trait SecretSharing<V: SharedValue>: Clone + Debug + Sized + Send + Sync + 'static {
const ZERO: Self;
}

Expand All @@ -21,6 +21,7 @@ pub trait Linear<V: Field>:
+ Mul<V, Output = Self>
+ for<'r> Mul<&'r V, Output = Self>
+ Neg<Output = Self>
+ 'static
{
}

Expand Down
Loading

0 comments on commit 4292569

Please sign in to comment.