Skip to content

Commit

Permalink
Removing invalid docs from persistence.
Browse files Browse the repository at this point in the history
This PR filters out invalid docs from batches before persisting the
batch.
That way, they won't be persisted in the mrecordlog and the
indexing pipeline will not have to parse them again (and log an error).

This PR also improves a little bit on the metric for ingested docs.
The num_docs and bytes metric now have a validity `label`, and the
point at which they are measured is documented in the metric description.

Closes #5205
  • Loading branch information
fulmicoton committed Jul 9, 2024
1 parent fc0c52e commit 0a57d53
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 67 deletions.
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,10 @@ impl IngestApiService {

num_docs += batch_num_docs;
INGEST_METRICS
.ingested_num_bytes
.ingested_docs_bytes_valid
.inc_by(batch_num_bytes as u64);
INGEST_METRICS
.ingested_num_docs
.ingested_docs_valid
.inc_by(batch_num_docs as u64);
}
// TODO we could fsync here and disable autosync to have better i/o perfs.
Expand Down
90 changes: 61 additions & 29 deletions quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Weak};

use once_cell::sync::OnceCell;
Expand All @@ -28,10 +28,12 @@ use quickwit_doc_mapper::DocMapper;
use quickwit_proto::ingest::{
DocBatchV2, IngestV2Error, IngestV2Result, ParseFailure, ParseFailureReason,
};
use quickwit_proto::types::DocMappingUid;
use quickwit_proto::types::{DocMappingUid, DocUid};
use serde_json_borrow::Value as JsonValue;
use tracing::info;

use crate::DocBatchV2Builder;

/// Attempts to get the doc mapper identified by the given doc mapping UID `doc_mapping_uid` from
/// the `doc_mappers` cache. If it is not found, it is built from the specified JSON doc mapping
/// `doc_mapping_json` and inserted into the cache before being returned.
Expand Down Expand Up @@ -72,40 +74,64 @@ pub(super) fn try_build_doc_mapper(doc_mapping_json: &str) -> IngestV2Result<Arc
Ok(doc_mapper)
}

fn validate_document(
doc_mapper: &dyn DocMapper,
doc_bytes: &[u8],
) -> Result<(), (ParseFailureReason, String)> {
let Ok(json_doc) = serde_json::from_slice::<serde_json_borrow::Value>(doc_bytes) else {
return Err((
ParseFailureReason::InvalidJson,
"failed to parse JSON document".to_string(),
));
};
let JsonValue::Object(json_obj) = json_doc else {
return Err((
ParseFailureReason::InvalidJson,
"JSON document is not an object".to_string(),
));
};
if let Err(error) = doc_mapper.validate_json_obj(&json_obj) {
return Err((ParseFailureReason::InvalidSchema, error.to_string()));
}
Ok(())
}

/// Validates a batch of docs.
///
/// Returns a batch of valid docs and the list of errors.
fn validate_doc_batch_impl(
doc_batch: DocBatchV2,
doc_mapper: Arc<dyn DocMapper>,
doc_mapper: &dyn DocMapper,
) -> (DocBatchV2, Vec<ParseFailure>) {
let mut parse_failures: Vec<ParseFailure> = Vec::new();
for (doc_uid, doc) in doc_batch.docs() {
let Ok(json_doc) = serde_json::from_slice::<serde_json_borrow::Value>(&doc) else {
let parse_failure = ParseFailure {
doc_uid: Some(doc_uid),
reason: ParseFailureReason::InvalidJson as i32,
message: "failed to parse JSON document".to_string(),
};
parse_failures.push(parse_failure);
continue;
};
let JsonValue::Object(json_obj) = json_doc else {
let mut invalid_doc_ids: HashSet<DocUid> = HashSet::default();
for (doc_uid, doc_bytes) in doc_batch.docs() {
if let Err((reason, message)) = validate_document(doc_mapper, &doc_bytes) {
let parse_failure = ParseFailure {
doc_uid: Some(doc_uid),
reason: ParseFailureReason::InvalidJson as i32,
message: "JSON document is not an object".to_string(),
};
parse_failures.push(parse_failure);
continue;
};
if let Err(error) = doc_mapper.validate_json_obj(&json_obj) {
let parse_failure = ParseFailure {
doc_uid: Some(doc_uid),
reason: ParseFailureReason::InvalidSchema as i32,
message: error.to_string(),
reason: reason as i32,
message,
};
invalid_doc_ids.insert(doc_uid);
parse_failures.push(parse_failure);
}
}
(doc_batch, parse_failures)
if invalid_doc_ids.is_empty() {
// All docs are valid! We don't need to build a valid doc batch.
return (doc_batch, parse_failures);
}
let mut valid_doc_batch_builder = DocBatchV2Builder::default();
for (doc_uid, doc_bytes) in doc_batch.docs() {
if !invalid_doc_ids.contains(&doc_uid) {
valid_doc_batch_builder.add_doc(doc_uid, &doc_bytes);
}
}
let valid_doc_batch: DocBatchV2 = valid_doc_batch_builder.build().unwrap_or_default();
assert_eq!(
valid_doc_batch.num_docs() + parse_failures.len(),
doc_batch.num_docs()
);
(valid_doc_batch, parse_failures)
}

fn is_document_validation_enabled() -> bool {
Expand All @@ -122,7 +148,7 @@ pub(super) async fn validate_doc_batch(
doc_mapper: Arc<dyn DocMapper>,
) -> IngestV2Result<(DocBatchV2, Vec<ParseFailure>)> {
if is_document_validation_enabled() {
run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper))
run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, &*doc_mapper))
.await
.map_err(|error| {
let message = format!("failed to validate documents: {error}");
Expand Down Expand Up @@ -230,12 +256,12 @@ mod tests {
let doc_mapper = try_build_doc_mapper(doc_mapping_json).unwrap();
let doc_batch = DocBatchV2::default();

let (_, parse_failures) = validate_doc_batch_impl(doc_batch, doc_mapper.clone());
let (_, parse_failures) = validate_doc_batch_impl(doc_batch, &*doc_mapper);
assert_eq!(parse_failures.len(), 0);

let doc_batch =
DocBatchV2::for_test(["", "[]", r#"{"foo": "bar"}"#, r#"{"doc": "test-doc-000"}"#]);
let (_, parse_failures) = validate_doc_batch_impl(doc_batch, doc_mapper);
let (doc_batch, parse_failures) = validate_doc_batch_impl(doc_batch, &*doc_mapper);
assert_eq!(parse_failures.len(), 3);

let parse_failure_0 = &parse_failures[0];
Expand All @@ -252,5 +278,11 @@ mod tests {
assert_eq!(parse_failure_2.doc_uid(), DocUid::for_test(2));
assert_eq!(parse_failure_2.reason(), ParseFailureReason::InvalidSchema);
assert!(parse_failure_2.message.contains("not declared"));

assert_eq!(doc_batch.num_docs(), 1);
assert_eq!(doc_batch.doc_uids[0], DocUid::for_test(3));
let (valid_doc_uid, valid_doc_bytes) = doc_batch.docs().next().unwrap();
assert_eq!(valid_doc_uid, DocUid::for_test(3));
assert_eq!(&valid_doc_bytes, r#"{"doc": "test-doc-000"}"#.as_bytes());
}
}
57 changes: 37 additions & 20 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ use super::IngesterPool;
use crate::ingest_v2::doc_mapper::get_or_try_build_doc_mapper;
use crate::ingest_v2::metrics::report_wal_usage;
use crate::ingest_v2::models::IngesterShardType;
use crate::metrics::INGEST_METRICS;
use crate::mrecordlog_async::MultiRecordLogAsync;
use crate::{estimate_size, with_lock_metrics, FollowerId};

Expand Down Expand Up @@ -568,24 +567,48 @@ impl Ingester {
persist_failures.push(persist_failure);
continue;
}
let (doc_batch, parse_failures) = validate_doc_batch(doc_batch, doc_mapper).await?;
let num_persisted_docs = (doc_batch.num_docs() - parse_failures.len()) as u32;

if num_persisted_docs == 0 {
// Total number of bytes (valid and invalid documents)
let original_batch_num_bytes = doc_batch.num_bytes() as u64;
let (valid_doc_batch, parse_failures) =
validate_doc_batch(doc_batch, doc_mapper).await?;

if valid_doc_batch.is_empty() {
crate::metrics::INGEST_METRICS
.ingested_docs_invalid
.inc_by(parse_failures.len() as u64);
crate::metrics::INGEST_METRICS
.ingested_docs_bytes_invalid
.inc_by(original_batch_num_bytes);
let persist_success = PersistSuccess {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
replication_position_inclusive: Some(from_position_exclusive),
num_persisted_docs,
num_persisted_docs: 0,
parse_failures,
};
persist_successes.push(persist_success);
continue;
};

crate::metrics::INGEST_METRICS
.ingested_docs_valid
.inc_by(valid_doc_batch.num_docs() as u64);
crate::metrics::INGEST_METRICS
.ingested_docs_bytes_valid
.inc_by(valid_doc_batch.num_bytes() as u64);
if !parse_failures.is_empty() {
crate::metrics::INGEST_METRICS
.ingested_docs_invalid
.inc_by(parse_failures.len() as u64);
crate::metrics::INGEST_METRICS
.ingested_docs_bytes_invalid
.inc_by(original_batch_num_bytes - valid_doc_batch.num_bytes() as u64);
}
let batch_num_bytes = doc_batch.num_bytes() as u64;
rate_meter.update(batch_num_bytes);
let valid_batch_num_bytes = valid_doc_batch.num_bytes() as u64;
rate_meter.update(valid_batch_num_bytes);
total_requested_capacity += requested_capacity;

let mut successfully_replicated = true;
Expand All @@ -599,7 +622,7 @@ impl Ingester {
source_id: subrequest.source_id.clone(),
shard_id: subrequest.shard_id.clone(),
from_position_exclusive: Some(from_position_exclusive),
doc_batch: Some(doc_batch.clone()),
doc_batch: Some(valid_doc_batch.clone()),
};
per_follower_replicate_subrequests
.entry(follower_id)
Expand All @@ -612,8 +635,7 @@ impl Ingester {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
doc_batch,
num_persisted_docs,
doc_batch: valid_doc_batch,
parse_failures,
expected_position_inclusive: None,
successfully_replicated,
Expand Down Expand Up @@ -697,7 +719,6 @@ impl Ingester {
}
let queue_id = subrequest.queue_id;

let batch_num_bytes = subrequest.doc_batch.num_bytes() as u64;
let batch_num_docs = subrequest.doc_batch.num_docs() as u64;

let append_result = append_non_empty_doc_batch(
Expand Down Expand Up @@ -754,16 +775,13 @@ impl Ingester {
.expect("primary shard should exist")
.set_replication_position_inclusive(current_position_inclusive.clone(), now);

INGEST_METRICS.ingested_num_bytes.inc_by(batch_num_bytes);
INGEST_METRICS.ingested_num_docs.inc_by(batch_num_docs);

let persist_success = PersistSuccess {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
replication_position_inclusive: Some(current_position_inclusive),
num_persisted_docs: subrequest.num_persisted_docs,
num_persisted_docs: batch_num_docs as u32,
parse_failures: subrequest.parse_failures,
};
persist_successes.push(persist_success);
Expand Down Expand Up @@ -1259,7 +1277,6 @@ struct PendingPersistSubrequest {
source_id: SourceId,
shard_id: Option<ShardId>,
doc_batch: DocBatchV2,
num_persisted_docs: u32,
parse_failures: Vec<ParseFailure>,
expected_position_inclusive: Option<Position>,
successfully_replicated: bool,
Expand Down Expand Up @@ -1937,10 +1954,10 @@ mod tests {
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(0)),
doc_batch: Some(DocBatchV2::for_test([
"",
"[]",
r#"{"foo": "bar"}"#,
r#"{"doc": "test-doc-000"}"#,
"", // invalid
"[]", // invalid
r#"{"foo": "bar"}"#, // invalid
r#"{"doc": "test-doc-000"}"#, // valid
])),
}],
};
Expand Down
49 changes: 33 additions & 16 deletions quickwit/quickwit-ingest/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,48 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use once_cell::sync::Lazy;
use quickwit_common::metrics::{new_counter, new_gauge, IntCounter, IntGauge};
use quickwit_common::metrics::{new_counter, new_counter_vec, new_gauge, IntCounter, IntGauge};

pub struct IngestMetrics {
pub ingested_num_bytes: IntCounter,
pub ingested_num_docs: IntCounter,
pub ingested_docs_bytes_valid: IntCounter,
pub ingested_docs_bytes_invalid: IntCounter,
pub ingested_docs_invalid: IntCounter,
pub ingested_docs_valid: IntCounter,

pub replicated_num_bytes_total: IntCounter,
pub replicated_num_docs_total: IntCounter,
pub queue_count: IntGauge,
}

impl Default for IngestMetrics {
fn default() -> Self {
Self {
ingested_num_bytes: new_counter(
"ingested_num_bytes",
"Total size of the docs ingested in bytes",
"ingest",
&[],
),
ingested_num_docs: new_counter(
"ingested_num_docs",
"Number of docs received to be ingested",
"ingest",
&[],
),
let ingest_docs_bytes_total = new_counter_vec(
"docs_bytes_total",
"Total size of the docs ingested, measured in ingester's leader, after validation and \
before persistence/replication",
"ingest",
&[],
["validity"],
);
let ingested_docs_bytes_valid = ingest_docs_bytes_total.with_label_values(["valid"]);
let ingested_docs_bytes_invalid = ingest_docs_bytes_total.with_label_values(["invalid"]);

let ingest_docs_total = new_counter_vec(
"docs_total",
"Total number of the docs ingested, measured in ingester's leader, after validation \
and before persistence/replication",
"ingest",
&[],
["validity"],
);
let ingested_docs_valid = ingest_docs_total.with_label_values(["valid"]);
let ingested_docs_invalid = ingest_docs_total.with_label_values(["invalid"]);

IngestMetrics {
ingested_docs_bytes_valid,
ingested_docs_bytes_invalid,
ingested_docs_valid,
ingested_docs_invalid,
replicated_num_bytes_total: new_counter(
"replicated_num_bytes_total",
"Total size in bytes of the replicated docs.",
Expand Down

0 comments on commit 0a57d53

Please sign in to comment.