diff --git a/ipa-core/benches/oneshot/ipa.rs b/ipa-core/benches/oneshot/ipa.rs index 1f9eec376..cb2d963d2 100644 --- a/ipa-core/benches/oneshot/ipa.rs +++ b/ipa-core/benches/oneshot/ipa.rs @@ -19,14 +19,6 @@ use ipa_step::StepNarrow; use rand::{random, rngs::StdRng, SeedableRng}; use tokio::runtime::Builder; -#[cfg(jemalloc)] -#[global_allocator] -static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; - -#[cfg(feature = "dhat-heap")] -#[global_allocator] -static ALLOC: dhat::Alloc = dhat::Alloc; - /// A benchmark for the full IPA protocol. #[derive(Parser)] #[command(about, long_about = None)] @@ -165,6 +157,13 @@ async fn run(args: Args) -> Result<(), Error> { } fn main() -> Result<(), Error> { + #[cfg(jemalloc)] + ipa_core::use_jemalloc!(); + + #[cfg(feature = "dhat-heap")] + #[global_allocator] + static ALLOC: dhat::Alloc = dhat::Alloc; + #[cfg(feature = "dhat-heap")] let _profiler = dhat::Profiler::new_heap(); diff --git a/ipa-core/src/bin/helper.rs b/ipa-core/src/bin/helper.rs index 3fa554a6a..3b62653a4 100644 --- a/ipa-core/src/bin/helper.rs +++ b/ipa-core/src/bin/helper.rs @@ -30,14 +30,6 @@ use ipa_core::{ use tokio::runtime::Runtime; use tracing::{error, info}; -#[cfg(jemalloc)] -#[global_allocator] -static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; - -#[cfg(feature = "dhat-heap")] -#[global_allocator] -static ALLOC: dhat::Alloc = dhat::Alloc; - #[derive(Debug, Parser)] #[clap( name = "helper", @@ -369,6 +361,13 @@ fn new_query_runtime(logging_handle: &LoggingHandle) -> Runtime { /// runtimes to use in MPC queries and HTTP. #[tokio::main(flavor = "current_thread")] pub async fn main() { + #[cfg(jemalloc)] + ipa_core::use_jemalloc!(); + + #[cfg(feature = "dhat-heap")] + #[global_allocator] + static ALLOC: dhat::Alloc = dhat::Alloc; + let args = Args::parse(); let handle = args.logging.setup_logging(); diff --git a/ipa-core/src/lib.rs b/ipa-core/src/lib.rs index ce98182f3..188f8154b 100644 --- a/ipa-core/src/lib.rs +++ b/ipa-core/src/lib.rs @@ -32,6 +32,7 @@ mod seq_join; mod serde; pub mod sharding; pub mod utils; + pub use app::{AppConfig, HelperApp, Setup as AppSetup}; pub use utils::NonZeroU32PowerOfTwo; @@ -348,6 +349,21 @@ pub(crate) mod test_executor { pub const CRATE_NAME: &str = env!("CARGO_CRATE_NAME"); +/// This macro should be called in a binary that uses `ipa_core`, if that binary wishes +/// to use jemalloc. +/// +/// Besides declaring the `#[global_allocator]`, the macro also activates some memory +/// reporting. +#[macro_export] +macro_rules! use_jemalloc { + () => { + #[global_allocator] + static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + + $crate::telemetry::memory::jemalloc::activate(); + }; +} + #[macro_export] macro_rules! const_assert { ($x:expr $(,)?) => { diff --git a/ipa-core/src/seq_join/local.rs b/ipa-core/src/seq_join/local.rs index 89ba2ca91..951df9701 100644 --- a/ipa-core/src/seq_join/local.rs +++ b/ipa-core/src/seq_join/local.rs @@ -10,6 +10,8 @@ use std::{ use futures::{stream::Fuse, Future, Stream, StreamExt}; use pin_project::pin_project; +use crate::telemetry::memory::periodic_memory_report; + enum ActiveItem { Pending(Pin>), Resolved(F::Output), @@ -56,6 +58,7 @@ where #[pin] source: Fuse, active: VecDeque>, + spawned: usize, _marker: PhantomData &'unused ()>, } @@ -68,6 +71,7 @@ where Self { source: source.fuse(), active: VecDeque::with_capacity(active.get()), + spawned: 0, _marker: PhantomData, } } @@ -88,6 +92,8 @@ where if let Poll::Ready(Some(f)) = this.source.as_mut().poll_next(cx) { this.active .push_back(ActiveItem::Pending(Box::pin(f.into_future()))); + periodic_memory_report(*this.spawned); + *this.spawned += 1; } else { break; } @@ -104,6 +110,7 @@ where Poll::Pending } } else if this.source.is_done() { + periodic_memory_report(*this.spawned); Poll::Ready(None) } else { Poll::Pending diff --git a/ipa-core/src/seq_join/multi_thread.rs b/ipa-core/src/seq_join/multi_thread.rs index 2ac8f458f..d68253417 100644 --- a/ipa-core/src/seq_join/multi_thread.rs +++ b/ipa-core/src/seq_join/multi_thread.rs @@ -9,6 +9,8 @@ use futures::{stream::Fuse, Stream, StreamExt}; use pin_project::pin_project; use tracing::{Instrument, Span}; +use crate::telemetry::memory::periodic_memory_report; + #[cfg(feature = "shuttle")] mod shuttle_spawner { use std::future::Future; @@ -62,6 +64,7 @@ where #[pin] source: Fuse, capacity: usize, + spawned: usize, } impl SequentialFutures<'_, S, F> @@ -75,6 +78,7 @@ where spawner: unsafe { create_spawner() }, source: source.fuse(), capacity: active.get(), + spawned: 0, } } } @@ -103,11 +107,14 @@ where // a dependency between futures, pending one will never complete. // Cancellable futures will be cancelled when spawner is dropped which is // the behavior we want. - let task_index = this.spawner.len(); + let task_index = *this.spawned; this.spawner .spawn_cancellable(f.into_future().instrument(Span::current()), move || { panic!("SequentialFutures: spawned task {task_index} cancelled") }); + + periodic_memory_report(*this.spawned); + *this.spawned += 1; } else { break; } @@ -127,6 +134,7 @@ where None => None, }) } else if this.source.is_done() { + periodic_memory_report(*this.spawned); Poll::Ready(None) } else { Poll::Pending diff --git a/ipa-core/src/telemetry/memory.rs b/ipa-core/src/telemetry/memory.rs new file mode 100644 index 000000000..d43e791f1 --- /dev/null +++ b/ipa-core/src/telemetry/memory.rs @@ -0,0 +1,76 @@ +pub fn periodic_memory_report(count: usize) { + #[cfg(not(jemalloc))] + let _ = count; + + #[cfg(jemalloc)] + jemalloc::periodic_memory_report(count); +} + +#[cfg(jemalloc)] +pub mod jemalloc { + use std::sync::RwLock; + + use tikv_jemalloc_ctl::{epoch_mib, stats::allocated_mib}; + + const MB: usize = 2 << 20; + + // In an unfortunate acronym collision, `mib` in the names of the jemalloc + // statistics stands for "Management Information Base", not "mebibytes". + // The reporting unit is bytes. + + struct JemallocControls { + epoch: epoch_mib, + allocated: allocated_mib, + } + + static CONTROLS: RwLock> = RwLock::new(None); + + /// Activates periodic memory usage reporting during `seq_join`. + /// + /// # Panics + /// If `RwLock` is poisoned. + pub fn activate() { + let mut controls = CONTROLS.write().unwrap(); + + let epoch = tikv_jemalloc_ctl::epoch::mib().unwrap(); + let allocated = tikv_jemalloc_ctl::stats::allocated::mib().unwrap(); + + *controls = Some(JemallocControls { epoch, allocated }); + } + + fn report_memory_usage(controls: &JemallocControls, count: usize) { + // Some of the information jemalloc uses when reporting statistics is cached, and + // refreshed only upon advancing the epoch. + controls.epoch.advance().unwrap(); + let allocated = controls.allocated.read().unwrap() / MB; + tracing::debug!("i={count}: {allocated} MiB allocated"); + } + + fn should_print_report(count: usize) -> bool { + if count == 0 { + return true; + } + + let bits = count.ilog2(); + let report_interval_log2 = std::cmp::max(bits.saturating_sub(1), 8); + let report_interval_mask = (1 << report_interval_log2) - 1; + (count & report_interval_mask) == 0 + } + + /// Print a memory report periodically, based on the value of `count`. + /// + /// As `count` increases, so does the report interval. This results in + /// a tolerable amount of log messages for loops with many iterations, + /// while still providing some reporting for shorter loops. + /// + /// # Panics + /// If `RwLock` is poisoned. + pub fn periodic_memory_report(count: usize) { + let controls_opt = CONTROLS.read().unwrap(); + if let Some(controls) = controls_opt.as_ref() { + if should_print_report(count) { + report_memory_usage(controls, count); + } + } + } +} diff --git a/ipa-core/src/telemetry/mod.rs b/ipa-core/src/telemetry/mod.rs index aae6ea1d4..eb31325d7 100644 --- a/ipa-core/src/telemetry/mod.rs +++ b/ipa-core/src/telemetry/mod.rs @@ -1,3 +1,4 @@ +pub mod memory; pub mod stats; mod step_stats;