Skip to content

Commit

Permalink
Open shards metrics was only updated on scale up/down.
Browse files Browse the repository at this point in the history
This PR also removes the source id label (as it will always be ingest
v2). This is not much of a win since the overall metric cardinality does
not change.

It also removes `.all_shards_mut()` which was error-prone
as a client could break all kinds of invariants.
  • Loading branch information
fulmicoton committed Mar 13, 2024
1 parent fd5014a commit 339d9c4
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 33 deletions.
25 changes: 3 additions & 22 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ use quickwit_proto::ingest::ingester::{
CloseShardsRequest, IngesterService, InitShardsRequest, PingRequest, RetainShardsForSource,
RetainShardsRequest,
};
use quickwit_proto::ingest::{
IngestV2Error, Shard, ShardIdPosition, ShardIdPositions, ShardIds, ShardState,
};
use quickwit_proto::ingest::{IngestV2Error, Shard, ShardIdPosition, ShardIdPositions, ShardIds};
use quickwit_proto::metastore;
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid};
Expand Down Expand Up @@ -343,13 +341,7 @@ impl IngestController {
}
}
if !confirmed_unavailable_leaders.is_empty() {
for shard_entry in model.all_shards_mut() {
if shard_entry.is_open()
&& confirmed_unavailable_leaders.contains(&shard_entry.leader_id)
{
shard_entry.set_shard_state(ShardState::Unavailable);
}
}
model.set_shards_as_unavailable(&confirmed_unavailable_leaders);
}
}

Expand Down Expand Up @@ -583,14 +575,6 @@ impl IngestController {
open_shards_subresponse.opened_shards,
);
}
let label_values = [
source_uid.index_uid.index_id.as_str(),
&source_uid.source_id,
];
CONTROL_PLANE_METRICS
.open_shards_total
.with_label_values(label_values)
.set(new_num_open_shards as i64);
}

/// Attempts to decrease the number of shards. This operation is rate limited to avoid closing
Expand Down Expand Up @@ -642,10 +626,7 @@ impl IngestController {
}
model.close_shards(&source_uid, &[shard_id]);

let label_values = [
source_uid.index_uid.index_id.as_str(),
&source_uid.source_id,
];
let label_values = [source_uid.index_uid.index_id.as_str()];
CONTROL_PLANE_METRICS
.open_shards_total
.with_label_values(label_values)
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-control-plane/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct ControlPlaneMetrics {
pub schedule_total: IntCounter,
pub metastore_error_aborted: IntCounter,
pub metastore_error_maybe_executed: IntCounter,
pub open_shards_total: IntGaugeVec<2>,
pub open_shards_total: IntGaugeVec<1>,
}

impl Default for ControlPlaneMetrics {
Expand Down Expand Up @@ -62,7 +62,7 @@ impl Default for ControlPlaneMetrics {
"Number of open shards per source.",
"control_plane",
&[],
["index_id", "source_id"],
["index_id"],
),
}
}
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,9 @@ impl ControlPlaneModel {
Ok(has_changed)
}

pub(crate) fn all_shards_mut(&mut self) -> impl Iterator<Item = &mut ShardEntry> + '_ {
self.shard_table.all_shards_mut()
pub(crate) fn set_shards_as_unavailable(&mut self, unavailable_leaders: &FnvHashSet<NodeId>) {
self.shard_table
.set_shards_as_unavailable(unavailable_leaders);
}

#[cfg(test)]
Expand Down
54 changes: 47 additions & 7 deletions quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ impl ShardTableEntry {
fn is_empty(&self) -> bool {
self.shard_entries.is_empty()
}

fn num_open_shards(&self) -> usize {
self.shard_entries
.values()
.filter(|shard_entry| shard_entry.is_open())
.count()
}
}

// A table that keeps track of the existing shards for each index and source,
Expand Down Expand Up @@ -271,10 +278,23 @@ impl ShardTable {
.map(|(source, shard_table)| (source, shard_table.shard_entries.values()))
}

pub(crate) fn all_shards_mut(&mut self) -> impl Iterator<Item = &mut ShardEntry> + '_ {
self.table_entries
.values_mut()
.flat_map(|table_entry| table_entry.shard_entries.values_mut())
pub(crate) fn set_shards_as_unavailable(&mut self, unavailable_leaders: &FnvHashSet<NodeId>) {
for (source_uid, shard_table_entry) in &mut self.table_entries {
let mut modified = false;
for shard_entry in shard_table_entry.shard_entries.values_mut() {
if shard_entry.is_open() && unavailable_leaders.contains(&shard_entry.leader_id) {
shard_entry.set_shard_state(ShardState::Unavailable);
modified = true;
}
}
if modified {
let num_open_shards = shard_table_entry.num_open_shards();
crate::metrics::CONTROL_PLANE_METRICS
.open_shards_total
.with_label_values([source_uid.index_uid.index_id.as_str()])
.set(num_open_shards as i64);
};
}
}

/// Lists the shards of a given source. Returns `None` if the source does not exist.
Expand Down Expand Up @@ -322,23 +342,26 @@ impl ShardTable {
shard_ids.insert(shard.shard_id().clone());
}
}
match self.table_entries.entry(source_uid) {
match self.table_entries.entry(source_uid.clone()) {
Entry::Occupied(mut entry) => {
let table_entry = entry.get_mut();

for opened_shard in opened_shards {
// We only insert shards that we don't know about because the control plane
// knows more about the state of the shards than the metastore.
table_entry
.shard_entries
.entry(opened_shard.shard_id().clone())
.or_insert(opened_shard.into());
.or_insert_with(|| ShardEntry::from(opened_shard));
}
}
// This should never happen if the control plane view is consistent with the state of
// the metastore, so should we panic here? Warnings are most likely going to go
// unnoticed.
Entry::Vacant(entry) => {
warn!(
"control plane inconsistent with metastore: inserting shards for a \
non-existing source (please report)"
);
let shard_entries: FnvHashMap<ShardId, ShardEntry> = opened_shards
.into_iter()
.map(|shard| (shard.shard_id().clone(), shard.into()))
Expand All @@ -350,6 +373,8 @@ impl ShardTable {
entry.insert(table_entry);
}
}
// Let's now update the open shard metrics for this specific index.
self.update_shard_metrics_for_source_uid(&source_uid);
self.check_invariant();
}

Expand Down Expand Up @@ -377,6 +402,19 @@ impl ShardTable {
Some(open_shards)
}

pub fn update_shard_metrics_for_source_uid(&self, source_uid: &SourceUid) {
let num_open_shards: usize =
if let Some(shard_table_entry) = self.table_entries.get(&source_uid) {
shard_table_entry.num_open_shards()
} else {
0
};
crate::metrics::CONTROL_PLANE_METRICS
.open_shards_total
.with_label_values([source_uid.index_uid.index_id.as_str()])
.set(num_open_shards as i64);
}

pub fn update_shards(
&mut self,
source_uid: &SourceUid,
Expand Down Expand Up @@ -436,6 +474,7 @@ impl ShardTable {
}
}
}
self.update_shard_metrics_for_source_uid(&source_uid);
closed_shard_ids
}

Expand All @@ -458,6 +497,7 @@ impl ShardTable {
&mut self.ingester_shards,
);
}
self.update_shard_metrics_for_source_uid(&source_uid);
self.check_invariant();
}

Expand Down

0 comments on commit 339d9c4

Please sign in to comment.