diff --git a/quickwit/quickwit-ingest/src/ingest_api_service.rs b/quickwit/quickwit-ingest/src/ingest_api_service.rs
index 0a9b3430bb0..be2f8476bb3 100644
--- a/quickwit/quickwit-ingest/src/ingest_api_service.rs
+++ b/quickwit/quickwit-ingest/src/ingest_api_service.rs
@@ -206,10 +206,10 @@ impl IngestApiService {
num_docs += batch_num_docs;
INGEST_METRICS
- .ingested_num_bytes
+ .ingested_docs_bytes_valid
.inc_by(batch_num_bytes as u64);
INGEST_METRICS
- .ingested_num_docs
+ .ingested_docs_valid
.inc_by(batch_num_docs as u64);
}
// TODO we could fsync here and disable autosync to have better i/o perfs.
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs
index a93513a9584..ba73b012e4f 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs
@@ -18,7 +18,7 @@
// along with this program. If not, see .
use std::collections::hash_map::Entry;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Weak};
use once_cell::sync::OnceCell;
@@ -28,10 +28,12 @@ use quickwit_doc_mapper::DocMapper;
use quickwit_proto::ingest::{
DocBatchV2, IngestV2Error, IngestV2Result, ParseFailure, ParseFailureReason,
};
-use quickwit_proto::types::DocMappingUid;
+use quickwit_proto::types::{DocMappingUid, DocUid};
use serde_json_borrow::Value as JsonValue;
use tracing::info;
+use crate::DocBatchV2Builder;
+
/// Attempts to get the doc mapper identified by the given doc mapping UID `doc_mapping_uid` from
/// the `doc_mappers` cache. If it is not found, it is built from the specified JSON doc mapping
/// `doc_mapping_json` and inserted into the cache before being returned.
@@ -72,40 +74,64 @@ pub(super) fn try_build_doc_mapper(doc_mapping_json: &str) -> IngestV2Result Result<(), (ParseFailureReason, String)> {
+ let Ok(json_doc) = serde_json::from_slice::(doc_bytes) else {
+ return Err((
+ ParseFailureReason::InvalidJson,
+ "failed to parse JSON document".to_string(),
+ ));
+ };
+ let JsonValue::Object(json_obj) = json_doc else {
+ return Err((
+ ParseFailureReason::InvalidJson,
+ "JSON document is not an object".to_string(),
+ ));
+ };
+ if let Err(error) = doc_mapper.validate_json_obj(&json_obj) {
+ return Err((ParseFailureReason::InvalidSchema, error.to_string()));
+ }
+ Ok(())
+}
+
+/// Validates a batch of docs.
+///
+/// Returns a batch of valid docs and the list of errors.
fn validate_doc_batch_impl(
doc_batch: DocBatchV2,
- doc_mapper: Arc,
+ doc_mapper: &dyn DocMapper,
) -> (DocBatchV2, Vec) {
let mut parse_failures: Vec = Vec::new();
- for (doc_uid, doc) in doc_batch.docs() {
- let Ok(json_doc) = serde_json::from_slice::(&doc) else {
- let parse_failure = ParseFailure {
- doc_uid: Some(doc_uid),
- reason: ParseFailureReason::InvalidJson as i32,
- message: "failed to parse JSON document".to_string(),
- };
- parse_failures.push(parse_failure);
- continue;
- };
- let JsonValue::Object(json_obj) = json_doc else {
- let parse_failure = ParseFailure {
- doc_uid: Some(doc_uid),
- reason: ParseFailureReason::InvalidJson as i32,
- message: "JSON document is not an object".to_string(),
- };
- parse_failures.push(parse_failure);
- continue;
- };
- if let Err(error) = doc_mapper.validate_json_obj(&json_obj) {
+ let mut invalid_doc_ids: HashSet = HashSet::default();
+ for (doc_uid, doc_bytes) in doc_batch.docs() {
+ if let Err((reason, message)) = validate_document(doc_mapper, &doc_bytes) {
let parse_failure = ParseFailure {
doc_uid: Some(doc_uid),
- reason: ParseFailureReason::InvalidSchema as i32,
- message: error.to_string(),
+ reason: reason as i32,
+ message,
};
+ invalid_doc_ids.insert(doc_uid);
parse_failures.push(parse_failure);
}
}
- (doc_batch, parse_failures)
+ if invalid_doc_ids.is_empty() {
+ // All docs are valid! We don't need to build a valid doc batch.
+ return (doc_batch, parse_failures);
+ }
+ let mut valid_doc_batch_builder = DocBatchV2Builder::default();
+ for (doc_uid, doc_bytes) in doc_batch.docs() {
+ if !invalid_doc_ids.contains(&doc_uid) {
+ valid_doc_batch_builder.add_doc(doc_uid, &doc_bytes);
+ }
+ }
+ let valid_doc_batch: DocBatchV2 = valid_doc_batch_builder.build().unwrap_or_default();
+ assert_eq!(
+ valid_doc_batch.num_docs() + parse_failures.len(),
+ doc_batch.num_docs()
+ );
+ (valid_doc_batch, parse_failures)
}
fn is_document_validation_enabled() -> bool {
@@ -122,7 +148,7 @@ pub(super) async fn validate_doc_batch(
doc_mapper: Arc,
) -> IngestV2Result<(DocBatchV2, Vec)> {
if is_document_validation_enabled() {
- run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper))
+ run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, &*doc_mapper))
.await
.map_err(|error| {
let message = format!("failed to validate documents: {error}");
@@ -230,12 +256,12 @@ mod tests {
let doc_mapper = try_build_doc_mapper(doc_mapping_json).unwrap();
let doc_batch = DocBatchV2::default();
- let (_, parse_failures) = validate_doc_batch_impl(doc_batch, doc_mapper.clone());
+ let (_, parse_failures) = validate_doc_batch_impl(doc_batch, &*doc_mapper);
assert_eq!(parse_failures.len(), 0);
let doc_batch =
DocBatchV2::for_test(["", "[]", r#"{"foo": "bar"}"#, r#"{"doc": "test-doc-000"}"#]);
- let (_, parse_failures) = validate_doc_batch_impl(doc_batch, doc_mapper);
+ let (_, parse_failures) = validate_doc_batch_impl(doc_batch, &*doc_mapper);
assert_eq!(parse_failures.len(), 3);
let parse_failure_0 = &parse_failures[0];
diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
index 163a9f5cab4..1453e5dd3f7 100644
--- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
+++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
@@ -83,7 +83,6 @@ use super::IngesterPool;
use crate::ingest_v2::doc_mapper::get_or_try_build_doc_mapper;
use crate::ingest_v2::metrics::report_wal_usage;
use crate::ingest_v2::models::IngesterShardType;
-use crate::metrics::INGEST_METRICS;
use crate::mrecordlog_async::MultiRecordLogAsync;
use crate::{estimate_size, with_lock_metrics, FollowerId};
@@ -568,24 +567,52 @@ impl Ingester {
persist_failures.push(persist_failure);
continue;
}
- let (doc_batch, parse_failures) = validate_doc_batch(doc_batch, doc_mapper).await?;
- let num_persisted_docs = (doc_batch.num_docs() - parse_failures.len()) as u32;
- if num_persisted_docs == 0 {
+ // Total number of bytes (valid and invalid documents)
+ let original_batch_num_bytes = doc_batch.num_bytes() as u64;
+ let (valid_doc_batch, parse_failures) =
+ validate_doc_batch(doc_batch, doc_mapper).await?;
+
+ crate::metrics::INGEST_METRICS
+ .ingested_docs_invalid
+ .inc_by(parse_failures.len() as u64);
+
+ if valid_doc_batch.is_empty() {
+ crate::metrics::INGEST_METRICS
+ .ingested_docs_invalid
+ .inc_by(parse_failures.len() as u64);
+ crate::metrics::INGEST_METRICS
+ .ingested_docs_bytes_invalid
+ .inc_by(original_batch_num_bytes);
let persist_success = PersistSuccess {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
replication_position_inclusive: Some(from_position_exclusive),
- num_persisted_docs,
+ num_persisted_docs: 0,
parse_failures,
};
persist_successes.push(persist_success);
continue;
+ };
+
+ crate::metrics::INGEST_METRICS
+ .ingested_docs_valid
+ .inc_by(valid_doc_batch.num_docs() as u64);
+ crate::metrics::INGEST_METRICS
+ .ingested_docs_bytes_valid
+ .inc_by(valid_doc_batch.num_bytes() as u64);
+ if !parse_failures.is_empty() {
+ crate::metrics::INGEST_METRICS
+ .ingested_docs_invalid
+ .inc_by(parse_failures.len() as u64);
+ crate::metrics::INGEST_METRICS
+ .ingested_docs_bytes_invalid
+ .inc_by(original_batch_num_bytes - valid_doc_batch.num_bytes() as u64);
}
- let batch_num_bytes = doc_batch.num_bytes() as u64;
- rate_meter.update(batch_num_bytes);
+ let valid_batch_num_bytes = valid_doc_batch.num_bytes() as u64;
+ rate_meter.update(valid_batch_num_bytes);
total_requested_capacity += requested_capacity;
let mut successfully_replicated = true;
@@ -599,7 +626,7 @@ impl Ingester {
source_id: subrequest.source_id.clone(),
shard_id: subrequest.shard_id.clone(),
from_position_exclusive: Some(from_position_exclusive),
- doc_batch: Some(doc_batch.clone()),
+ doc_batch: Some(valid_doc_batch.clone()),
};
per_follower_replicate_subrequests
.entry(follower_id)
@@ -612,8 +639,7 @@ impl Ingester {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
- doc_batch,
- num_persisted_docs,
+ doc_batch: valid_doc_batch,
parse_failures,
expected_position_inclusive: None,
successfully_replicated,
@@ -697,7 +723,6 @@ impl Ingester {
}
let queue_id = subrequest.queue_id;
- let batch_num_bytes = subrequest.doc_batch.num_bytes() as u64;
let batch_num_docs = subrequest.doc_batch.num_docs() as u64;
let append_result = append_non_empty_doc_batch(
@@ -754,16 +779,13 @@ impl Ingester {
.expect("primary shard should exist")
.set_replication_position_inclusive(current_position_inclusive.clone(), now);
- INGEST_METRICS.ingested_num_bytes.inc_by(batch_num_bytes);
- INGEST_METRICS.ingested_num_docs.inc_by(batch_num_docs);
-
let persist_success = PersistSuccess {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
replication_position_inclusive: Some(current_position_inclusive),
- num_persisted_docs: subrequest.num_persisted_docs,
+ num_persisted_docs: batch_num_docs as u32,
parse_failures: subrequest.parse_failures,
};
persist_successes.push(persist_success);
@@ -1259,7 +1281,6 @@ struct PendingPersistSubrequest {
source_id: SourceId,
shard_id: Option,
doc_batch: DocBatchV2,
- num_persisted_docs: u32,
parse_failures: Vec,
expected_position_inclusive: Option,
successfully_replicated: bool,
@@ -1937,10 +1958,10 @@ mod tests {
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(0)),
doc_batch: Some(DocBatchV2::for_test([
- "",
- "[]",
- r#"{"foo": "bar"}"#,
- r#"{"doc": "test-doc-000"}"#,
+ "", // invalid
+ "[]", // invalid
+ r#"{"foo": "bar"}"#, // invalid
+ r#"{"doc": "test-doc-000"}"#, // valid
])),
}],
};
diff --git a/quickwit/quickwit-ingest/src/metrics.rs b/quickwit/quickwit-ingest/src/metrics.rs
index 3fc8fef7863..d38d7173a8a 100644
--- a/quickwit/quickwit-ingest/src/metrics.rs
+++ b/quickwit/quickwit-ingest/src/metrics.rs
@@ -18,11 +18,14 @@
// along with this program. If not, see .
use once_cell::sync::Lazy;
-use quickwit_common::metrics::{new_counter, new_gauge, IntCounter, IntGauge};
+use quickwit_common::metrics::{new_counter, new_counter_vec, new_gauge, IntCounter, IntGauge};
pub struct IngestMetrics {
- pub ingested_num_bytes: IntCounter,
- pub ingested_num_docs: IntCounter,
+ pub ingested_docs_bytes_valid: IntCounter,
+ pub ingested_docs_bytes_invalid: IntCounter,
+ pub ingested_docs_invalid: IntCounter,
+ pub ingested_docs_valid: IntCounter,
+
pub replicated_num_bytes_total: IntCounter,
pub replicated_num_docs_total: IntCounter,
pub queue_count: IntGauge,
@@ -30,19 +33,33 @@ pub struct IngestMetrics {
impl Default for IngestMetrics {
fn default() -> Self {
- Self {
- ingested_num_bytes: new_counter(
- "ingested_num_bytes",
- "Total size of the docs ingested in bytes",
- "ingest",
- &[],
- ),
- ingested_num_docs: new_counter(
- "ingested_num_docs",
- "Number of docs received to be ingested",
- "ingest",
- &[],
- ),
+ let ingest_docs_bytes_total = new_counter_vec(
+ "ingested_docs_bytes_total",
+ "Total size of the docs ingested, measured in ingester's leader, after validation and \
+ before persistence/replication",
+ "ingest",
+ &[],
+ ["validity"],
+ );
+ let ingested_docs_bytes_valid = ingest_docs_bytes_total.with_label_values(["valid"]);
+ let ingested_docs_bytes_invalid = ingest_docs_bytes_total.with_label_values(["invalid"]);
+
+ let ingest_docs_total = new_counter_vec(
+ "ingested_docs_total",
+ "Total number of the docs ingested, measured in ingester's leader, after validation \
+ and before persistence/replication",
+ "ingest",
+ &[],
+ ["validity"],
+ );
+ let ingested_docs_valid = ingest_docs_total.with_label_values(["valid"]);
+ let ingested_docs_invalid = ingest_docs_total.with_label_values(["invalid"]);
+
+ IngestMetrics {
+ ingested_docs_bytes_valid,
+ ingested_docs_bytes_invalid,
+ ingested_docs_valid,
+ ingested_docs_invalid,
replicated_num_bytes_total: new_counter(
"replicated_num_bytes_total",
"Total size in bytes of the replicated docs.",