diff --git a/quickwit/quickwit-ingest/src/ingest_api_service.rs b/quickwit/quickwit-ingest/src/ingest_api_service.rs index 0a9b3430bb0..be2f8476bb3 100644 --- a/quickwit/quickwit-ingest/src/ingest_api_service.rs +++ b/quickwit/quickwit-ingest/src/ingest_api_service.rs @@ -206,10 +206,10 @@ impl IngestApiService { num_docs += batch_num_docs; INGEST_METRICS - .ingested_num_bytes + .ingested_docs_bytes_valid .inc_by(batch_num_bytes as u64); INGEST_METRICS - .ingested_num_docs + .ingested_docs_valid .inc_by(batch_num_docs as u64); } // TODO we could fsync here and disable autosync to have better i/o perfs. diff --git a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs index a93513a9584..41a16006164 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs @@ -18,7 +18,7 @@ // along with this program. If not, see . use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Weak}; use once_cell::sync::OnceCell; @@ -28,10 +28,12 @@ use quickwit_doc_mapper::DocMapper; use quickwit_proto::ingest::{ DocBatchV2, IngestV2Error, IngestV2Result, ParseFailure, ParseFailureReason, }; -use quickwit_proto::types::DocMappingUid; +use quickwit_proto::types::{DocMappingUid, DocUid}; use serde_json_borrow::Value as JsonValue; use tracing::info; +use crate::DocBatchV2Builder; + /// Attempts to get the doc mapper identified by the given doc mapping UID `doc_mapping_uid` from /// the `doc_mappers` cache. If it is not found, it is built from the specified JSON doc mapping /// `doc_mapping_json` and inserted into the cache before being returned. @@ -72,40 +74,64 @@ pub(super) fn try_build_doc_mapper(doc_mapping_json: &str) -> IngestV2Result Result<(), (ParseFailureReason, String)> { + let Ok(json_doc) = serde_json::from_slice::(doc_bytes) else { + return Err(( + ParseFailureReason::InvalidJson, + "failed to parse JSON document".to_string(), + )); + }; + let JsonValue::Object(json_obj) = json_doc else { + return Err(( + ParseFailureReason::InvalidJson, + "JSON document is not an object".to_string(), + )); + }; + if let Err(error) = doc_mapper.validate_json_obj(&json_obj) { + return Err((ParseFailureReason::InvalidSchema, error.to_string())); + } + Ok(()) +} + +/// Validates a batch of docs. +/// +/// Returns a batch of valid docs and the list of errors. fn validate_doc_batch_impl( doc_batch: DocBatchV2, - doc_mapper: Arc, + doc_mapper: &dyn DocMapper, ) -> (DocBatchV2, Vec) { let mut parse_failures: Vec = Vec::new(); - for (doc_uid, doc) in doc_batch.docs() { - let Ok(json_doc) = serde_json::from_slice::(&doc) else { - let parse_failure = ParseFailure { - doc_uid: Some(doc_uid), - reason: ParseFailureReason::InvalidJson as i32, - message: "failed to parse JSON document".to_string(), - }; - parse_failures.push(parse_failure); - continue; - }; - let JsonValue::Object(json_obj) = json_doc else { + let mut invalid_doc_ids: HashSet = HashSet::default(); + for (doc_uid, doc_bytes) in doc_batch.docs() { + if let Err((reason, message)) = validate_document(doc_mapper, &doc_bytes) { let parse_failure = ParseFailure { doc_uid: Some(doc_uid), - reason: ParseFailureReason::InvalidJson as i32, - message: "JSON document is not an object".to_string(), - }; - parse_failures.push(parse_failure); - continue; - }; - if let Err(error) = doc_mapper.validate_json_obj(&json_obj) { - let parse_failure = ParseFailure { - doc_uid: Some(doc_uid), - reason: ParseFailureReason::InvalidSchema as i32, - message: error.to_string(), + reason: reason as i32, + message, }; + invalid_doc_ids.insert(doc_uid); parse_failures.push(parse_failure); } } - (doc_batch, parse_failures) + if invalid_doc_ids.is_empty() { + // All docs are valid! We don't need to build a valid doc batch. + return (doc_batch, parse_failures); + } + let mut valid_doc_batch_builder = DocBatchV2Builder::default(); + for (doc_uid, doc_bytes) in doc_batch.docs() { + if !invalid_doc_ids.contains(&doc_uid) { + valid_doc_batch_builder.add_doc(doc_uid, &doc_bytes); + } + } + let valid_doc_batch: DocBatchV2 = valid_doc_batch_builder.build().unwrap_or_default(); + assert_eq!( + valid_doc_batch.num_docs() + parse_failures.len(), + doc_batch.num_docs() + ); + (valid_doc_batch, parse_failures) } fn is_document_validation_enabled() -> bool { @@ -122,7 +148,7 @@ pub(super) async fn validate_doc_batch( doc_mapper: Arc, ) -> IngestV2Result<(DocBatchV2, Vec)> { if is_document_validation_enabled() { - run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper)) + run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, &*doc_mapper)) .await .map_err(|error| { let message = format!("failed to validate documents: {error}"); @@ -230,12 +256,12 @@ mod tests { let doc_mapper = try_build_doc_mapper(doc_mapping_json).unwrap(); let doc_batch = DocBatchV2::default(); - let (_, parse_failures) = validate_doc_batch_impl(doc_batch, doc_mapper.clone()); + let (_, parse_failures) = validate_doc_batch_impl(doc_batch, &*doc_mapper); assert_eq!(parse_failures.len(), 0); let doc_batch = DocBatchV2::for_test(["", "[]", r#"{"foo": "bar"}"#, r#"{"doc": "test-doc-000"}"#]); - let (_, parse_failures) = validate_doc_batch_impl(doc_batch, doc_mapper); + let (doc_batch, parse_failures) = validate_doc_batch_impl(doc_batch, &*doc_mapper); assert_eq!(parse_failures.len(), 3); let parse_failure_0 = &parse_failures[0]; @@ -252,5 +278,11 @@ mod tests { assert_eq!(parse_failure_2.doc_uid(), DocUid::for_test(2)); assert_eq!(parse_failure_2.reason(), ParseFailureReason::InvalidSchema); assert!(parse_failure_2.message.contains("not declared")); + + assert_eq!(doc_batch.num_docs(), 1); + assert_eq!(doc_batch.doc_uids[0], DocUid::for_test(3)); + let (valid_doc_uid, valid_doc_bytes) = doc_batch.docs().next().unwrap(); + assert_eq!(valid_doc_uid, DocUid::for_test(3)); + assert_eq!(&valid_doc_bytes, r#"{"doc": "test-doc-000"}"#.as_bytes()); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 163a9f5cab4..31b968a5c06 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -83,7 +83,6 @@ use super::IngesterPool; use crate::ingest_v2::doc_mapper::get_or_try_build_doc_mapper; use crate::ingest_v2::metrics::report_wal_usage; use crate::ingest_v2::models::IngesterShardType; -use crate::metrics::INGEST_METRICS; use crate::mrecordlog_async::MultiRecordLogAsync; use crate::{estimate_size, with_lock_metrics, FollowerId}; @@ -568,24 +567,48 @@ impl Ingester { persist_failures.push(persist_failure); continue; } - let (doc_batch, parse_failures) = validate_doc_batch(doc_batch, doc_mapper).await?; - let num_persisted_docs = (doc_batch.num_docs() - parse_failures.len()) as u32; - if num_persisted_docs == 0 { + // Total number of bytes (valid and invalid documents) + let original_batch_num_bytes = doc_batch.num_bytes() as u64; + let (valid_doc_batch, parse_failures) = + validate_doc_batch(doc_batch, doc_mapper).await?; + + if valid_doc_batch.is_empty() { + crate::metrics::INGEST_METRICS + .ingested_docs_invalid + .inc_by(parse_failures.len() as u64); + crate::metrics::INGEST_METRICS + .ingested_docs_bytes_invalid + .inc_by(original_batch_num_bytes); let persist_success = PersistSuccess { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, replication_position_inclusive: Some(from_position_exclusive), - num_persisted_docs, + num_persisted_docs: 0, parse_failures, }; persist_successes.push(persist_success); continue; + }; + + crate::metrics::INGEST_METRICS + .ingested_docs_valid + .inc_by(valid_doc_batch.num_docs() as u64); + crate::metrics::INGEST_METRICS + .ingested_docs_bytes_valid + .inc_by(valid_doc_batch.num_bytes() as u64); + if !parse_failures.is_empty() { + crate::metrics::INGEST_METRICS + .ingested_docs_invalid + .inc_by(parse_failures.len() as u64); + crate::metrics::INGEST_METRICS + .ingested_docs_bytes_invalid + .inc_by(original_batch_num_bytes - valid_doc_batch.num_bytes() as u64); } - let batch_num_bytes = doc_batch.num_bytes() as u64; - rate_meter.update(batch_num_bytes); + let valid_batch_num_bytes = valid_doc_batch.num_bytes() as u64; + rate_meter.update(valid_batch_num_bytes); total_requested_capacity += requested_capacity; let mut successfully_replicated = true; @@ -599,7 +622,7 @@ impl Ingester { source_id: subrequest.source_id.clone(), shard_id: subrequest.shard_id.clone(), from_position_exclusive: Some(from_position_exclusive), - doc_batch: Some(doc_batch.clone()), + doc_batch: Some(valid_doc_batch.clone()), }; per_follower_replicate_subrequests .entry(follower_id) @@ -612,8 +635,7 @@ impl Ingester { index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - doc_batch, - num_persisted_docs, + doc_batch: valid_doc_batch, parse_failures, expected_position_inclusive: None, successfully_replicated, @@ -697,7 +719,6 @@ impl Ingester { } let queue_id = subrequest.queue_id; - let batch_num_bytes = subrequest.doc_batch.num_bytes() as u64; let batch_num_docs = subrequest.doc_batch.num_docs() as u64; let append_result = append_non_empty_doc_batch( @@ -754,16 +775,13 @@ impl Ingester { .expect("primary shard should exist") .set_replication_position_inclusive(current_position_inclusive.clone(), now); - INGEST_METRICS.ingested_num_bytes.inc_by(batch_num_bytes); - INGEST_METRICS.ingested_num_docs.inc_by(batch_num_docs); - let persist_success = PersistSuccess { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, replication_position_inclusive: Some(current_position_inclusive), - num_persisted_docs: subrequest.num_persisted_docs, + num_persisted_docs: batch_num_docs as u32, parse_failures: subrequest.parse_failures, }; persist_successes.push(persist_success); @@ -1259,7 +1277,6 @@ struct PendingPersistSubrequest { source_id: SourceId, shard_id: Option, doc_batch: DocBatchV2, - num_persisted_docs: u32, parse_failures: Vec, expected_position_inclusive: Option, successfully_replicated: bool, @@ -1937,10 +1954,10 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(0)), doc_batch: Some(DocBatchV2::for_test([ - "", - "[]", - r#"{"foo": "bar"}"#, - r#"{"doc": "test-doc-000"}"#, + "", // invalid + "[]", // invalid + r#"{"foo": "bar"}"#, // invalid + r#"{"doc": "test-doc-000"}"#, // valid ])), }], }; diff --git a/quickwit/quickwit-ingest/src/metrics.rs b/quickwit/quickwit-ingest/src/metrics.rs index 3fc8fef7863..0f0f9a8f3e5 100644 --- a/quickwit/quickwit-ingest/src/metrics.rs +++ b/quickwit/quickwit-ingest/src/metrics.rs @@ -18,11 +18,14 @@ // along with this program. If not, see . use once_cell::sync::Lazy; -use quickwit_common::metrics::{new_counter, new_gauge, IntCounter, IntGauge}; +use quickwit_common::metrics::{new_counter, new_counter_vec, new_gauge, IntCounter, IntGauge}; pub struct IngestMetrics { - pub ingested_num_bytes: IntCounter, - pub ingested_num_docs: IntCounter, + pub ingested_docs_bytes_valid: IntCounter, + pub ingested_docs_bytes_invalid: IntCounter, + pub ingested_docs_invalid: IntCounter, + pub ingested_docs_valid: IntCounter, + pub replicated_num_bytes_total: IntCounter, pub replicated_num_docs_total: IntCounter, pub queue_count: IntGauge, @@ -30,19 +33,33 @@ pub struct IngestMetrics { impl Default for IngestMetrics { fn default() -> Self { - Self { - ingested_num_bytes: new_counter( - "ingested_num_bytes", - "Total size of the docs ingested in bytes", - "ingest", - &[], - ), - ingested_num_docs: new_counter( - "ingested_num_docs", - "Number of docs received to be ingested", - "ingest", - &[], - ), + let ingest_docs_bytes_total = new_counter_vec( + "docs_bytes_total", + "Total size of the docs ingested, measured in ingester's leader, after validation and \ + before persistence/replication", + "ingest", + &[], + ["validity"], + ); + let ingested_docs_bytes_valid = ingest_docs_bytes_total.with_label_values(["valid"]); + let ingested_docs_bytes_invalid = ingest_docs_bytes_total.with_label_values(["invalid"]); + + let ingest_docs_total = new_counter_vec( + "docs_total", + "Total number of the docs ingested, measured in ingester's leader, after validation \ + and before persistence/replication", + "ingest", + &[], + ["validity"], + ); + let ingested_docs_valid = ingest_docs_total.with_label_values(["valid"]); + let ingested_docs_invalid = ingest_docs_total.with_label_values(["invalid"]); + + IngestMetrics { + ingested_docs_bytes_valid, + ingested_docs_bytes_invalid, + ingested_docs_valid, + ingested_docs_invalid, replicated_num_bytes_total: new_counter( "replicated_num_bytes_total", "Total size in bytes of the replicated docs.",