From 0a57d53b42612ad81ba0c1faf844e09935829b99 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 9 Jul 2024 16:02:21 +0900 Subject: [PATCH] Removing invalid docs from persistence. This PR filters out invalid docs from batches before persisting the batch. That way, they won't be persisted in the mrecordlog and the indexing pipeline will not have to parse them again (and log an error). This PR also improves a little bit on the metric for ingested docs. The num_docs and bytes metric now have a validity `label`, and the point at which they are measured is documented in the metric description. Closes #5205 --- .../quickwit-ingest/src/ingest_api_service.rs | 4 +- .../src/ingest_v2/doc_mapper.rs | 90 +++++++++++++------ .../quickwit-ingest/src/ingest_v2/ingester.rs | 57 +++++++----- quickwit/quickwit-ingest/src/metrics.rs | 49 ++++++---- 4 files changed, 133 insertions(+), 67 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_api_service.rs b/quickwit/quickwit-ingest/src/ingest_api_service.rs index 0a9b3430bb0..be2f8476bb3 100644 --- a/quickwit/quickwit-ingest/src/ingest_api_service.rs +++ b/quickwit/quickwit-ingest/src/ingest_api_service.rs @@ -206,10 +206,10 @@ impl IngestApiService { num_docs += batch_num_docs; INGEST_METRICS - .ingested_num_bytes + .ingested_docs_bytes_valid .inc_by(batch_num_bytes as u64); INGEST_METRICS - .ingested_num_docs + .ingested_docs_valid .inc_by(batch_num_docs as u64); } // TODO we could fsync here and disable autosync to have better i/o perfs. diff --git a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs index a93513a9584..41a16006164 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs @@ -18,7 +18,7 @@ // along with this program. If not, see . use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Weak}; use once_cell::sync::OnceCell; @@ -28,10 +28,12 @@ use quickwit_doc_mapper::DocMapper; use quickwit_proto::ingest::{ DocBatchV2, IngestV2Error, IngestV2Result, ParseFailure, ParseFailureReason, }; -use quickwit_proto::types::DocMappingUid; +use quickwit_proto::types::{DocMappingUid, DocUid}; use serde_json_borrow::Value as JsonValue; use tracing::info; +use crate::DocBatchV2Builder; + /// Attempts to get the doc mapper identified by the given doc mapping UID `doc_mapping_uid` from /// the `doc_mappers` cache. If it is not found, it is built from the specified JSON doc mapping /// `doc_mapping_json` and inserted into the cache before being returned. @@ -72,40 +74,64 @@ pub(super) fn try_build_doc_mapper(doc_mapping_json: &str) -> IngestV2Result Result<(), (ParseFailureReason, String)> { + let Ok(json_doc) = serde_json::from_slice::(doc_bytes) else { + return Err(( + ParseFailureReason::InvalidJson, + "failed to parse JSON document".to_string(), + )); + }; + let JsonValue::Object(json_obj) = json_doc else { + return Err(( + ParseFailureReason::InvalidJson, + "JSON document is not an object".to_string(), + )); + }; + if let Err(error) = doc_mapper.validate_json_obj(&json_obj) { + return Err((ParseFailureReason::InvalidSchema, error.to_string())); + } + Ok(()) +} + +/// Validates a batch of docs. +/// +/// Returns a batch of valid docs and the list of errors. fn validate_doc_batch_impl( doc_batch: DocBatchV2, - doc_mapper: Arc, + doc_mapper: &dyn DocMapper, ) -> (DocBatchV2, Vec) { let mut parse_failures: Vec = Vec::new(); - for (doc_uid, doc) in doc_batch.docs() { - let Ok(json_doc) = serde_json::from_slice::(&doc) else { - let parse_failure = ParseFailure { - doc_uid: Some(doc_uid), - reason: ParseFailureReason::InvalidJson as i32, - message: "failed to parse JSON document".to_string(), - }; - parse_failures.push(parse_failure); - continue; - }; - let JsonValue::Object(json_obj) = json_doc else { + let mut invalid_doc_ids: HashSet = HashSet::default(); + for (doc_uid, doc_bytes) in doc_batch.docs() { + if let Err((reason, message)) = validate_document(doc_mapper, &doc_bytes) { let parse_failure = ParseFailure { doc_uid: Some(doc_uid), - reason: ParseFailureReason::InvalidJson as i32, - message: "JSON document is not an object".to_string(), - }; - parse_failures.push(parse_failure); - continue; - }; - if let Err(error) = doc_mapper.validate_json_obj(&json_obj) { - let parse_failure = ParseFailure { - doc_uid: Some(doc_uid), - reason: ParseFailureReason::InvalidSchema as i32, - message: error.to_string(), + reason: reason as i32, + message, }; + invalid_doc_ids.insert(doc_uid); parse_failures.push(parse_failure); } } - (doc_batch, parse_failures) + if invalid_doc_ids.is_empty() { + // All docs are valid! We don't need to build a valid doc batch. + return (doc_batch, parse_failures); + } + let mut valid_doc_batch_builder = DocBatchV2Builder::default(); + for (doc_uid, doc_bytes) in doc_batch.docs() { + if !invalid_doc_ids.contains(&doc_uid) { + valid_doc_batch_builder.add_doc(doc_uid, &doc_bytes); + } + } + let valid_doc_batch: DocBatchV2 = valid_doc_batch_builder.build().unwrap_or_default(); + assert_eq!( + valid_doc_batch.num_docs() + parse_failures.len(), + doc_batch.num_docs() + ); + (valid_doc_batch, parse_failures) } fn is_document_validation_enabled() -> bool { @@ -122,7 +148,7 @@ pub(super) async fn validate_doc_batch( doc_mapper: Arc, ) -> IngestV2Result<(DocBatchV2, Vec)> { if is_document_validation_enabled() { - run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper)) + run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, &*doc_mapper)) .await .map_err(|error| { let message = format!("failed to validate documents: {error}"); @@ -230,12 +256,12 @@ mod tests { let doc_mapper = try_build_doc_mapper(doc_mapping_json).unwrap(); let doc_batch = DocBatchV2::default(); - let (_, parse_failures) = validate_doc_batch_impl(doc_batch, doc_mapper.clone()); + let (_, parse_failures) = validate_doc_batch_impl(doc_batch, &*doc_mapper); assert_eq!(parse_failures.len(), 0); let doc_batch = DocBatchV2::for_test(["", "[]", r#"{"foo": "bar"}"#, r#"{"doc": "test-doc-000"}"#]); - let (_, parse_failures) = validate_doc_batch_impl(doc_batch, doc_mapper); + let (doc_batch, parse_failures) = validate_doc_batch_impl(doc_batch, &*doc_mapper); assert_eq!(parse_failures.len(), 3); let parse_failure_0 = &parse_failures[0]; @@ -252,5 +278,11 @@ mod tests { assert_eq!(parse_failure_2.doc_uid(), DocUid::for_test(2)); assert_eq!(parse_failure_2.reason(), ParseFailureReason::InvalidSchema); assert!(parse_failure_2.message.contains("not declared")); + + assert_eq!(doc_batch.num_docs(), 1); + assert_eq!(doc_batch.doc_uids[0], DocUid::for_test(3)); + let (valid_doc_uid, valid_doc_bytes) = doc_batch.docs().next().unwrap(); + assert_eq!(valid_doc_uid, DocUid::for_test(3)); + assert_eq!(&valid_doc_bytes, r#"{"doc": "test-doc-000"}"#.as_bytes()); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 163a9f5cab4..31b968a5c06 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -83,7 +83,6 @@ use super::IngesterPool; use crate::ingest_v2::doc_mapper::get_or_try_build_doc_mapper; use crate::ingest_v2::metrics::report_wal_usage; use crate::ingest_v2::models::IngesterShardType; -use crate::metrics::INGEST_METRICS; use crate::mrecordlog_async::MultiRecordLogAsync; use crate::{estimate_size, with_lock_metrics, FollowerId}; @@ -568,24 +567,48 @@ impl Ingester { persist_failures.push(persist_failure); continue; } - let (doc_batch, parse_failures) = validate_doc_batch(doc_batch, doc_mapper).await?; - let num_persisted_docs = (doc_batch.num_docs() - parse_failures.len()) as u32; - if num_persisted_docs == 0 { + // Total number of bytes (valid and invalid documents) + let original_batch_num_bytes = doc_batch.num_bytes() as u64; + let (valid_doc_batch, parse_failures) = + validate_doc_batch(doc_batch, doc_mapper).await?; + + if valid_doc_batch.is_empty() { + crate::metrics::INGEST_METRICS + .ingested_docs_invalid + .inc_by(parse_failures.len() as u64); + crate::metrics::INGEST_METRICS + .ingested_docs_bytes_invalid + .inc_by(original_batch_num_bytes); let persist_success = PersistSuccess { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, replication_position_inclusive: Some(from_position_exclusive), - num_persisted_docs, + num_persisted_docs: 0, parse_failures, }; persist_successes.push(persist_success); continue; + }; + + crate::metrics::INGEST_METRICS + .ingested_docs_valid + .inc_by(valid_doc_batch.num_docs() as u64); + crate::metrics::INGEST_METRICS + .ingested_docs_bytes_valid + .inc_by(valid_doc_batch.num_bytes() as u64); + if !parse_failures.is_empty() { + crate::metrics::INGEST_METRICS + .ingested_docs_invalid + .inc_by(parse_failures.len() as u64); + crate::metrics::INGEST_METRICS + .ingested_docs_bytes_invalid + .inc_by(original_batch_num_bytes - valid_doc_batch.num_bytes() as u64); } - let batch_num_bytes = doc_batch.num_bytes() as u64; - rate_meter.update(batch_num_bytes); + let valid_batch_num_bytes = valid_doc_batch.num_bytes() as u64; + rate_meter.update(valid_batch_num_bytes); total_requested_capacity += requested_capacity; let mut successfully_replicated = true; @@ -599,7 +622,7 @@ impl Ingester { source_id: subrequest.source_id.clone(), shard_id: subrequest.shard_id.clone(), from_position_exclusive: Some(from_position_exclusive), - doc_batch: Some(doc_batch.clone()), + doc_batch: Some(valid_doc_batch.clone()), }; per_follower_replicate_subrequests .entry(follower_id) @@ -612,8 +635,7 @@ impl Ingester { index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - doc_batch, - num_persisted_docs, + doc_batch: valid_doc_batch, parse_failures, expected_position_inclusive: None, successfully_replicated, @@ -697,7 +719,6 @@ impl Ingester { } let queue_id = subrequest.queue_id; - let batch_num_bytes = subrequest.doc_batch.num_bytes() as u64; let batch_num_docs = subrequest.doc_batch.num_docs() as u64; let append_result = append_non_empty_doc_batch( @@ -754,16 +775,13 @@ impl Ingester { .expect("primary shard should exist") .set_replication_position_inclusive(current_position_inclusive.clone(), now); - INGEST_METRICS.ingested_num_bytes.inc_by(batch_num_bytes); - INGEST_METRICS.ingested_num_docs.inc_by(batch_num_docs); - let persist_success = PersistSuccess { subrequest_id: subrequest.subrequest_id, index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, replication_position_inclusive: Some(current_position_inclusive), - num_persisted_docs: subrequest.num_persisted_docs, + num_persisted_docs: batch_num_docs as u32, parse_failures: subrequest.parse_failures, }; persist_successes.push(persist_success); @@ -1259,7 +1277,6 @@ struct PendingPersistSubrequest { source_id: SourceId, shard_id: Option, doc_batch: DocBatchV2, - num_persisted_docs: u32, parse_failures: Vec, expected_position_inclusive: Option, successfully_replicated: bool, @@ -1937,10 +1954,10 @@ mod tests { source_id: "test-source".to_string(), shard_id: Some(ShardId::from(0)), doc_batch: Some(DocBatchV2::for_test([ - "", - "[]", - r#"{"foo": "bar"}"#, - r#"{"doc": "test-doc-000"}"#, + "", // invalid + "[]", // invalid + r#"{"foo": "bar"}"#, // invalid + r#"{"doc": "test-doc-000"}"#, // valid ])), }], }; diff --git a/quickwit/quickwit-ingest/src/metrics.rs b/quickwit/quickwit-ingest/src/metrics.rs index 3fc8fef7863..0f0f9a8f3e5 100644 --- a/quickwit/quickwit-ingest/src/metrics.rs +++ b/quickwit/quickwit-ingest/src/metrics.rs @@ -18,11 +18,14 @@ // along with this program. If not, see . use once_cell::sync::Lazy; -use quickwit_common::metrics::{new_counter, new_gauge, IntCounter, IntGauge}; +use quickwit_common::metrics::{new_counter, new_counter_vec, new_gauge, IntCounter, IntGauge}; pub struct IngestMetrics { - pub ingested_num_bytes: IntCounter, - pub ingested_num_docs: IntCounter, + pub ingested_docs_bytes_valid: IntCounter, + pub ingested_docs_bytes_invalid: IntCounter, + pub ingested_docs_invalid: IntCounter, + pub ingested_docs_valid: IntCounter, + pub replicated_num_bytes_total: IntCounter, pub replicated_num_docs_total: IntCounter, pub queue_count: IntGauge, @@ -30,19 +33,33 @@ pub struct IngestMetrics { impl Default for IngestMetrics { fn default() -> Self { - Self { - ingested_num_bytes: new_counter( - "ingested_num_bytes", - "Total size of the docs ingested in bytes", - "ingest", - &[], - ), - ingested_num_docs: new_counter( - "ingested_num_docs", - "Number of docs received to be ingested", - "ingest", - &[], - ), + let ingest_docs_bytes_total = new_counter_vec( + "docs_bytes_total", + "Total size of the docs ingested, measured in ingester's leader, after validation and \ + before persistence/replication", + "ingest", + &[], + ["validity"], + ); + let ingested_docs_bytes_valid = ingest_docs_bytes_total.with_label_values(["valid"]); + let ingested_docs_bytes_invalid = ingest_docs_bytes_total.with_label_values(["invalid"]); + + let ingest_docs_total = new_counter_vec( + "docs_total", + "Total number of the docs ingested, measured in ingester's leader, after validation \ + and before persistence/replication", + "ingest", + &[], + ["validity"], + ); + let ingested_docs_valid = ingest_docs_total.with_label_values(["valid"]); + let ingested_docs_invalid = ingest_docs_total.with_label_values(["invalid"]); + + IngestMetrics { + ingested_docs_bytes_valid, + ingested_docs_bytes_invalid, + ingested_docs_valid, + ingested_docs_invalid, replicated_num_bytes_total: new_counter( "replicated_num_bytes_total", "Total size in bytes of the replicated docs.",