Skip to content

Commit

Permalink
Parallelize decryption of reports
Browse files Browse the repository at this point in the history
  • Loading branch information
andyleiserson committed Dec 20, 2024
1 parent fc61e53 commit 6a65171
Showing 1 changed file with 16 additions and 17 deletions.
33 changes: 16 additions & 17 deletions ipa-core/src/query/runner/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
sync::Arc,
};

use futures::{stream::iter, StreamExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt};
use generic_array::ArrayLength;

use super::QueryResult;
Expand All @@ -20,7 +20,9 @@ use crate::{
},
helpers::{
query::{DpMechanism, HybridQueryParams, QueryConfig, QuerySize},
setup_cross_shard_prss, BodyStream, Gateway, LengthDelimitedStream,
setup_cross_shard_prss,
stream::TryFlattenItersExt,
BodyStream, Gateway, LengthDelimitedStream,
},
hpke::PrivateKeyRegistry,
protocol::{
Expand Down Expand Up @@ -105,7 +107,7 @@ where
config,
key_registry,
phantom_data: _,
} = self;
} = &self;

tracing::info!("New hybrid query: {config:?}");
let ctx = ctx.narrow(&Hybrid);
Expand All @@ -118,21 +120,18 @@ where
}

let stream = LengthDelimitedStream::<EncryptedHybridReport<BA8, BA3>, _>::new(input_stream)
.map_err(Into::<Error>::into)
.map_ok(|enc_reports| {
iter(enc_reports.into_iter().map({
|enc_report| {
let dec_report = enc_report
.decrypt(key_registry.as_ref())
.map_err(Into::<Error>::into);
let unique_tag = UniqueTag::from_unique_bytes(&enc_report);
dec_report.map(|dec_report1| (dec_report1, unique_tag))
}
}))
.map_err(Into::into)
.try_flatten_iters()
.map(|enc_report_res| async move {
enc_report_res.and_then(|enc_report| {
let dec_report = enc_report
.decrypt(key_registry.as_ref())
.map_err(Into::<Error>::into);
let unique_tag = UniqueTag::from_unique_bytes(&enc_report);
dec_report.map(|dec_report1| (dec_report1, unique_tag))
})
})
.try_flatten()
.take(sz)
.map(|v| async move { v });
.take(sz);

let (decrypted_reports, resharded_tags) = reshard_aad(
ctx.narrow(&HybridStep::ReshardByTag),
Expand Down

0 comments on commit 6a65171

Please sign in to comment.