Skip to content

Commit

Permalink
Stop flamegraph only when the target duration is elapsed. (#5131)
Browse files Browse the repository at this point in the history
Before this PR, fetch the flamegraph was stopping the profiling even if
the duration was not elapsed yet.

After this PR, the flamegraph is only generated after the duration is elapsed (still defaulting to 30s).
  • Loading branch information
fulmicoton authored Jun 17, 2024
1 parent 74ac88e commit 89ce762
Showing 1 changed file with 19 additions and 33 deletions.
52 changes: 19 additions & 33 deletions quickwit/quickwit-serve/src/developer_api/pprof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,24 @@ use warp::Filter;
#[cfg(not(feature = "pprof"))]
pub fn pprof_handlers() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone
{
let start_profiler = {
warp::path!("pprof" / "start").map(move || {
warp::reply::with_status(
"Quickwit was compiled without the `pprof` feature",
warp::http::StatusCode::NOT_IMPLEMENTED,
)
})
};
let stop_profiler = {
warp::path!("pprof" / "flamegraph").map(move || {
warp::reply::with_status(
"Quickwit was compiled without the `pprof` feature",
warp::http::StatusCode::NOT_IMPLEMENTED,
)
})
let not_implemented_handler = || {
warp::reply::with_status(
"Quickwit was compiled without the `pprof` feature",
warp::http::StatusCode::NOT_IMPLEMENTED,
)
};
let start_profiler = { warp::path!("pprof" / "start").map(not_implemented_handler) };
let stop_profiler = { warp::path!("pprof" / "flamegraph").map(not_implemented_handler) };
start_profiler.or(stop_profiler)
}

/// pprof/start to start cpu profiling.
/// pprof/start?max_duration=5&sampling=1000 to start a short high frequency cpu profiling
/// pprof/start?duration=5&sampling=1000 to start a short high frequency cpu profiling
/// pprof/flamegraph to stop the current cpu profiling and return a flamegraph or return the last
/// flamegraph
///
/// Query parameters:
/// - max_duration: max duration of the profiling in seconds, default is 30 seconds. max value is
/// 300
/// - duration: duration of the profiling in seconds, default is 30 seconds. max value is 300
/// - sampling: the sampling rate, default is 100, max value is 1000
#[cfg(feature = "pprof")]
pub fn pprof_handlers() -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone
Expand All @@ -59,7 +50,6 @@ pub fn pprof_handlers() -> impl Filter<Extract = impl warp::Reply, Error = warp:

use pprof::ProfilerGuard;
use serde::Deserialize;
use tokio::task::spawn_blocking;
use tokio::time::{self, Duration};
use warp::reply::Reply;

Expand All @@ -77,8 +67,8 @@ pub fn pprof_handlers() -> impl Filter<Extract = impl warp::Reply, Error = warp:

#[derive(Deserialize)]
struct ProfilerQueryParams {
max_duration: Option<u64>, // max allowed value is 300 seconds, default is 30 seconds
sampling: Option<i32>, // max value is 1000, default is 100
duration: Option<u64>, // max allowed value is 300 seconds, default is 30 seconds
sampling: Option<i32>, // max value is 1000, default is 100
}

let start_profiler = {
Expand All @@ -93,7 +83,7 @@ pub fn pprof_handlers() -> impl Filter<Extract = impl warp::Reply, Error = warp:
let stop_profiler = {
let profiler_state = Arc::clone(&profiler_state);
warp::path!("pprof" / "flamegraph")
.and_then(move || stop_profiler_handler(Arc::clone(&profiler_state)))
.and_then(move || get_flamegraph_handler(Arc::clone(&profiler_state)))
};

async fn start_profiler_handler(
Expand All @@ -103,12 +93,12 @@ pub fn pprof_handlers() -> impl Filter<Extract = impl warp::Reply, Error = warp:
let mut state = profiler_state.lock().unwrap();

if state.profiler_guard.is_none() {
let max_duration = params.max_duration.unwrap_or(30).min(300);
let duration = params.duration.unwrap_or(30).min(300);
let sampling = params.sampling.unwrap_or(100).min(1000);
state.profiler_guard = Some(pprof::ProfilerGuard::new(sampling).unwrap());
let profiler_state = Arc::clone(&profiler_state);
tokio::spawn(async move {
time::sleep(Duration::from_secs(max_duration)).await;
time::sleep(Duration::from_secs(duration)).await;
save_flamegraph(profiler_state).await;
});
Ok(warp::reply::with_status(
Expand All @@ -123,11 +113,9 @@ pub fn pprof_handlers() -> impl Filter<Extract = impl warp::Reply, Error = warp:
}
}

async fn stop_profiler_handler(
async fn get_flamegraph_handler(
profiler_state: Arc<Mutex<ProfilerState>>,
) -> Result<impl warp::Reply, warp::Rejection> {
save_flamegraph(profiler_state.clone()).await;

let state = profiler_state.lock().unwrap();

if let Some(data) = state.flamegraph_data.clone() {
Expand All @@ -141,12 +129,9 @@ pub fn pprof_handlers() -> impl Filter<Extract = impl warp::Reply, Error = warp:
}
}

async fn save_flamegraph(
profiler_state: Arc<Mutex<ProfilerState>>,
) -> tokio::task::JoinHandle<()> {
spawn_blocking(move || {
async fn save_flamegraph(profiler_state: Arc<Mutex<ProfilerState>>) {
let handle = quickwit_common::thread_pool::run_cpu_intensive(move || {
let mut state = profiler_state.lock().unwrap();

if let Some(profiler) = state.profiler_guard.take() {
if let Ok(report) = profiler.report().build() {
let mut buffer = Vec::new();
Expand All @@ -155,7 +140,8 @@ pub fn pprof_handlers() -> impl Filter<Extract = impl warp::Reply, Error = warp:
}
}
}
})
});
let _ = handle.await;
}

start_profiler.or(stop_profiler)
Expand Down

0 comments on commit 89ce762

Please sign in to comment.