From afee9de3bae12e0b4a3a53028161dbc896b50143 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 12 Jul 2024 10:29:11 +0200 Subject: [PATCH 1/2] shorten apply-plan log (#5214) --- quickwit/quickwit-common/src/pretty.rs | 30 ++-- .../src/indexing_scheduler/mod.rs | 141 ++++++++++++++++-- 2 files changed, 148 insertions(+), 23 deletions(-) diff --git a/quickwit/quickwit-common/src/pretty.rs b/quickwit/quickwit-common/src/pretty.rs index 04f75bb7265..dcc5e3b1536 100644 --- a/quickwit/quickwit-common/src/pretty.rs +++ b/quickwit/quickwit-common/src/pretty.rs @@ -20,28 +20,36 @@ use std::fmt; use std::time::Duration; -pub struct PrettySample<'a, T>(&'a [T], usize); +pub struct PrettySample(I, usize); -impl<'a, T> PrettySample<'a, T> { - pub fn new(slice: &'a [T], sample_size: usize) -> Self { +impl PrettySample { + pub fn new(slice: I, sample_size: usize) -> Self { Self(slice, sample_size) } } -impl fmt::Debug for PrettySample<'_, T> -where T: fmt::Debug +impl fmt::Debug for PrettySample +where + I: IntoIterator + Clone, + T: fmt::Debug, { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { write!(formatter, "[")?; - for (i, item) in self.0.iter().enumerate() { - if i == self.1 { - write!(formatter, ", and {} more", self.0.len() - i)?; - break; - } + // in general we will get passed a reference (&[...], &HashMap...) or a Map<_> of them. + // So we either perform a Copy, or a cheap Clone of a simple struct + let mut iter = self.0.clone().into_iter().enumerate(); + for (i, item) in &mut iter { if i > 0 { write!(formatter, ", ")?; } write!(formatter, "{item:?}")?; + if i == self.1 - 1 { + break; + } + } + let left = iter.count(); + if left > 0 { + write!(formatter, ", and {left} more")?; } write!(formatter, "]")?; Ok(()) @@ -83,7 +91,7 @@ mod tests { #[test] fn test_pretty_sample() { - let pretty_sample = PrettySample::<'_, usize>::new(&[], 2); + let pretty_sample = PrettySample::<&[usize]>::new(&[], 2); assert_eq!(format!("{pretty_sample:?}"), "[]"); let pretty_sample = PrettySample::new(&[1], 2); diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 110e08a5139..a94d5de92a3 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -29,6 +29,7 @@ use std::time::{Duration, Instant}; use fnv::{FnvHashMap, FnvHashSet}; use itertools::Itertools; use once_cell::sync::OnceCell; +use quickwit_common::pretty::PrettySample; use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY, PIPELINE_THROUGHTPUT, @@ -448,38 +449,112 @@ fn get_shard_locality_metrics( impl<'a> fmt::Debug for IndexingPlansDiff<'a> { fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { if self.has_same_nodes() && self.has_same_tasks() { - return write!(formatter, "EmptyIndexingPlanDiff"); + return write!(formatter, "EmptyIndexingPlansDiff"); } - write!(formatter, "IndexingPlanDiff(")?; + write!(formatter, "IndexingPlansDiff(")?; let mut separator = ""; if !self.missing_node_ids.is_empty() { - write!(formatter, "missing_node_ids={:?}, ", self.missing_node_ids)?; + write!( + formatter, + "missing_node_ids={:?}", + PrettySample::new(&self.missing_node_ids, 10) + )?; separator = ", " } if !self.unplanned_node_ids.is_empty() { write!( formatter, "{separator}unplanned_node_ids={:?}", - self.unplanned_node_ids + PrettySample::new(&self.unplanned_node_ids, 10) )?; separator = ", " } if !self.missing_tasks_by_node_id.is_empty() { - write!( - formatter, - "{separator}missing_tasks_by_node_id={:?}, ", - self.missing_tasks_by_node_id - )?; + write!(formatter, "{separator}missing_tasks_by_node_id=",)?; + format_indexing_task_map(formatter, &self.missing_tasks_by_node_id)?; separator = ", " } if !self.unplanned_tasks_by_node_id.is_empty() { + write!(formatter, "{separator}unplanned_tasks_by_node_id=",)?; + format_indexing_task_map(formatter, &self.unplanned_tasks_by_node_id)?; + } + write!(formatter, ")") + } +} + +fn format_indexing_task_map( + formatter: &mut std::fmt::Formatter, + indexing_tasks: &FnvHashMap<&str, Vec<&IndexingTask>>, +) -> std::fmt::Result { + // we show at most 5 nodes, and aggregate the results for the other. + // we show at most 10 indexes, but aggregate results after. + // we always aggregate shard ids + // we hide pipeline id and incarnation id, they are not very useful in most case, but take a + // lot of place + const MAX_NODE: usize = 5; + const MAX_INDEXES: usize = 10; + let mut index_displayed = 0; + write!(formatter, "{{")?; + let mut indexer_iter = indexing_tasks.iter().enumerate(); + for (i, (index_name, tasks)) in &mut indexer_iter { + if i != 0 { + write!(formatter, ", ")?; + } + if index_displayed != MAX_INDEXES - 1 { + write!(formatter, "{index_name:?}: [")?; + let mut tasks_iter = tasks.iter().enumerate(); + for (i, task) in &mut tasks_iter { + if i != 0 { + write!(formatter, ", ")?; + } + write!( + formatter, + r#"(index_id: "{}", source_id: "{}", shard_count: {})"#, + task.index_uid.as_ref().unwrap().index_id, + task.source_id, + task.shard_ids.len() + )?; + index_displayed += 1; + if index_displayed == MAX_INDEXES - 1 { + let (task_count, shard_count) = tasks_iter.fold((0, 0), |(t, s), (_, task)| { + (t + 1, s + task.shard_ids.len()) + }); + if task_count > 0 { + write!( + formatter, + " and {task_count} tasks and {shard_count} shards" + )?; + } + break; + } + } + write!(formatter, "]")?; + } else { write!( formatter, - "{separator}unplanned_tasks_by_node_id={:?}", - self.unplanned_tasks_by_node_id + "{index_name:?}: [with {} tasks and {} shards]", + tasks.len(), + tasks.iter().map(|task| task.shard_ids.len()).sum::() )?; } - write!(formatter, ")") + if i == MAX_NODE - 1 { + break; + } + } + let (indexer, tasks, shards) = indexer_iter.fold((0, 0, 0), |(i, t, s), (_, (_, task))| { + ( + i + 1, + t + task.len(), + s + task.iter().map(|task| task.shard_ids.len()).sum::(), + ) + }); + if indexer > 0 { + write!( + formatter, + " and {indexer} more indexers, handling {tasks} tasks and {shards} shards}}" + ) + } else { + write!(formatter, "}}") } } @@ -884,6 +959,48 @@ mod tests { assert_eq!(indexer_2_tasks.len(), 3); } + #[test] + fn test_debug_indexing_task_map() { + let mut map = FnvHashMap::default(); + let task1 = IndexingTask { + index_uid: Some(IndexUid::for_test("index1", 123)), + source_id: "my-source".to_string(), + pipeline_uid: Some(PipelineUid::random()), + shard_ids: vec!["shard1".into()], + }; + let task2 = IndexingTask { + index_uid: Some(IndexUid::for_test("index2", 123)), + source_id: "my-source".to_string(), + pipeline_uid: Some(PipelineUid::random()), + shard_ids: vec!["shard2".into(), "shard3".into()], + }; + let task3 = IndexingTask { + index_uid: Some(IndexUid::for_test("index3", 123)), + source_id: "my-source".to_string(), + pipeline_uid: Some(PipelineUid::random()), + shard_ids: vec!["shard6".into()], + }; + // order made to map with the debug for lisibility + map.insert("indexer5", vec![&task2]); + map.insert("indexer4", vec![&task1]); + map.insert("indexer3", vec![&task1, &task3]); + map.insert("indexer2", vec![&task2, &task3, &task1, &task2]); + map.insert("indexer1", vec![&task1, &task2, &task3, &task1]); + map.insert("indexer6", vec![&task1, &task2, &task3]); + let plan = IndexingPlansDiff { + missing_node_ids: FnvHashSet::default(), + unplanned_node_ids: FnvHashSet::default(), + missing_tasks_by_node_id: map, + unplanned_tasks_by_node_id: FnvHashMap::default(), + }; + + let debug = format!("{plan:?}"); + assert_eq!( + debug, + r#"IndexingPlansDiff(missing_tasks_by_node_id={"indexer5": [(index_id: "index2", source_id: "my-source", shard_count: 2)], "indexer4": [(index_id: "index1", source_id: "my-source", shard_count: 1)], "indexer3": [(index_id: "index1", source_id: "my-source", shard_count: 1), (index_id: "index3", source_id: "my-source", shard_count: 1)], "indexer2": [(index_id: "index2", source_id: "my-source", shard_count: 2), (index_id: "index3", source_id: "my-source", shard_count: 1), (index_id: "index1", source_id: "my-source", shard_count: 1), (index_id: "index2", source_id: "my-source", shard_count: 2)], "indexer1": [(index_id: "index1", source_id: "my-source", shard_count: 1) and 3 tasks and 4 shards] and 1 more indexers, handling 3 tasks and 4 shards})"# + ); + } + proptest! { #[test] fn test_building_indexing_tasks_and_physical_plan(num_indexers in 1usize..50usize, index_id_sources in proptest::collection::vec(gen_kafka_source(), 1..20)) { From 549e129659585a41644970fb5d3d023787ae4f1b Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Fri, 12 Jul 2024 11:40:14 +0200 Subject: [PATCH 2/2] add raw_lowercase tokenizer (#5216) --- docs/configuration/index-config.md | 7 +++--- quickwit/quickwit-query/src/tokenizers/mod.rs | 22 +++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/docs/configuration/index-config.md b/docs/configuration/index-config.md index 27d1befb5f2..01eb922fa98 100644 --- a/docs/configuration/index-config.md +++ b/docs/configuration/index-config.md @@ -142,11 +142,12 @@ fast: | Tokenizer | Description | | ------------- | ------------- | | `raw` | Does not process nor tokenize text. Filters out tokens larger than 255 bytes. | +| `raw_lowercase` | Does not tokenize text, but lowercase it. Filters out tokens larger than 255 bytes. | | `default` | Chops the text on according to whitespace and punctuation, removes tokens that are too long, and converts to lowercase. Filters out tokens larger than 255 bytes. | -| `en_stem` | Like `default`, but also applies stemming on the resulting tokens. Filters out tokens larger than 255 bytes. | -| `whitespace` | Chops the text on according to whitespace only. Doesn't remove long tokens or converts to lowercase. | +| `en_stem` | Like `default`, but also applies stemming on the resulting tokens. Filters out tokens larger than 255 bytes. | +| `whitespace` | Chops the text on according to whitespace only. Doesn't remove long tokens or converts to lowercase. | | `chinese_compatible` | Chop between each CJK character in addition to what `default` does. Should be used with `record: position` to be able to properly search | -| `lowercase` | Applies a lowercase transformation on the text. It does not tokenize the text. | +| `lowercase` | Applies a lowercase transformation on the text. It does not tokenize the text. | ##### Description of available normalizers diff --git a/quickwit/quickwit-query/src/tokenizers/mod.rs b/quickwit/quickwit-query/src/tokenizers/mod.rs index 3e68a55cb50..420252c52a7 100644 --- a/quickwit/quickwit-query/src/tokenizers/mod.rs +++ b/quickwit/quickwit-query/src/tokenizers/mod.rs @@ -46,6 +46,12 @@ pub fn create_default_quickwit_tokenizer_manager() -> TokenizerManager { .build(); tokenizer_manager.register("raw", raw_tokenizer, false); + let raw_tokenizer = TextAnalyzer::builder(RawTokenizer::default()) + .filter(LowerCaser) + .filter(RemoveLongFilter::limit(DEFAULT_REMOVE_TOKEN_LENGTH)) + .build(); + tokenizer_manager.register("raw_lowercase", raw_tokenizer, false); + let lower_case_tokenizer = TextAnalyzer::builder(RawTokenizer::default()) .filter(LowerCaser) .filter(RemoveLongFilter::limit(DEFAULT_REMOVE_TOKEN_LENGTH)) @@ -160,4 +166,20 @@ mod tests { } assert_eq!(tokens, vec!["pig", "cafe", "factory", "2"]) } + + #[test] + fn test_raw_lowercase_tokenizer() { + let tokenizer_manager = super::create_default_quickwit_tokenizer_manager(); + let my_long_text = "a text, that is just too long, no one will type it, no one will like \ + it, no one shall find it. I just need some more chars, now you may \ + not pass."; + + let mut tokenizer = tokenizer_manager.get_tokenizer("raw_lowercase").unwrap(); + let mut stream = tokenizer.token_stream(my_long_text); + assert!(stream.advance()); + assert_eq!(stream.token().text.len(), my_long_text.len()); + // there are non letter, so we can't check for all lowercase directly + assert!(stream.token().text.chars().all(|c| !c.is_uppercase())); + assert!(!stream.advance()); + } }