Skip to content

Commit

Permalink
Add Tokio runtimes metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed May 15, 2024
1 parent 1a58b71 commit 00c739d
Show file tree
Hide file tree
Showing 22 changed files with 168 additions and 124 deletions.
6 changes: 5 additions & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
rustdocflags = ["--cfg", "tokio_unstable"]

[target.x86_64-unknown-linux-gnu]
# Targetting x86-64-v2 gives a ~2% performance boost while only
# Targeting x86-64-v2 gives a ~2% performance boost while only
# disallowing Intel CPUs older than 2008 and AMD CPUs older than 2011.
# None of those very old CPUs are used in GCP
# (https://cloud.google.com/compute/docs/cpu-platforms). Unfortunately,
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/cbench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ on:
# pull request.
pull_request_target:

env:
RUSTFLAGS: --cfg tokio_unstable

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true


jobs:
tests:
name: Benchmark
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ env:
QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev
RUST_BACKTRACE: 1
RUSTDOCFLAGS: -Dwarnings -Arustdoc::private_intra_doc_links
RUSTFLAGS: -Dwarnings
RUSTFLAGS: -Dwarnings --cfg tokio_unstable

# Ensures that we cancel running jobs for the same PR / same workflow.
concurrency:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ env:
QW_S3_ENDPOINT: "http://localhost:4566" # Services are exposed as localhost because we are not running coverage in a container.
QW_S3_FORCE_PATH_STYLE_ACCESS: 1
QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev
RUSTFLAGS: -Dwarnings --cfg tokio_unstable

jobs:
test:
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ COPY --from=ui-builder /quickwit/quickwit-ui/build /quickwit/quickwit-ui/build
WORKDIR /quickwit

RUN echo "Building workspace with feature(s) '$CARGO_FEATURES' and profile '$CARGO_PROFILE'" \
&& cargo build \
&& ENV RUSTFLAGS="--cfg tokio_unstable" \
cargo build \
-p quickwit-cli \
--features $CARGO_FEATURES \
--bin quickwit \
Expand Down
13 changes: 13 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ tikv-jemalloc-ctl = "0.5"
tikv-jemallocator = "0.5"
time = { version = "0.3", features = ["std", "formatting", "macros"] }
tokio = { version = "1.37", features = ["full"] }
tokio-metrics = { version = "0.3.1", features = ["rt"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = { version = "0.7", features = ["full"] }
toml = "0.7.6"
Expand Down
11 changes: 8 additions & 3 deletions quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

use std::collections::BTreeMap;

use anyhow::Context;
use colored::Colorize;
use opentelemetry::global;
use quickwit_cli::busy_detector;
Expand All @@ -29,17 +30,21 @@ use quickwit_cli::cli::{build_cli, CliCommand};
#[cfg(feature = "jemalloc")]
use quickwit_cli::jemalloc::start_jemalloc_metrics_loop;
use quickwit_cli::logger::setup_logging_and_tracing;
use quickwit_common::runtimes::scrape_tokio_runtime_metrics;
use quickwit_serve::BuildInfo;
use tracing::error;

fn main() -> anyhow::Result<()> {
tokio::runtime::Builder::new_multi_thread()
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.on_thread_unpark(busy_detector::thread_unpark)
.on_thread_park(busy_detector::thread_park)
.build()
.unwrap()
.block_on(main_impl())
.context("failed to start main Tokio runtime")?;

scrape_tokio_runtime_metrics(rt.handle(), "main");

rt.block_on(main_impl())
}

fn register_build_info_metric() {
Expand Down
8 changes: 0 additions & 8 deletions quickwit/quickwit-cli/src/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,6 @@ async fn mark_splits_for_deletion_cli(args: MarkForDeletionArgs) -> anyhow::Resu
Ok(())
}

#[derive(Tabled)]
struct FileRow {
#[tabled(rename = "File Name")]
file_name: String,
#[tabled(rename = "Size")]
size: String,
}

async fn describe_split_cli(args: DescribeSplitArgs) -> anyhow::Result<()> {
debug!(args=?args, "describe-split");
let qw_client = args.client_args.client();
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-cluster/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,26 +93,31 @@ impl Default for ClusterMetrics {
"gossip_recv_messages_total",
"Total number of gossip messages received.",
"cluster",
&[],
),
gossip_recv_bytes_total: new_counter(
"gossip_recv_bytes_total",
"Total amount of gossip data received in bytes.",
"cluster",
&[],
),
gossip_sent_messages_total: new_counter(
"gossip_sent_messages_total",
"Total number of gossip messages sent.",
"cluster",
&[],
),
gossip_sent_bytes_total: new_counter(
"gossip_sent_bytes_total",
"Total amount of gossip data sent in bytes.",
"cluster",
&[],
),
grpc_gossip_rounds_total: new_counter(
"grpc_gossip_rounds_total",
"Total number of gRPC gossip rounds performed with peer nodes.",
"cluster",
&[],
),
}
}
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ siphasher = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-metrics ={ workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tower = { workspace = true }
Expand Down
35 changes: 32 additions & 3 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use prometheus::{
IntCounter, IntCounterVec as PrometheusIntCounterVec, IntGauge,
IntGaugeVec as PrometheusIntGaugeVec,
};
use prometheus::{Encoder, HistogramOpts, Opts, TextEncoder};
use prometheus::{Encoder, Gauge, HistogramOpts, Opts, TextEncoder};

#[derive(Clone)]
pub struct HistogramVec<const N: usize> {
Expand Down Expand Up @@ -71,10 +71,20 @@ pub fn register_info(name: &'static str, help: &'static str, kvs: BTreeMap<&'sta
prometheus::register(Box::new(counter)).expect("failed to register counter");
}

pub fn new_counter(name: &str, help: &str, subsystem: &str) -> IntCounter {
pub fn new_counter(
name: &str,
help: &str,
subsystem: &str,
const_labels: &[(&str, &str)],
) -> IntCounter {
let owned_const_labels: HashMap<String, String> = const_labels
.iter()
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
.collect();
let counter_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem);
.subsystem(subsystem)
.const_labels(owned_const_labels);
let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter");
prometheus::register(Box::new(counter.clone())).expect("failed to register counter");
counter
Expand Down Expand Up @@ -104,6 +114,25 @@ pub fn new_counter_vec<const N: usize>(
IntCounterVec { underlying }
}

pub fn new_float_gauge(
name: &str,
help: &str,
subsystem: &str,
const_labels: &[(&str, &str)],
) -> Gauge {
let owned_const_labels: HashMap<String, String> = const_labels
.iter()
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
.collect();
let gauge_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem)
.const_labels(owned_const_labels);
let gauge = Gauge::with_opts(gauge_opts).expect("failed to create float gauge");
prometheus::register(Box::new(gauge.clone())).expect("failed to register float gauge");
gauge
}

pub fn new_gauge(
name: &str,
help: &str,
Expand Down
79 changes: 77 additions & 2 deletions quickwit/quickwit-common/src/runtimes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@

use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

use once_cell::sync::OnceCell;
use prometheus::{Gauge, IntCounter, IntGauge};
use tokio::runtime::Runtime;
use tokio_metrics::{RuntimeMetrics, RuntimeMonitor};

use crate::metrics::{new_counter, new_float_gauge, new_gauge};

static RUNTIMES: OnceCell<HashMap<RuntimeType, tokio::runtime::Runtime>> = OnceCell::new();

Expand Down Expand Up @@ -63,7 +68,7 @@ impl RuntimesConfig {
}

pub fn with_num_cpus(num_cpus: usize) -> Self {
// Non blocking task are supposed to be io intensive, and not require many threads...
// Non blocking task are supposed to be io intensive, and not require many threads...
let num_threads_non_blocking = if num_cpus > 6 { 2 } else { 1 };
// On the other hand the blocking actors are cpu intensive. We allocate
// almost all of the threads to them.
Expand All @@ -83,7 +88,8 @@ impl Default for RuntimesConfig {
}

fn start_runtimes(config: RuntimesConfig) -> HashMap<RuntimeType, Runtime> {
let mut runtimes = HashMap::default();
let mut runtimes = HashMap::with_capacity(2);

let blocking_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.num_threads_blocking)
.thread_name_fn(|| {
Expand All @@ -94,7 +100,10 @@ fn start_runtimes(config: RuntimesConfig) -> HashMap<RuntimeType, Runtime> {
.enable_all()
.build()
.unwrap();

scrape_tokio_runtime_metrics(blocking_runtime.handle(), "blocking");
runtimes.insert(RuntimeType::Blocking, blocking_runtime);

let non_blocking_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.num_threads_non_blocking)
.thread_name_fn(|| {
Expand All @@ -105,7 +114,10 @@ fn start_runtimes(config: RuntimesConfig) -> HashMap<RuntimeType, Runtime> {
.enable_all()
.build()
.unwrap();

scrape_tokio_runtime_metrics(non_blocking_runtime.handle(), "non_blocking");
runtimes.insert(RuntimeType::NonBlocking, non_blocking_runtime);

runtimes
}

Expand Down Expand Up @@ -135,6 +147,69 @@ impl RuntimeType {
}
}

/// Spawns a background task
pub fn scrape_tokio_runtime_metrics(handle: &tokio::runtime::Handle, label: &'static str) {
let runtime_monitor = RuntimeMonitor::new(handle);
handle.spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut prometheus_runtime_metrics = PrometheusRuntimeMetrics::new(label);

for tokio_runtime_metrics in runtime_monitor.intervals() {
interval.tick().await;
prometheus_runtime_metrics.update(&tokio_runtime_metrics);
}
});
}

struct PrometheusRuntimeMetrics {
scheduled_tasks: IntGauge,
worker_busy_duration_milliseconds_total: IntCounter,
worker_busy_ratio: Gauge,
worker_threads: IntGauge,
}

impl PrometheusRuntimeMetrics {
pub fn new(label: &'static str) -> Self {
Self {
scheduled_tasks: new_gauge(
"tokio_scheduled_tasks",
"The total number of tasks currently scheduled in workers' local queues.",
"runtime",
&[("runtime_type", label)],
),
worker_busy_duration_milliseconds_total: new_counter(
"tokio_worker_busy_duration_milliseconds_total",
" The amount of time worker threads were busy.",
"runtime",
&[("runtime_type", label)],
),
worker_busy_ratio: new_float_gauge(
"tokio_worker_busy_ratio",
"The ratio of time worker threads were busy since the last time runtime metrics \
were collected.",
"runtime",
&[("runtime_type", label)],
),
worker_threads: new_gauge(
"tokio_worker_threads",
"The number of worker threads used by the runtime.",
"runtime",
&[("runtime_type", label)],
),
}
}

pub fn update(&mut self, runtime_metrics: &RuntimeMetrics) {
self.scheduled_tasks
.set(runtime_metrics.total_local_queue_depth as i64);
self.worker_busy_duration_milliseconds_total
.inc_by(runtime_metrics.total_busy_duration.as_millis() as u64);
self.worker_busy_ratio.set(runtime_metrics.busy_ratio());
self.worker_threads
.set(runtime_metrics.workers_count as i64);
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-common/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl ThreadPool {
}
let thread_pool = rayon_pool_builder
.build()
.expect("failed to spawn the spawning pool");
.expect("failed to spawn thread pool");
let ongoing_tasks = THREAD_POOL_METRICS.ongoing_tasks.with_label_values([name]);
let pending_tasks = THREAD_POOL_METRICS.pending_tasks.with_label_values([name]);
ThreadPool {
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-control-plane/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,27 @@ impl Default for ControlPlaneMetrics {
"restart_total",
"Number of control plane restart.",
"control_plane",
&[],
),
schedule_total: new_counter(
"schedule_total",
"Number of control plane `schedule` operations.",
"control_plane",
&[],
),
metastore_error_aborted: new_counter(
"metastore_error_aborted",
"Number of aborted metastore transaction (= do not trigger a control plane \
restart)",
"control_plane",
&[],
),
metastore_error_maybe_executed: new_counter(
"metastore_error_maybe_executed",
"Number of metastore transaction with an uncertain outcome (= do trigger a \
control plane restart)",
"control_plane",
&[],
),
open_shards_total: new_gauge_vec(
"open_shards_total",
Expand Down
Loading

0 comments on commit 00c739d

Please sign in to comment.