From 31ff50a79ee1ab9eabf83c7f133edf39710a10e8 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Thu, 19 Dec 2024 00:24:41 -0800 Subject: [PATCH] e2e integration test for upload --- ipa-core/src/bin/report_collector.rs | 6 +- ipa-core/tests/hybrid.rs | 228 ++++++++++++++++++++++++++- 2 files changed, 229 insertions(+), 5 deletions(-) diff --git a/ipa-core/src/bin/report_collector.rs b/ipa-core/src/bin/report_collector.rs index 773bad220..fcab4e158 100644 --- a/ipa-core/src/bin/report_collector.rs +++ b/ipa-core/src/bin/report_collector.rs @@ -146,7 +146,7 @@ enum ReportCollectorCommand { }, MaliciousHybrid { #[clap(flatten)] - encrypted_inputs: EncryptedInputs, + encrypted_inputs: Option, #[arg( long, @@ -289,12 +289,14 @@ async fn main() -> Result<(), Box> { |query_id| { if let Some(ref url_file_list) = url_file_list { inputs_from_url_file(&url_file_list, query_id, args.shard_count) - } else { + } else if let Some(ref encrypted_inputs) = encrypted_inputs { Ok(inputs_from_encrypted_inputs( encrypted_inputs, query_id, args.shard_count, )) + } else { + panic!("Either --url-file-list or --enc-input-file1, --enc-input-file2, and --enc-input-file3 must be provided"); } }, // encrypted_inputs, diff --git a/ipa-core/tests/hybrid.rs b/ipa-core/tests/hybrid.rs index b40f524f0..fc48690ac 100644 --- a/ipa-core/tests/hybrid.rs +++ b/ipa-core/tests/hybrid.rs @@ -4,22 +4,34 @@ mod common; use std::{ fs::File, + io::{BufReader, Read, Write}, + iter::once, + net::TcpListener, + os::fd::AsRawFd, + path::{Path, PathBuf}, process::{Command, Stdio}, }; +use bytes::Bytes; +use command_fds::CommandFdExt; use common::{ spawn_shards, tempdir::TempDir, test_sharded_setup, CommandExt, TerminateOnDropExt, UnwrapStatusExt, CRYPTO_UTIL_BIN, TEST_RC_BIN, }; -use ipa_core::{cli::playbook::HybridQueryResult, helpers::query::HybridQueryParams}; +use futures_util::{StreamExt, TryStreamExt}; +use ipa_core::{ + cli::playbook::HybridQueryResult, + error::BoxError, + helpers::{query::HybridQueryParams, LengthDelimitedStream}, +}; use rand::thread_rng; use rand_core::RngCore; use serde_json::from_reader; +use crate::common::TEST_MPC_BIN; + pub const IN_THE_CLEAR_BIN: &str = env!("CARGO_BIN_EXE_in_the_clear"); -// this currently only generates data and runs in the clear -// eventaully we'll want to add the MPC as well #[test] fn test_hybrid() { const INPUT_SIZE: usize = 100; @@ -134,3 +146,213 @@ fn test_hybrid() { .zip(expected_result.iter()) .all(|(a, b)| a == b)); } + +#[test] +fn test_hybrid_poll() { + const INPUT_SIZE: usize = 100; + const SHARDS: usize = 5; + const MAX_CONVERSION_VALUE: usize = 5; + + let config = HybridQueryParams { + max_breakdown_key: 5, + with_dp: 0, + epsilon: 0.0, + // only encrypted inputs are supported + plaintext_match_keys: false, + }; + + let dir = TempDir::new_delete_on_drop(); + + // Gen inputs + let input_file = dir.path().join("ipa_inputs.txt"); + let in_the_clear_output_file = dir.path().join("ipa_output_in_the_clear.json"); + let output_file = dir.path().join("ipa_output.json"); + + let mut command = Command::new(TEST_RC_BIN); + command + .args(["--output-file".as_ref(), input_file.as_os_str()]) + .arg("gen-hybrid-inputs") + .args(["--count", &INPUT_SIZE.to_string()]) + .args(["--max-conversion-value", &MAX_CONVERSION_VALUE.to_string()]) + .args(["--max-breakdown-key", &config.max_breakdown_key.to_string()]) + .args(["--seed", &thread_rng().next_u64().to_string()]) + .silent() + .stdin(Stdio::piped()); + command.status().unwrap_status(); + + let mut command = Command::new(IN_THE_CLEAR_BIN); + command + .args(["--input-file".as_ref(), input_file.as_os_str()]) + .args([ + "--output-file".as_ref(), + in_the_clear_output_file.as_os_str(), + ]) + .silent() + .stdin(Stdio::piped()); + command.status().unwrap_status(); + + let config_path = dir.path().join("config"); + let sockets = test_sharded_setup::(&config_path); + let _helpers = spawn_shards(&config_path, &sockets, true); + + // encrypt input + let mut command = Command::new(CRYPTO_UTIL_BIN); + command + .arg("hybrid-encrypt") + .args(["--input-file".as_ref(), input_file.as_os_str()]) + .args(["--output-dir".as_ref(), dir.path().as_os_str()]) + .args(["--length-delimited"]) + .args(["--network".into(), config_path.join("network.toml")]) + .stdin(Stdio::piped()); + command.status().unwrap_status(); + let enc1 = dir.path().join("helper1.enc"); + let enc2 = dir.path().join("helper2.enc"); + let enc3 = dir.path().join("helper3.enc"); + + let poll_port = TcpListener::bind("127.0.0.1:0").unwrap(); + + // split encryption into N shards and create a metadata file that contains + // all files + let upload_metadata = create_upload_files::( + &enc1, + &enc2, + &enc3, + poll_port.local_addr().unwrap().port(), + dir.path(), + ) + .unwrap(); + + // spawn HTTP server to serve the uploaded files + let mut command = Command::new(TEST_MPC_BIN); + command + .arg("serve-input") + .preserved_fds(vec![poll_port.as_raw_fd()]) + .args(["--fd", &poll_port.as_raw_fd().to_string()]) + .args([ + "--dir".as_ref(), + upload_metadata.parent().unwrap().as_os_str(), + ]) + .silent(); + + let _server_handle = command.spawn().unwrap(); + + // Run Hybrid + let mut command = Command::new(TEST_RC_BIN); + command + .args(["--network".into(), config_path.join("network.toml")]) + .args(["--output-file".as_ref(), output_file.as_os_str()]) + .args(["--shard-count", SHARDS.to_string().as_str()]) + .args(["--wait", "2"]) + .arg("malicious-hybrid") + .silent() + .args(["--count", INPUT_SIZE.to_string().as_str()]) + .args(["--url-file-list".into(), upload_metadata]) + .args(["--max-breakdown-key", &config.max_breakdown_key.to_string()]); + + match config.with_dp { + 0 => { + command.args(["--with-dp", &config.with_dp.to_string()]); + } + _ => { + command + .args(["--with-dp", &config.with_dp.to_string()]) + .args(["--epsilon", &config.epsilon.to_string()]); + } + } + command.stdin(Stdio::piped()); + + let test_mpc = command.spawn().unwrap().terminate_on_drop(); + test_mpc.wait().unwrap_status(); + + // basic output checks - output should have the exact size as number of breakdowns + let output = serde_json::from_str::( + &std::fs::read_to_string(&output_file).expect("IPA results file should exist"), + ) + .expect("IPA results file is valid JSON"); + + assert_eq!( + usize::try_from(config.max_breakdown_key).unwrap(), + output.breakdowns.len(), + "Number of breakdowns does not match the expected", + ); + assert_eq!(INPUT_SIZE, usize::from(output.input_size)); + + let expected_result: Vec = from_reader( + File::open(in_the_clear_output_file) + .expect("file should exist as it's created above in the test"), + ) + .expect("should match hard coded format from in_the_clear"); + assert!(output + .breakdowns + .iter() + .zip(expected_result.iter()) + .all(|(a, b)| a == b)); +} + +fn create_upload_files( + enc_file1: &Path, + enc_file2: &Path, + enc_file3: &Path, + port: u16, + dest: &Path, +) -> Result { + let manifest_path = dest.join("manifest.txt"); + let mut manifest_file = File::create_new(&manifest_path)?; + create_upload_file::("h1", enc_file1, port, dest, &mut manifest_file)?; + create_upload_file::("h2", enc_file2, port, dest, &mut manifest_file)?; + create_upload_file::("h3", enc_file3, port, dest, &mut manifest_file)?; + + manifest_file.flush()?; + + Ok(manifest_path) +} + +fn create_upload_file( + prefix: &str, + enc_file: &Path, + port: u16, + dest_dir: &Path, + metadata_file: &mut File, +) -> Result<(), BoxError> { + let mut files = (0..SHARDS) + .map(|i| { + let path = dest_dir.join(format!("{prefix}_shard_{i}.enc")); + let file = File::create_new(&path)?; + Ok((path, file)) + }) + .collect::>>()?; + + // we assume files are tiny for the integration tests + let mut input = BufReader::new(File::open(enc_file)?); + let mut buf = Vec::new(); + if input.read_to_end(&mut buf)? == 0 { + panic!("{:?} file is empty", enc_file); + } + + // read length delimited data and write it to each file + let stream = + LengthDelimitedStream::::new(futures::stream::iter(once(Ok::<_, BoxError>( + buf.into(), + )))) + .map_ok(|v| futures::stream::iter(v).map(Ok::<_, BoxError>)) + .try_flatten(); + + for (i, next_bytes) in futures::executor::block_on_stream(stream).enumerate() { + let next_bytes = next_bytes?; + let file = &mut files[i % SHARDS].1; + let len = u16::try_from(next_bytes.len()) + .map_err(|_| format!("record is too too big: {} > 65535", next_bytes.len()))?; + file.write(&len.to_le_bytes())?; + file.write_all(&next_bytes)?; + } + + // update manifest file + for (path, mut file) in files { + // let path = path.strip_prefix(dest_dir).unwrap().to_str().unwrap(); + file.flush()?; + let path = path.file_name().and_then(|p| p.to_str()).unwrap(); + writeln!(metadata_file, "http://localhost:{port}/{path}")?; + } + + Ok(()) +}