Skip to content

Commit

Permalink
Merge branch 'main' into fmassot/update-docs-0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot authored Jan 17, 2024
2 parents 20b6afa + eec9776 commit 7748dff
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 22 deletions.
18 changes: 9 additions & 9 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ quickwit-serve = { version = "0.7.0", path = "./quickwit-serve" }
quickwit-storage = { version = "0.7.0", path = "./quickwit-storage" }
quickwit-telemetry = { version = "0.7.0", path = "./quickwit-telemetry" }

tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "014328e", default-features = false, features = [
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "108f30b", default-features = false, features = [
"lz4-compression",
"mmap",
"quickwit",
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub fn setup_logging_and_tracing(
}
let env_filter = env::var("RUST_LOG")
.map(|_| EnvFilter::from_default_env())
.or_else(|_| EnvFilter::try_new(format!("quickwit={level}")))
.or_else(|_| EnvFilter::try_new(format!("quickwit={level},tantivy=WARN")))
.context("Failed to set up tracing env filter.")?;
global::set_text_map_propagator(TraceContextPropagator::new());
let registry = tracing_subscriber::registry().with(env_filter);
Expand Down
21 changes: 20 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ pub struct IndexerCounters {
/// This value is used to trigger commit and for observation.
pub num_docs_in_workbench: u64,

/// Number of ProcessDocBatch received by the indexer to
/// build this split.
pub num_doc_batches_in_workbench: u64,

/// Metrics describing the load and indexing performance of the
/// pipeline. This is only updated for cooperative indexers.
pub pipeline_metrics_opt: Option<PipelineMetrics>,
Expand Down Expand Up @@ -292,6 +296,7 @@ impl IndexerState {
.extend(batch.checkpoint_delta)
.context("batch delta does not follow indexer checkpoint")?;
let mut memory_usage_delta: u64 = 0;
counters.num_doc_batches_in_workbench += 1;
for doc in batch.docs {
let ProcessedDoc {
doc,
Expand Down Expand Up @@ -652,7 +657,14 @@ impl Indexer {
}
let num_splits = splits.len() as u64;
let split_ids = splits.iter().map(|split| split.split_id()).join(",");
info!(commit_trigger=?commit_trigger, split_ids=%split_ids, num_docs=self.counters.num_docs_in_workbench, "send-to-index-serializer");
info!(
index=self.indexer_state.pipeline_id.index_uid.as_str(),
source=self.indexer_state.pipeline_id.source_id.as_str(),
pipeline_uid=%self.indexer_state.pipeline_id.pipeline_uid,
commit_trigger=?commit_trigger,
num_batches=%self.counters.num_doc_batches_in_workbench,
split_ids=%split_ids,
num_docs=self.counters.num_docs_in_workbench, "send-to-index-serializer");
ctx.send_message(
&self.index_serializer_mailbox,
IndexedSplitBatchBuilder {
Expand All @@ -666,6 +678,7 @@ impl Indexer {
)
.await?;
self.counters.num_docs_in_workbench = 0;
self.counters.num_doc_batches_in_workbench = 0;
self.counters.num_splits_emitted += num_splits;
self.counters.num_split_batches_emitted += 1;
Ok(())
Expand Down Expand Up @@ -830,6 +843,7 @@ mod tests {
num_splits_emitted: 1,
num_split_batches_emitted: 1,
num_docs_in_workbench: 1, //< the num docs in split counter has been reset.
num_doc_batches_in_workbench: 1, //< the num docs in split counter has been reset.
pipeline_metrics_opt: None,
}
);
Expand Down Expand Up @@ -1075,6 +1089,7 @@ mod tests {
num_splits_emitted: 1,
num_split_batches_emitted: 1,
num_docs_in_workbench: 0,
num_doc_batches_in_workbench: 0,
pipeline_metrics_opt: None,
}
);
Expand Down Expand Up @@ -1148,6 +1163,7 @@ mod tests {
num_splits_emitted: 1,
num_split_batches_emitted: 1,
num_docs_in_workbench: 0,
num_doc_batches_in_workbench: 0,
pipeline_metrics_opt: None,
}
);
Expand Down Expand Up @@ -1237,6 +1253,7 @@ mod tests {
indexer_counters,
IndexerCounters {
num_docs_in_workbench: 2,
num_doc_batches_in_workbench: 1,
num_splits_emitted: 0,
num_split_batches_emitted: 0,
pipeline_metrics_opt: None,
Expand All @@ -1249,6 +1266,7 @@ mod tests {
indexer_counters,
IndexerCounters {
num_docs_in_workbench: 0,
num_doc_batches_in_workbench: 0,
num_splits_emitted: 2,
num_split_batches_emitted: 1,
pipeline_metrics_opt: None,
Expand Down Expand Up @@ -1597,6 +1615,7 @@ mod tests {
num_splits_emitted: 0,
num_split_batches_emitted: 0,
num_docs_in_workbench: 0, //< the num docs in split counter has been reset.
num_doc_batches_in_workbench: 2, //< the num docs in split counter has been reset.
pipeline_metrics_opt: None,
}
);
Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,11 @@ impl IngestSource {
return Ok(());
}

warn!("resetting pipeline");
warn!(
index_uid = self.client_id.source_uid.index_uid.as_str(),
pipeline_uid = self.client_id.pipeline_uid,
"resetting pipeline"
);
self.assigned_shards.clear();
self.fetch_stream.reset();
self.publish_lock.kill().await;
Expand Down
7 changes: 6 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,12 @@ impl Ingester {
shard: Shard,
) -> IngestV2Result<()> {
let queue_id = shard.queue_id();

info!(
index_uid = shard.index_uid,
source = shard.source_id,
shard = %shard.shard_id(),
"init primary shard"
);
let Entry::Vacant(entry) = state.shards.entry(queue_id.clone()) else {
return Ok(());
};
Expand Down
17 changes: 9 additions & 8 deletions quickwit/quickwit-proto/src/types/pipeline_uid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::fmt;
use std::fmt::{Display, Formatter};
use std::str::FromStr;

Expand All @@ -30,12 +31,18 @@ const ULID_SIZE: usize = 16;
#[derive(Clone, Copy, Default, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct PipelineUid(Ulid);

impl std::fmt::Debug for PipelineUid {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
impl fmt::Debug for PipelineUid {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "Pipeline({})", self.0)
}
}

impl Display for PipelineUid {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
self.0.fmt(f)
}
}

impl PipelineUid {
pub fn from_u128(ulid_u128: u128) -> PipelineUid {
PipelineUid(Ulid::from_bytes(ulid_u128.to_le_bytes()))
Expand All @@ -57,12 +64,6 @@ impl FromStr for PipelineUid {
}
}

impl Display for PipelineUid {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl Serialize for PipelineUid {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.collect_str(&self.0)
Expand Down

0 comments on commit 7748dff

Please sign in to comment.