From 3fb1f35209327577a8db6cda8eeb93de8350dbe4 Mon Sep 17 00:00:00 2001 From: Andy Leiserson Date: Mon, 16 Dec 2024 17:30:33 -0800 Subject: [PATCH] Periodically report memory usage --- ipa-core/src/seq_join/multi_thread.rs | 9 ++++- ipa-core/src/telemetry/memory.rs | 56 +++++++++++++++++++++++++++ ipa-core/src/telemetry/mod.rs | 1 + 3 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 ipa-core/src/telemetry/memory.rs diff --git a/ipa-core/src/seq_join/multi_thread.rs b/ipa-core/src/seq_join/multi_thread.rs index 2ac8f458f..370d7630f 100644 --- a/ipa-core/src/seq_join/multi_thread.rs +++ b/ipa-core/src/seq_join/multi_thread.rs @@ -9,6 +9,8 @@ use futures::{stream::Fuse, Stream, StreamExt}; use pin_project::pin_project; use tracing::{Instrument, Span}; +use crate::telemetry::memory::periodic_memory_report; + #[cfg(feature = "shuttle")] mod shuttle_spawner { use std::future::Future; @@ -62,6 +64,7 @@ where #[pin] source: Fuse, capacity: usize, + spawned: usize, } impl SequentialFutures<'_, S, F> @@ -75,6 +78,7 @@ where spawner: unsafe { create_spawner() }, source: source.fuse(), capacity: active.get(), + spawned: 0, } } } @@ -103,11 +107,14 @@ where // a dependency between futures, pending one will never complete. // Cancellable futures will be cancelled when spawner is dropped which is // the behavior we want. - let task_index = this.spawner.len(); + let task_index = *this.spawned; this.spawner .spawn_cancellable(f.into_future().instrument(Span::current()), move || { panic!("SequentialFutures: spawned task {task_index} cancelled") }); + + periodic_memory_report(*this.spawned); + *this.spawned += 1; } else { break; } diff --git a/ipa-core/src/telemetry/memory.rs b/ipa-core/src/telemetry/memory.rs new file mode 100644 index 000000000..9d40bb45f --- /dev/null +++ b/ipa-core/src/telemetry/memory.rs @@ -0,0 +1,56 @@ +#[cfg(not(jemalloc))] +pub fn periodic_memory_report() { } + +#[cfg(jemalloc)] +pub use jemalloc::periodic_memory_report; + +#[cfg(jemalloc)] +mod jemalloc { + use std::sync::LazyLock; + + use tikv_jemalloc_ctl::{epoch_mib, stats::allocated_mib}; + + const MB: usize = 2 << 20; + + // In an unfortunate acronym collision, `mib` in the names of the jemalloc + // statistics stands for "Management Information Base", not "mebibytes". + // The reporting unit is bytes. + + static EPOCH: LazyLock = LazyLock::new(|| { + tikv_jemalloc_ctl::epoch::mib().unwrap() + }); + + static ALLOCATED: LazyLock = LazyLock::new(|| { + tikv_jemalloc_ctl::stats::allocated::mib().unwrap() + }); + + fn report_memory_usage(count: usize) { + // Some of the information jemalloc uses when reporting statistics is cached, and + // refreshed only upon advancing the epoch. + EPOCH.advance().unwrap(); + let allocated = ALLOCATED.read().unwrap() / MB; + tracing::debug!("i={count}: {allocated} MiB allocated"); + } + + fn should_print_report(count: usize) -> bool { + if count == 0 { + return true; + } + + let bits = count.ilog2(); + let report_interval_log2 = std::cmp::max(bits.saturating_sub(2), 8); + let report_interval_mask = (1 << report_interval_log2) - 1; + (count & report_interval_mask) == 0 + } + + /// Print a memory report periodically, based on the value of `count`. + /// + /// As `count` increases, so does the report interval. This results in + /// a tolerable amount of log messages for loops with many iterations, + /// while still providing some reporting for shorter loops. + pub fn periodic_memory_report(count: usize) { + if should_print_report(count) { + report_memory_usage(count); + } + } +} diff --git a/ipa-core/src/telemetry/mod.rs b/ipa-core/src/telemetry/mod.rs index aae6ea1d4..eb31325d7 100644 --- a/ipa-core/src/telemetry/mod.rs +++ b/ipa-core/src/telemetry/mod.rs @@ -1,3 +1,4 @@ +pub mod memory; pub mod stats; mod step_stats;