Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodically report memory usage #1503

Merged
merged 3 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions ipa-core/benches/oneshot/ipa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ use ipa_step::StepNarrow;
use rand::{random, rngs::StdRng, SeedableRng};
use tokio::runtime::Builder;

#[cfg(jemalloc)]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[cfg(feature = "dhat-heap")]
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

/// A benchmark for the full IPA protocol.
#[derive(Parser)]
#[command(about, long_about = None)]
Expand Down Expand Up @@ -165,6 +157,13 @@ async fn run(args: Args) -> Result<(), Error> {
}

fn main() -> Result<(), Error> {
#[cfg(jemalloc)]
ipa_core::use_jemalloc!();

#[cfg(feature = "dhat-heap")]
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::new_heap();

Expand Down
15 changes: 7 additions & 8 deletions ipa-core/src/bin/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@ use ipa_core::{
use tokio::runtime::Runtime;
use tracing::{error, info};

#[cfg(jemalloc)]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[cfg(feature = "dhat-heap")]
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

#[derive(Debug, Parser)]
#[clap(
name = "helper",
Expand Down Expand Up @@ -369,6 +361,13 @@ fn new_query_runtime(logging_handle: &LoggingHandle) -> Runtime {
/// runtimes to use in MPC queries and HTTP.
#[tokio::main(flavor = "current_thread")]
pub async fn main() {
#[cfg(jemalloc)]
ipa_core::use_jemalloc!();

#[cfg(feature = "dhat-heap")]
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;

let args = Args::parse();
let handle = args.logging.setup_logging();

Expand Down
16 changes: 16 additions & 0 deletions ipa-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod seq_join;
mod serde;
pub mod sharding;
pub mod utils;

pub use app::{AppConfig, HelperApp, Setup as AppSetup};
pub use utils::NonZeroU32PowerOfTwo;

Expand Down Expand Up @@ -348,6 +349,21 @@ pub(crate) mod test_executor {

pub const CRATE_NAME: &str = env!("CARGO_CRATE_NAME");

/// This macro should be called in a binary that uses `ipa_core`, if that binary wishes
/// to use jemalloc.
///
/// Besides declaring the `#[global_allocator]`, the macro also activates some memory
/// reporting.
#[macro_export]
macro_rules! use_jemalloc {
() => {
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

$crate::telemetry::memory::jemalloc::activate();
};
}

#[macro_export]
macro_rules! const_assert {
($x:expr $(,)?) => {
Expand Down
7 changes: 7 additions & 0 deletions ipa-core/src/seq_join/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use std::{
use futures::{stream::Fuse, Future, Stream, StreamExt};
use pin_project::pin_project;

use crate::telemetry::memory::periodic_memory_report;

enum ActiveItem<F: IntoFuture> {
Pending(Pin<Box<F::IntoFuture>>),
Resolved(F::Output),
Expand Down Expand Up @@ -56,6 +58,7 @@ where
#[pin]
source: Fuse<S>,
active: VecDeque<ActiveItem<F>>,
spawned: usize,
_marker: PhantomData<fn(&'unused ()) -> &'unused ()>,
}

Expand All @@ -68,6 +71,7 @@ where
Self {
source: source.fuse(),
active: VecDeque::with_capacity(active.get()),
spawned: 0,
_marker: PhantomData,
}
}
Expand All @@ -88,6 +92,8 @@ where
if let Poll::Ready(Some(f)) = this.source.as_mut().poll_next(cx) {
this.active
.push_back(ActiveItem::Pending(Box::pin(f.into_future())));
periodic_memory_report(*this.spawned);
*this.spawned += 1;
} else {
break;
}
Expand All @@ -104,6 +110,7 @@ where
Poll::Pending
}
} else if this.source.is_done() {
periodic_memory_report(*this.spawned);
Poll::Ready(None)
} else {
Poll::Pending
Expand Down
10 changes: 9 additions & 1 deletion ipa-core/src/seq_join/multi_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use futures::{stream::Fuse, Stream, StreamExt};
use pin_project::pin_project;
use tracing::{Instrument, Span};

use crate::telemetry::memory::periodic_memory_report;

#[cfg(feature = "shuttle")]
mod shuttle_spawner {
use std::future::Future;
Expand Down Expand Up @@ -62,6 +64,7 @@ where
#[pin]
source: Fuse<S>,
capacity: usize,
spawned: usize,
}

impl<S, F> SequentialFutures<'_, S, F>
Expand All @@ -75,6 +78,7 @@ where
spawner: unsafe { create_spawner() },
source: source.fuse(),
capacity: active.get(),
spawned: 0,
}
}
}
Expand Down Expand Up @@ -103,11 +107,14 @@ where
// a dependency between futures, pending one will never complete.
// Cancellable futures will be cancelled when spawner is dropped which is
// the behavior we want.
let task_index = this.spawner.len();
let task_index = *this.spawned;
this.spawner
.spawn_cancellable(f.into_future().instrument(Span::current()), move || {
panic!("SequentialFutures: spawned task {task_index} cancelled")
});

periodic_memory_report(*this.spawned);
*this.spawned += 1;
} else {
break;
}
Expand All @@ -127,6 +134,7 @@ where
None => None,
})
} else if this.source.is_done() {
periodic_memory_report(*this.spawned);
Poll::Ready(None)
} else {
Poll::Pending
Expand Down
76 changes: 76 additions & 0 deletions ipa-core/src/telemetry/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
pub fn periodic_memory_report(count: usize) {
#[cfg(not(jemalloc))]
let _ = count;

#[cfg(jemalloc)]
jemalloc::periodic_memory_report(count);
}

#[cfg(jemalloc)]
pub mod jemalloc {
use std::sync::RwLock;

use tikv_jemalloc_ctl::{epoch_mib, stats::allocated_mib};

const MB: usize = 2 << 20;

// In an unfortunate acronym collision, `mib` in the names of the jemalloc
// statistics stands for "Management Information Base", not "mebibytes".
// The reporting unit is bytes.

struct JemallocControls {
epoch: epoch_mib,
allocated: allocated_mib,
}

static CONTROLS: RwLock<Option<JemallocControls>> = RwLock::new(None);

/// Activates periodic memory usage reporting during `seq_join`.
///
/// # Panics
/// If `RwLock` is poisoned.
pub fn activate() {
let mut controls = CONTROLS.write().unwrap();

let epoch = tikv_jemalloc_ctl::epoch::mib().unwrap();
let allocated = tikv_jemalloc_ctl::stats::allocated::mib().unwrap();

*controls = Some(JemallocControls { epoch, allocated });
}

fn report_memory_usage(controls: &JemallocControls, count: usize) {
// Some of the information jemalloc uses when reporting statistics is cached, and
// refreshed only upon advancing the epoch.
controls.epoch.advance().unwrap();
let allocated = controls.allocated.read().unwrap() / MB;
tracing::debug!("i={count}: {allocated} MiB allocated");
}

fn should_print_report(count: usize) -> bool {
if count == 0 {
return true;
}

let bits = count.ilog2();
let report_interval_log2 = std::cmp::max(bits.saturating_sub(1), 8);
let report_interval_mask = (1 << report_interval_log2) - 1;
(count & report_interval_mask) == 0
}

/// Print a memory report periodically, based on the value of `count`.
///
/// As `count` increases, so does the report interval. This results in
/// a tolerable amount of log messages for loops with many iterations,
/// while still providing some reporting for shorter loops.
///
/// # Panics
/// If `RwLock` is poisoned.
pub fn periodic_memory_report(count: usize) {
let controls_opt = CONTROLS.read().unwrap();
if let Some(controls) = controls_opt.as_ref() {
if should_print_report(count) {
report_memory_usage(controls, count);
}
}
}
}
1 change: 1 addition & 0 deletions ipa-core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod memory;
pub mod stats;
mod step_stats;

Expand Down
Loading