Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multithreading in seq_join/parallel_join #873

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
362dd56
Support multithreading in `seq_join`/`parallel_join`
akoshelev Nov 8, 2023
2a93f76
Add a test that demonstrates the unsafety of parallel_join
akoshelev Jan 11, 2024
27cf49d
Merge from main
akoshelev Jan 11, 2024
e1bbdc9
Fix compile errors
akoshelev Jan 11, 2024
69655b7
Remove the false safety claim from parallel_join
akoshelev Jan 11, 2024
a39903f
Make `parallel_join_forget_is_not_safe` safe for sanitizers
akoshelev Jan 11, 2024
9c22e49
Import async_scoped from git
akoshelev Jan 18, 2024
7319f00
Merge from main
akoshelev Jan 18, 2024
98c59d2
Update CI to run multi-threading tests
akoshelev Jan 18, 2024
61d8560
Fix a typo in checks.yml
akoshelev Jan 18, 2024
105c76a
Prohibit methods that can leak data
akoshelev Jan 19, 2024
a2c2f6f
Disable Shuttle tests for IPA with multi-threading enabled
akoshelev Jan 19, 2024
1279278
Fix a bug in ipa tests conditional compilation gate
akoshelev Jan 19, 2024
a19c716
Upgrade tokio to allow Miri
akoshelev Jan 19, 2024
e355c39
Disable Tokio IO driver
akoshelev Jan 19, 2024
e47b15a
Make seq_join tests Miri compatible
akoshelev Jan 19, 2024
2fcd3dd
Run Miri in CI
akoshelev Jan 19, 2024
11f756a
Merge from main
akoshelev Jan 19, 2024
e95bc42
Fix Miri action
akoshelev Jan 19, 2024
83334bc
More fixes
akoshelev Jan 19, 2024
7e964fc
More seq_join tests compatible with Miri
akoshelev Jan 19, 2024
44e3e3e
Improve code coverage
akoshelev Jan 19, 2024
49e14ea
Improve documentation and lift the unsafe blocks
akoshelev Jan 19, 2024
a40d8cd
Fiddle with boxed futures
akoshelev Jan 19, 2024
e1c9d17
Final cleanup
akoshelev Jan 19, 2024
2c5aa17
Apply suggestions from code review
akoshelev Jan 22, 2024
66d9ed5
More feedback
akoshelev Jan 23, 2024
8dc3386
Minor feedback
akoshelev Jan 23, 2024
95b2637
Split the `seq_join` file into separate modules
akoshelev Jan 23, 2024
28c945e
Import async-scoped from crates.io
akoshelev Jan 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@
disallowed-methods = [
{ path = "futures::future::join_all", reason = "We don't have a replacement for this method yet. Consider extending `SeqJoin` trait." },
{ path = "futures::future::try_join_all", reason = "Use Context.try_join instead." },
{ path = "std::boxed::Box::leak", reason = "Not running the destructors on futures created inside seq_join module will cause UB in IPA. Make sure you don't leak any of those." },
{ path = "std::mem::forget", reason = "Not running the destructors on futures created inside seq_join module will cause UB in IPA. Make sure you don't leak any of those." },
{ path = "std::mem::ManuallyDrop::new", reason = "Not running the destructors on futures created inside seq_join module will cause UB in IPA. Make sure you don't leak any of those." },
{ path = "std::vec::Vec::leak", reason = "Not running the destructors on futures created inside seq_join module will cause UB in IPA. Make sure you don't leak any of those." },
]
26 changes: 22 additions & 4 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ jobs:
if: ${{ success() || failure() }}
run: cargo build --tests

- name: Run Tests
- name: Run tests
run: cargo test

- name: Run tests with multithreading feature enabled
run: cargo test --features "multi-threading"

- name: Run Web Tests
run: cargo test -p ipa-core --no-default-features --features "cli web-app real-world-infra test-fixture descriptive-gate"

Expand Down Expand Up @@ -96,10 +99,10 @@ jobs:
run: cargo build --release

- name: Build concurrency tests
run: cargo build --release --features shuttle
run: cargo build --release --features "shuttle multi-threading"

- name: Run concurrency tests
run: cargo test --release --features shuttle
run: cargo test --release --features "shuttle multi-threading"

extra:
name: Additional Builds and Concurrency Tests
Expand Down Expand Up @@ -148,6 +151,7 @@ jobs:
fail-fast: false
matrix:
sanitizer: [address, leak]
features: ['', 'multi-threading']
env:
TARGET: x86_64-unknown-linux-gnu
steps:
Expand All @@ -156,7 +160,21 @@ jobs:
- name: Add Rust sources
run: rustup component add rust-src
- name: Run tests with sanitizer
run: RUSTFLAGS="-Z sanitizer=${{ matrix.sanitizer }} -Z sanitizer-memory-track-origins" cargo test -Z build-std --target $TARGET --no-default-features --features "cli web-app real-world-infra test-fixture descriptive-gate"
run: RUSTFLAGS="-Z sanitizer=${{ matrix.sanitizer }} -Z sanitizer-memory-track-origins" cargo test -Z build-std --target $TARGET --no-default-features --features "cli web-app real-world-infra test-fixture descriptive-gate ${{ matrix.features }}"
akoshelev marked this conversation as resolved.
Show resolved Hide resolved

miri:
runs-on: ubuntu-latest
env:
TARGET: x86_64-unknown-linux-gnu
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@nightly
- name: Add Miri
run: rustup component add miri
- name: Setup Miri
run: cargo miri setup
- name: Run seq_join tests
run: cargo miri test --target $TARGET --lib seq_join --features "multi-threading"

coverage:
name: Measure coverage
Expand Down
Empty file added .pre-commit.stashsIsbN1
Empty file.
5 changes: 4 additions & 1 deletion ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ step-trace = ["descriptive-gate"]
# of unit tests use it. Compact uses memory-efficient gates and is suitable for production.
descriptive-gate = []
compact-gate = ["ipa-macros/compact-gate"]
# Enable using more than one thread for protocol execution. Most of the parallelism occurs at parallel/seq_join operations
multi-threading = ["async-scoped"]

# Standalone aggregation protocol. We use IPA infra for communication
# but it has nothing to do with IPA.
Expand All @@ -73,6 +75,7 @@ ipa-macros = { version = "*", path = "../ipa-macros" }

aes = "0.8.3"
async-trait = "0.1.68"
async-scoped = { version = "0.9.0", features = ["use-tokio"], optional = true }
axum = { version = "0.5.17", optional = true, features = ["http2"] }
axum-server = { version = "0.5.1", optional = true, features = [
"rustls",
Expand Down Expand Up @@ -132,7 +135,7 @@ sha2 = "0.10"
shuttle-crate = { package = "shuttle", version = "0.6.1", optional = true }
thiserror = "1.0"
time = { version = "0.3", optional = true }
tokio = { version = "1.28", features = ["fs", "rt", "rt-multi-thread", "macros"] }
tokio = { version = "1.35", features = ["fs", "rt", "rt-multi-thread", "macros"] }
# TODO: axum-server holds onto 0.24 and we can't upgrade until they do. Or we move away from axum-server
tokio-rustls = { version = "0.24", optional = true }
tokio-stream = "0.1.14"
Expand Down
11 changes: 7 additions & 4 deletions ipa-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,19 @@ pub(crate) mod test_executor {
run(f);
}

pub fn run<F, Fut>(f: F)
pub fn run<F, Fut, T>(f: F) -> T
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()>,
Fut: Future<Output = T>,
{
tokio::runtime::Builder::new_multi_thread()
.enable_all()
// enable_all() is common to use to build Tokio runtime, but it enables both IO and time drivers.
// IO driver is not compatible with Miri (https://github.com/rust-lang/miri/issues/2057) which we use to
// sanitize our tests, so this runtime only enables time driver.
.enable_time()
.build()
.unwrap()
.block_on(f());
.block_on(f())
}
}

Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/protocol/basics/reshare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
/// `to_helper` = (`rand_left`, `rand_right`) = (r0, r1)
/// `to_helper.right` = (`rand_right`, part1 + part2) = (r0, part1 + part2)
#[async_trait]
pub trait Reshare<C: Context, B: RecordBinding>: Sized {
pub trait Reshare<C: Context, B: RecordBinding>: Sized + 'static {
async fn reshare<'fut>(
&self,
ctx: C,
Expand Down
5 changes: 4 additions & 1 deletion ipa-core/src/protocol/ipa/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,10 @@ where
.collect::<Vec<_>>()
}

#[cfg(all(test, any(unit_test, feature = "shuttle")))]
#[cfg(all(
test,
any(unit_test, all(feature = "shuttle", not(feature = "multi-threading")))
))]
pub mod tests {
use std::num::NonZeroU32;

Expand Down
32 changes: 17 additions & 15 deletions ipa-core/src/protocol/modulus_conversion/convert_shares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,17 @@ where
/// # Panics
/// If the total record count on the context is unspecified.
#[tracing::instrument(name = "modulus_conversion", skip_all, fields(bits = ?bit_range, gate = %ctx.gate().as_ref()))]
pub fn convert_bits<F, V, C, S, VS>(
pub fn convert_bits<'a, F, V, C, S, VS>(
ctx: C,
binary_shares: VS,
bit_range: Range<u32>,
) -> impl Stream<Item = Result<BitDecomposed<S>, Error>>
) -> impl Stream<Item = Result<BitDecomposed<S>, Error>> + 'a
where
F: PrimeField,
V: ToBitConversionTriples<Residual = ()>,
C: UpgradedContext<F, Share = S>,
V: ToBitConversionTriples<Residual = ()> + 'a,
C: UpgradedContext<F, Share = S> + 'a,
S: LinearSecretSharing<F> + SecureMul<C>,
VS: Stream<Item = V> + Unpin + Send,
VS: Stream<Item = V> + Unpin + Send + 'a,
for<'u> UpgradeContext<'u, C, F, RecordId>:
UpgradeToMalicious<'u, BitConversionTriple<Replicated<F>>, BitConversionTriple<C::Share>>,
{
Expand All @@ -320,35 +320,37 @@ where
/// Note that unconverted fields are not upgraded, so they might need to be upgraded either before or
/// after invoking this function.
#[tracing::instrument(name = "modulus_conversion", skip_all, fields(bits = ?bit_range, gate = %ctx.gate().as_ref()))]
pub fn convert_selected_bits<F, V, C, S, VS, R>(
pub fn convert_selected_bits<'a, F, V, C, S, VS, R>(
ctx: C,
binary_shares: VS,
bit_range: Range<u32>,
) -> impl Stream<Item = Result<(BitDecomposed<S>, R), Error>>
) -> impl Stream<Item = Result<(BitDecomposed<S>, R), Error>> + 'a
where
R: Send + 'static,
F: PrimeField,
V: ToBitConversionTriples<Residual = R>,
C: UpgradedContext<F, Share = S>,
V: ToBitConversionTriples<Residual = R> + 'a,
C: UpgradedContext<F, Share = S> + 'a,
S: LinearSecretSharing<F> + SecureMul<C>,
VS: Stream<Item = V> + Unpin + Send,
VS: Stream<Item = V> + Unpin + Send + 'a,
for<'u> UpgradeContext<'u, C, F, RecordId>:
UpgradeToMalicious<'u, BitConversionTriple<Replicated<F>>, BitConversionTriple<C::Share>>,
{
convert_some_bits(ctx, binary_shares, RecordId::FIRST, bit_range)
}

pub(crate) fn convert_some_bits<F, V, C, S, VS, R>(
pub(crate) fn convert_some_bits<'a, F, V, C, S, VS, R>(
ctx: C,
binary_shares: VS,
first_record: RecordId,
bit_range: Range<u32>,
) -> impl Stream<Item = Result<(BitDecomposed<S>, R), Error>>
) -> impl Stream<Item = Result<(BitDecomposed<S>, R), Error>> + 'a
where
R: Send + 'static,
F: PrimeField,
V: ToBitConversionTriples<Residual = R>,
C: UpgradedContext<F, Share = S>,
V: ToBitConversionTriples<Residual = R> + 'a,
C: UpgradedContext<F, Share = S> + 'a,
S: LinearSecretSharing<F> + SecureMul<C>,
VS: Stream<Item = V> + Unpin + Send,
VS: Stream<Item = V> + Unpin + Send + 'a,
for<'u> UpgradeContext<'u, C, F, RecordId>:
UpgradeToMalicious<'u, BitConversionTriple<Replicated<F>>, BitConversionTriple<C::Share>>,
{
Expand Down
4 changes: 3 additions & 1 deletion ipa-core/src/protocol/sort/generate_permutation_opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ mod tests {
}

/// Passing 32 records for Fp31 doesn't work.
#[tokio::test]
///
/// Requires one extra thread to cancel futures running in parallel with the one that panics.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[should_panic = "prime field ipa_core::ff::prime_field::fp31::Fp31 is too small to sort 32 records"]
async fn fp31_overflow() {
const COUNT: usize = 32;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<V: SharedValue + ExtendableField> LinearSecretSharing<V> for AdditiveShare<
/// when the protocol is done. This should not be used directly.
#[async_trait]
pub trait Downgrade: Send {
type Target: Send;
type Target: Send + 'static;
async fn downgrade(self) -> UnauthorizedDowngradeWrapper<Self::Target>;
}

Expand Down
3 changes: 2 additions & 1 deletion ipa-core/src/secret_sharing/scheme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::SharedValue;
use crate::ff::{AddSub, AddSubAssign, Field, GaloisField};

/// Secret sharing scheme i.e. Replicated secret sharing
pub trait SecretSharing<V: SharedValue>: Clone + Debug + Sized + Send + Sync {
pub trait SecretSharing<V: SharedValue>: Clone + Debug + Sized + Send + Sync + 'static {
const ZERO: Self;
}

Expand All @@ -21,6 +21,7 @@ pub trait Linear<V: Field>:
+ Mul<V, Output = Self>
+ for<'r> Mul<&'r V, Output = Self>
+ Neg<Output = Self>
+ 'static
{
}

Expand Down
Loading
Loading