diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 2121f9e44e8..2ce268f617a 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -36,9 +36,7 @@ use quickwit_proto::ingest::ingester::{ CloseShardsRequest, IngesterService, InitShardsRequest, PingRequest, RetainShardsForSource, RetainShardsRequest, }; -use quickwit_proto::ingest::{ - IngestV2Error, Shard, ShardIdPosition, ShardIdPositions, ShardIds, ShardState, -}; +use quickwit_proto::ingest::{IngestV2Error, Shard, ShardIdPosition, ShardIdPositions, ShardIds}; use quickwit_proto::metastore; use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient}; use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid}; @@ -49,7 +47,6 @@ use tracing::{error, info, warn}; use ulid::Ulid; use crate::ingest::wait_handle::WaitHandle; -use crate::metrics::CONTROL_PLANE_METRICS; use crate::model::{ControlPlaneModel, ScalingMode, ShardEntry, ShardStats}; const MAX_SHARD_INGESTION_THROUGHPUT_MIB_PER_SEC: f32 = 5.; @@ -343,13 +340,7 @@ impl IngestController { } } if !confirmed_unavailable_leaders.is_empty() { - for shard_entry in model.all_shards_mut() { - if shard_entry.is_open() - && confirmed_unavailable_leaders.contains(&shard_entry.leader_id) - { - shard_entry.set_shard_state(ShardState::Unavailable); - } - } + model.set_shards_as_unavailable(&confirmed_unavailable_leaders); } } @@ -583,14 +574,6 @@ impl IngestController { open_shards_subresponse.opened_shards, ); } - let label_values = [ - source_uid.index_uid.index_id.as_str(), - &source_uid.source_id, - ]; - CONTROL_PLANE_METRICS - .open_shards_total - .with_label_values(label_values) - .set(new_num_open_shards as i64); } /// Attempts to decrease the number of shards. This operation is rate limited to avoid closing @@ -641,15 +624,6 @@ impl IngestController { return; } model.close_shards(&source_uid, &[shard_id]); - - let label_values = [ - source_uid.index_uid.index_id.as_str(), - &source_uid.source_id, - ]; - CONTROL_PLANE_METRICS - .open_shards_total - .with_label_values(label_values) - .set(new_num_open_shards as i64); } pub(crate) fn advise_reset_shards( diff --git a/quickwit/quickwit-control-plane/src/metrics.rs b/quickwit/quickwit-control-plane/src/metrics.rs index d4c20c9f4a1..6ebf7db5c21 100644 --- a/quickwit/quickwit-control-plane/src/metrics.rs +++ b/quickwit/quickwit-control-plane/src/metrics.rs @@ -28,7 +28,7 @@ pub struct ControlPlaneMetrics { pub schedule_total: IntCounter, pub metastore_error_aborted: IntCounter, pub metastore_error_maybe_executed: IntCounter, - pub open_shards_total: IntGaugeVec<2>, + pub open_shards_total: IntGaugeVec<1>, } impl Default for ControlPlaneMetrics { @@ -62,7 +62,7 @@ impl Default for ControlPlaneMetrics { "Number of open shards per source.", "control_plane", &[], - ["index_id", "source_id"], + ["index_id"], ), } } diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index c6a896943b0..592f09bcccc 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -252,8 +252,9 @@ impl ControlPlaneModel { Ok(has_changed) } - pub(crate) fn all_shards_mut(&mut self) -> impl Iterator + '_ { - self.shard_table.all_shards_mut() + pub(crate) fn set_shards_as_unavailable(&mut self, unavailable_leaders: &FnvHashSet) { + self.shard_table + .set_shards_as_unavailable(unavailable_leaders); } #[cfg(test)] diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 369e22578ac..158efdf2e2d 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -102,6 +102,13 @@ impl ShardTableEntry { fn is_empty(&self) -> bool { self.shard_entries.is_empty() } + + fn num_open_shards(&self) -> usize { + self.shard_entries + .values() + .filter(|shard_entry| shard_entry.is_open()) + .count() + } } // A table that keeps track of the existing shards for each index and source, @@ -271,10 +278,23 @@ impl ShardTable { .map(|(source, shard_table)| (source, shard_table.shard_entries.values())) } - pub(crate) fn all_shards_mut(&mut self) -> impl Iterator + '_ { - self.table_entries - .values_mut() - .flat_map(|table_entry| table_entry.shard_entries.values_mut()) + pub(crate) fn set_shards_as_unavailable(&mut self, unavailable_leaders: &FnvHashSet) { + for (source_uid, shard_table_entry) in &mut self.table_entries { + let mut modified = false; + for shard_entry in shard_table_entry.shard_entries.values_mut() { + if shard_entry.is_open() && unavailable_leaders.contains(&shard_entry.leader_id) { + shard_entry.set_shard_state(ShardState::Unavailable); + modified = true; + } + } + if modified { + let num_open_shards = shard_table_entry.num_open_shards(); + crate::metrics::CONTROL_PLANE_METRICS + .open_shards_total + .with_label_values([source_uid.index_uid.index_id.as_str()]) + .set(num_open_shards as i64); + }; + } } /// Lists the shards of a given source. Returns `None` if the source does not exist. @@ -322,23 +342,26 @@ impl ShardTable { shard_ids.insert(shard.shard_id().clone()); } } - match self.table_entries.entry(source_uid) { + match self.table_entries.entry(source_uid.clone()) { Entry::Occupied(mut entry) => { let table_entry = entry.get_mut(); - for opened_shard in opened_shards { // We only insert shards that we don't know about because the control plane // knows more about the state of the shards than the metastore. table_entry .shard_entries .entry(opened_shard.shard_id().clone()) - .or_insert(opened_shard.into()); + .or_insert_with(|| ShardEntry::from(opened_shard)); } } // This should never happen if the control plane view is consistent with the state of // the metastore, so should we panic here? Warnings are most likely going to go // unnoticed. Entry::Vacant(entry) => { + warn!( + "control plane inconsistent with metastore: inserting shards for a \ + non-existing source (please report)" + ); let shard_entries: FnvHashMap = opened_shards .into_iter() .map(|shard| (shard.shard_id().clone(), shard.into())) @@ -350,6 +373,8 @@ impl ShardTable { entry.insert(table_entry); } } + // Let's now update the open shard metrics for this specific index. + self.update_shard_metrics_for_source_uid(&source_uid); self.check_invariant(); } @@ -377,6 +402,19 @@ impl ShardTable { Some(open_shards) } + pub fn update_shard_metrics_for_source_uid(&self, source_uid: &SourceUid) { + let num_open_shards: usize = + if let Some(shard_table_entry) = self.table_entries.get(source_uid) { + shard_table_entry.num_open_shards() + } else { + 0 + }; + crate::metrics::CONTROL_PLANE_METRICS + .open_shards_total + .with_label_values([source_uid.index_uid.index_id.as_str()]) + .set(num_open_shards as i64); + } + pub fn update_shards( &mut self, source_uid: &SourceUid, @@ -436,6 +474,7 @@ impl ShardTable { } } } + self.update_shard_metrics_for_source_uid(source_uid); closed_shard_ids } @@ -458,6 +497,7 @@ impl ShardTable { &mut self.ingester_shards, ); } + self.update_shard_metrics_for_source_uid(source_uid); self.check_invariant(); }