Skip to content

Commit

Permalink
Set up separate runtimes for HTTP and MPC queries (#1338)
Browse files Browse the repository at this point in the history
* Use separate runtimes for HTTP and MPC queries

* Start MPC server inside HTTP runtime

* Run HTTP client tasks in separate runtime

* Fix compilation errors

* Fix more compilation errors

* Fix more compilation errors
  • Loading branch information
akoshelev authored Oct 11, 2024
1 parent 9c28dde commit 726a549
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 69 deletions.
69 changes: 64 additions & 5 deletions ipa-core/src/bin/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use ipa_core::{
},
config::{hpke_registry, HpkeServerConfig, NetworkConfig, ServerConfig, TlsConfig},
error::BoxError,
executor::IpaRuntime,
helpers::HelperIdentity,
net::{ClientIdentity, HttpShardTransport, HttpTransport, MpcHelperClient},
AppConfig, AppSetup, NonZeroU32PowerOfTwo,
};
use tokio::runtime::Runtime;
use tracing::{error, info};

#[cfg(all(not(target_env = "msvc"), not(target_os = "macos")))]
Expand Down Expand Up @@ -133,9 +135,12 @@ async fn server(args: ServerArgs) -> Result<(), BoxError> {
private_key_file: sk_path,
});

let query_runtime = new_query_runtime();
let app_config = AppConfig::default()
.with_key_registry(hpke_registry(mk_encryption.as_ref()).await?)
.with_active_work(args.active_work);
.with_active_work(args.active_work)
.with_runtime(IpaRuntime::from_tokio_runtime(&query_runtime));

let (setup, handler) = AppSetup::new(app_config);

let server_config = ServerConfig {
Expand All @@ -153,9 +158,14 @@ async fn server(args: ServerArgs) -> Result<(), BoxError> {
let network_config_path = args.network.as_deref().unwrap();
let network_config = NetworkConfig::from_toml_str(&fs::read_to_string(network_config_path)?)?
.override_scheme(&scheme);
let clients = MpcHelperClient::from_conf(&network_config, &identity);

let http_runtime = new_http_runtime();
let clients = MpcHelperClient::from_conf(
&IpaRuntime::from_tokio_runtime(&http_runtime),
&network_config,
&identity,
);
let (transport, server) = HttpTransport::new(
IpaRuntime::from_tokio_runtime(&http_runtime),
my_identity,
server_config,
network_config,
Expand Down Expand Up @@ -183,18 +193,67 @@ async fn server(args: ServerArgs) -> Result<(), BoxError> {

let (_addr, server_handle) = server
.start_on(
&IpaRuntime::from_tokio_runtime(&http_runtime),
listener,
// TODO, trace based on the content of the query.
None as Option<()>,
)
.await;

server_handle.await?;
server_handle.await;
[query_runtime, http_runtime].map(Runtime::shutdown_background);

Ok(())
}

#[tokio::main]
/// Creates a new runtime for HTTP stack. It is useful to provide a dedicated
/// scheduler to HTTP tasks, to make sure IPA server can respond to requests,
/// if for some reason query runtime becomes overloaded.
/// When multi-threading feature is enabled it creates a runtime with thread-per-core,
/// otherwise a single-threaded runtime is created.
fn new_http_runtime() -> Runtime {
if cfg!(feature = "multi-threading") {
tokio::runtime::Builder::new_multi_thread()
.thread_name("http-worker")
.enable_all()
.build()
.unwrap()
} else {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.thread_name("http-worker")
.enable_all()
.build()
.unwrap()
}
}

/// This function creates a runtime suitable for executing MPC queries.
/// When multi-threading feature is enabled it creates a runtime with thread-per-core,
/// otherwise a single-threaded runtime is created.
fn new_query_runtime() -> Runtime {
// it is intentional that IO driver is not enabled here (enable_time() call only).
// query runtime is supposed to use CPU/memory only, no writes to disk and all
// network communication is handled by HTTP runtime.
if cfg!(feature = "multi-threading") {
tokio::runtime::Builder::new_multi_thread()
.thread_name("query-executor")
.enable_time()
.build()
.unwrap()
} else {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.thread_name("query-executor")
.enable_time()
.build()
.unwrap()
}
}

/// A single thread is enough here, because server spawns additional
/// runtimes to use in MPC queries and HTTP.
#[tokio::main(flavor = "current_thread")]
pub async fn main() {
let args = Args::parse();
let _handle = args.logging.setup_logging();
Expand Down
4 changes: 3 additions & 1 deletion ipa-core/src/cli/playbook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tokio::time::sleep;
pub use self::ipa::{playbook_oprf_ipa, run_query_and_validate};
use crate::{
config::{ClientConfig, NetworkConfig, PeerConfig},
executor::IpaRuntime,
ff::boolean_array::{BA20, BA3, BA8},
helpers::query::DpMechanism,
net::{ClientIdentity, MpcHelperClient},
Expand Down Expand Up @@ -211,7 +212,8 @@ pub async fn make_clients(

// Note: This closure is only called when the selected action uses clients.

let clients = MpcHelperClient::from_conf(&network, &ClientIdentity::None);
let clients =
MpcHelperClient::from_conf(&IpaRuntime::current(), &network, &ClientIdentity::None);
while wait > 0 && !clients_ready(&clients).await {
tracing::debug!("waiting for servers to come up");
sleep(Duration::from_secs(1)).await;
Expand Down
132 changes: 123 additions & 9 deletions ipa-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ pub(crate) mod rand {

#[cfg(all(feature = "shuttle", test))]
pub(crate) mod task {
pub use shuttle::future::{JoinError, JoinHandle};
pub use shuttle::future::JoinError;
}

#[cfg(all(feature = "multi-threading", feature = "shuttle"))]
#[cfg(feature = "shuttle")]
pub(crate) mod shim {
use std::any::Any;

Expand All @@ -100,9 +100,16 @@ pub(crate) mod task {

#[cfg(not(feature = "shuttle"))]
pub mod executor {
use std::future::Future;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use tokio::{runtime::Handle, task::JoinHandle};
use tokio::{
runtime::{Handle, Runtime},
task::JoinHandle,
};

/// In prod we use Tokio scheduler, so this struct just wraps
/// its runtime handle and mimics the standard executor API.
Expand All @@ -112,7 +119,8 @@ pub mod executor {
pub struct IpaRuntime(Handle);

/// Wrapper around Tokio's [`JoinHandle`]
pub struct IpaJoinHandle<T>(JoinHandle<T>);
#[pin_project::pin_project]
pub struct IpaJoinHandle<T>(#[pin] JoinHandle<T>);

impl Default for IpaRuntime {
fn default() -> Self {
Expand All @@ -134,26 +142,82 @@ pub mod executor {
{
IpaJoinHandle(self.0.spawn(future))
}

/// This is a convenience method to convert a Tokio runtime into
/// an IPA runtime. It does not assume ownership of the Tokio runtime.
/// The caller is responsible for ensuring the Tokio runtime is properly
/// shut down.
#[must_use]
pub fn from_tokio_runtime(rt: &Runtime) -> Self {
Self(rt.handle().clone())
}
}

/// allow using [`IpaRuntime`] as Hyper executor
#[cfg(feature = "web-app")]
impl<Fut> hyper::rt::Executor<Fut> for IpaRuntime
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
fn execute(&self, fut: Fut) {
// Dropping the handle does not terminate the task
// Clippy wants us to be explicit here.
drop(self.spawn(fut));
}
}

impl<T> IpaJoinHandle<T> {
pub fn abort(self) {
pub fn abort(&self) {
self.0.abort();
}
}

impl<T: Send + 'static> Future for IpaJoinHandle<T> {
type Output = T;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().0.poll(cx) {
Poll::Ready(Ok(v)) => Poll::Ready(v),
Poll::Ready(Err(e)) => match e.try_into_panic() {
Ok(p) => std::panic::resume_unwind(p),
Err(e) => panic!("Task is cancelled: {e:?}"),
},
Poll::Pending => Poll::Pending,
}
}
}
}

#[cfg(feature = "shuttle")]
pub(crate) mod executor {
use std::future::Future;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

use shuttle_crate::future::{spawn, JoinHandle};

use crate::shim::Tokio;

/// Shuttle does not support more than one runtime
/// so we always use its default
#[derive(Clone, Default)]
pub struct IpaRuntime;
pub struct IpaJoinHandle<T>(JoinHandle<T>);
#[pin_project::pin_project]
pub struct IpaJoinHandle<T>(#[pin] JoinHandle<T>);

#[cfg(feature = "web-app")]
impl<Fut> hyper::rt::Executor<Fut> for IpaRuntime
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
fn execute(&self, fut: Fut) {
drop(self.spawn(fut));
}
}

impl IpaRuntime {
#[must_use]
Expand All @@ -173,10 +237,25 @@ pub(crate) mod executor {
}

impl<T> IpaJoinHandle<T> {
pub fn abort(self) {
pub fn abort(&self) {
self.0.abort();
}
}

impl<T: Send + 'static> Future for IpaJoinHandle<T> {
type Output = T;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().0.poll(cx) {
Poll::Ready(Ok(v)) => Poll::Ready(v),
Poll::Ready(Err(e)) => match e.try_into_panic() {
Ok(p) => std::panic::resume_unwind(p),
Err(e) => panic!("Task is cancelled: {e:?}"),
},
Poll::Pending => Poll::Pending,
}
}
}
}

#[cfg(all(feature = "shuttle", test))]
Expand Down Expand Up @@ -265,3 +344,38 @@ macro_rules! mutually_incompatible {
}

mutually_incompatible!("in-memory-infra", "real-world-infra");

#[cfg(test)]
mod tests {
/// Tests in this module ensure both Shuttle and Tokio runtimes conform to the same API
mod executor {
use crate::{executor::IpaRuntime, test_executor::run};

#[test]
#[should_panic(expected = "task panicked")]
fn handle_join_panicked() {
run(|| async move {
let rt = IpaRuntime::current();
rt.spawn(async { panic!("task panicked") }).await;
});
}

#[test]
/// It is nearly impossible to intentionally hang a Shuttle task. Its executor
/// detects that immediately and panics with a deadlock error. We only want to test
/// the API, so it is not that important to panic with cancellation error
#[cfg_attr(not(feature = "shuttle"), should_panic(expected = "Task is cancelled"))]
fn handle_abort() {
run(|| async move {
let rt = IpaRuntime::current();
let handle = rt.spawn(async {
#[cfg(not(feature = "shuttle"))]
futures::future::pending::<()>().await;
});

handle.abort();
handle.await;
});
}
}
}
Loading

0 comments on commit 726a549

Please sign in to comment.