From 677fb3062e03d9011ead0a3e4348517062a7d529 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 11 Sep 2024 14:58:14 +0200 Subject: [PATCH 1/6] add search after to listing splits --- .../file_backed/file_backed_index/mod.rs | 24 ++++++++++++++++++- .../quickwit-metastore/src/metastore/mod.rs | 11 +++++++++ .../src/metastore/postgres/metastore.rs | 22 ++++++++++++++++- .../src/metastore/postgres/utils.rs | 13 +++++++++- 4 files changed, 67 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 426a91530b5..4a8dc28ac61 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -452,7 +452,18 @@ impl FileBackedIndex { .splits .values() .filter(|split| split_query_predicate(split, query)) - .sorted_unstable_by_key(|split| &split.split_metadata.index_uid) + .sorted_unstable_by(|left_split, right_split| { + left_split + .split_metadata + .index_uid + .cmp(&right_split.split_metadata.index_uid) + .then_with(|| { + left_split + .split_metadata + .split_id + .cmp(&right_split.split_metadata.split_id) + }) + }) .skip(offset) .take(limit) .cloned() @@ -763,6 +774,17 @@ fn split_query_predicate(split: &&Split, query: &ListSplitsQuery) -> bool { } } + if let Some((index_uid, split_id)) = &query.after_split { + if *index_uid > split.split_metadata.index_uid { + return false; + } + if *index_uid == split.split_metadata.index_uid + && *split_id >= split.split_metadata.split_id + { + return false; + } + } + true } diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 06211e1f63a..3b224413e64 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -632,6 +632,9 @@ pub struct ListSplitsQuery { /// Sorts the splits by staleness, i.e. by delete opstamp and publish timestamp in ascending /// order. pub sort_by: SortBy, + + /// Only return splits whose (index_uid, split_id) are lexicographically after this split + pub after_split: Option<(IndexUid, SplitId)>, } #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] @@ -658,6 +661,7 @@ impl ListSplitsQuery { create_timestamp: Default::default(), mature: Bound::Unbounded, sort_by: SortBy::None, + after_split: None, } } @@ -680,6 +684,7 @@ impl ListSplitsQuery { create_timestamp: Default::default(), mature: Bound::Unbounded, sort_by: SortBy::None, + after_split: None, }) } @@ -855,6 +860,12 @@ impl ListSplitsQuery { self.sort_by = SortBy::IndexUid; self } + + /// Only return splits whose (index_uid, split_id) are lexicographically after this split + pub fn after_split(mut self, split_meta: &SplitMetadata) -> Self { + self.after_split = Some((split_meta.index_uid.clone(), split_meta.split_id.clone())); + self + } } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index c9f554875ba..016771f33b9 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -2080,7 +2080,27 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT "splits".* FROM "splits" JOIN "indexes" ON "splits"."index_uid" = "indexes"."index_uid" WHERE "splits"."index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC"# + r#"SELECT "splits".* FROM "splits" JOIN "indexes" ON "splits"."index_uid" = "indexes"."index_uid" WHERE "splits"."index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC, "split_id" ASC"# + ) + ); + + let mut select_statement = Query::select(); + let sql = select_statement + .column((Splits::Table, Asterisk)) + .from(Splits::Table); + + let query = + ListSplitsQuery::for_index(index_uid.clone()).after_split(&crate::SplitMetadata { + index_uid: index_uid.clone(), + split_id: "my_split".to_string(), + ..Default::default() + }); + append_query_filters(sql, &query); + + assert_eq!( + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT "splits".* FROM "splits" WHERE "splits"."index_uid" IN ('{index_uid}') AND ("splits"."index_uid", "splits"."split_id") > ('{index_uid}', 'my_split')"# ) ); } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index c76a0cac673..903d2a2bb25 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -179,6 +179,16 @@ pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplits Expr::expr(val) }); + if let Some((index_uid, split_id)) = &query.after_split { + sql.cond_where( + Expr::tuple([ + Expr::col((Splits::Table, Splits::IndexUid)).into(), + Expr::col((Splits::Table, Splits::SplitId)).into(), + ]) + .gt(Expr::tuple([Expr::value(index_uid), Expr::value(split_id)])), + ); + } + match query.sort_by { SortBy::Staleness => { sql.order_by( @@ -207,7 +217,8 @@ pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplits Expr::col((Splits::Table, Splits::IndexUid)) .equals((Indexes::Table, Indexes::IndexUid)), ) - .order_by(Splits::IndexUid, Order::Asc); + .order_by(Splits::IndexUid, Order::Asc) + .order_by(Splits::SplitId, Order::Asc); } SortBy::None => (), } From 47e6f64d6b828bdbbd6e7cccdb1ce2fc9e99afd7 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 11 Sep 2024 17:10:52 +0200 Subject: [PATCH 2/6] don't stop gcing after first error --- .../src/garbage_collection.rs | 15 +++-- .../src/actors/garbage_collector.rs | 56 +++++++++++++++---- 2 files changed, 54 insertions(+), 17 deletions(-) diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 4a24aedc012..3bd7cb0952a 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -280,7 +280,7 @@ async fn delete_splits_marked_for_deletion_several_indexes( return split_removal_info; }; - let list_splits_query = list_splits_query + let mut list_splits_query = list_splits_query .with_split_state(SplitState::MarkedForDeletion) .with_update_timestamp_lte(updated_before_timestamp) .with_limit(DELETE_SPLITS_BATCH_SIZE) @@ -306,13 +306,18 @@ async fn delete_splits_marked_for_deletion_several_indexes( break; } + // set split after which to search for the next loop + list_splits_query = + list_splits_query.after_split(splits_metadata_to_delete.last().unwrap()); + let splits_metadata_to_delete_per_index: HashMap> = splits_metadata_to_delete .into_iter() .map(|meta| (meta.index_uid.clone(), meta)) .into_group_map(); - let delete_split_res = delete_splits( + // ignore return we continue either way + let _: Result<(), ()> = delete_splits( splits_metadata_to_delete_per_index, &storages, metastore.clone(), @@ -321,9 +326,9 @@ async fn delete_splits_marked_for_deletion_several_indexes( ) .await; - if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || delete_split_res.is_err() { - // stop the gc if this was the last batch or we encountered an error - // (otherwise we might try deleting the same splits in an endless loop) + if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE { + // stop the gc if this was the last batch + // we are guaranteed to make progress due to .after_split() break; } } diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index fbfdeb2b1e1..8815c75a271 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -608,7 +608,7 @@ mod tests { }); mock_metastore .expect_list_splits() - .times(2) + .times(3) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); assert_eq!(query.index_uids.len(), 2); @@ -616,24 +616,40 @@ mod tests { .contains(&query.index_uids[0].index_id.as_ref())); assert!(["test-index-1", "test-index-2"] .contains(&query.index_uids[1].index_id.as_ref())); - let splits = match query.split_states[0] { + let splits_ids_string: Vec = + (0..8000).map(|seq| format!("split-{seq:04}")).collect(); + let splits_ids: Vec<&str> = splits_ids_string + .iter() + .map(|string| string.as_str()) + .collect(); + let mut splits = match query.split_states[0] { SplitState::Staged => { let mut splits = make_splits("test-index-1", &["a"], SplitState::Staged); splits.append(&mut make_splits("test-index-2", &["a"], SplitState::Staged)); splits } SplitState::MarkedForDeletion => { + assert_eq!(query.limit, Some(10_000)); let mut splits = - make_splits("test-index-1", &["a", "b"], SplitState::MarkedForDeletion); + make_splits("test-index-1", &splits_ids, SplitState::MarkedForDeletion); splits.append(&mut make_splits( "test-index-2", - &["a", "b"], + &splits_ids, SplitState::MarkedForDeletion, )); splits } _ => panic!("only Staged and MarkedForDeletion expected."), }; + if let Some((index_uid, split_id)) = query.after_split { + splits = splits.retain(|split| { + ( + &split.split_metadata.index_uid, + &split.split_metadata.split_id, + ) > (&index_uid, &split_id) + }); + } + splits.truncate(10_000); let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); Ok(ServiceStream::from(vec![Ok(splits)])) }); @@ -648,7 +664,7 @@ mod tests { }); mock_metastore .expect_delete_splits() - .times(2) + .times(3) .returning(|delete_splits_request| { let index_uid: IndexUid = delete_splits_request.index_uid().clone(); let split_ids = HashSet::<&str>::from_iter( @@ -657,14 +673,30 @@ mod tests { .iter() .map(|split_id| split_id.as_str()), ); - let expected_split_ids = HashSet::<&str>::from_iter(["a", "b"]); - - assert_eq!(split_ids, expected_split_ids); + if index_uid.index_id == "test-index-1" { + assert_eq!(split_ids.len(), 8000); + for seq in 0..8000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else if split_ids.len() == 2000 { + for seq in 0..2000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else if split_ids.len() == 6000 { + for seq in 2000..8000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else { + panic!(); + } // This should not cause the whole run to fail and return an error, // instead this should simply get logged and return the list of splits // which have successfully been deleted. - if index_uid.index_id == "test-index-2" { + if index_uid.index_id == "test-index-2" && split_ids.len() == 2000 { Err(MetastoreError::Db { message: "fail to delete".to_string(), }) @@ -682,12 +714,12 @@ mod tests { let counters = handle.process_pending_and_observe().await.state; assert_eq!(counters.num_passes, 1); - assert_eq!(counters.num_deleted_files, 2); - assert_eq!(counters.num_deleted_bytes, 40); + assert_eq!(counters.num_deleted_files, 14000); + assert_eq!(counters.num_deleted_bytes, 20 * 14000); assert_eq!(counters.num_successful_gc_run_on_index, 1); assert_eq!(counters.num_failed_storage_resolution, 0); assert_eq!(counters.num_failed_gc_run_on_index, 0); - assert_eq!(counters.num_failed_splits, 2); + assert_eq!(counters.num_failed_splits, 2000); universe.assert_quit().await; } } From 68581e58d4eaa53ac16bf87157e2935448a95694 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 11 Sep 2024 21:12:22 +0200 Subject: [PATCH 3/6] add metrics to garbage collector --- .../src/garbage_collection.rs | 42 ++++++++++- .../quickwit-index-management/src/index.rs | 1 + quickwit/quickwit-index-management/src/lib.rs | 2 +- .../src/actors/garbage_collector.rs | 70 ++++++++++++------- quickwit/quickwit-janitor/src/metrics.rs | 42 ++++++++++- 5 files changed, 130 insertions(+), 27 deletions(-) diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 3bd7cb0952a..ee604cff9ad 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -25,6 +25,7 @@ use std::time::Duration; use anyhow::Context; use futures::{Future, StreamExt}; use itertools::Itertools; +use quickwit_common::metrics::IntCounter; use quickwit_common::pretty::PrettySample; use quickwit_common::Progress; use quickwit_metastore::{ @@ -44,6 +45,12 @@ use tracing::{error, instrument}; /// The maximum number of splits that the GC should delete per attempt. const DELETE_SPLITS_BATCH_SIZE: usize = 10_000; +pub struct GcMetrics { + pub deleted_splits: IntCounter, + pub deleted_bytes: IntCounter, + pub failed_splits: IntCounter, +} + /// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from /// storage and metastore. #[derive(Error, Debug)] @@ -94,6 +101,7 @@ pub async fn run_garbage_collect( deletion_grace_period: Duration, dry_run: bool, progress_opt: Option<&Progress>, + metrics: Option, ) -> anyhow::Result { let grace_period_timestamp = OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64; @@ -170,6 +178,7 @@ pub async fn run_garbage_collect( metastore, indexes, progress_opt, + metrics, ) .await) } @@ -179,6 +188,7 @@ async fn delete_splits( storages: &HashMap>, metastore: MetastoreServiceClient, progress_opt: Option<&Progress>, + metrics: Option<&GcMetrics>, split_removal_info: &mut SplitRemovalInfo, ) -> Result<(), ()> { let mut delete_split_from_index_res_stream = @@ -219,9 +229,32 @@ async fn delete_splits( while let Some(delete_split_result) = delete_split_from_index_res_stream.next().await { match delete_split_result { Ok(entries) => { + if let Some(metrics) = metrics { + let deleted_bytes = entries + .iter() + .map(|entry| entry.file_size_bytes.as_u64()) + .sum::(); + metrics.deleted_splits.inc_by(entries.len() as u64); + metrics.deleted_bytes.inc_by(deleted_bytes); + } split_removal_info.removed_split_entries.extend(entries); } Err(delete_split_error) => { + if let Some(metrics) = metrics { + let deleted_bytes = delete_split_error + .successes + .iter() + .map(|entry| entry.file_size_bytes.as_u64()) + .sum::(); + metrics + .deleted_splits + .inc_by(delete_split_error.successes.len() as u64); + metrics.deleted_bytes.inc_by(deleted_bytes); + metrics.failed_splits.inc_by( + delete_split_error.storage_failures.len() as u64 + + delete_split_error.metastore_failures.len() as u64, + ); + } split_removal_info .removed_split_entries .extend(delete_split_error.successes); @@ -265,13 +298,14 @@ async fn list_splits_metadata( /// /// The aim of this is to spread the load out across a longer period /// rather than short, heavy bursts on the metastore and storage system itself. -#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))] +#[instrument(skip(index_uids, storages, metastore, progress_opt, metrics), fields(num_indexes=%index_uids.len()))] async fn delete_splits_marked_for_deletion_several_indexes( index_uids: Vec, updated_before_timestamp: i64, metastore: MetastoreServiceClient, storages: HashMap>, progress_opt: Option<&Progress>, + metrics: Option, ) -> SplitRemovalInfo { let mut split_removal_info = SplitRemovalInfo::default(); @@ -322,6 +356,7 @@ async fn delete_splits_marked_for_deletion_several_indexes( &storages, metastore.clone(), progress_opt, + metrics.as_ref(), &mut split_removal_info, ) .await; @@ -516,6 +551,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); @@ -543,6 +579,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); @@ -620,6 +657,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); @@ -647,6 +685,7 @@ mod tests { Duration::from_secs(0), false, None, + None, ) .await .unwrap(); @@ -685,6 +724,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 5d4dc5ec149..0fe5c77cc2b 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -373,6 +373,7 @@ impl IndexService { Duration::ZERO, dry_run, None, + None, ) .await?; diff --git a/quickwit/quickwit-index-management/src/lib.rs b/quickwit/quickwit-index-management/src/lib.rs index 93b6ee6d1c3..65a7ef861ce 100644 --- a/quickwit/quickwit-index-management/src/lib.rs +++ b/quickwit/quickwit-index-management/src/lib.rs @@ -20,5 +20,5 @@ mod garbage_collection; mod index; -pub use garbage_collection::run_garbage_collect; +pub use garbage_collection::{run_garbage_collect, GcMetrics}; pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError}; diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index 8815c75a271..96660ca247b 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -20,13 +20,13 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use futures::{stream, StreamExt}; use quickwit_actors::{Actor, ActorContext, Handler}; use quickwit_common::shared_consts::split_deletion_grace_period; -use quickwit_index_management::run_garbage_collect; +use quickwit_index_management::{run_garbage_collect, GcMetrics}; use quickwit_metastore::ListIndexesMetadataResponseExt; use quickwit_proto::metastore::{ ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, @@ -36,6 +36,8 @@ use quickwit_storage::{Storage, StorageResolver}; use serde::Serialize; use tracing::{debug, error, info}; +use crate::metrics::JANITOR_METRICS; + const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes /// Staged files needs to be deleted if there was a failure. @@ -51,10 +53,10 @@ pub struct GarbageCollectorCounters { pub num_deleted_files: usize, /// The number of bytes deleted. pub num_deleted_bytes: usize, - /// The number of failed garbage collection run on an index. - pub num_failed_gc_run_on_index: usize, - /// The number of successful garbage collection run on an index. - pub num_successful_gc_run_on_index: usize, + /// The number of failed garbage collection run. + pub num_failed_gc_run: usize, + /// The number of successful garbage collection run. + pub num_successful_gc_run: usize, /// The number or failed storage resolution. pub num_failed_storage_resolution: usize, /// The number of splits that were unable to be removed. @@ -86,6 +88,8 @@ impl GarbageCollector { debug!("loading indexes from the metastore"); self.counters.num_passes += 1; + let start = Instant::now(); + let response = match self .metastore .list_indexes_metadata(ListIndexesMetadataRequest::all()) @@ -137,23 +141,43 @@ impl GarbageCollector { split_deletion_grace_period(), false, Some(ctx.progress()), + Some(GcMetrics { + deleted_splits: JANITOR_METRICS.gc_deleted_splits.clone(), + deleted_bytes: JANITOR_METRICS.gc_deleted_bytes.clone(), + failed_splits: JANITOR_METRICS.gc_failed_deleted_splits.clone(), + }), ) .await; + let run_duration = start.elapsed().as_secs(); + JANITOR_METRICS.gc_duration_seconds_sum.inc_by(run_duration); + let deleted_file_entries = match gc_res { Ok(removal_info) => { - self.counters.num_successful_gc_run_on_index += 1; + self.counters.num_successful_gc_run += 1; + JANITOR_METRICS + .gc_run_count + .with_label_values(["success"]) + .inc(); self.counters.num_failed_splits += removal_info.failed_splits.len(); removal_info.removed_split_entries } Err(error) => { - self.counters.num_failed_gc_run_on_index += 1; + self.counters.num_failed_gc_run += 1; + JANITOR_METRICS + .gc_run_count + .with_label_values(["error"]) + .inc(); error!(error=?error, "failed to run garbage collection"); return; } }; if !deleted_file_entries.is_empty() { let num_deleted_splits = deleted_file_entries.len(); + let num_deleted_bytes = deleted_file_entries + .iter() + .map(|entry| entry.file_size_bytes.as_u64() as usize) + .sum::(); let deleted_files: HashSet<&Path> = deleted_file_entries .iter() .map(|deleted_entry| deleted_entry.file_name.as_path()) @@ -163,11 +187,8 @@ impl GarbageCollector { num_deleted_splits = num_deleted_splits, "Janitor deleted {:?} and {} other splits.", deleted_files, num_deleted_splits, ); - self.counters.num_deleted_files += deleted_file_entries.len(); - self.counters.num_deleted_bytes += deleted_file_entries - .iter() - .map(|entry| entry.file_size_bytes.as_u64() as usize) - .sum::(); + self.counters.num_deleted_files += num_deleted_splits; + self.counters.num_deleted_bytes += num_deleted_bytes; } } } @@ -348,6 +369,7 @@ mod tests { split_deletion_grace_period(), false, None, + None, ) .await; assert!(result.is_ok()); @@ -497,9 +519,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 2); assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); // 30 secs later @@ -508,9 +530,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 2); assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); // 60 secs later @@ -519,9 +541,9 @@ mod tests { assert_eq!(counters.num_passes, 2); assert_eq!(counters.num_deleted_files, 4); assert_eq!(counters.num_deleted_bytes, 80); - assert_eq!(counters.num_successful_gc_run_on_index, 2); + assert_eq!(counters.num_successful_gc_run, 2); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); universe.assert_quit().await; } @@ -585,9 +607,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 0); assert_eq!(counters.num_deleted_bytes, 0); - assert_eq!(counters.num_successful_gc_run_on_index, 0); + assert_eq!(counters.num_successful_gc_run, 0); assert_eq!(counters.num_failed_storage_resolution, 1); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); universe.assert_quit().await; } @@ -642,7 +664,7 @@ mod tests { _ => panic!("only Staged and MarkedForDeletion expected."), }; if let Some((index_uid, split_id)) = query.after_split { - splits = splits.retain(|split| { + splits.retain(|split| { ( &split.split_metadata.index_uid, &split.split_metadata.split_id, @@ -716,9 +738,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 14000); assert_eq!(counters.num_deleted_bytes, 20 * 14000); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 2000); universe.assert_quit().await; } diff --git a/quickwit/quickwit-janitor/src/metrics.rs b/quickwit/quickwit-janitor/src/metrics.rs index d3392af7b3f..45111af8f93 100644 --- a/quickwit/quickwit-janitor/src/metrics.rs +++ b/quickwit/quickwit-janitor/src/metrics.rs @@ -18,10 +18,19 @@ // along with this program. If not, see . use once_cell::sync::Lazy; -use quickwit_common::metrics::{new_gauge_vec, IntGaugeVec}; +use quickwit_common::metrics::{ + new_counter, new_counter_vec, new_gauge_vec, IntCounter, IntCounterVec, IntGaugeVec, +}; pub struct JanitorMetrics { pub ongoing_num_delete_operations_total: IntGaugeVec<1>, + pub gc_deleted_splits: IntCounter, + pub gc_deleted_bytes: IntCounter, + pub gc_failed_deleted_splits: IntCounter, + pub gc_run_count: IntCounterVec<1>, + pub gc_duration_seconds_sum: IntCounter, + // TODO having a current run duration which is 0|undefined out of run, and returns `now - + // start_time` during a run would be nice } impl Default for JanitorMetrics { @@ -34,6 +43,37 @@ impl Default for JanitorMetrics { &[], ["index"], ), + gc_deleted_splits: new_counter( + "gc_deleted_splits_count", + "Total number of splits deleted by the garbage collector.", + "quickwit_janitor", + &[], + ), + gc_deleted_bytes: new_counter( + "gc_deleted_bytes_count", + "Total number of bytes deleted by the garbage collector.", + "quickwit_janitor", + &[], + ), + gc_failed_deleted_splits: new_counter( + "gc_deleted_splits_error_count", + "Total number of splits that failed to be delete.", + "quickwit_janitor", + &[], + ), + gc_run_count: new_counter_vec( + "gc_run_count", + "Total number of garbage collector execition.", + "quickwit_janitor", + &[], + ["result"], + ), + gc_duration_seconds_sum: new_counter( + "gc_duration_seconds_sum", + "Total time spent running the garbage collector", + "quickwit_janitor", + &[], + ), } } } From cb21e82af299f3d456edcaa3736612e922ba197f Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 16 Sep 2024 11:05:24 +0200 Subject: [PATCH 4/6] refactor gc metrics --- .../src/garbage_collection.rs | 72 ++++++++++--------- .../src/actors/garbage_collector.rs | 12 +++- quickwit/quickwit-janitor/src/metrics.rs | 22 +++--- 3 files changed, 56 insertions(+), 50 deletions(-) diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index ee604cff9ad..74971077b3b 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -51,6 +51,20 @@ pub struct GcMetrics { pub failed_splits: IntCounter, } +trait RecordGcMetrics { + fn record(&self, num_delete_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize); +} + +impl RecordGcMetrics for Option { + fn record(&self, num_deleted_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize) { + if let Some(metrics) = self { + metrics.deleted_splits.inc_by(num_deleted_splits as u64); + metrics.deleted_bytes.inc_by(num_deleted_bytes); + metrics.failed_splits.inc_by(num_failed_splits as u64); + } + } +} + /// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from /// storage and metastore. #[derive(Error, Debug)] @@ -188,7 +202,7 @@ async fn delete_splits( storages: &HashMap>, metastore: MetastoreServiceClient, progress_opt: Option<&Progress>, - metrics: Option<&GcMetrics>, + metrics: &Option, split_removal_info: &mut SplitRemovalInfo, ) -> Result<(), ()> { let mut delete_split_from_index_res_stream = @@ -229,32 +243,26 @@ async fn delete_splits( while let Some(delete_split_result) = delete_split_from_index_res_stream.next().await { match delete_split_result { Ok(entries) => { - if let Some(metrics) = metrics { - let deleted_bytes = entries - .iter() - .map(|entry| entry.file_size_bytes.as_u64()) - .sum::(); - metrics.deleted_splits.inc_by(entries.len() as u64); - metrics.deleted_bytes.inc_by(deleted_bytes); - } + let deleted_bytes = entries + .iter() + .map(|entry| entry.file_size_bytes.as_u64()) + .sum::(); + let deleted_splits_count = entries.len(); + + metrics.record(deleted_splits_count, deleted_bytes, 0); split_removal_info.removed_split_entries.extend(entries); } Err(delete_split_error) => { - if let Some(metrics) = metrics { - let deleted_bytes = delete_split_error - .successes - .iter() - .map(|entry| entry.file_size_bytes.as_u64()) - .sum::(); - metrics - .deleted_splits - .inc_by(delete_split_error.successes.len() as u64); - metrics.deleted_bytes.inc_by(deleted_bytes); - metrics.failed_splits.inc_by( - delete_split_error.storage_failures.len() as u64 - + delete_split_error.metastore_failures.len() as u64, - ); - } + let deleted_bytes = delete_split_error + .successes + .iter() + .map(|entry| entry.file_size_bytes.as_u64()) + .sum::(); + let deleted_splits_count = delete_split_error.successes.len(); + let failed_splits_count = delete_split_error.storage_failures.len() + + delete_split_error.metastore_failures.len(); + + metrics.record(deleted_splits_count, deleted_bytes, failed_splits_count); split_removal_info .removed_split_entries .extend(delete_split_error.successes); @@ -334,15 +342,13 @@ async fn delete_splits_marked_for_deletion_several_indexes( } }; - let num_splits_to_delete = splits_metadata_to_delete.len(); - - if num_splits_to_delete == 0 { + // set split after which to search for the next loop + let Some(last_split_metadata) = splits_metadata_to_delete.last() else { break; - } + }; + list_splits_query = list_splits_query.after_split(last_split_metadata); - // set split after which to search for the next loop - list_splits_query = - list_splits_query.after_split(splits_metadata_to_delete.last().unwrap()); + let num_splits_to_delete = splits_metadata_to_delete.len(); let splits_metadata_to_delete_per_index: HashMap> = splits_metadata_to_delete @@ -356,7 +362,7 @@ async fn delete_splits_marked_for_deletion_several_indexes( &storages, metastore.clone(), progress_opt, - metrics.as_ref(), + &metrics, &mut split_removal_info, ) .await; @@ -385,7 +391,7 @@ pub async fn delete_splits_from_storage_and_metastore( metastore: MetastoreServiceClient, splits: Vec, progress_opt: Option<&Progress>, -) -> anyhow::Result, DeleteSplitsError> { +) -> Result, DeleteSplitsError> { let mut split_infos: HashMap = HashMap::with_capacity(splits.len()); for split in splits { diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index 96660ca247b..6d02f29b39a 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -142,15 +142,21 @@ impl GarbageCollector { false, Some(ctx.progress()), Some(GcMetrics { - deleted_splits: JANITOR_METRICS.gc_deleted_splits.clone(), + deleted_splits: JANITOR_METRICS + .gc_deleted_splits + .with_label_values(["success"]) + .clone(), deleted_bytes: JANITOR_METRICS.gc_deleted_bytes.clone(), - failed_splits: JANITOR_METRICS.gc_failed_deleted_splits.clone(), + failed_splits: JANITOR_METRICS + .gc_deleted_splits + .with_label_values(["error"]) + .clone(), }), ) .await; let run_duration = start.elapsed().as_secs(); - JANITOR_METRICS.gc_duration_seconds_sum.inc_by(run_duration); + JANITOR_METRICS.gc_seconds_total.inc_by(run_duration); let deleted_file_entries = match gc_res { Ok(removal_info) => { diff --git a/quickwit/quickwit-janitor/src/metrics.rs b/quickwit/quickwit-janitor/src/metrics.rs index 45111af8f93..5cd2ac36583 100644 --- a/quickwit/quickwit-janitor/src/metrics.rs +++ b/quickwit/quickwit-janitor/src/metrics.rs @@ -24,11 +24,10 @@ use quickwit_common::metrics::{ pub struct JanitorMetrics { pub ongoing_num_delete_operations_total: IntGaugeVec<1>, - pub gc_deleted_splits: IntCounter, + pub gc_deleted_splits: IntCounterVec<1>, pub gc_deleted_bytes: IntCounter, - pub gc_failed_deleted_splits: IntCounter, pub gc_run_count: IntCounterVec<1>, - pub gc_duration_seconds_sum: IntCounter, + pub gc_seconds_total: IntCounter, // TODO having a current run duration which is 0|undefined out of run, and returns `now - // start_time` during a run would be nice } @@ -43,33 +42,28 @@ impl Default for JanitorMetrics { &[], ["index"], ), - gc_deleted_splits: new_counter( + gc_deleted_splits: new_counter_vec( "gc_deleted_splits_count", "Total number of splits deleted by the garbage collector.", "quickwit_janitor", &[], + ["result"], ), gc_deleted_bytes: new_counter( - "gc_deleted_bytes_count", + "gc_deleted_bytes_total", "Total number of bytes deleted by the garbage collector.", "quickwit_janitor", &[], ), - gc_failed_deleted_splits: new_counter( - "gc_deleted_splits_error_count", - "Total number of splits that failed to be delete.", - "quickwit_janitor", - &[], - ), gc_run_count: new_counter_vec( - "gc_run_count", + "gc_run_total", "Total number of garbage collector execition.", "quickwit_janitor", &[], ["result"], ), - gc_duration_seconds_sum: new_counter( - "gc_duration_seconds_sum", + gc_seconds_total: new_counter( + "gc_seconds_total", "Total time spent running the garbage collector", "quickwit_janitor", &[], From 63521a3eaffdec057673fa19f975fe43e8c01a65 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Mon, 16 Sep 2024 11:11:56 +0200 Subject: [PATCH 5/6] update doc on after_split --- quickwit/quickwit-metastore/src/metastore/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 3b224413e64..2b0bb86d9ca 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -855,13 +855,14 @@ impl ListSplitsQuery { self } - /// Sorts the splits by index_uid. + /// Sorts the splits by index_uid and split_id. pub fn sort_by_index_uid(mut self) -> Self { self.sort_by = SortBy::IndexUid; self } - /// Only return splits whose (index_uid, split_id) are lexicographically after this split + /// Only return splits whose (index_uid, split_id) are lexicographically after this split. + /// This is only useful if results are sorted by index_uid and split_id. pub fn after_split(mut self, split_meta: &SplitMetadata) -> Self { self.after_split = Some((split_meta.index_uid.clone(), split_meta.split_id.clone())); self From a076c6311b7a801b94b163c1c0c36c5d2a6c25b3 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 17 Sep 2024 15:10:54 +0900 Subject: [PATCH 6/6] gc metrics --- .../src/garbage_collection.rs | 35 +++------ .../quickwit-index-management/src/index.rs | 3 +- quickwit/quickwit-index-management/src/lib.rs | 5 +- .../src/actors/garbage_collector.rs | 14 +--- quickwit/quickwit-janitor/src/metrics.rs | 77 +++++++++++++------ 5 files changed, 72 insertions(+), 62 deletions(-) diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 74971077b3b..9440872da87 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -25,7 +25,6 @@ use std::time::Duration; use anyhow::Context; use futures::{Future, StreamExt}; use itertools::Itertools; -use quickwit_common::metrics::IntCounter; use quickwit_common::pretty::PrettySample; use quickwit_common::Progress; use quickwit_metastore::{ @@ -45,24 +44,14 @@ use tracing::{error, instrument}; /// The maximum number of splits that the GC should delete per attempt. const DELETE_SPLITS_BATCH_SIZE: usize = 10_000; -pub struct GcMetrics { - pub deleted_splits: IntCounter, - pub deleted_bytes: IntCounter, - pub failed_splits: IntCounter, -} - -trait RecordGcMetrics { +pub trait RecordGcMetrics: Sync { fn record(&self, num_delete_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize); } -impl RecordGcMetrics for Option { - fn record(&self, num_deleted_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize) { - if let Some(metrics) = self { - metrics.deleted_splits.inc_by(num_deleted_splits as u64); - metrics.deleted_bytes.inc_by(num_deleted_bytes); - metrics.failed_splits.inc_by(num_failed_splits as u64); - } - } +pub(crate) struct DoNotRecordGcMetrics; + +impl RecordGcMetrics for DoNotRecordGcMetrics { + fn record(&self, _num_deleted_splits: usize, _num_deleted_bytes: u64, _num_failed_splits: usize) {} } /// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from @@ -115,7 +104,7 @@ pub async fn run_garbage_collect( deletion_grace_period: Duration, dry_run: bool, progress_opt: Option<&Progress>, - metrics: Option, + metrics: &dyn RecordGcMetrics, ) -> anyhow::Result { let grace_period_timestamp = OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64; @@ -202,7 +191,7 @@ async fn delete_splits( storages: &HashMap>, metastore: MetastoreServiceClient, progress_opt: Option<&Progress>, - metrics: &Option, + metrics: &dyn RecordGcMetrics, split_removal_info: &mut SplitRemovalInfo, ) -> Result<(), ()> { let mut delete_split_from_index_res_stream = @@ -313,7 +302,7 @@ async fn delete_splits_marked_for_deletion_several_indexes( metastore: MetastoreServiceClient, storages: HashMap>, progress_opt: Option<&Progress>, - metrics: Option, + metrics: &dyn RecordGcMetrics, ) -> SplitRemovalInfo { let mut split_removal_info = SplitRemovalInfo::default(); @@ -362,7 +351,7 @@ async fn delete_splits_marked_for_deletion_several_indexes( &storages, metastore.clone(), progress_opt, - &metrics, + metrics, &mut split_removal_info, ) .await; @@ -557,7 +546,7 @@ mod tests { Duration::from_secs(30), false, None, - None, + &DoNotRecordGcMetrics, ) .await .unwrap(); @@ -585,7 +574,7 @@ mod tests { Duration::from_secs(30), false, None, - None, + &DoNotRecordGcMetrics, ) .await .unwrap(); @@ -663,7 +652,7 @@ mod tests { Duration::from_secs(30), false, None, - None, + &DoNotRecordGcMetrics, ) .await .unwrap(); diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 0fe5c77cc2b..86f03844358 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -49,6 +49,7 @@ use crate::garbage_collection::{ delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError, SplitRemovalInfo, }; +use crate::DoNotRecordGcMetrics; #[derive(Error, Debug)] pub enum IndexServiceError { @@ -373,7 +374,7 @@ impl IndexService { Duration::ZERO, dry_run, None, - None, + &DoNotRecordGcMetrics, ) .await?; diff --git a/quickwit/quickwit-index-management/src/lib.rs b/quickwit/quickwit-index-management/src/lib.rs index 65a7ef861ce..c5b6b1934b9 100644 --- a/quickwit/quickwit-index-management/src/lib.rs +++ b/quickwit/quickwit-index-management/src/lib.rs @@ -20,5 +20,8 @@ mod garbage_collection; mod index; -pub use garbage_collection::{run_garbage_collect, GcMetrics}; +pub use garbage_collection::run_garbage_collect; +pub use garbage_collection::RecordGcMetrics; pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError}; + +use garbage_collection::DoNotRecordGcMetrics; diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index 6d02f29b39a..fd23f5824cd 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -26,7 +26,7 @@ use async_trait::async_trait; use futures::{stream, StreamExt}; use quickwit_actors::{Actor, ActorContext, Handler}; use quickwit_common::shared_consts::split_deletion_grace_period; -use quickwit_index_management::{run_garbage_collect, GcMetrics}; +use quickwit_index_management::run_garbage_collect; use quickwit_metastore::ListIndexesMetadataResponseExt; use quickwit_proto::metastore::{ ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, @@ -141,17 +141,7 @@ impl GarbageCollector { split_deletion_grace_period(), false, Some(ctx.progress()), - Some(GcMetrics { - deleted_splits: JANITOR_METRICS - .gc_deleted_splits - .with_label_values(["success"]) - .clone(), - deleted_bytes: JANITOR_METRICS.gc_deleted_bytes.clone(), - failed_splits: JANITOR_METRICS - .gc_deleted_splits - .with_label_values(["error"]) - .clone(), - }), + &JANITOR_METRICS.gc_metrics, ) .await; diff --git a/quickwit/quickwit-janitor/src/metrics.rs b/quickwit/quickwit-janitor/src/metrics.rs index 5cd2ac36583..560ae08c26a 100644 --- a/quickwit/quickwit-janitor/src/metrics.rs +++ b/quickwit/quickwit-janitor/src/metrics.rs @@ -19,14 +19,53 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - new_counter, new_counter_vec, new_gauge_vec, IntCounter, IntCounterVec, IntGaugeVec, + new_counter, new_counter_vec, new_gauge_vec, IntCounter, IntCounterVec, IntGaugeVec }; +use quickwit_index_management::RecordGcMetrics; + + +pub struct GcMetrics { + pub deleted_splits: IntCounter, + pub deleted_bytes: IntCounter, + pub failed_splits: IntCounter, +} + +impl RecordGcMetrics for GcMetrics { + fn record(&self, num_deleted_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize) { + self.deleted_splits.inc_by(num_deleted_splits as u64); + self.deleted_bytes.inc_by(num_deleted_bytes); + self.failed_splits.inc_by(num_failed_splits as u64); + } +} + +impl Default for GcMetrics { + fn default() -> GcMetrics { + let deleted_splits = new_counter_vec( + "gc_deleted_splits_count", + "Total number of splits deleted by the garbage collector.", + "quickwit_janitor", + &[], + ["result"], + ); + let deleted_bytes = new_counter( + "gc_deleted_bytes_total", + "Total number of bytes deleted by the garbage collector.", + "quickwit_janitor", + &[], + ); + GcMetrics { + deleted_splits: deleted_splits.with_label_values(["success"]), + deleted_bytes, + failed_splits: deleted_splits.with_label_values(["failure"]) + } + } + +} pub struct JanitorMetrics { pub ongoing_num_delete_operations_total: IntGaugeVec<1>, - pub gc_deleted_splits: IntCounterVec<1>, - pub gc_deleted_bytes: IntCounter, pub gc_run_count: IntCounterVec<1>, + pub gc_metrics: GcMetrics, pub gc_seconds_total: IntCounter, // TODO having a current run duration which is 0|undefined out of run, and returns `now - // start_time` during a run would be nice @@ -42,32 +81,20 @@ impl Default for JanitorMetrics { &[], ["index"], ), - gc_deleted_splits: new_counter_vec( - "gc_deleted_splits_count", - "Total number of splits deleted by the garbage collector.", - "quickwit_janitor", - &[], - ["result"], - ), - gc_deleted_bytes: new_counter( - "gc_deleted_bytes_total", - "Total number of bytes deleted by the garbage collector.", - "quickwit_janitor", - &[], - ), gc_run_count: new_counter_vec( - "gc_run_total", - "Total number of garbage collector execition.", - "quickwit_janitor", - &[], - ["result"], - ), + "gc_run_total", + "Total number of garbage collector execition.", + "quickwit_janitor", + &[], + ["result"], + ), gc_seconds_total: new_counter( "gc_seconds_total", - "Total time spent running the garbage collector", - "quickwit_janitor", - &[], + "Total time spent running the garbage collector", + "quickwit_janitor", + &[], ), + gc_metrics: GcMetrics::default(), } } }