Skip to content

Commit

Permalink
Detailed ingest response
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jan 15, 2025
1 parent dee631b commit 7fe75f6
Show file tree
Hide file tree
Showing 12 changed files with 514 additions and 95 deletions.
1 change: 1 addition & 0 deletions docs/internals/ingest-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ See [full configuration example](https://github.com/quickwit-oss/quickwit/blob/m
- but ingest V2 can also be configured with:
- `ingest_api.replication_factor`, not working yet
- ingest V1 always writes to the WAL of the node receiving the request, V2 potentially forwards it to another node, dynamically assigned by the control plane to distribute the indexing work more evenly.
- ingest V2 parses and validates input documents synchronously. Schema and JSON formatting errors are returned in the ingest response (for ingest V1 those errors were available in the server logs only).
19 changes: 14 additions & 5 deletions docs/reference/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ POST api/v1/<index id>/ingest?commit=wait_for -d \
```

:::info
The payload size is limited to 10MB as this endpoint is intended to receive documents in batch.
The payload size is limited to 10MB [by default](../configuration/node-config.md#ingest-api-configuration) as this endpoint is intended to receive documents in batch.
:::

#### Path variable
Expand All @@ -204,17 +204,26 @@ The payload size is limited to 10MB as this endpoint is intended to receive docu

#### Query parameters

| Variable | Type | Description | Default value |
|---------------------|------------|----------------------------------------------------|---------------|
| `commit` | `String` | The commit behavior: `auto`, `wait_for` or `force` | `auto` |
| Variable | Type | Description | Default value |
|---------------------------|------------|----------------------------------------------------|---------------|
| `commit` | `String` | The commit behavior: `auto`, `wait_for` or `force` | `auto` |
| `detailed_parse_failures` | `bool` | Enable `parse_failures` in the response. Setting to `true` might impact performances negatively. | `false` |

#### Response

The response is a JSON object, and the content type is `application/json; charset=UTF-8.`

| Field | Description | Type |
|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------:|
| `num_docs_for_processing` | Total number of documents ingested for processing. The documents may not have been processed. The API will not return indexing errors, check the server logs for errors. | `number` |
| `num_docs_for_processing` | Total number of documents submitted for processing. The documents may not have been processed. | `number` |
| `num_ingested_docs` | Number of documents successfully persisted in the write ahead log | `number` |
| `num_rejected_docs` | Number of documents that couldn't be parsed (invalid json, bad schema...) | `number` |
| `parse_failures` | List detailing parsing failures. Only available if `detailed_parse_failures` is set to `true`. | `list(object)` |

The parse failure objects contain the following fields:
- `message`: a detailed message explaining the error
- `reason`: on of `invalid_json`, `invalid_schema` or `unspecified`
- `document`: the utf-8 decoded string of the document byte chunk that generated the error


## Index API
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub enum IngestServiceError {
RateLimited(RateLimitingCause),
#[error("ingest service is unavailable ({0})")]
Unavailable(String),
#[error("bad request ({0})")]
Unsupported(String),
}

impl From<AskError<IngestServiceError>> for IngestServiceError {
Expand Down Expand Up @@ -161,6 +163,7 @@ impl ServiceError for IngestServiceError {
}
Self::RateLimited(_) => ServiceErrorCode::TooManyRequests,
Self::Unavailable(_) => ServiceErrorCode::Unavailable,
Self::Unsupported(_) => ServiceErrorCode::BadRequest,
}
}
}
Expand Down Expand Up @@ -204,6 +207,7 @@ impl From<IngestServiceError> for tonic::Status {
IngestServiceError::IoError { .. } => tonic::Code::Internal,
IngestServiceError::RateLimited(_) => tonic::Code::ResourceExhausted,
IngestServiceError::Unavailable(_) => tonic::Code::Unavailable,
IngestServiceError::Unsupported(_) => tonic::Code::InvalidArgument,
};
let message = error.to_string();
tonic::Status::new(code, message)
Expand Down
10 changes: 0 additions & 10 deletions quickwit/quickwit-ingest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,6 @@ pub async fn start_ingest_api_service(
init_ingest_api(universe, &queues_dir_path, config).await
}

impl CommitType {
pub fn to_query_parameter(&self) -> Option<&'static [(&'static str, &'static str)]> {
match self {
CommitType::Auto => None,
CommitType::WaitFor => Some(&[("commit", "wait_for")]),
CommitType::Force => Some(&[("commit", "force")]),
}
}
}

#[macro_export]
macro_rules! with_lock_metrics {
($future:expr, $($label:tt),*) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ use quickwit_rest_client::rest_client::{
CommitType, QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL,
};
use quickwit_serve::tcp_listener::for_tests::TestTcpListenerResolver;
use quickwit_serve::{serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString};
use quickwit_serve::{
serve_quickwit, ListSplitsQueryParams, RestIngestResponse, SearchRequestQueryString,
};
use quickwit_storage::StorageResolver;
use reqwest::Url;
use serde_json::Value;
Expand Down Expand Up @@ -246,11 +248,11 @@ pub(crate) async fn ingest(
index_id: &str,
ingest_source: IngestSource,
commit_type: CommitType,
) -> anyhow::Result<()> {
client
) -> anyhow::Result<RestIngestResponse> {
let resp = client
.ingest(index_id, ingest_source, None, None, commit_type)
.await?;
Ok(())
Ok(resp)
}

/// A test environment where you can start a Quickwit cluster and use the gRPC
Expand Down Expand Up @@ -286,6 +288,15 @@ impl ClusterSandbox {
QuickwitClientBuilder::new(transport_url(node_config.rest_config.listen_addr)).build()
}

/// A client configured to ingest documents and return detailed parse failures.
pub fn detailed_ingest_client(&self) -> QuickwitClient {
let node_config = self.find_node_for_service(QuickwitService::Indexer);

QuickwitClientBuilder::new(transport_url(node_config.rest_config.listen_addr))
.detailed_parse_failures(true)
.build()
}

// TODO(#5604)
pub fn rest_client_legacy_indexer(&self) -> QuickwitClient {
let node_config = self.find_node_for_service(QuickwitService::Indexer);
Expand Down
111 changes: 104 additions & 7 deletions quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ use quickwit_config::service::QuickwitService;
use quickwit_config::ConfigFormat;
use quickwit_indexing::actors::INDEXING_DIR_NAME;
use quickwit_metastore::SplitState;
use quickwit_proto::ingest::ParseFailureReason;
use quickwit_rest_client::error::{ApiError, Error};
use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::rest_client::CommitType;
use quickwit_serve::ListSplitsQueryParams;
use quickwit_serve::{ListSplitsQueryParams, RestIngestResponse, RestParseFailure};
use serde_json::json;

use crate::ingest_json;
Expand Down Expand Up @@ -299,14 +301,23 @@ async fn test_ingest_v2_happy_path() {
.await
.unwrap();

ingest(
let ingest_resp = ingest(
&sandbox.rest_client(QuickwitService::Indexer),
index_id,
ingest_json!({"body": "doc1"}),
CommitType::Auto,
)
.await
.unwrap();
assert_eq!(
ingest_resp,
RestIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
},
);

sandbox
.wait_for_splits(index_id, Some(vec![SplitState::Published]), 1)
Expand Down Expand Up @@ -352,7 +363,7 @@ async fn test_commit_force() {

// commit_timeout_secs is set to a large value, so this would timeout if
// the commit isn't forced
tokio::time::timeout(
let ingest_resp = tokio::time::timeout(
Duration::from_secs(20),
ingest(
&sandbox.rest_client(QuickwitService::Indexer),
Expand All @@ -364,6 +375,15 @@ async fn test_commit_force() {
.await
.unwrap()
.unwrap();
assert_eq!(
ingest_resp,
RestIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
},
);

sandbox.assert_hit_count(index_id, "body:force", 1).await;

Expand Down Expand Up @@ -415,8 +435,9 @@ async fn test_commit_wait_for() {
CommitType::WaitFor,
)
.then(|res| async {
res.unwrap();
let ingest_resp = res.unwrap();
sandbox.assert_hit_count(index_id, "body:for", 1).await;
ingest_resp
});

let ingest_2_fut = client
Expand All @@ -428,11 +449,30 @@ async fn test_commit_wait_for() {
CommitType::WaitFor,
)
.then(|res| async {
res.unwrap();
let ingest_resp = res.unwrap();
sandbox.assert_hit_count(index_id, "body:again", 1).await;
ingest_resp
});

tokio::join!(ingest_1_fut, ingest_2_fut);
let (ingest_resp_1, ingest_resp_2) = tokio::join!(ingest_1_fut, ingest_2_fut);
assert_eq!(
ingest_resp_1,
RestIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
},
);
assert_eq!(
ingest_resp_2,
RestIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
},
);

sandbox.assert_hit_count(index_id, "body:wait", 2).await;

Expand Down Expand Up @@ -475,7 +515,7 @@ async fn test_commit_auto() {
.await
.unwrap();

sandbox
let ingest_resp = sandbox
.rest_client(QuickwitService::Indexer)
.ingest(
index_id,
Expand All @@ -486,6 +526,15 @@ async fn test_commit_auto() {
)
.await
.unwrap();
assert_eq!(
ingest_resp,
RestIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
},
);

sandbox.assert_hit_count(index_id, "body:auto", 0).await;

Expand All @@ -499,6 +548,54 @@ async fn test_commit_auto() {
sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_detailed_ingest_response() {
let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
let index_id = "test_detailed_ingest_response";
let index_config = format!(
r#"
version: 0.8
index_id: {index_id}
doc_mapping:
field_mappings:
- name: body
type: text
indexing_settings:
commit_timeout_secs: 1
"#
);
sandbox
.rest_client(QuickwitService::Indexer)
.indexes()
.create(index_config, ConfigFormat::Yaml, false)
.await
.unwrap();

let ingest_resp = ingest(
&sandbox.detailed_ingest_client(),
index_id,
IngestSource::Str("{\"body\":\"hello\"}\naouch!".to_string()),
CommitType::Auto,
)
.await
.unwrap();

assert_eq!(
ingest_resp,
RestIngestResponse {
num_docs_for_processing: 2,
num_ingested_docs: Some(1),
num_rejected_docs: Some(1),
parse_failures: Some(vec![RestParseFailure {
document: "aouch!".to_string(),
message: "failed to parse JSON document".to_string(),
reason: ParseFailureReason::InvalidJson,
}]),
},
);
sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_very_large_index_name() {
let sandbox = ClusterSandboxBuilder::default()
Expand Down
Loading

0 comments on commit 7fe75f6

Please sign in to comment.