diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index 71937f0eac7..27c0e3bb2ca 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -49,7 +49,7 @@ pub fn setup_logging_and_tracing( } let env_filter = env::var("RUST_LOG") .map(|_| EnvFilter::from_default_env()) - .or_else(|_| EnvFilter::try_new(format!("quickwit={level}"))) + .or_else(|_| EnvFilter::try_new(format!("quickwit={level},tantivy=WARN"))) .context("Failed to set up tracing env filter.")?; global::set_text_map_propagator(TraceContextPropagator::new()); let registry = tracing_subscriber::registry().with(env_filter); diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 308ac1613d4..035c14f2eb4 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -82,6 +82,10 @@ pub struct IndexerCounters { /// This value is used to trigger commit and for observation. pub num_docs_in_workbench: u64, + /// Number of ProcessDocBatch received by the indexer to + /// build this split. + pub num_doc_batches_in_workbench: u64, + /// Metrics describing the load and indexing performance of the /// pipeline. This is only updated for cooperative indexers. pub pipeline_metrics_opt: Option, @@ -292,6 +296,7 @@ impl IndexerState { .extend(batch.checkpoint_delta) .context("batch delta does not follow indexer checkpoint")?; let mut memory_usage_delta: u64 = 0; + counters.num_doc_batches_in_workbench += 1; for doc in batch.docs { let ProcessedDoc { doc, @@ -652,7 +657,14 @@ impl Indexer { } let num_splits = splits.len() as u64; let split_ids = splits.iter().map(|split| split.split_id()).join(","); - info!(commit_trigger=?commit_trigger, split_ids=%split_ids, num_docs=self.counters.num_docs_in_workbench, "send-to-index-serializer"); + info!( + index=self.indexer_state.pipeline_id.index_uid.as_str(), + source=self.indexer_state.pipeline_id.source_id.as_str(), + pipeline_uid=%self.indexer_state.pipeline_id.pipeline_uid, + commit_trigger=?commit_trigger, + num_batches=%self.counters.num_doc_batches_in_workbench, + split_ids=%split_ids, + num_docs=self.counters.num_docs_in_workbench, "send-to-index-serializer"); ctx.send_message( &self.index_serializer_mailbox, IndexedSplitBatchBuilder { @@ -666,6 +678,7 @@ impl Indexer { ) .await?; self.counters.num_docs_in_workbench = 0; + self.counters.num_doc_batches_in_workbench = 0; self.counters.num_splits_emitted += num_splits; self.counters.num_split_batches_emitted += 1; Ok(()) @@ -830,6 +843,7 @@ mod tests { num_splits_emitted: 1, num_split_batches_emitted: 1, num_docs_in_workbench: 1, //< the num docs in split counter has been reset. + num_doc_batches_in_workbench: 1, //< the num docs in split counter has been reset. pipeline_metrics_opt: None, } ); @@ -1075,6 +1089,7 @@ mod tests { num_splits_emitted: 1, num_split_batches_emitted: 1, num_docs_in_workbench: 0, + num_doc_batches_in_workbench: 0, pipeline_metrics_opt: None, } ); @@ -1148,6 +1163,7 @@ mod tests { num_splits_emitted: 1, num_split_batches_emitted: 1, num_docs_in_workbench: 0, + num_doc_batches_in_workbench: 0, pipeline_metrics_opt: None, } ); @@ -1237,6 +1253,7 @@ mod tests { indexer_counters, IndexerCounters { num_docs_in_workbench: 2, + num_doc_batches_in_workbench: 1, num_splits_emitted: 0, num_split_batches_emitted: 0, pipeline_metrics_opt: None, @@ -1249,6 +1266,7 @@ mod tests { indexer_counters, IndexerCounters { num_docs_in_workbench: 0, + num_doc_batches_in_workbench: 0, num_splits_emitted: 2, num_split_batches_emitted: 1, pipeline_metrics_opt: None, @@ -1597,6 +1615,7 @@ mod tests { num_splits_emitted: 0, num_split_batches_emitted: 0, num_docs_in_workbench: 0, //< the num docs in split counter has been reset. + num_doc_batches_in_workbench: 2, //< the num docs in split counter has been reset. pipeline_metrics_opt: None, } ); diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index bd9cc9dd047..d1e17983ea6 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -438,7 +438,11 @@ impl IngestSource { return Ok(()); } - warn!("resetting pipeline"); + warn!( + index_uid = self.client_id.source_uid.index_uid.as_str(), + pipeline_uid = self.client_id.pipeline_uid, + "resetting pipeline" + ); self.assigned_shards.clear(); self.fetch_stream.reset(); self.publish_lock.kill().await; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index aa422e3f13e..8158d35fa94 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -266,7 +266,12 @@ impl Ingester { shard: Shard, ) -> IngestV2Result<()> { let queue_id = shard.queue_id(); - + info!( + index_uid = shard.index_uid, + source = shard.source_id, + shard = shard.shard_id, + "init primary shard" + ); let Entry::Vacant(entry) = state.shards.entry(queue_id.clone()) else { return Ok(()); }; diff --git a/quickwit/quickwit-proto/src/types/pipeline_uid.rs b/quickwit/quickwit-proto/src/types/pipeline_uid.rs index 11ae92c3a6d..caafa482dfb 100644 --- a/quickwit/quickwit-proto/src/types/pipeline_uid.rs +++ b/quickwit/quickwit-proto/src/types/pipeline_uid.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::fmt; use std::fmt::{Display, Formatter}; use std::str::FromStr; @@ -30,12 +31,18 @@ const ULID_SIZE: usize = 16; #[derive(Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)] pub struct PipelineUid(Ulid); -impl std::fmt::Debug for PipelineUid { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { +impl fmt::Debug for PipelineUid { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!(f, "Pipeline({})", self.0) } } +impl Display for PipelineUid { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + self.0.fmt(f) + } +} + impl PipelineUid { pub fn from_u128(ulid_u128: u128) -> PipelineUid { PipelineUid(Ulid::from_bytes(ulid_u128.to_le_bytes())) @@ -57,12 +64,6 @@ impl FromStr for PipelineUid { } } -impl Display for PipelineUid { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - self.0.fmt(f) - } -} - impl Serialize for PipelineUid { fn serialize(&self, serializer: S) -> Result { serializer.collect_str(&self.0)