Skip to content

Commit

Permalink
Removing invalid docs from persistence.
Browse files Browse the repository at this point in the history
Closes #5205
  • Loading branch information
fulmicoton committed Jul 9, 2024
1 parent fc0c52e commit 773c73d
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 67 deletions.
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,10 @@ impl IngestApiService {

num_docs += batch_num_docs;
INGEST_METRICS
.ingested_num_bytes
.ingested_docs_bytes_valid
.inc_by(batch_num_bytes as u64);
INGEST_METRICS
.ingested_num_docs
.ingested_docs_valid
.inc_by(batch_num_docs as u64);
}
// TODO we could fsync here and disable autosync to have better i/o perfs.
Expand Down
88 changes: 59 additions & 29 deletions quickwit/quickwit-ingest/src/ingest_v2/doc_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Weak};

use once_cell::sync::OnceCell;
Expand All @@ -28,10 +28,12 @@ use quickwit_doc_mapper::DocMapper;
use quickwit_proto::ingest::{
DocBatchV2, IngestV2Error, IngestV2Result, ParseFailure, ParseFailureReason,
};
use quickwit_proto::types::DocMappingUid;
use quickwit_proto::types::{DocMappingUid, DocUid};
use serde_json_borrow::Value as JsonValue;
use tracing::info;

use crate::DocBatchV2Builder;

/// Attempts to get the doc mapper identified by the given doc mapping UID `doc_mapping_uid` from
/// the `doc_mappers` cache. If it is not found, it is built from the specified JSON doc mapping
/// `doc_mapping_json` and inserted into the cache before being returned.
Expand Down Expand Up @@ -72,40 +74,64 @@ pub(super) fn try_build_doc_mapper(doc_mapping_json: &str) -> IngestV2Result<Arc
Ok(doc_mapper)
}

fn validate_document(
doc_mapper: &dyn DocMapper,
doc_bytes: &[u8],
) -> Result<(), (ParseFailureReason, String)> {
let Ok(json_doc) = serde_json::from_slice::<serde_json_borrow::Value>(doc_bytes) else {
return Err((
ParseFailureReason::InvalidJson,
"failed to parse JSON document".to_string(),
));
};
let JsonValue::Object(json_obj) = json_doc else {
return Err((
ParseFailureReason::InvalidJson,
"JSON document is not an object".to_string(),
));
};
if let Err(error) = doc_mapper.validate_json_obj(&json_obj) {
return Err((ParseFailureReason::InvalidSchema, error.to_string()));
}
Ok(())
}

/// Validates a batch of docs.
///
/// Returns a batch of valid docs and the list of errors.
fn validate_doc_batch_impl(
doc_batch: DocBatchV2,
doc_mapper: Arc<dyn DocMapper>,
doc_mapper: &dyn DocMapper,
) -> (DocBatchV2, Vec<ParseFailure>) {
let mut parse_failures: Vec<ParseFailure> = Vec::new();
for (doc_uid, doc) in doc_batch.docs() {
let Ok(json_doc) = serde_json::from_slice::<serde_json_borrow::Value>(&doc) else {
let parse_failure = ParseFailure {
doc_uid: Some(doc_uid),
reason: ParseFailureReason::InvalidJson as i32,
message: "failed to parse JSON document".to_string(),
};
parse_failures.push(parse_failure);
continue;
};
let JsonValue::Object(json_obj) = json_doc else {
let mut invalid_doc_ids: HashSet<DocUid> = HashSet::default();
for (doc_uid, doc_bytes) in doc_batch.docs() {
if let Err((reason, message)) = validate_document(doc_mapper, &doc_bytes) {
let parse_failure = ParseFailure {
doc_uid: Some(doc_uid),
reason: ParseFailureReason::InvalidJson as i32,
message: "JSON document is not an object".to_string(),
};
parse_failures.push(parse_failure);
continue;
};
if let Err(error) = doc_mapper.validate_json_obj(&json_obj) {
let parse_failure = ParseFailure {
doc_uid: Some(doc_uid),
reason: ParseFailureReason::InvalidSchema as i32,
message: error.to_string(),
reason: reason as i32,
message,
};
invalid_doc_ids.insert(doc_uid);
parse_failures.push(parse_failure);
}
}
(doc_batch, parse_failures)
if invalid_doc_ids.is_empty() {
// All docs are valid! We don't need to build a valid doc batch.
return (doc_batch, parse_failures);
}
let mut valid_doc_batch_builder = DocBatchV2Builder::default();
for (doc_uid, doc_bytes) in doc_batch.docs() {
if !invalid_doc_ids.contains(&doc_uid) {
valid_doc_batch_builder.add_doc(doc_uid, &doc_bytes);
}
}
let valid_doc_batch: DocBatchV2 = valid_doc_batch_builder.build().unwrap_or_default();
assert_eq!(
valid_doc_batch.num_docs() + parse_failures.len(),
doc_batch.num_docs()
);
(valid_doc_batch, parse_failures)
}

fn is_document_validation_enabled() -> bool {
Expand All @@ -122,7 +148,7 @@ pub(super) async fn validate_doc_batch(
doc_mapper: Arc<dyn DocMapper>,
) -> IngestV2Result<(DocBatchV2, Vec<ParseFailure>)> {
if is_document_validation_enabled() {
run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, doc_mapper))
run_cpu_intensive(move || validate_doc_batch_impl(doc_batch, &*doc_mapper))
.await
.map_err(|error| {
let message = format!("failed to validate documents: {error}");
Expand Down Expand Up @@ -230,12 +256,12 @@ mod tests {
let doc_mapper = try_build_doc_mapper(doc_mapping_json).unwrap();
let doc_batch = DocBatchV2::default();

let (_, parse_failures) = validate_doc_batch_impl(doc_batch, doc_mapper.clone());
let (_, parse_failures) = validate_doc_batch_impl(doc_batch, &*doc_mapper);
assert_eq!(parse_failures.len(), 0);

let doc_batch =
DocBatchV2::for_test(["", "[]", r#"{"foo": "bar"}"#, r#"{"doc": "test-doc-000"}"#]);
let (_, parse_failures) = validate_doc_batch_impl(doc_batch, doc_mapper);
let (doc_batch, parse_failures) = validate_doc_batch_impl(doc_batch, &*doc_mapper);
assert_eq!(parse_failures.len(), 3);

let parse_failure_0 = &parse_failures[0];
Expand All @@ -252,5 +278,9 @@ mod tests {
assert_eq!(parse_failure_2.doc_uid(), DocUid::for_test(2));
assert_eq!(parse_failure_2.reason(), ParseFailureReason::InvalidSchema);
assert!(parse_failure_2.message.contains("not declared"));

assert_eq!(doc_batch.num_docs(), 1);
assert_eq!(doc_batch.doc_uids[0], DocUid::for_test(3));
assert_eq!(doc_batch.docs().next().unwrap(), (DocUid::for_test(3)));
}
}
57 changes: 37 additions & 20 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ use super::IngesterPool;
use crate::ingest_v2::doc_mapper::get_or_try_build_doc_mapper;
use crate::ingest_v2::metrics::report_wal_usage;
use crate::ingest_v2::models::IngesterShardType;
use crate::metrics::INGEST_METRICS;
use crate::mrecordlog_async::MultiRecordLogAsync;
use crate::{estimate_size, with_lock_metrics, FollowerId};

Expand Down Expand Up @@ -568,24 +567,48 @@ impl Ingester {
persist_failures.push(persist_failure);
continue;
}
let (doc_batch, parse_failures) = validate_doc_batch(doc_batch, doc_mapper).await?;
let num_persisted_docs = (doc_batch.num_docs() - parse_failures.len()) as u32;

if num_persisted_docs == 0 {
// Total number of bytes (valid and invalid documents)
let original_batch_num_bytes = doc_batch.num_bytes() as u64;
let (valid_doc_batch, parse_failures) =
validate_doc_batch(doc_batch, doc_mapper).await?;

if valid_doc_batch.is_empty() {
crate::metrics::INGEST_METRICS
.ingested_docs_invalid
.inc_by(parse_failures.len() as u64);
crate::metrics::INGEST_METRICS
.ingested_docs_bytes_invalid
.inc_by(original_batch_num_bytes);
let persist_success = PersistSuccess {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
replication_position_inclusive: Some(from_position_exclusive),
num_persisted_docs,
num_persisted_docs: 0,
parse_failures,
};
persist_successes.push(persist_success);
continue;
};

crate::metrics::INGEST_METRICS
.ingested_docs_valid
.inc_by(valid_doc_batch.num_docs() as u64);
crate::metrics::INGEST_METRICS
.ingested_docs_bytes_valid
.inc_by(valid_doc_batch.num_bytes() as u64);
if !parse_failures.is_empty() {
crate::metrics::INGEST_METRICS
.ingested_docs_invalid
.inc_by(parse_failures.len() as u64);
crate::metrics::INGEST_METRICS
.ingested_docs_bytes_invalid
.inc_by(original_batch_num_bytes - valid_doc_batch.num_bytes() as u64);
}
let batch_num_bytes = doc_batch.num_bytes() as u64;
rate_meter.update(batch_num_bytes);
let valid_batch_num_bytes = valid_doc_batch.num_bytes() as u64;
rate_meter.update(valid_batch_num_bytes);
total_requested_capacity += requested_capacity;

let mut successfully_replicated = true;
Expand All @@ -599,7 +622,7 @@ impl Ingester {
source_id: subrequest.source_id.clone(),
shard_id: subrequest.shard_id.clone(),
from_position_exclusive: Some(from_position_exclusive),
doc_batch: Some(doc_batch.clone()),
doc_batch: Some(valid_doc_batch.clone()),
};
per_follower_replicate_subrequests
.entry(follower_id)
Expand All @@ -612,8 +635,7 @@ impl Ingester {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
doc_batch,
num_persisted_docs,
doc_batch: valid_doc_batch,
parse_failures,
expected_position_inclusive: None,
successfully_replicated,
Expand Down Expand Up @@ -697,7 +719,6 @@ impl Ingester {
}
let queue_id = subrequest.queue_id;

let batch_num_bytes = subrequest.doc_batch.num_bytes() as u64;
let batch_num_docs = subrequest.doc_batch.num_docs() as u64;

let append_result = append_non_empty_doc_batch(
Expand Down Expand Up @@ -754,16 +775,13 @@ impl Ingester {
.expect("primary shard should exist")
.set_replication_position_inclusive(current_position_inclusive.clone(), now);

INGEST_METRICS.ingested_num_bytes.inc_by(batch_num_bytes);
INGEST_METRICS.ingested_num_docs.inc_by(batch_num_docs);

let persist_success = PersistSuccess {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
replication_position_inclusive: Some(current_position_inclusive),
num_persisted_docs: subrequest.num_persisted_docs,
num_persisted_docs: batch_num_docs as u32,
parse_failures: subrequest.parse_failures,
};
persist_successes.push(persist_success);
Expand Down Expand Up @@ -1259,7 +1277,6 @@ struct PendingPersistSubrequest {
source_id: SourceId,
shard_id: Option<ShardId>,
doc_batch: DocBatchV2,
num_persisted_docs: u32,
parse_failures: Vec<ParseFailure>,
expected_position_inclusive: Option<Position>,
successfully_replicated: bool,
Expand Down Expand Up @@ -1937,10 +1954,10 @@ mod tests {
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(0)),
doc_batch: Some(DocBatchV2::for_test([
"",
"[]",
r#"{"foo": "bar"}"#,
r#"{"doc": "test-doc-000"}"#,
"", // invalid
"[]", // invalid
r#"{"foo": "bar"}"#, // invalid
r#"{"doc": "test-doc-000"}"#, // valid
])),
}],
};
Expand Down
49 changes: 33 additions & 16 deletions quickwit/quickwit-ingest/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,48 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use once_cell::sync::Lazy;
use quickwit_common::metrics::{new_counter, new_gauge, IntCounter, IntGauge};
use quickwit_common::metrics::{new_counter, new_counter_vec, new_gauge, IntCounter, IntGauge};

pub struct IngestMetrics {
pub ingested_num_bytes: IntCounter,
pub ingested_num_docs: IntCounter,
pub ingested_docs_bytes_valid: IntCounter,
pub ingested_docs_bytes_invalid: IntCounter,
pub ingested_docs_invalid: IntCounter,
pub ingested_docs_valid: IntCounter,

pub replicated_num_bytes_total: IntCounter,
pub replicated_num_docs_total: IntCounter,
pub queue_count: IntGauge,
}

impl Default for IngestMetrics {
fn default() -> Self {
Self {
ingested_num_bytes: new_counter(
"ingested_num_bytes",
"Total size of the docs ingested in bytes",
"ingest",
&[],
),
ingested_num_docs: new_counter(
"ingested_num_docs",
"Number of docs received to be ingested",
"ingest",
&[],
),
let ingest_docs_bytes_total = new_counter_vec(
"ingested_docs_bytes_total",
"Total size of the docs ingested, measured in ingester's leader, after validation and \
before persistence/replication",
"ingest",
&[],
["validity"],
);
let ingested_docs_bytes_valid = ingest_docs_bytes_total.with_label_values(["valid"]);
let ingested_docs_bytes_invalid = ingest_docs_bytes_total.with_label_values(["invalid"]);

let ingest_docs_total = new_counter_vec(
"ingested_docs_total",
"Total number of the docs ingested, measured in ingester's leader, after validation \
and before persistence/replication",
"ingest",
&[],
["validity"],
);
let ingested_docs_valid = ingest_docs_total.with_label_values(["valid"]);
let ingested_docs_invalid = ingest_docs_total.with_label_values(["invalid"]);

IngestMetrics {
ingested_docs_bytes_valid,
ingested_docs_bytes_invalid,
ingested_docs_valid,
ingested_docs_invalid,
replicated_num_bytes_total: new_counter(
"replicated_num_bytes_total",
"Total size in bytes of the replicated docs.",
Expand Down

0 comments on commit 773c73d

Please sign in to comment.