Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Tokio runtimes metrics #4984

Merged
merged 1 commit into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
rustdocflags = ["--cfg", "tokio_unstable"]

[target.x86_64-unknown-linux-gnu]
# Targetting x86-64-v2 gives a ~2% performance boost while only
# Targeting x86-64-v2 gives a ~2% performance boost while only
# disallowing Intel CPUs older than 2008 and AMD CPUs older than 2011.
# None of those very old CPUs are used in GCP
# (https://cloud.google.com/compute/docs/cpu-platforms). Unfortunately,
# AWS does not seem to disclose the exact CPUs they use.
rustflags = ["-C", "target-cpu=x86-64-v2"]
rustflags = ["-C", "target-cpu=x86-64-v2", "--cfg", "tokio_unstable"]

4 changes: 3 additions & 1 deletion .github/workflows/cbench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ on:
# pull request.
pull_request_target:

env:
RUSTFLAGS: --cfg tokio_unstable

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true


jobs:
tests:
name: Benchmark
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ env:
QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev
RUST_BACKTRACE: 1
RUSTDOCFLAGS: -Dwarnings -Arustdoc::private_intra_doc_links
RUSTFLAGS: -Dwarnings
RUSTFLAGS: -Dwarnings --cfg tokio_unstable

# Ensures that we cancel running jobs for the same PR / same workflow.
concurrency:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ env:
QW_S3_ENDPOINT: "http://localhost:4566" # Services are exposed as localhost because we are not running coverage in a container.
QW_S3_FORCE_PATH_STYLE_ACCESS: 1
QW_TEST_DATABASE_URL: postgres://quickwit-dev:quickwit-dev@localhost:5432/quickwit-metastore-dev
RUSTFLAGS: -Dwarnings --cfg tokio_unstable

jobs:
test:
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ COPY --from=ui-builder /quickwit/quickwit-ui/build /quickwit/quickwit-ui/build
WORKDIR /quickwit

RUN echo "Building workspace with feature(s) '$CARGO_FEATURES' and profile '$CARGO_PROFILE'" \
&& cargo build \
&& ENV RUSTFLAGS="--cfg tokio_unstable" \
cargo build \
-p quickwit-cli \
--features $CARGO_FEATURES \
--bin quickwit \
Expand Down
13 changes: 13 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ tikv-jemalloc-ctl = "0.5"
tikv-jemallocator = "0.5"
time = { version = "0.3", features = ["std", "formatting", "macros"] }
tokio = { version = "1.37", features = ["full"] }
tokio-metrics = { version = "0.3.1", features = ["rt"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = { version = "0.7", features = ["full"] }
toml = "0.7.6"
Expand Down
11 changes: 8 additions & 3 deletions quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

use std::collections::BTreeMap;

use anyhow::Context;
use colored::Colorize;
use opentelemetry::global;
use quickwit_cli::busy_detector;
Expand All @@ -29,17 +30,21 @@ use quickwit_cli::cli::{build_cli, CliCommand};
#[cfg(feature = "jemalloc")]
use quickwit_cli::jemalloc::start_jemalloc_metrics_loop;
use quickwit_cli::logger::setup_logging_and_tracing;
use quickwit_common::runtimes::scrape_tokio_runtime_metrics;
use quickwit_serve::BuildInfo;
use tracing::error;

fn main() -> anyhow::Result<()> {
tokio::runtime::Builder::new_multi_thread()
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.on_thread_unpark(busy_detector::thread_unpark)
.on_thread_park(busy_detector::thread_park)
.build()
.unwrap()
.block_on(main_impl())
.context("failed to start main Tokio runtime")?;

scrape_tokio_runtime_metrics(rt.handle(), "main");

rt.block_on(main_impl())
}

fn register_build_info_metric() {
Expand Down
8 changes: 0 additions & 8 deletions quickwit/quickwit-cli/src/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,6 @@ async fn mark_splits_for_deletion_cli(args: MarkForDeletionArgs) -> anyhow::Resu
Ok(())
}

#[derive(Tabled)]
struct FileRow {
#[tabled(rename = "File Name")]
file_name: String,
#[tabled(rename = "Size")]
size: String,
}

async fn describe_split_cli(args: DescribeSplitArgs) -> anyhow::Result<()> {
debug!(args=?args, "describe-split");
let qw_client = args.client_args.client();
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-cluster/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,26 +93,31 @@ impl Default for ClusterMetrics {
"gossip_recv_messages_total",
"Total number of gossip messages received.",
"cluster",
&[],
),
gossip_recv_bytes_total: new_counter(
"gossip_recv_bytes_total",
"Total amount of gossip data received in bytes.",
"cluster",
&[],
),
gossip_sent_messages_total: new_counter(
"gossip_sent_messages_total",
"Total number of gossip messages sent.",
"cluster",
&[],
),
gossip_sent_bytes_total: new_counter(
"gossip_sent_bytes_total",
"Total amount of gossip data sent in bytes.",
"cluster",
&[],
),
grpc_gossip_rounds_total: new_counter(
"grpc_gossip_rounds_total",
"Total number of gRPC gossip rounds performed with peer nodes.",
"cluster",
&[],
),
}
}
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ siphasher = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-metrics ={ workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tower = { workspace = true }
Expand Down
35 changes: 32 additions & 3 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use prometheus::{
IntCounter, IntCounterVec as PrometheusIntCounterVec, IntGauge,
IntGaugeVec as PrometheusIntGaugeVec,
};
use prometheus::{Encoder, HistogramOpts, Opts, TextEncoder};
use prometheus::{Encoder, Gauge, HistogramOpts, Opts, TextEncoder};

#[derive(Clone)]
pub struct HistogramVec<const N: usize> {
Expand Down Expand Up @@ -71,10 +71,20 @@ pub fn register_info(name: &'static str, help: &'static str, kvs: BTreeMap<&'sta
prometheus::register(Box::new(counter)).expect("failed to register counter");
}

pub fn new_counter(name: &str, help: &str, subsystem: &str) -> IntCounter {
pub fn new_counter(
name: &str,
help: &str,
subsystem: &str,
const_labels: &[(&str, &str)],
) -> IntCounter {
let owned_const_labels: HashMap<String, String> = const_labels
.iter()
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
.collect();
let counter_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem);
.subsystem(subsystem)
.const_labels(owned_const_labels);
let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter");
prometheus::register(Box::new(counter.clone())).expect("failed to register counter");
counter
Expand Down Expand Up @@ -104,6 +114,25 @@ pub fn new_counter_vec<const N: usize>(
IntCounterVec { underlying }
}

pub fn new_float_gauge(
name: &str,
help: &str,
subsystem: &str,
const_labels: &[(&str, &str)],
) -> Gauge {
let owned_const_labels: HashMap<String, String> = const_labels
.iter()
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
.collect();
let gauge_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem)
.const_labels(owned_const_labels);
let gauge = Gauge::with_opts(gauge_opts).expect("failed to create float gauge");
prometheus::register(Box::new(gauge.clone())).expect("failed to register float gauge");
gauge
}

pub fn new_gauge(
name: &str,
help: &str,
Expand Down
79 changes: 77 additions & 2 deletions quickwit/quickwit-common/src/runtimes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@

use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

use once_cell::sync::OnceCell;
use prometheus::{Gauge, IntCounter, IntGauge};
use tokio::runtime::Runtime;
use tokio_metrics::{RuntimeMetrics, RuntimeMonitor};

use crate::metrics::{new_counter, new_float_gauge, new_gauge};

static RUNTIMES: OnceCell<HashMap<RuntimeType, tokio::runtime::Runtime>> = OnceCell::new();

Expand Down Expand Up @@ -63,7 +68,7 @@ impl RuntimesConfig {
}

pub fn with_num_cpus(num_cpus: usize) -> Self {
// Non blocking task are supposed to be io intensive, and not require many threads...
// Non blocking task are supposed to be io intensive, and not require many threads...
let num_threads_non_blocking = if num_cpus > 6 { 2 } else { 1 };
// On the other hand the blocking actors are cpu intensive. We allocate
// almost all of the threads to them.
Expand All @@ -83,7 +88,8 @@ impl Default for RuntimesConfig {
}

fn start_runtimes(config: RuntimesConfig) -> HashMap<RuntimeType, Runtime> {
let mut runtimes = HashMap::default();
let mut runtimes = HashMap::with_capacity(2);

let blocking_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.num_threads_blocking)
.thread_name_fn(|| {
Expand All @@ -94,7 +100,10 @@ fn start_runtimes(config: RuntimesConfig) -> HashMap<RuntimeType, Runtime> {
.enable_all()
.build()
.unwrap();

scrape_tokio_runtime_metrics(blocking_runtime.handle(), "blocking");
runtimes.insert(RuntimeType::Blocking, blocking_runtime);

let non_blocking_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.num_threads_non_blocking)
.thread_name_fn(|| {
Expand All @@ -105,7 +114,10 @@ fn start_runtimes(config: RuntimesConfig) -> HashMap<RuntimeType, Runtime> {
.enable_all()
.build()
.unwrap();

scrape_tokio_runtime_metrics(non_blocking_runtime.handle(), "non_blocking");
runtimes.insert(RuntimeType::NonBlocking, non_blocking_runtime);

runtimes
}

Expand Down Expand Up @@ -135,6 +147,69 @@ impl RuntimeType {
}
}

/// Spawns a background task
pub fn scrape_tokio_runtime_metrics(handle: &tokio::runtime::Handle, label: &'static str) {
let runtime_monitor = RuntimeMonitor::new(handle);
handle.spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut prometheus_runtime_metrics = PrometheusRuntimeMetrics::new(label);

for tokio_runtime_metrics in runtime_monitor.intervals() {
interval.tick().await;
prometheus_runtime_metrics.update(&tokio_runtime_metrics);
}
});
}

struct PrometheusRuntimeMetrics {
scheduled_tasks: IntGauge,
worker_busy_duration_milliseconds_total: IntCounter,
worker_busy_ratio: Gauge,
worker_threads: IntGauge,
}

impl PrometheusRuntimeMetrics {
pub fn new(label: &'static str) -> Self {
Self {
scheduled_tasks: new_gauge(
"tokio_scheduled_tasks",
"The total number of tasks currently scheduled in workers' local queues.",
"runtime",
&[("runtime_type", label)],
),
worker_busy_duration_milliseconds_total: new_counter(
"tokio_worker_busy_duration_milliseconds_total",
" The total amount of time worker threads were busy.",
"runtime",
&[("runtime_type", label)],
),
worker_busy_ratio: new_float_gauge(
"tokio_worker_busy_ratio",
"The ratio of time worker threads were busy since the last time runtime metrics \
were collected.",
"runtime",
&[("runtime_type", label)],
),
worker_threads: new_gauge(
"tokio_worker_threads",
"The number of worker threads used by the runtime.",
"runtime",
&[("runtime_type", label)],
),
}
}

pub fn update(&mut self, runtime_metrics: &RuntimeMetrics) {
self.scheduled_tasks
.set(runtime_metrics.total_local_queue_depth as i64);
self.worker_busy_duration_milliseconds_total
.inc_by(runtime_metrics.total_busy_duration.as_millis() as u64);
self.worker_busy_ratio.set(runtime_metrics.busy_ratio());
self.worker_threads
.set(runtime_metrics.workers_count as i64);
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-common/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl ThreadPool {
}
let thread_pool = rayon_pool_builder
.build()
.expect("failed to spawn the spawning pool");
.expect("failed to spawn thread pool");
let ongoing_tasks = THREAD_POOL_METRICS.ongoing_tasks.with_label_values([name]);
let pending_tasks = THREAD_POOL_METRICS.pending_tasks.with_label_values([name]);
ThreadPool {
Expand Down
Loading
Loading