diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index c25809dedf4..54d75e3d5c9 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -4463,7 +4463,7 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "ownedbytes" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=014328e#014328e378ff313a0b7f00f124e02d9230aa2039" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=108f30#108f30ba235cf4c4154c62d115b3efc487fb0a8b" dependencies = [ "stable_deref_trait", ] @@ -7696,7 +7696,7 @@ dependencies = [ [[package]] name = "tantivy" version = "0.22.0-dev" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=014328e#014328e378ff313a0b7f00f124e02d9230aa2039" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=108f30#108f30ba235cf4c4154c62d115b3efc487fb0a8b" dependencies = [ "aho-corasick", "arc-swap", @@ -7749,7 +7749,7 @@ dependencies = [ [[package]] name = "tantivy-bitpacker" version = "0.5.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=014328e#014328e378ff313a0b7f00f124e02d9230aa2039" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=108f30#108f30ba235cf4c4154c62d115b3efc487fb0a8b" dependencies = [ "bitpacking", ] @@ -7757,7 +7757,7 @@ dependencies = [ [[package]] name = "tantivy-columnar" version = "0.2.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=014328e#014328e378ff313a0b7f00f124e02d9230aa2039" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=108f30#108f30ba235cf4c4154c62d115b3efc487fb0a8b" dependencies = [ "fastdivide", "itertools 0.12.0", @@ -7771,7 +7771,7 @@ dependencies = [ [[package]] name = "tantivy-common" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=014328e#014328e378ff313a0b7f00f124e02d9230aa2039" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=108f30#108f30ba235cf4c4154c62d115b3efc487fb0a8b" dependencies = [ "async-trait", "byteorder", @@ -7794,7 +7794,7 @@ dependencies = [ [[package]] name = "tantivy-query-grammar" version = "0.21.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=014328e#014328e378ff313a0b7f00f124e02d9230aa2039" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=108f30#108f30ba235cf4c4154c62d115b3efc487fb0a8b" dependencies = [ "nom", ] @@ -7802,7 +7802,7 @@ dependencies = [ [[package]] name = "tantivy-sstable" version = "0.2.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=014328e#014328e378ff313a0b7f00f124e02d9230aa2039" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=108f30#108f30ba235cf4c4154c62d115b3efc487fb0a8b" dependencies = [ "tantivy-bitpacker", "tantivy-common", @@ -7813,7 +7813,7 @@ dependencies = [ [[package]] name = "tantivy-stacker" version = "0.2.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=014328e#014328e378ff313a0b7f00f124e02d9230aa2039" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=108f30#108f30ba235cf4c4154c62d115b3efc487fb0a8b" dependencies = [ "murmurhash32", "tantivy-common", @@ -7822,7 +7822,7 @@ dependencies = [ [[package]] name = "tantivy-tokenizer-api" version = "0.2.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=014328e#014328e378ff313a0b7f00f124e02d9230aa2039" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=108f30#108f30ba235cf4c4154c62d115b3efc487fb0a8b" dependencies = [ "serde", ] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 661f86bfb5e..60b5f7a5760 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -279,7 +279,7 @@ quickwit-serve = { version = "0.7.0", path = "./quickwit-serve" } quickwit-storage = { version = "0.7.0", path = "./quickwit-storage" } quickwit-telemetry = { version = "0.7.0", path = "./quickwit-telemetry" } -tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "014328e", default-features = false, features = [ +tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "108f30b", default-features = false, features = [ "lz4-compression", "mmap", "quickwit", diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index 71937f0eac7..27c0e3bb2ca 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -49,7 +49,7 @@ pub fn setup_logging_and_tracing( } let env_filter = env::var("RUST_LOG") .map(|_| EnvFilter::from_default_env()) - .or_else(|_| EnvFilter::try_new(format!("quickwit={level}"))) + .or_else(|_| EnvFilter::try_new(format!("quickwit={level},tantivy=WARN"))) .context("Failed to set up tracing env filter.")?; global::set_text_map_propagator(TraceContextPropagator::new()); let registry = tracing_subscriber::registry().with(env_filter); diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 308ac1613d4..035c14f2eb4 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -82,6 +82,10 @@ pub struct IndexerCounters { /// This value is used to trigger commit and for observation. pub num_docs_in_workbench: u64, + /// Number of ProcessDocBatch received by the indexer to + /// build this split. + pub num_doc_batches_in_workbench: u64, + /// Metrics describing the load and indexing performance of the /// pipeline. This is only updated for cooperative indexers. pub pipeline_metrics_opt: Option, @@ -292,6 +296,7 @@ impl IndexerState { .extend(batch.checkpoint_delta) .context("batch delta does not follow indexer checkpoint")?; let mut memory_usage_delta: u64 = 0; + counters.num_doc_batches_in_workbench += 1; for doc in batch.docs { let ProcessedDoc { doc, @@ -652,7 +657,14 @@ impl Indexer { } let num_splits = splits.len() as u64; let split_ids = splits.iter().map(|split| split.split_id()).join(","); - info!(commit_trigger=?commit_trigger, split_ids=%split_ids, num_docs=self.counters.num_docs_in_workbench, "send-to-index-serializer"); + info!( + index=self.indexer_state.pipeline_id.index_uid.as_str(), + source=self.indexer_state.pipeline_id.source_id.as_str(), + pipeline_uid=%self.indexer_state.pipeline_id.pipeline_uid, + commit_trigger=?commit_trigger, + num_batches=%self.counters.num_doc_batches_in_workbench, + split_ids=%split_ids, + num_docs=self.counters.num_docs_in_workbench, "send-to-index-serializer"); ctx.send_message( &self.index_serializer_mailbox, IndexedSplitBatchBuilder { @@ -666,6 +678,7 @@ impl Indexer { ) .await?; self.counters.num_docs_in_workbench = 0; + self.counters.num_doc_batches_in_workbench = 0; self.counters.num_splits_emitted += num_splits; self.counters.num_split_batches_emitted += 1; Ok(()) @@ -830,6 +843,7 @@ mod tests { num_splits_emitted: 1, num_split_batches_emitted: 1, num_docs_in_workbench: 1, //< the num docs in split counter has been reset. + num_doc_batches_in_workbench: 1, //< the num docs in split counter has been reset. pipeline_metrics_opt: None, } ); @@ -1075,6 +1089,7 @@ mod tests { num_splits_emitted: 1, num_split_batches_emitted: 1, num_docs_in_workbench: 0, + num_doc_batches_in_workbench: 0, pipeline_metrics_opt: None, } ); @@ -1148,6 +1163,7 @@ mod tests { num_splits_emitted: 1, num_split_batches_emitted: 1, num_docs_in_workbench: 0, + num_doc_batches_in_workbench: 0, pipeline_metrics_opt: None, } ); @@ -1237,6 +1253,7 @@ mod tests { indexer_counters, IndexerCounters { num_docs_in_workbench: 2, + num_doc_batches_in_workbench: 1, num_splits_emitted: 0, num_split_batches_emitted: 0, pipeline_metrics_opt: None, @@ -1249,6 +1266,7 @@ mod tests { indexer_counters, IndexerCounters { num_docs_in_workbench: 0, + num_doc_batches_in_workbench: 0, num_splits_emitted: 2, num_split_batches_emitted: 1, pipeline_metrics_opt: None, @@ -1597,6 +1615,7 @@ mod tests { num_splits_emitted: 0, num_split_batches_emitted: 0, num_docs_in_workbench: 0, //< the num docs in split counter has been reset. + num_doc_batches_in_workbench: 2, //< the num docs in split counter has been reset. pipeline_metrics_opt: None, } ); diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index bd9cc9dd047..d1e17983ea6 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -438,7 +438,11 @@ impl IngestSource { return Ok(()); } - warn!("resetting pipeline"); + warn!( + index_uid = self.client_id.source_uid.index_uid.as_str(), + pipeline_uid = self.client_id.pipeline_uid, + "resetting pipeline" + ); self.assigned_shards.clear(); self.fetch_stream.reset(); self.publish_lock.kill().await; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index aa422e3f13e..2e4a1ad5072 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -266,7 +266,12 @@ impl Ingester { shard: Shard, ) -> IngestV2Result<()> { let queue_id = shard.queue_id(); - + info!( + index_uid = shard.index_uid, + source = shard.source_id, + shard = %shard.shard_id(), + "init primary shard" + ); let Entry::Vacant(entry) = state.shards.entry(queue_id.clone()) else { return Ok(()); }; diff --git a/quickwit/quickwit-proto/src/types/pipeline_uid.rs b/quickwit/quickwit-proto/src/types/pipeline_uid.rs index 11ae92c3a6d..caafa482dfb 100644 --- a/quickwit/quickwit-proto/src/types/pipeline_uid.rs +++ b/quickwit/quickwit-proto/src/types/pipeline_uid.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::fmt; use std::fmt::{Display, Formatter}; use std::str::FromStr; @@ -30,12 +31,18 @@ const ULID_SIZE: usize = 16; #[derive(Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)] pub struct PipelineUid(Ulid); -impl std::fmt::Debug for PipelineUid { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { +impl fmt::Debug for PipelineUid { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!(f, "Pipeline({})", self.0) } } +impl Display for PipelineUid { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + self.0.fmt(f) + } +} + impl PipelineUid { pub fn from_u128(ulid_u128: u128) -> PipelineUid { PipelineUid(Ulid::from_bytes(ulid_u128.to_le_bytes())) @@ -57,12 +64,6 @@ impl FromStr for PipelineUid { } } -impl Display for PipelineUid { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - self.0.fmt(f) - } -} - impl Serialize for PipelineUid { fn serialize(&self, serializer: S) -> Result { serializer.collect_str(&self.0)