From f27beaa197e1113f44dc3046612bef859e581a8e Mon Sep 17 00:00:00 2001 From: Andy Leiserson Date: Mon, 16 Dec 2024 17:30:33 -0800 Subject: [PATCH 1/3] Periodically report memory usage --- ipa-core/src/seq_join/multi_thread.rs | 9 ++++- ipa-core/src/telemetry/memory.rs | 56 +++++++++++++++++++++++++++ ipa-core/src/telemetry/mod.rs | 1 + 3 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 ipa-core/src/telemetry/memory.rs diff --git a/ipa-core/src/seq_join/multi_thread.rs b/ipa-core/src/seq_join/multi_thread.rs index 2ac8f458f..370d7630f 100644 --- a/ipa-core/src/seq_join/multi_thread.rs +++ b/ipa-core/src/seq_join/multi_thread.rs @@ -9,6 +9,8 @@ use futures::{stream::Fuse, Stream, StreamExt}; use pin_project::pin_project; use tracing::{Instrument, Span}; +use crate::telemetry::memory::periodic_memory_report; + #[cfg(feature = "shuttle")] mod shuttle_spawner { use std::future::Future; @@ -62,6 +64,7 @@ where #[pin] source: Fuse, capacity: usize, + spawned: usize, } impl SequentialFutures<'_, S, F> @@ -75,6 +78,7 @@ where spawner: unsafe { create_spawner() }, source: source.fuse(), capacity: active.get(), + spawned: 0, } } } @@ -103,11 +107,14 @@ where // a dependency between futures, pending one will never complete. // Cancellable futures will be cancelled when spawner is dropped which is // the behavior we want. - let task_index = this.spawner.len(); + let task_index = *this.spawned; this.spawner .spawn_cancellable(f.into_future().instrument(Span::current()), move || { panic!("SequentialFutures: spawned task {task_index} cancelled") }); + + periodic_memory_report(*this.spawned); + *this.spawned += 1; } else { break; } diff --git a/ipa-core/src/telemetry/memory.rs b/ipa-core/src/telemetry/memory.rs new file mode 100644 index 000000000..9d40bb45f --- /dev/null +++ b/ipa-core/src/telemetry/memory.rs @@ -0,0 +1,56 @@ +#[cfg(not(jemalloc))] +pub fn periodic_memory_report() { } + +#[cfg(jemalloc)] +pub use jemalloc::periodic_memory_report; + +#[cfg(jemalloc)] +mod jemalloc { + use std::sync::LazyLock; + + use tikv_jemalloc_ctl::{epoch_mib, stats::allocated_mib}; + + const MB: usize = 2 << 20; + + // In an unfortunate acronym collision, `mib` in the names of the jemalloc + // statistics stands for "Management Information Base", not "mebibytes". + // The reporting unit is bytes. + + static EPOCH: LazyLock = LazyLock::new(|| { + tikv_jemalloc_ctl::epoch::mib().unwrap() + }); + + static ALLOCATED: LazyLock = LazyLock::new(|| { + tikv_jemalloc_ctl::stats::allocated::mib().unwrap() + }); + + fn report_memory_usage(count: usize) { + // Some of the information jemalloc uses when reporting statistics is cached, and + // refreshed only upon advancing the epoch. + EPOCH.advance().unwrap(); + let allocated = ALLOCATED.read().unwrap() / MB; + tracing::debug!("i={count}: {allocated} MiB allocated"); + } + + fn should_print_report(count: usize) -> bool { + if count == 0 { + return true; + } + + let bits = count.ilog2(); + let report_interval_log2 = std::cmp::max(bits.saturating_sub(2), 8); + let report_interval_mask = (1 << report_interval_log2) - 1; + (count & report_interval_mask) == 0 + } + + /// Print a memory report periodically, based on the value of `count`. + /// + /// As `count` increases, so does the report interval. This results in + /// a tolerable amount of log messages for loops with many iterations, + /// while still providing some reporting for shorter loops. + pub fn periodic_memory_report(count: usize) { + if should_print_report(count) { + report_memory_usage(count); + } + } +} diff --git a/ipa-core/src/telemetry/mod.rs b/ipa-core/src/telemetry/mod.rs index aae6ea1d4..eb31325d7 100644 --- a/ipa-core/src/telemetry/mod.rs +++ b/ipa-core/src/telemetry/mod.rs @@ -1,3 +1,4 @@ +pub mod memory; pub mod stats; mod step_stats; From 71f9d79b143c076f560c2721829b42303b992446 Mon Sep 17 00:00:00 2001 From: Andy Leiserson Date: Mon, 16 Dec 2024 18:34:39 -0800 Subject: [PATCH 2/3] Better jemalloc controls. Reduce reporting frequency. --- ipa-core/benches/oneshot/ipa.rs | 16 ++++----- ipa-core/src/bin/helper.rs | 17 +++++---- ipa-core/src/lib.rs | 16 +++++++++ ipa-core/src/seq_join/local.rs | 7 ++++ ipa-core/src/seq_join/multi_thread.rs | 1 + ipa-core/src/telemetry/memory.rs | 50 +++++++++++++++++---------- 6 files changed, 72 insertions(+), 35 deletions(-) diff --git a/ipa-core/benches/oneshot/ipa.rs b/ipa-core/benches/oneshot/ipa.rs index 1f9eec376..5fd4ba52c 100644 --- a/ipa-core/benches/oneshot/ipa.rs +++ b/ipa-core/benches/oneshot/ipa.rs @@ -14,19 +14,12 @@ use ipa_core::{ ipa::{ipa_in_the_clear, test_oprf_ipa, CappingOrder, IpaSecurityModel}, EventGenerator, EventGeneratorConfig, TestWorld, TestWorldConfig, }, + use_jemalloc, }; use ipa_step::StepNarrow; use rand::{random, rngs::StdRng, SeedableRng}; use tokio::runtime::Builder; -#[cfg(jemalloc)] -#[global_allocator] -static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; - -#[cfg(feature = "dhat-heap")] -#[global_allocator] -static ALLOC: dhat::Alloc = dhat::Alloc; - /// A benchmark for the full IPA protocol. #[derive(Parser)] #[command(about, long_about = None)] @@ -165,6 +158,13 @@ async fn run(args: Args) -> Result<(), Error> { } fn main() -> Result<(), Error> { + #[cfg(jemalloc)] + use_jemalloc!(); + + #[cfg(feature = "dhat-heap")] + #[global_allocator] + static ALLOC: dhat::Alloc = dhat::Alloc; + #[cfg(feature = "dhat-heap")] let _profiler = dhat::Profiler::new_heap(); diff --git a/ipa-core/src/bin/helper.rs b/ipa-core/src/bin/helper.rs index 3fa554a6a..28ad19d1b 100644 --- a/ipa-core/src/bin/helper.rs +++ b/ipa-core/src/bin/helper.rs @@ -25,19 +25,11 @@ use ipa_core::{ ShardHttpTransport, }, sharding::ShardIndex, - AppConfig, AppSetup, NonZeroU32PowerOfTwo, + use_jemalloc, AppConfig, AppSetup, NonZeroU32PowerOfTwo, }; use tokio::runtime::Runtime; use tracing::{error, info}; -#[cfg(jemalloc)] -#[global_allocator] -static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; - -#[cfg(feature = "dhat-heap")] -#[global_allocator] -static ALLOC: dhat::Alloc = dhat::Alloc; - #[derive(Debug, Parser)] #[clap( name = "helper", @@ -369,6 +361,13 @@ fn new_query_runtime(logging_handle: &LoggingHandle) -> Runtime { /// runtimes to use in MPC queries and HTTP. #[tokio::main(flavor = "current_thread")] pub async fn main() { + #[cfg(jemalloc)] + use_jemalloc!(); + + #[cfg(feature = "dhat-heap")] + #[global_allocator] + static ALLOC: dhat::Alloc = dhat::Alloc; + let args = Args::parse(); let handle = args.logging.setup_logging(); diff --git a/ipa-core/src/lib.rs b/ipa-core/src/lib.rs index ce98182f3..188f8154b 100644 --- a/ipa-core/src/lib.rs +++ b/ipa-core/src/lib.rs @@ -32,6 +32,7 @@ mod seq_join; mod serde; pub mod sharding; pub mod utils; + pub use app::{AppConfig, HelperApp, Setup as AppSetup}; pub use utils::NonZeroU32PowerOfTwo; @@ -348,6 +349,21 @@ pub(crate) mod test_executor { pub const CRATE_NAME: &str = env!("CARGO_CRATE_NAME"); +/// This macro should be called in a binary that uses `ipa_core`, if that binary wishes +/// to use jemalloc. +/// +/// Besides declaring the `#[global_allocator]`, the macro also activates some memory +/// reporting. +#[macro_export] +macro_rules! use_jemalloc { + () => { + #[global_allocator] + static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + + $crate::telemetry::memory::jemalloc::activate(); + }; +} + #[macro_export] macro_rules! const_assert { ($x:expr $(,)?) => { diff --git a/ipa-core/src/seq_join/local.rs b/ipa-core/src/seq_join/local.rs index 89ba2ca91..951df9701 100644 --- a/ipa-core/src/seq_join/local.rs +++ b/ipa-core/src/seq_join/local.rs @@ -10,6 +10,8 @@ use std::{ use futures::{stream::Fuse, Future, Stream, StreamExt}; use pin_project::pin_project; +use crate::telemetry::memory::periodic_memory_report; + enum ActiveItem { Pending(Pin>), Resolved(F::Output), @@ -56,6 +58,7 @@ where #[pin] source: Fuse, active: VecDeque>, + spawned: usize, _marker: PhantomData &'unused ()>, } @@ -68,6 +71,7 @@ where Self { source: source.fuse(), active: VecDeque::with_capacity(active.get()), + spawned: 0, _marker: PhantomData, } } @@ -88,6 +92,8 @@ where if let Poll::Ready(Some(f)) = this.source.as_mut().poll_next(cx) { this.active .push_back(ActiveItem::Pending(Box::pin(f.into_future()))); + periodic_memory_report(*this.spawned); + *this.spawned += 1; } else { break; } @@ -104,6 +110,7 @@ where Poll::Pending } } else if this.source.is_done() { + periodic_memory_report(*this.spawned); Poll::Ready(None) } else { Poll::Pending diff --git a/ipa-core/src/seq_join/multi_thread.rs b/ipa-core/src/seq_join/multi_thread.rs index 370d7630f..d68253417 100644 --- a/ipa-core/src/seq_join/multi_thread.rs +++ b/ipa-core/src/seq_join/multi_thread.rs @@ -134,6 +134,7 @@ where None => None, }) } else if this.source.is_done() { + periodic_memory_report(*this.spawned); Poll::Ready(None) } else { Poll::Pending diff --git a/ipa-core/src/telemetry/memory.rs b/ipa-core/src/telemetry/memory.rs index 9d40bb45f..8db372275 100644 --- a/ipa-core/src/telemetry/memory.rs +++ b/ipa-core/src/telemetry/memory.rs @@ -1,12 +1,14 @@ -#[cfg(not(jemalloc))] -pub fn periodic_memory_report() { } +pub fn periodic_memory_report(count: usize) { + #[cfg(not(jemalloc))] + let _ = count; -#[cfg(jemalloc)] -pub use jemalloc::periodic_memory_report; + #[cfg(jemalloc)] + jemalloc::periodic_memory_report(count); +} #[cfg(jemalloc)] -mod jemalloc { - use std::sync::LazyLock; +pub mod jemalloc { + use std::sync::RwLock; use tikv_jemalloc_ctl::{epoch_mib, stats::allocated_mib}; @@ -16,19 +18,28 @@ mod jemalloc { // statistics stands for "Management Information Base", not "mebibytes". // The reporting unit is bytes. - static EPOCH: LazyLock = LazyLock::new(|| { - tikv_jemalloc_ctl::epoch::mib().unwrap() - }); + struct JemallocControls { + epoch: epoch_mib, + allocated: allocated_mib, + } + + static CONTROLS: RwLock> = RwLock::new(None); - static ALLOCATED: LazyLock = LazyLock::new(|| { - tikv_jemalloc_ctl::stats::allocated::mib().unwrap() - }); + /// Activates periodic memory usage reporting during `seq_join`. + pub fn activate() { + let mut controls = CONTROLS.write().unwrap(); + + let epoch = tikv_jemalloc_ctl::epoch::mib().unwrap(); + let allocated = tikv_jemalloc_ctl::stats::allocated::mib().unwrap(); + + *controls = Some(JemallocControls { epoch, allocated }); + } - fn report_memory_usage(count: usize) { + fn report_memory_usage(controls: &JemallocControls, count: usize) { // Some of the information jemalloc uses when reporting statistics is cached, and // refreshed only upon advancing the epoch. - EPOCH.advance().unwrap(); - let allocated = ALLOCATED.read().unwrap() / MB; + controls.epoch.advance().unwrap(); + let allocated = controls.allocated.read().unwrap() / MB; tracing::debug!("i={count}: {allocated} MiB allocated"); } @@ -38,7 +49,7 @@ mod jemalloc { } let bits = count.ilog2(); - let report_interval_log2 = std::cmp::max(bits.saturating_sub(2), 8); + let report_interval_log2 = std::cmp::max(bits.saturating_sub(1), 8); let report_interval_mask = (1 << report_interval_log2) - 1; (count & report_interval_mask) == 0 } @@ -49,8 +60,11 @@ mod jemalloc { /// a tolerable amount of log messages for loops with many iterations, /// while still providing some reporting for shorter loops. pub fn periodic_memory_report(count: usize) { - if should_print_report(count) { - report_memory_usage(count); + let controls_opt = CONTROLS.read().unwrap(); + if let Some(controls) = controls_opt.as_ref() { + if should_print_report(count) { + report_memory_usage(controls, count); + } } } } From 007630e32ec282fbabcec17e02a4e6f3d05613b9 Mon Sep 17 00:00:00 2001 From: Andy Leiserson Date: Thu, 19 Dec 2024 15:08:38 -0800 Subject: [PATCH 3/3] Fix conditional compilation issues --- ipa-core/benches/oneshot/ipa.rs | 3 +-- ipa-core/src/bin/helper.rs | 4 ++-- ipa-core/src/telemetry/memory.rs | 6 ++++++ 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/ipa-core/benches/oneshot/ipa.rs b/ipa-core/benches/oneshot/ipa.rs index 5fd4ba52c..cb2d963d2 100644 --- a/ipa-core/benches/oneshot/ipa.rs +++ b/ipa-core/benches/oneshot/ipa.rs @@ -14,7 +14,6 @@ use ipa_core::{ ipa::{ipa_in_the_clear, test_oprf_ipa, CappingOrder, IpaSecurityModel}, EventGenerator, EventGeneratorConfig, TestWorld, TestWorldConfig, }, - use_jemalloc, }; use ipa_step::StepNarrow; use rand::{random, rngs::StdRng, SeedableRng}; @@ -159,7 +158,7 @@ async fn run(args: Args) -> Result<(), Error> { fn main() -> Result<(), Error> { #[cfg(jemalloc)] - use_jemalloc!(); + ipa_core::use_jemalloc!(); #[cfg(feature = "dhat-heap")] #[global_allocator] diff --git a/ipa-core/src/bin/helper.rs b/ipa-core/src/bin/helper.rs index 28ad19d1b..3b62653a4 100644 --- a/ipa-core/src/bin/helper.rs +++ b/ipa-core/src/bin/helper.rs @@ -25,7 +25,7 @@ use ipa_core::{ ShardHttpTransport, }, sharding::ShardIndex, - use_jemalloc, AppConfig, AppSetup, NonZeroU32PowerOfTwo, + AppConfig, AppSetup, NonZeroU32PowerOfTwo, }; use tokio::runtime::Runtime; use tracing::{error, info}; @@ -362,7 +362,7 @@ fn new_query_runtime(logging_handle: &LoggingHandle) -> Runtime { #[tokio::main(flavor = "current_thread")] pub async fn main() { #[cfg(jemalloc)] - use_jemalloc!(); + ipa_core::use_jemalloc!(); #[cfg(feature = "dhat-heap")] #[global_allocator] diff --git a/ipa-core/src/telemetry/memory.rs b/ipa-core/src/telemetry/memory.rs index 8db372275..d43e791f1 100644 --- a/ipa-core/src/telemetry/memory.rs +++ b/ipa-core/src/telemetry/memory.rs @@ -26,6 +26,9 @@ pub mod jemalloc { static CONTROLS: RwLock> = RwLock::new(None); /// Activates periodic memory usage reporting during `seq_join`. + /// + /// # Panics + /// If `RwLock` is poisoned. pub fn activate() { let mut controls = CONTROLS.write().unwrap(); @@ -59,6 +62,9 @@ pub mod jemalloc { /// As `count` increases, so does the report interval. This results in /// a tolerable amount of log messages for loops with many iterations, /// while still providing some reporting for shorter loops. + /// + /// # Panics + /// If `RwLock` is poisoned. pub fn periodic_memory_report(count: usize) { let controls_opt = CONTROLS.read().unwrap(); if let Some(controls) = controls_opt.as_ref() {