From 740d0f23d54c598a35ed0621c1ab9ef50d077c98 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 2 Aug 2024 16:00:14 +0200 Subject: [PATCH 1/2] improve ingest lock --- .../quickwit-ingest/src/ingest_v2/fetch.rs | 11 +++- .../quickwit-ingest/src/ingest_v2/idle.rs | 2 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 21 +++--- .../quickwit-ingest/src/ingest_v2/metrics.rs | 13 +++- quickwit/quickwit-ingest/src/ingest_v2/mod.rs | 2 +- .../src/ingest_v2/replication.rs | 6 +- quickwit/quickwit-ingest/src/lib.rs | 65 +++++++++++++++++-- 7 files changed, 93 insertions(+), 27 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 53f6b0aeeea..25d684f7257 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use bytes::{BufMut, BytesMut}; use bytesize::ByteSize; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use mrecordlog::Record; use quickwit_common::metrics::MEMORY_METRICS; use quickwit_common::retry::RetryParams; @@ -133,8 +133,13 @@ impl FetchStreamTask { let mut mrecord_buffer = BytesMut::with_capacity(self.batch_num_bytes); let mut mrecord_lengths = Vec::new(); - let mrecordlog_guard = - with_lock_metrics!(self.mrecordlog.read().await, "fetch", "read"); + let Some(mrecordlog_guard) = + with_lock_metrics!(self.mrecordlog.read().map(Some), "fetch", "read") + else { + // we always get a Some, that layer is just added to satisfly with_lock_metrics + // needs for a Future + unreachable!(); + }; let Ok(mrecords) = mrecordlog_guard .as_ref() diff --git a/quickwit/quickwit-ingest/src/ingest_v2/idle.rs b/quickwit/quickwit-ingest/src/ingest_v2/idle.rs index 0263e23d194..5791f7797fa 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/idle.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/idle.rs @@ -64,7 +64,7 @@ impl CloseIdleShardsTask { return; }; let Ok(mut state_guard) = - with_lock_metrics!(state.lock_partially(), "close_idle_shards", "write").await + with_lock_metrics!(state.lock_partially(), "close_idle_shards", "write") else { return; }; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 1fae0c3b829..48653ecb213 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -292,7 +292,7 @@ impl Ingester { let mut per_source_shard_ids: HashMap<(IndexUid, SourceId), Vec> = HashMap::new(); - let state_guard = with_lock_metrics!(self.state.lock_fully().await, "reset_shards", "read") + let state_guard = with_lock_metrics!(self.state.lock_fully(), "reset_shards", "read") .expect("ingester should be ready"); for queue_id in state_guard.mrecordlog.list_queues() { @@ -325,7 +325,7 @@ impl Ingester { match advise_reset_shards_result { Ok(Ok(advise_reset_shards_response)) => { let mut state_guard = - with_lock_metrics!(self.state.lock_fully().await, "reset_shards", "write") + with_lock_metrics!(self.state.lock_fully(), "reset_shards", "write") .expect("ingester should be ready"); state_guard @@ -458,8 +458,7 @@ impl Ingester { let force_commit = commit_type == CommitTypeV2::Force; let leader_id: NodeId = persist_request.leader_id.into(); - let mut state_guard = - with_lock_metrics!(self.state.lock_fully().await, "persist", "write")?; + let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "persist", "write")?; if state_guard.status() != IngesterStatus::Ready { persist_failures.reserve_exact(persist_request.subrequests.len()); @@ -929,8 +928,9 @@ impl Ingester { &self, init_shards_request: InitShardsRequest, ) -> IngestV2Result { - let mut state_guard = - with_lock_metrics!(self.state.lock_fully().await, "init_shards", "write")?; + let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "init_shards", "write")?; + // we do this to allow simultaneous reborrow of multiple fields. + let state_guard = &mut *state_guard; if state_guard.status() != IngesterStatus::Ready { return Err(IngestV2Error::Internal("node decommissioned".to_string())); @@ -984,7 +984,7 @@ impl Ingester { ))); } let mut state_guard = - with_lock_metrics!(self.state.lock_fully().await, "truncate_shards", "write")?; + with_lock_metrics!(self.state.lock_fully(), "truncate_shards", "write")?; for subrequest in truncate_shards_request.subrequests { let queue_id = subrequest.queue_id(); @@ -1011,7 +1011,7 @@ impl Ingester { close_shards_request: CloseShardsRequest, ) -> IngestV2Result { let mut state_guard = - with_lock_metrics!(self.state.lock_partially().await, "close_shards", "write")?; + with_lock_metrics!(self.state.lock_partially(), "close_shards", "write")?; let mut successes = Vec::with_capacity(close_shards_request.shard_pkeys.len()); @@ -1166,7 +1166,7 @@ impl IngesterService for Ingester { }) .collect(); let mut state_guard = - with_lock_metrics!(self.state.lock_fully(), "retain_shards", "write").await?; + with_lock_metrics!(self.state.lock_fully(), "retain_shards", "write")?; let remove_queue_ids: HashSet = state_guard .shards .keys() @@ -1210,8 +1210,7 @@ impl EventSubscriber for WeakIngesterState { warn!("ingester state update failed"); return; }; - let Ok(mut state_guard) = - with_lock_metrics!(state.lock_fully().await, "gc_shards", "write") + let Ok(mut state_guard) = with_lock_metrics!(state.lock_fully(), "gc_shards", "write") else { error!("failed to lock the ingester state"); return; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index 8fc6a75b9f4..795665f7f2c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -76,7 +76,7 @@ impl Default for IngestResultMetrics { } } -pub(super) struct IngestV2Metrics { +pub(crate) struct IngestV2Metrics { pub reset_shards_operations_total: IntCounterVec<1>, pub open_shards: IntGauge, pub closed_shards: IntGauge, @@ -84,6 +84,7 @@ pub(super) struct IngestV2Metrics { pub shard_st_throughput_mib: Histogram, pub wal_acquire_lock_requests_in_flight: IntGaugeVec<2>, pub wal_acquire_lock_request_duration_secs: HistogramVec<2>, + pub wal_hold_lock_duration_secs: HistogramVec<2>, pub wal_disk_used_bytes: IntGauge, pub wal_memory_used_bytes: IntGauge, pub ingest_results: IngestResultMetrics, @@ -139,6 +140,14 @@ impl Default for IngestV2Metrics { ["operation", "type"], exponential_buckets(0.001, 2.0, 12).unwrap(), ), + wal_hold_lock_duration_secs: new_histogram_vec( + "wal_hold_lock_duration_secs", + "Duration for which a lock was held in seconds.", + "ingest", + &[], + ["operation", "type"], + exponential_buckets(0.001, 2.0, 12).unwrap(), + ), wal_disk_used_bytes: new_gauge( "wal_disk_used_bytes", "WAL disk space used in bytes.", @@ -168,4 +177,4 @@ pub(super) fn report_wal_usage(wal_usage: ResourceUsage) { .set(wal_usage.memory_used_bytes as i64); } -pub(super) static INGEST_V2_METRICS: Lazy = Lazy::new(IngestV2Metrics::default); +pub(crate) static INGEST_V2_METRICS: Lazy = Lazy::new(IngestV2Metrics::default); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index d9bc7b1be75..6a51c2c4cfb 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -23,7 +23,7 @@ mod doc_mapper; mod fetch; mod idle; mod ingester; -mod metrics; +pub(crate) mod metrics; mod models; mod mrecord; mod mrecordlog_utils; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 43d30d73740..193be2715b4 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -454,8 +454,7 @@ impl ReplicationTask { }; let queue_id = replica_shard.queue_id(); - let mut state_guard = - with_lock_metrics!(self.state.lock_fully(), "init_replica", "write").await?; + let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "init_replica", "write")?; match state_guard.mrecordlog.create_queue(&queue_id).await { Ok(_) => {} @@ -527,8 +526,7 @@ impl ReplicationTask { // queue in the WAL and should be deleted. let mut shards_to_delete: HashSet = HashSet::new(); - let mut state_guard = - with_lock_metrics!(self.state.lock_fully(), "replicate", "write").await?; + let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "replicate", "write")?; if state_guard.status() != IngesterStatus::Ready { replicate_failures.reserve_exact(replicate_request.subrequests.len()); diff --git a/quickwit/quickwit-ingest/src/lib.rs b/quickwit/quickwit-ingest/src/lib.rs index 12807f637b6..705715eac7c 100644 --- a/quickwit/quickwit-ingest/src/lib.rs +++ b/quickwit/quickwit-ingest/src/lib.rs @@ -125,6 +125,49 @@ impl CommitType { } } +struct TimedMutexGuard { + guard: T, + acquired_at: std::time::Instant, + purpose: [&'static str; 2], +} + +use std::ops::{Deref, DerefMut}; + +impl Deref for TimedMutexGuard { + type Target = T; + + fn deref(&self) -> &T { + &self.guard + } +} + +impl DerefMut for TimedMutexGuard { + fn deref_mut(&mut self) -> &mut T { + &mut self.guard + } +} + +impl Drop for TimedMutexGuard { + fn drop(&mut self) { + let elapsed = self.acquired_at.elapsed(); + + crate::ingest_v2::metrics::INGEST_V2_METRICS + .wal_hold_lock_duration_secs + .with_label_values(self.purpose) + .observe(elapsed.as_secs_f64()); + + if elapsed > std::time::Duration::from_secs(1) { + let purpose = self.purpose.join("::"); + quickwit_common::rate_limited_warn!( + limit_per_min = 6, + "hold mutext for {}ms for {}", + elapsed.as_millis(), + purpose, + ); + } + } +} + #[macro_export] macro_rules! with_lock_metrics { ($future:expr, $($label:tt),*) => { @@ -134,14 +177,19 @@ macro_rules! with_lock_metrics { .with_label_values([$($label),*]) .inc(); + let now = std::time::Instant::now(); - let guard = $future; + let guard = $future.await; - let elapsed = now.elapsed(); + let now_after = std::time::Instant::now(); + let elapsed = now_after.duration_since(now); if elapsed > std::time::Duration::from_secs(1) { + let text_label = with_lock_metrics!(@concat $($label,)*); quickwit_common::rate_limited_warn!( limit_per_min=6, - "lock acquisition took {}ms", elapsed.as_millis() + "lock acquisition took {}ms for {}", + elapsed.as_millis(), + text_label, ); } $crate::ingest_v2::metrics::INGEST_V2_METRICS @@ -153,9 +201,16 @@ macro_rules! with_lock_metrics { .with_label_values([$($label),*]) .observe(elapsed.as_secs_f64()); - guard + guard.map(|guard| $crate::TimedMutexGuard { + guard, + acquired_at: now_after, + purpose: [$($label),*], + }) } - } + }; + (@concat $label1:tt, $($label:tt,)*) => { + concat!($label1, $("::", $label),*) + }; } #[cfg(test)] From 748fbd26fe1595536efefb36a8b577f0e98b0520 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 30 Sep 2024 10:49:15 +0200 Subject: [PATCH 2/2] make lock metrics not do dynamic lookups --- quickwit/quickwit-cli/src/lib.rs | 1 + quickwit/quickwit-common/src/io.rs | 2 +- quickwit/quickwit-common/src/metrics.rs | 23 +++-- quickwit/quickwit-common/src/thread_pool.rs | 2 +- quickwit/quickwit-common/src/tower/metrics.rs | 1 + .../quickwit-control-plane/src/metrics.rs | 2 +- .../src/model/shard_table.rs | 1 + .../src/actors/doc_processor.rs | 2 +- .../src/actors/indexing_pipeline.rs | 1 + .../src/actors/merge_pipeline.rs | 1 + .../quickwit-indexing/src/actors/uploader.rs | 1 + .../quickwit-ingest/src/ingest_v2/fetch.rs | 2 +- .../quickwit-ingest/src/ingest_v2/idle.rs | 2 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 22 ++--- .../quickwit-ingest/src/ingest_v2/metrics.rs | 89 ++++++++++++++++--- .../src/ingest_v2/replication.rs | 4 +- quickwit/quickwit-ingest/src/lib.rs | 24 ++--- quickwit/quickwit-ingest/src/metrics.rs | 4 +- quickwit/quickwit-jaeger/src/lib.rs | 1 + .../src/actors/delete_task_planner.rs | 1 + .../quickwit-opentelemetry/src/otlp/logs.rs | 1 + .../quickwit-opentelemetry/src/otlp/traces.rs | 1 + .../quickwit-search/src/search_job_placer.rs | 1 + quickwit/quickwit-serve/src/load_shield.rs | 2 +- quickwit/quickwit-serve/src/rest.rs | 1 + 25 files changed, 142 insertions(+), 50 deletions(-) diff --git a/quickwit/quickwit-cli/src/lib.rs b/quickwit/quickwit-cli/src/lib.rs index 98029541f05..883b911f5a3 100644 --- a/quickwit/quickwit-cli/src/lib.rs +++ b/quickwit/quickwit-cli/src/lib.rs @@ -335,6 +335,7 @@ pub mod busy_detector { use std::time::Instant; use once_cell::sync::Lazy; + use quickwit_common::metrics::Vector; use tracing::debug; use crate::metrics::CLI_METRICS; diff --git a/quickwit/quickwit-common/src/io.rs b/quickwit/quickwit-common/src/io.rs index f25245e0ece..9e627d491e6 100644 --- a/quickwit/quickwit-common/src/io.rs +++ b/quickwit/quickwit-common/src/io.rs @@ -42,7 +42,7 @@ use pin_project::pin_project; use prometheus::IntCounter; use tokio::io::AsyncWrite; -use crate::metrics::{new_counter_vec, IntCounterVec}; +use crate::metrics::{new_counter_vec, IntCounterVec, Vector}; use crate::{KillSwitch, Progress, ProtectedZoneGuard}; // Max 1MB at a time. diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 97341d30c67..c2fe6a0eac7 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -28,13 +28,17 @@ pub use prometheus::{ }; use prometheus::{Gauge, HistogramOpts, Opts, TextEncoder}; +pub trait Vector { + fn with_label_values(&self, label_values: [&str; N]) -> T; +} + #[derive(Clone)] pub struct HistogramVec { underlying: PrometheusHistogramVec, } -impl HistogramVec { - pub fn with_label_values(&self, label_values: [&str; N]) -> Histogram { +impl Vector for HistogramVec { + fn with_label_values(&self, label_values: [&str; N]) -> Histogram { self.underlying.with_label_values(&label_values) } } @@ -44,8 +48,8 @@ pub struct IntCounterVec { underlying: PrometheusIntCounterVec, } -impl IntCounterVec { - pub fn with_label_values(&self, label_values: [&str; N]) -> IntCounter { +impl Vector for IntCounterVec { + fn with_label_values(&self, label_values: [&str; N]) -> IntCounter { self.underlying.with_label_values(&label_values) } } @@ -55,8 +59,8 @@ pub struct IntGaugeVec { underlying: PrometheusIntGaugeVec, } -impl IntGaugeVec { - pub fn with_label_values(&self, label_values: [&str; N]) -> IntGauge { +impl Vector for IntGaugeVec { + fn with_label_values(&self, label_values: [&str; N]) -> IntGauge { self.underlying.with_label_values(&label_values) } } @@ -465,4 +469,11 @@ pub fn index_label(index_name: &str) -> &str { } } +// TODO wants: macro to generate static metric vectors +// could be used to simplify +// - quickwit-ingest/src/ingest_v2/metrics.rs 15 labels +// - quickwit-ingest/src/ingest_v2/metrics.rs 12 labels +// - quickwit-common/src/metrics.rs 10 labels +// and encourage using that pattern in more places + pub static MEMORY_METRICS: Lazy = Lazy::new(MemoryMetrics::default); diff --git a/quickwit/quickwit-common/src/thread_pool.rs b/quickwit/quickwit-common/src/thread_pool.rs index 229007e20d4..9f86abe8a35 100644 --- a/quickwit/quickwit-common/src/thread_pool.rs +++ b/quickwit/quickwit-common/src/thread_pool.rs @@ -26,7 +26,7 @@ use prometheus::IntGauge; use tokio::sync::oneshot; use tracing::error; -use crate::metrics::{new_gauge_vec, GaugeGuard, IntGaugeVec, OwnedGaugeGuard}; +use crate::metrics::{new_gauge_vec, GaugeGuard, IntGaugeVec, OwnedGaugeGuard, Vector}; /// An executor backed by a thread pool to run CPU-intensive tasks. /// diff --git a/quickwit/quickwit-common/src/tower/metrics.rs b/quickwit/quickwit-common/src/tower/metrics.rs index 2ec2b73f9bd..e74a8b40a77 100644 --- a/quickwit/quickwit-common/src/tower/metrics.rs +++ b/quickwit/quickwit-common/src/tower/metrics.rs @@ -28,6 +28,7 @@ use tower::{Layer, Service}; use crate::metrics::{ new_counter_vec, new_gauge_vec, new_histogram_vec, HistogramVec, IntCounterVec, IntGaugeVec, + Vector, }; pub trait RpcName { diff --git a/quickwit/quickwit-control-plane/src/metrics.rs b/quickwit/quickwit-control-plane/src/metrics.rs index ee1440edfb0..409c5c8b508 100644 --- a/quickwit/quickwit-control-plane/src/metrics.rs +++ b/quickwit/quickwit-control-plane/src/metrics.rs @@ -19,7 +19,7 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - new_counter, new_gauge, new_gauge_vec, IntCounter, IntGauge, IntGaugeVec, + new_counter, new_gauge, new_gauge_vec, IntCounter, IntGauge, IntGaugeVec, Vector, }; #[derive(Debug, Clone, Copy)] diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 29c579cddcd..92b21e046d3 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -23,6 +23,7 @@ use std::ops::{Deref, DerefMut}; use std::time::Duration; use fnv::{FnvHashMap, FnvHashSet}; +use quickwit_common::metrics::Vector; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; use quickwit_common::tower::ConstantRate; use quickwit_ingest::{RateMibPerSec, ShardInfo, ShardInfos}; diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index a41576a686b..fe5e48cd39d 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -25,7 +25,7 @@ use anyhow::{bail, Context}; use async_trait::async_trait; use bytes::Bytes; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; -use quickwit_common::metrics::IntCounter; +use quickwit_common::metrics::{IntCounter, Vector}; use quickwit_common::rate_limited_tracing::rate_limited_warn; use quickwit_common::runtimes::RuntimeType; use quickwit_config::{SourceInputFormat, TransformConfig}; diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 4087f2ed230..3d1e10efea2 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -27,6 +27,7 @@ use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Mailbox, QueueCapacity, Supervisable, HEARTBEAT, }; +use quickwit_common::metrics::Vector; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_common::KillSwitch; diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 2e3475f5759..7f4f7cdd442 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -26,6 +26,7 @@ use quickwit_actors::{ SpawnContext, Supervisable, HEARTBEAT, }; use quickwit_common::io::{IoControls, Limiter}; +use quickwit_common::metrics::Vector; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_common::KillSwitch; diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index 3fae76eb01c..0be818f5c82 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -29,6 +29,7 @@ use fail::fail_point; use itertools::Itertools; use once_cell::sync::OnceCell; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::metrics::Vector; use quickwit_common::pubsub::EventBroker; use quickwit_common::spawn_named_task; use quickwit_metastore::checkpoint::IndexCheckpointDelta; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 25d684f7257..080d5e20b29 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -134,7 +134,7 @@ impl FetchStreamTask { let mut mrecord_lengths = Vec::new(); let Some(mrecordlog_guard) = - with_lock_metrics!(self.mrecordlog.read().map(Some), "fetch", "read") + with_lock_metrics!(self.mrecordlog.read().map(Some), fetch, read) else { // we always get a Some, that layer is just added to satisfly with_lock_metrics // needs for a Future diff --git a/quickwit/quickwit-ingest/src/ingest_v2/idle.rs b/quickwit/quickwit-ingest/src/ingest_v2/idle.rs index 5791f7797fa..7a2282b5bb3 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/idle.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/idle.rs @@ -64,7 +64,7 @@ impl CloseIdleShardsTask { return; }; let Ok(mut state_guard) = - with_lock_metrics!(state.lock_partially(), "close_idle_shards", "write") + with_lock_metrics!(state.lock_partially(), close_idle_shards, write) else { return; }; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 48653ecb213..55c41ec9ebc 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -32,7 +32,7 @@ use futures::StreamExt; use mrecordlog::error::CreateQueueError; use once_cell::sync::OnceCell; use quickwit_cluster::Cluster; -use quickwit_common::metrics::{GaugeGuard, MEMORY_METRICS}; +use quickwit_common::metrics::{GaugeGuard, Vector, MEMORY_METRICS}; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::pubsub::{EventBroker, EventSubscriber}; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; @@ -292,7 +292,7 @@ impl Ingester { let mut per_source_shard_ids: HashMap<(IndexUid, SourceId), Vec> = HashMap::new(); - let state_guard = with_lock_metrics!(self.state.lock_fully(), "reset_shards", "read") + let state_guard = with_lock_metrics!(self.state.lock_fully(), reset_shards, read) .expect("ingester should be ready"); for queue_id in state_guard.mrecordlog.list_queues() { @@ -325,7 +325,7 @@ impl Ingester { match advise_reset_shards_result { Ok(Ok(advise_reset_shards_response)) => { let mut state_guard = - with_lock_metrics!(self.state.lock_fully(), "reset_shards", "write") + with_lock_metrics!(self.state.lock_fully(), reset_shards, write) .expect("ingester should be ready"); state_guard @@ -458,7 +458,7 @@ impl Ingester { let force_commit = commit_type == CommitTypeV2::Force; let leader_id: NodeId = persist_request.leader_id.into(); - let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "persist", "write")?; + let mut state_guard = with_lock_metrics!(self.state.lock_fully(), persist, write)?; if state_guard.status() != IngesterStatus::Ready { persist_failures.reserve_exact(persist_request.subrequests.len()); @@ -928,7 +928,7 @@ impl Ingester { &self, init_shards_request: InitShardsRequest, ) -> IngestV2Result { - let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "init_shards", "write")?; + let mut state_guard = with_lock_metrics!(self.state.lock_fully(), init_shards, write)?; // we do this to allow simultaneous reborrow of multiple fields. let state_guard = &mut *state_guard; @@ -983,8 +983,7 @@ impl Ingester { self.self_node_id, truncate_shards_request.ingester_id, ))); } - let mut state_guard = - with_lock_metrics!(self.state.lock_fully(), "truncate_shards", "write")?; + let mut state_guard = with_lock_metrics!(self.state.lock_fully(), truncate_shards, write)?; for subrequest in truncate_shards_request.subrequests { let queue_id = subrequest.queue_id(); @@ -1010,8 +1009,7 @@ impl Ingester { &self, close_shards_request: CloseShardsRequest, ) -> IngestV2Result { - let mut state_guard = - with_lock_metrics!(self.state.lock_partially(), "close_shards", "write")?; + let mut state_guard = with_lock_metrics!(self.state.lock_partially(), close_shards, write)?; let mut successes = Vec::with_capacity(close_shards_request.shard_pkeys.len()); @@ -1165,8 +1163,7 @@ impl IngesterService for Ingester { }) }) .collect(); - let mut state_guard = - with_lock_metrics!(self.state.lock_fully(), "retain_shards", "write")?; + let mut state_guard = with_lock_metrics!(self.state.lock_fully(), retain_shards, write)?; let remove_queue_ids: HashSet = state_guard .shards .keys() @@ -1210,8 +1207,7 @@ impl EventSubscriber for WeakIngesterState { warn!("ingester state update failed"); return; }; - let Ok(mut state_guard) = with_lock_metrics!(state.lock_fully(), "gc_shards", "write") - else { + let Ok(mut state_guard) = with_lock_metrics!(state.lock_fully(), gc_shards, write) else { error!("failed to lock the ingester state"); return; }; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index 795665f7f2c..afdf9dc8a03 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -21,7 +21,7 @@ use mrecordlog::ResourceUsage; use once_cell::sync::Lazy; use quickwit_common::metrics::{ exponential_buckets, linear_buckets, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, - new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, + new_histogram_vec, Histogram, IntCounter, IntCounterVec, IntGauge, Vector, }; // Counter vec counting the different outcomes of ingest requests as @@ -82,14 +82,83 @@ pub(crate) struct IngestV2Metrics { pub closed_shards: IntGauge, pub shard_lt_throughput_mib: Histogram, pub shard_st_throughput_mib: Histogram, - pub wal_acquire_lock_requests_in_flight: IntGaugeVec<2>, - pub wal_acquire_lock_request_duration_secs: HistogramVec<2>, - pub wal_hold_lock_duration_secs: HistogramVec<2>, + pub wal_acquire_lock_requests_in_flight: LockMetric, + pub wal_acquire_lock_request_duration_secs: LockMetric, + pub wal_hold_lock_duration_secs: LockMetric, pub wal_disk_used_bytes: IntGauge, pub wal_memory_used_bytes: IntGauge, pub ingest_results: IngestResultMetrics, } +pub(crate) struct ReadMetric { + pub read: T, +} + +pub(crate) struct WriteMetric { + pub write: T, +} + +pub(crate) struct RWMetric { + pub read: T, + pub write: T, +} + +pub(crate) struct LockMetric { + pub reset_shards: RWMetric, + pub fetch: ReadMetric, + pub persist: WriteMetric, + pub init_shards: WriteMetric, + pub truncate_shards: WriteMetric, + pub close_shards: WriteMetric, + pub retain_shards: WriteMetric, + pub gc_shards: WriteMetric, + pub init_replica: WriteMetric, + pub replicate: WriteMetric, + pub close_idle_shards: WriteMetric, +} + +impl LockMetric { + fn new(dynamic_vector: V) -> Self + where V: Vector<2, T> { + LockMetric { + reset_shards: RWMetric { + read: dynamic_vector.with_label_values(["reset_shards", "read"]), + write: dynamic_vector.with_label_values(["reset_shards", "write"]), + }, + fetch: ReadMetric { + read: dynamic_vector.with_label_values(["fetch", "read"]), + }, + persist: WriteMetric { + write: dynamic_vector.with_label_values(["persit", "read"]), + }, + init_shards: WriteMetric { + write: dynamic_vector.with_label_values(["init_shards", "read"]), + }, + truncate_shards: WriteMetric { + write: dynamic_vector.with_label_values(["truncate_shards", "read"]), + }, + close_shards: WriteMetric { + write: dynamic_vector.with_label_values(["close_shards", "read"]), + }, + retain_shards: WriteMetric { + write: dynamic_vector.with_label_values(["retain_shards", "read"]), + }, + gc_shards: WriteMetric { + write: dynamic_vector.with_label_values(["gc_shards", "read"]), + }, + init_replica: WriteMetric { + write: dynamic_vector.with_label_values(["init_replica", "read"]), + }, + replicate: WriteMetric { + write: dynamic_vector.with_label_values(["replicate", "read"]), + }, + close_idle_shards: WriteMetric { + write: dynamic_vector.with_label_values(["close_idle_shards", "read"]), + }, + } + } +} + impl Default for IngestV2Metrics { fn default() -> Self { Self { @@ -125,29 +194,29 @@ impl Default for IngestV2Metrics { "ingest", linear_buckets(0.0f64, 1.0f64, 15).unwrap(), ), - wal_acquire_lock_requests_in_flight: new_gauge_vec( + wal_acquire_lock_requests_in_flight: LockMetric::new(new_gauge_vec( "wal_acquire_lock_requests_in_flight", "Number of acquire lock requests in-flight.", "ingest", &[], ["operation", "type"], - ), - wal_acquire_lock_request_duration_secs: new_histogram_vec( + )), + wal_acquire_lock_request_duration_secs: LockMetric::new(new_histogram_vec( "wal_acquire_lock_request_duration_secs", "Duration of acquire lock requests in seconds.", "ingest", &[], ["operation", "type"], exponential_buckets(0.001, 2.0, 12).unwrap(), - ), - wal_hold_lock_duration_secs: new_histogram_vec( + )), + wal_hold_lock_duration_secs: LockMetric::new(new_histogram_vec( "wal_hold_lock_duration_secs", "Duration for which a lock was held in seconds.", "ingest", &[], ["operation", "type"], exponential_buckets(0.001, 2.0, 12).unwrap(), - ), + )), wal_disk_used_bytes: new_gauge( "wal_disk_used_bytes", "WAL disk space used in bytes.", diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 193be2715b4..db36a8b2fba 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -454,7 +454,7 @@ impl ReplicationTask { }; let queue_id = replica_shard.queue_id(); - let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "init_replica", "write")?; + let mut state_guard = with_lock_metrics!(self.state.lock_fully(), init_replica, write)?; match state_guard.mrecordlog.create_queue(&queue_id).await { Ok(_) => {} @@ -526,7 +526,7 @@ impl ReplicationTask { // queue in the WAL and should be deleted. let mut shards_to_delete: HashSet = HashSet::new(); - let mut state_guard = with_lock_metrics!(self.state.lock_fully(), "replicate", "write")?; + let mut state_guard = with_lock_metrics!(self.state.lock_fully(), replicate, write)?; if state_guard.status() != IngesterStatus::Ready { replicate_failures.reserve_exact(replicate_request.subrequests.len()); diff --git a/quickwit/quickwit-ingest/src/lib.rs b/quickwit/quickwit-ingest/src/lib.rs index 705715eac7c..30cf3144673 100644 --- a/quickwit/quickwit-ingest/src/lib.rs +++ b/quickwit/quickwit-ingest/src/lib.rs @@ -46,6 +46,7 @@ use once_cell::sync::OnceCell; pub use position::Position; pub use queue::Queues; use quickwit_actors::{Mailbox, Universe}; +use quickwit_common::metrics::Histogram; use quickwit_config::IngestApiConfig; use tokio::sync::Mutex; @@ -128,7 +129,8 @@ impl CommitType { struct TimedMutexGuard { guard: T, acquired_at: std::time::Instant, - purpose: [&'static str; 2], + purpose: &'static [&'static str; 2], + histogram: &'static Histogram, } use std::ops::{Deref, DerefMut}; @@ -151,10 +153,7 @@ impl Drop for TimedMutexGuard { fn drop(&mut self) { let elapsed = self.acquired_at.elapsed(); - crate::ingest_v2::metrics::INGEST_V2_METRICS - .wal_hold_lock_duration_secs - .with_label_values(self.purpose) - .observe(elapsed.as_secs_f64()); + self.histogram.observe(elapsed.as_secs_f64()); if elapsed > std::time::Duration::from_secs(1) { let purpose = self.purpose.join("::"); @@ -174,7 +173,7 @@ macro_rules! with_lock_metrics { { $crate::ingest_v2::metrics::INGEST_V2_METRICS .wal_acquire_lock_requests_in_flight - .with_label_values([$($label),*]) + $(.$label)* .inc(); @@ -194,22 +193,25 @@ macro_rules! with_lock_metrics { } $crate::ingest_v2::metrics::INGEST_V2_METRICS .wal_acquire_lock_requests_in_flight - .with_label_values([$($label),*]) + $(.$label)* .dec(); $crate::ingest_v2::metrics::INGEST_V2_METRICS .wal_acquire_lock_request_duration_secs - .with_label_values([$($label),*]) + $(.$label)* .observe(elapsed.as_secs_f64()); - + let histogram = &$crate::ingest_v2::metrics::INGEST_V2_METRICS + .wal_hold_lock_duration_secs + $(.$label)*; guard.map(|guard| $crate::TimedMutexGuard { guard, acquired_at: now_after, - purpose: [$($label),*], + purpose: &[$(stringify!($label)),*], + histogram, }) } }; (@concat $label1:tt, $($label:tt,)*) => { - concat!($label1, $("::", $label),*) + concat!(stringify!($label1), $("::", stringify!($label)),*) }; } diff --git a/quickwit/quickwit-ingest/src/metrics.rs b/quickwit/quickwit-ingest/src/metrics.rs index 0f0f9a8f3e5..1467fe22bd3 100644 --- a/quickwit/quickwit-ingest/src/metrics.rs +++ b/quickwit/quickwit-ingest/src/metrics.rs @@ -18,7 +18,9 @@ // along with this program. If not, see . use once_cell::sync::Lazy; -use quickwit_common::metrics::{new_counter, new_counter_vec, new_gauge, IntCounter, IntGauge}; +use quickwit_common::metrics::{ + new_counter, new_counter_vec, new_gauge, IntCounter, IntGauge, Vector, +}; pub struct IngestMetrics { pub ingested_docs_bytes_valid: IntCounter, diff --git a/quickwit/quickwit-jaeger/src/lib.rs b/quickwit/quickwit-jaeger/src/lib.rs index fe663cd4d8c..6120a5c0751 100644 --- a/quickwit/quickwit-jaeger/src/lib.rs +++ b/quickwit/quickwit-jaeger/src/lib.rs @@ -27,6 +27,7 @@ use async_trait::async_trait; use itertools::Itertools; use prost::Message; use prost_types::{Duration as WellKnownDuration, Timestamp as WellKnownTimestamp}; +use quickwit_common::metrics::Vector; use quickwit_config::JaegerConfig; use quickwit_opentelemetry::otlp::{ extract_otel_traces_index_id_patterns_from_metadata, Event as QwEvent, Link as QwLink, diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs index a8adce51b0b..c6f07c5127a 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs @@ -26,6 +26,7 @@ use async_trait::async_trait; use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; use quickwit_common::extract_time_range; +use quickwit_common::metrics::Vector; use quickwit_common::uri::Uri; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_indexing::actors::{schedule_merge, MergeSchedulerService, MergeSplitDownloader}; diff --git a/quickwit/quickwit-opentelemetry/src/otlp/logs.rs b/quickwit/quickwit-opentelemetry/src/otlp/logs.rs index e5b527ac0f2..b3a461ff2b7 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/logs.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/logs.rs @@ -22,6 +22,7 @@ use std::collections::{btree_set, BTreeSet, HashMap}; use async_trait::async_trait; use prost::Message; +use quickwit_common::metrics::Vector; use quickwit_common::rate_limited_error; use quickwit_common::thread_pool::run_cpu_intensive; use quickwit_common::uri::Uri; diff --git a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs index c3c4e5e1e72..93fbe68f400 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/traces.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/traces.rs @@ -23,6 +23,7 @@ use std::str::FromStr; use async_trait::async_trait; use prost::Message; +use quickwit_common::metrics::Vector; use quickwit_common::thread_pool::run_cpu_intensive; use quickwit_common::uri::Uri; use quickwit_config::{load_index_config_from_user_config, ConfigFormat, IndexConfig}; diff --git a/quickwit/quickwit-search/src/search_job_placer.rs b/quickwit/quickwit-search/src/search_job_placer.rs index 9b4965eaa89..8f6ad04b693 100644 --- a/quickwit/quickwit-search/src/search_job_placer.rs +++ b/quickwit/quickwit-search/src/search_job_placer.rs @@ -25,6 +25,7 @@ use std::net::SocketAddr; use anyhow::bail; use async_trait::async_trait; +use quickwit_common::metrics::Vector; use quickwit_common::pubsub::EventSubscriber; use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash}; use quickwit_proto::search::{ReportSplit, ReportSplitsRequest}; diff --git a/quickwit/quickwit-serve/src/load_shield.rs b/quickwit/quickwit-serve/src/load_shield.rs index ab143c0328c..e9f57add8ca 100644 --- a/quickwit/quickwit-serve/src/load_shield.rs +++ b/quickwit/quickwit-serve/src/load_shield.rs @@ -19,7 +19,7 @@ use std::time::Duration; -use quickwit_common::metrics::{GaugeGuard, IntGauge}; +use quickwit_common::metrics::{GaugeGuard, IntGauge, Vector}; use tokio::sync::{Semaphore, SemaphorePermit}; use crate::rest::TooManyRequests; diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 483961a7378..ed328994ebf 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use hyper::http::HeaderValue; use hyper::{http, Method, StatusCode}; +use quickwit_common::metrics::Vector; use quickwit_common::tower::BoxFutureInfaillible; use quickwit_search::SearchService; use tower::make::Shared;