diff --git a/src/bin/report_collector.rs b/src/bin/report_collector.rs index a8ca7ba82..f8ba75d7e 100644 --- a/src/bin/report_collector.rs +++ b/src/bin/report_collector.rs @@ -15,7 +15,7 @@ use hyper::http::uri::Scheme; use ipa::{ cli::{ noise::{apply, ApplyDpArgs}, - playbook::{make_clients, playbook_ipa, validate, InputSource}, + playbook::{make_clients, playbook_ipa, playbook_oprf_ipa, validate, InputSource}, CsvSerializer, IpaQueryResult, Verbosity, }, config::NetworkConfig, @@ -26,7 +26,7 @@ use ipa::{ protocol::{BreakdownKey, MatchKey}, report::{KeyIdentifier, DEFAULT_KEY_ID}, test_fixture::{ - ipa::{ipa_in_the_clear, CappingOrder, IpaSecurityModel, TestRawDataRecord}, + ipa::{ipa_in_the_clear, CappingOrder, IpaQueryStyle, IpaSecurityModel, TestRawDataRecord}, EventGenerator, EventGeneratorConfig, }, }; @@ -103,6 +103,8 @@ enum ReportCollectorCommand { }, /// Apply differential privacy noise to IPA inputs ApplyDpNoise(ApplyDpArgs), + /// Execute OPRF IPA in a semi-honest majority setting + OprfIpa(IpaQueryConfig), } #[derive(Debug, clap::Args)] @@ -134,6 +136,7 @@ async fn main() -> Result<(), Box> { IpaSecurityModel::SemiHonest, config, &clients, + IpaQueryStyle::SortInMpc, ) .await? } @@ -144,6 +147,7 @@ async fn main() -> Result<(), Box> { IpaSecurityModel::Malicious, config, &clients, + IpaQueryStyle::SortInMpc, ) .await? } @@ -153,6 +157,17 @@ async fn main() -> Result<(), Box> { gen_args, } => gen_inputs(count, seed, args.output_file, gen_args)?, ReportCollectorCommand::ApplyDpNoise(ref dp_args) => apply_dp_noise(&args, dp_args)?, + ReportCollectorCommand::OprfIpa(config) => { + ipa( + &args, + &network, + IpaSecurityModel::SemiHonest, + config, + &clients, + IpaQueryStyle::Oprf, + ) + .await? + } }; Ok(()) @@ -221,16 +236,23 @@ async fn ipa( security_model: IpaSecurityModel, ipa_query_config: IpaQueryConfig, helper_clients: &[MpcHelperClient; 3], + query_style: IpaQueryStyle, ) -> Result<(), Box> { let input = InputSource::from(&args.input); let query_type: QueryType; - match security_model { - IpaSecurityModel::SemiHonest => { + match (security_model, &query_style) { + (IpaSecurityModel::SemiHonest, IpaQueryStyle::SortInMpc) => { query_type = QueryType::SemiHonestIpa(ipa_query_config.clone()); } - IpaSecurityModel::Malicious => { + (IpaSecurityModel::Malicious, IpaQueryStyle::SortInMpc) => { query_type = QueryType::MaliciousIpa(ipa_query_config.clone()) } + (IpaSecurityModel::SemiHonest, IpaQueryStyle::Oprf) => { + query_type = QueryType::OprfIpa(ipa_query_config.clone()); + } + (IpaSecurityModel::Malicious, IpaQueryStyle::Oprf) => { + panic!("OPRF for malicious is not implemented as yet") + } }; let input_rows = input.iter::().collect::>(); @@ -247,7 +269,10 @@ async fn ipa( ipa_query_config.per_user_credit_cap, ipa_query_config.attribution_window_seconds, ipa_query_config.max_breakdown_key, - &CappingOrder::CapOldestFirst, + &(match query_style { + IpaQueryStyle::Oprf => CappingOrder::CapMostRecentFirst, + IpaQueryStyle::SortInMpc => CappingOrder::CapOldestFirst, + }), ); // pad the output vector to the max breakdown key, to make sure it is aligned with the MPC results @@ -260,18 +285,27 @@ async fn ipa( }; let mut key_registries = KeyRegistries::default(); - let actual = playbook_ipa::( - &input_rows, - &helper_clients, - query_id, - ipa_query_config, - key_registries.init_from(network), - ) - .await; - - tracing::info!("{m:?}", m = ipa_query_config); - - validate(&expected, &actual.breakdowns); + let actual = match query_style { + IpaQueryStyle::Oprf => { + playbook_oprf_ipa::( + input_rows, + &helper_clients, + query_id, + ipa_query_config, + ) + .await + } + IpaQueryStyle::SortInMpc => { + playbook_ipa::( + &input_rows, + &helper_clients, + query_id, + ipa_query_config, + key_registries.init_from(network), + ) + .await + } + }; if let Some(ref path) = args.output_file { // it will be sad to lose the results if file already exists. @@ -308,6 +342,10 @@ async fn ipa( write!(file, "{}", serde_json::to_string_pretty(&actual)?)?; } + tracing::info!("{m:?}", m = ipa_query_config); + + validate(&expected, &actual.breakdowns); + Ok(()) } diff --git a/src/cli/playbook/ipa.rs b/src/cli/playbook/ipa.rs index 9047baeb3..6c258f858 100644 --- a/src/cli/playbook/ipa.rs +++ b/src/cli/playbook/ipa.rs @@ -22,9 +22,9 @@ use crate::{ hpke::PublicKeyRegistry, ipa_test_input, net::MpcHelperClient, - protocol::{ipa::IPAInputRow, BreakdownKey, MatchKey, QueryId}, + protocol::{ipa::IPAInputRow, BreakdownKey, MatchKey, QueryId, Timestamp, TriggerValue}, query::QueryStatus, - report::{KeyIdentifier, Report}, + report::{KeyIdentifier, OprfReport, Report}, secret_sharing::{replicated::semi_honest::AdditiveShare, IntoShares}, test_fixture::{input::GenericReportTestInput, ipa::TestRawDataRecord, Reconstruct}, }; @@ -99,6 +99,57 @@ where let inputs = buffers.map(BodyStream::from); tracing::info!("Starting query after finishing encryption"); + + run_query_and_validate::(inputs, query_size, clients, query_id, query_config).await +} + +pub async fn playbook_oprf_ipa( + mut records: Vec, + clients: &[MpcHelperClient; 3], + query_id: QueryId, + query_config: IpaQueryConfig, +) -> IpaQueryResult +where + F: PrimeField, + AdditiveShare: Serializable, +{ + let mut buffers: [_; 3] = std::array::from_fn(|_| Vec::new()); + let query_size = records.len(); + + let sz = as Serializable>::Size::USIZE; + for buffer in &mut buffers { + buffer.resize(query_size * sz, 0u8); + } + + //TODO(richaj) This manual sorting will be removed once we have the PRF sharding in place. + //This does a stable sort. It also expects the inputs to be sorted by timestamp + records.sort_by(|a, b| b.user_id.cmp(&a.user_id)); + + let shares: [Vec>; 3] = + records.iter().cloned().share(); + zip(&mut buffers, shares).for_each(|(buf, shares)| { + for (share, chunk) in zip(shares, buf.chunks_mut(sz)) { + share.serialize(GenericArray::from_mut_slice(chunk)); + } + }); + + let inputs = buffers.map(BodyStream::from); + tracing::info!("Starting query for OPRF"); + + run_query_and_validate::(inputs, query_size, clients, query_id, query_config).await +} + +pub async fn run_query_and_validate( + inputs: [BodyStream; 3], + query_size: usize, + clients: &[MpcHelperClient; 3], + query_id: QueryId, + query_config: IpaQueryConfig, +) -> IpaQueryResult +where + F: PrimeField, + AdditiveShare: Serializable, +{ let mpc_time = Instant::now(); try_join_all( inputs @@ -143,12 +194,20 @@ where .reconstruct(); let lat = mpc_time.elapsed(); + tracing::info!("Running IPA for {query_size:?} records took {t:?}", t = lat); let mut breakdowns = vec![0; usize::try_from(query_config.max_breakdown_key).unwrap()]; for (breakdown_key, trigger_value) in results.into_iter().enumerate() { // TODO: make the data type used consistent with `ipa_in_the_clear` // I think using u32 is wrong, we should move to u128 - breakdowns[breakdown_key] += u32::try_from(trigger_value.as_u128()).unwrap(); + assert!( + breakdown_key < query_config.max_breakdown_key.try_into().unwrap() + || trigger_value == F::ZERO, + "trigger values were attributed to buckets more than max breakdown key" + ); + if breakdown_key < query_config.max_breakdown_key.try_into().unwrap() { + breakdowns[breakdown_key] += u32::try_from(trigger_value.as_u128()).unwrap(); + } } IpaQueryResult { diff --git a/src/cli/playbook/mod.rs b/src/cli/playbook/mod.rs index cdc47be00..4fe0ac6ca 100644 --- a/src/cli/playbook/mod.rs +++ b/src/cli/playbook/mod.rs @@ -11,7 +11,7 @@ pub use input::InputSource; pub use multiply::secure_mul; use tokio::time::sleep; -pub use self::ipa::playbook_ipa; +pub use self::ipa::{playbook_ipa, playbook_oprf_ipa}; use crate::{ config::{ClientConfig, NetworkConfig, PeerConfig}, net::{ClientIdentity, MpcHelperClient}, diff --git a/src/helpers/transport/query.rs b/src/helpers/transport/query.rs index ab4e0761b..6bb7a9ed9 100644 --- a/src/helpers/transport/query.rs +++ b/src/helpers/transport/query.rs @@ -206,6 +206,7 @@ pub enum QueryType { MaliciousIpa(IpaQueryConfig), SemiHonestSparseAggregate(SparseAggregateQueryConfig), MaliciousSparseAggregate(SparseAggregateQueryConfig), + OprfIpa(IpaQueryConfig), } impl QueryType { @@ -214,6 +215,7 @@ impl QueryType { pub const MALICIOUS_IPA_STR: &'static str = "malicious-ipa"; pub const SEMIHONEST_AGGREGATE_STR: &'static str = "semihonest-sparse-aggregate"; pub const MALICIOUS_AGGREGATE_STR: &'static str = "malicious-sparse-aggregate"; + pub const OPRF_IPA_STR: &'static str = "oprf_ipa"; } /// TODO: should this `AsRef` impl (used for `Substep`) take into account config of IPA? @@ -226,6 +228,7 @@ impl AsRef for QueryType { QueryType::MaliciousIpa(_) => Self::MALICIOUS_IPA_STR, QueryType::SemiHonestSparseAggregate(_) => Self::SEMIHONEST_AGGREGATE_STR, QueryType::MaliciousSparseAggregate(_) => Self::MALICIOUS_AGGREGATE_STR, + QueryType::OprfIpa(_) => Self::OPRF_IPA_STR, } } } diff --git a/src/net/http_serde.rs b/src/net/http_serde.rs index faa2ec13d..406ec9a48 100644 --- a/src/net/http_serde.rs +++ b/src/net/http_serde.rs @@ -139,6 +139,10 @@ pub mod query { let Query(q) = req.extract().await?; Ok(QueryType::MaliciousSparseAggregate(q)) } + QueryType::OPRF_IPA_STR => { + let Query(q) = req.extract().await?; + Ok(QueryType::OprfIpa(q)) + } other => Err(Error::bad_query_value("query_type", other)), }?; Ok(QueryConfigQueryParams(QueryConfig { @@ -161,7 +165,9 @@ pub mod query { match self.query_type { #[cfg(any(test, feature = "test-fixture", feature = "cli"))] QueryType::TestMultiply => Ok(()), - QueryType::SemiHonestIpa(config) | QueryType::MaliciousIpa(config) => { + QueryType::SemiHonestIpa(config) + | QueryType::MaliciousIpa(config) + | QueryType::OprfIpa(config) => { write!( f, "&per_user_credit_cap={}&max_breakdown_key={}&num_multi_bits={}", diff --git a/src/query/executor.rs b/src/query/executor.rs index d99e6df4f..4da5f7a6c 100644 --- a/src/query/executor.rs +++ b/src/query/executor.rs @@ -14,6 +14,7 @@ use rand_core::SeedableRng; use shuttle::future as tokio; use typenum::Unsigned; +use super::runner::OprfIpaQuery; #[cfg(any(test, feature = "cli", feature = "test-fixture"))] use crate::query::runner::execute_test_multiply; use crate::{ @@ -202,6 +203,33 @@ pub fn execute( }, ) } + (QueryType::OprfIpa(ipa_config), FieldType::Fp32BitPrime) => do_query( + config, + gateway, + input, + move |prss, gateway, config, input| { + let ctx = SemiHonestContext::new(prss, gateway); + Box::pin( + OprfIpaQuery::<_, Fp32BitPrime>::new(ipa_config) + .execute(ctx, config.size, input) + .then(|res| ready(res.map(|out| Box::new(out) as Box))), + ) + }, + ), + #[cfg(any(test, feature = "weak-field"))] + (QueryType::OprfIpa(ipa_config), FieldType::Fp31) => do_query( + config, + gateway, + input, + move |prss, gateway, config, input| { + let ctx = SemiHonestContext::new(prss, gateway); + Box::pin( + OprfIpaQuery::<_, Fp32BitPrime>::new(ipa_config) + .execute(ctx, config.size, input) + .then(|res| ready(res.map(|out| Box::new(out) as Box))), + ) + }, + ), } } diff --git a/src/query/runner/aggregate.rs b/src/query/runner/aggregate.rs index e0dee3550..23cd6e7ba 100644 --- a/src/query/runner/aggregate.rs +++ b/src/query/runner/aggregate.rs @@ -2,7 +2,6 @@ use std::marker::PhantomData; use futures_util::TryStreamExt; -use super::ipa::assert_stream_send; use crate::{ error::Error, ff::{Gf2, Gf8Bit, PrimeField, Serializable}, @@ -83,10 +82,9 @@ where let input = { //TODO: Replace `Gf8Bit` with an appropriate type specified by the config `contribution_bits` - let mut v = assert_stream_send(RecordsStream::< - SparseAggregateInputRow, - _, - >::new(input_stream)) + let mut v = RecordsStream::, _>::new( + input_stream, + ) .try_concat() .await?; v.truncate(sz); diff --git a/src/query/runner/ipa.rs b/src/query/runner/ipa.rs index 2b19ead23..ed4a3da43 100644 --- a/src/query/runner/ipa.rs +++ b/src/query/runner/ipa.rs @@ -2,7 +2,7 @@ use std::marker::PhantomData; use futures::{ stream::{iter, repeat}, - Stream, StreamExt, TryStreamExt, + StreamExt, TryStreamExt, }; use crate::{ @@ -89,19 +89,16 @@ where let sz = usize::from(query_size); let input = if config.plaintext_match_keys { - let mut v = assert_stream_send(RecordsStream::< - IPAInputRow, - _, - >::new(input_stream)) - .try_concat() - .await?; + let mut v = + RecordsStream::, _>::new(input_stream) + .try_concat() + .await?; v.truncate(sz); v } else { - assert_stream_send(LengthDelimitedStream::< - EncryptedReport, - _, - >::new(input_stream)) + LengthDelimitedStream::, _>::new( + input_stream, + ) .map_err(Into::::into) .map_ok(|enc_reports| { iter(enc_reports.into_iter().map(|enc_report| { @@ -147,16 +144,6 @@ where } } -/// Helps to convince the compiler that things are `Send`. Like `seq_join::assert_send`, but for -/// streams. -/// -/// -pub fn assert_stream_send<'a, T>( - st: impl Stream + Send + 'a, -) -> impl Stream + Send + 'a { - st -} - /// no dependency on `weak-field` feature because it is enabled in tests by default #[cfg(all(test, unit_test))] mod tests { diff --git a/src/query/runner/mod.rs b/src/query/runner/mod.rs index bcba34275..d9eb28f8f 100644 --- a/src/query/runner/mod.rs +++ b/src/query/runner/mod.rs @@ -1,12 +1,14 @@ mod aggregate; mod ipa; +mod oprf_ipa; + #[cfg(any(test, feature = "cli", feature = "test-fixture"))] mod test_multiply; #[cfg(any(test, feature = "cli", feature = "test-fixture"))] pub(super) use test_multiply::execute_test_multiply; -pub(super) use self::{aggregate::SparseAggregateQuery, ipa::IpaQuery}; +pub(super) use self::{aggregate::SparseAggregateQuery, ipa::IpaQuery, oprf_ipa::OprfIpaQuery}; use crate::{error::Error, query::ProtocolResult}; pub(super) type QueryResult = Result, Error>; diff --git a/src/query/runner/oprf_ipa.rs b/src/query/runner/oprf_ipa.rs new file mode 100644 index 000000000..e77a0300e --- /dev/null +++ b/src/query/runner/oprf_ipa.rs @@ -0,0 +1,116 @@ +use std::marker::PhantomData; + +use futures::TryStreamExt; + +use crate::{ + error::Error, + ff::{Field, Gf2, PrimeField, Serializable}, + helpers::{ + query::{IpaQueryConfig, QuerySize}, + BodyStream, RecordsStream, + }, + protocol::{ + basics::ShareKnownValue, + context::{UpgradableContext, UpgradedContext}, + prf_sharding::{attribution_and_capping_and_aggregation, PrfShardedIpaInputRow}, + BreakdownKey, Timestamp, TriggerValue, + }, + report::{EventType, OprfReport}, + secret_sharing::{ + replicated::{malicious::ExtendableField, semi_honest::AdditiveShare as Replicated}, + SharedValue, + }, +}; + +pub struct OprfIpaQuery { + config: IpaQueryConfig, + phantom_data: PhantomData<(C, F)>, +} + +impl OprfIpaQuery { + pub fn new(config: IpaQueryConfig) -> Self { + Self { + config, + phantom_data: PhantomData, + } + } +} + +impl OprfIpaQuery +where + C: UpgradableContext, + C::UpgradedContext: UpgradedContext>, + C::UpgradedContext: UpgradedContext>, + F: PrimeField + ExtendableField, + Replicated: Serializable + ShareKnownValue, + Replicated: Serializable + ShareKnownValue, +{ + #[tracing::instrument("oprf_ipa_query", skip_all, fields(sz=%query_size))] + pub async fn execute<'a>( + self, + ctx: C, + query_size: QuerySize, + input_stream: BodyStream, + ) -> Result>, Error> { + let Self { + config, + phantom_data: _, + } = self; + tracing::info!("New query: {config:?}"); + let sz = usize::from(query_size); + + let input = if config.plaintext_match_keys { + let mut v = RecordsStream::, _>::new( + input_stream, + ) + .try_concat() + .await?; + v.truncate(sz); + v + } else { + panic!("Encrypted match key handling is not handled for OPRF flow as yet"); + }; + + // TODO: Compute OPRFs and shuffle and add dummies and stuff (Daniel's code will be called here) + let sharded_input = input + .into_iter() + .map(|single_row| { + let is_trigger_bit_share = if single_row.event_type == EventType::Trigger { + Replicated::share_known_value(&ctx, Gf2::ONE) + } else { + Replicated::share_known_value(&ctx, Gf2::ZERO) + }; + PrfShardedIpaInputRow { + prf_of_match_key: single_row.mk_oprf, + is_trigger_bit: is_trigger_bit_share, + breakdown_key: single_row.breakdown_key, + trigger_value: single_row.trigger_value, + timestamp: single_row.timestamp, + } + }) + .collect::>(); + // Until then, we convert the output to something next function is happy about. + + let user_cap: i32 = config.per_user_credit_cap.try_into().unwrap(); + assert!( + user_cap & (user_cap - 1) == 0, + "This code only works for a user cap which is a power of 2" + ); + + attribution_and_capping_and_aggregation::< + C, + BreakdownKey, + TriggerValue, + Timestamp, + F, + _, + Replicated, + >( + ctx, + sharded_input, + user_cap.ilog2().try_into().unwrap(), + config.attribution_window_seconds, + ) + .await + } +} diff --git a/src/report.rs b/src/report.rs index 2b9cf1039..5c8ada0ae 100644 --- a/src/report.rs +++ b/src/report.rs @@ -1,14 +1,15 @@ use std::{ fmt::{Display, Formatter}, marker::PhantomData, - ops::Deref, + mem::size_of, + ops::{Add, Deref}, }; use bytes::{BufMut, Bytes}; -use generic_array::GenericArray; +use generic_array::{ArrayLength, GenericArray}; use hpke::Serializable as _; use rand_core::{CryptoRng, RngCore}; -use typenum::Unsigned; +use typenum::{Unsigned, U1, U8, U9}; use crate::{ ff::{GaloisField, Gf40Bit, Gf8Bit, PrimeField, Serializable}, @@ -47,6 +48,29 @@ pub enum EventType { Source, } +impl Serializable for EventType { + type Size = U1; + + fn serialize(&self, buf: &mut GenericArray) { + let raw: &[u8] = match self { + EventType::Trigger => &[0], + EventType::Source => &[1], + }; + buf.copy_from_slice(raw); + } + + fn deserialize(buf: &GenericArray) -> Self { + let mut buf_to = [0u8; 1]; + buf_to[..buf.len()].copy_from_slice(buf); + + match buf[0] { + 0 => EventType::Trigger, + 1 => EventType::Source, + 2_u8..=u8::MAX => panic!("Unreachable code"), + } + } +} + #[derive(Debug, PartialEq, Eq)] pub struct ParseEventTypeError(u8); @@ -400,6 +424,106 @@ where pub trigger_value: Replicated, } +impl Serializable for u64 { + type Size = U8; + + fn serialize(&self, buf: &mut GenericArray) { + let raw = &self.to_le_bytes()[..buf.len()]; + buf.copy_from_slice(raw); + } + + fn deserialize(buf: &GenericArray) -> Self { + let mut buf_to = [0u8; 8]; + buf_to[..buf.len()].copy_from_slice(buf); + u64::from_le_bytes(buf_to) + } +} + +impl Serializable for OprfReport +where + Replicated: Serializable, + Replicated: Serializable, + Replicated: Serializable, + as Serializable>::Size: Add, + as Serializable>::Size: + Add<< as Serializable>::Size as Add>::Output>, + as Serializable>::Size: Add< + < as Serializable>::Size as Add< + < as Serializable>::Size as Add>::Output, + >>::Output, + >, + < as Serializable>::Size as Add< + < as Serializable>::Size as Add< + < as Serializable>::Size as Add>::Output, + >>::Output, + >>::Output: ArrayLength, +{ + type Size = < as Serializable>::Size as Add< + < as Serializable>::Size as Add< + < as Serializable>::Size as Add>::Output, + >>::Output, + >>::Output; + + fn serialize(&self, buf: &mut GenericArray) { + let sizeof_u64 = size_of::(); + let sizeof_eventtype = size_of::(); + let ts_sz = as Serializable>::Size::USIZE; + let bk_sz = as Serializable>::Size::USIZE; + let tv_sz = as Serializable>::Size::USIZE; + + self.mk_oprf + .serialize(GenericArray::from_mut_slice(&mut buf[..sizeof_u64])); + + self.timestamp.serialize(GenericArray::from_mut_slice( + &mut buf[sizeof_u64..sizeof_u64 + ts_sz], + )); + + self.breakdown_key.serialize(GenericArray::from_mut_slice( + &mut buf[sizeof_u64 + ts_sz..sizeof_u64 + ts_sz + bk_sz], + )); + + self.trigger_value.serialize(GenericArray::from_mut_slice( + &mut buf[sizeof_u64 + ts_sz + bk_sz..sizeof_u64 + ts_sz + bk_sz + tv_sz], + )); + + self.event_type.serialize(GenericArray::from_mut_slice( + &mut buf[sizeof_u64 + ts_sz + bk_sz + tv_sz + ..sizeof_u64 + ts_sz + bk_sz + tv_sz + sizeof_eventtype], + )); + } + + fn deserialize(buf: &GenericArray) -> Self { + let sizeof_u64 = size_of::(); + let sizeof_eventtype = size_of::(); + + let ts_sz = as Serializable>::Size::USIZE; + let bk_sz = as Serializable>::Size::USIZE; + let tv_sz = as Serializable>::Size::USIZE; + + let mk_oprf = u64::deserialize(GenericArray::from_slice(&buf[..sizeof_u64])); + let timestamp = Replicated::::deserialize(GenericArray::from_slice( + &buf[sizeof_u64..sizeof_u64 + ts_sz], + )); + let breakdown_key = Replicated::::deserialize(GenericArray::from_slice( + &buf[sizeof_u64 + ts_sz..sizeof_u64 + ts_sz + bk_sz], + )); + let trigger_value = Replicated::::deserialize(GenericArray::from_slice( + &buf[sizeof_u64 + ts_sz + bk_sz..sizeof_u64 + ts_sz + bk_sz + tv_sz], + )); + let event_type = EventType::deserialize(GenericArray::from_slice( + &buf[sizeof_u64 + ts_sz + bk_sz + tv_sz + ..sizeof_u64 + ts_sz + bk_sz + tv_sz + sizeof_eventtype], + )); + Self { + timestamp, + mk_oprf, + event_type, + breakdown_key, + trigger_value, + } + } +} + #[cfg(all(test, unit_test))] mod test { use rand::{distributions::Alphanumeric, rngs::StdRng, Rng}; diff --git a/src/test_fixture/ipa.rs b/src/test_fixture/ipa.rs index a33b92e5b..e6f899b79 100644 --- a/src/test_fixture/ipa.rs +++ b/src/test_fixture/ipa.rs @@ -24,6 +24,11 @@ pub enum IpaSecurityModel { Malicious, } +pub enum IpaQueryStyle { + SortInMpc, + Oprf, +} + #[derive(Debug, Clone)] pub struct TestRawDataRecord { pub timestamp: u64,