From 5c1469d5cc4f24ad9f1b5c2bed8ec7e641aced85 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Wed, 22 May 2024 08:34:46 -0400 Subject: [PATCH] Populate ES bulk response items on success too (#5019) --- .../src/elasticsearch_api/bulk.rs | 2 +- .../src/elasticsearch_api/bulk_v2.rs | 88 +++++++++++++++++-- .../src/elasticsearch_api/model/error.rs | 3 + .../bulk/0001-happy-path.yaml | 16 ++++ ...action.yaml => 0002-malformed-action.yaml} | 0 ...0003-validation-failed-index-missing.yaml} | 0 ...-validation-failed-no-requests-added.yaml} | 0 .../bulk/_setup.elasticsearch.yaml | 1 - .../bulk/_setup.quickwit.yaml | 16 +++- 9 files changed, 114 insertions(+), 12 deletions(-) create mode 100644 quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0001-happy-path.yaml rename quickwit/rest-api-tests/scenarii/es_compatibility/bulk/{0001-malformed-action.yaml => 0002-malformed-action.yaml} (100%) rename quickwit/rest-api-tests/scenarii/es_compatibility/bulk/{0002-validation-failed-index-missing.yaml => 0003-validation-failed-index-missing.yaml} (100%) rename quickwit/rest-api-tests/scenarii/es_compatibility/bulk/{0003-validation-failed-no-requests-added.yaml => 0004-validation-failed-no-requests-added.yaml} (100%) diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs index 24b03021e7e..ab3047324f6 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs @@ -146,7 +146,7 @@ async fn elastic_ingest_bulk( let bulk_response = ElasticBulkResponse { took_millis, errors, - items: Vec::new(), + actions: Vec::new(), }; Ok(bulk_response) } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs index e7e48617059..c0c7ecf735a 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs @@ -41,18 +41,19 @@ pub(crate) struct ElasticBulkResponse { #[serde(rename = "took")] pub took_millis: u64, pub errors: bool, - pub items: Vec, + #[serde(rename = "items")] + pub actions: Vec, } -#[derive(Debug, Serialize, Deserialize)] -pub(crate) enum ElasticBulkItemAction { +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) enum ElasticBulkAction { #[serde(rename = "create")] Create(ElasticBulkItem), #[serde(rename = "index")] Index(ElasticBulkItem), } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct ElasticBulkItem { #[serde(rename = "_index")] pub index_id: IndexId, @@ -63,7 +64,7 @@ pub(crate) struct ElasticBulkItem { pub error: Option, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct ElasticBulkError { #[serde(rename = "index")] pub index_id: Option, @@ -132,8 +133,32 @@ pub(crate) async fn elastic_bulk_ingest_v2( }; let ingest_response_v2 = ingest_router.ingest(ingest_request).await?; let errors = !ingest_response_v2.failures.is_empty(); - let mut items = Vec::new(); + let mut actions: Vec = Vec::new(); + for success in ingest_response_v2.successes { + let es_doc_ids = per_subrequest_id_es_doc_ids + .remove(&success.subrequest_id) + .ok_or_else(|| { + ElasticsearchError::new( + StatusCode::INTERNAL_SERVER_ERROR, + format!( + "could not find subrequest `{}` in bulk request", + success.subrequest_id + ), + None, + ) + })?; + for es_doc_id in es_doc_ids { + let item = ElasticBulkItem { + index_id: success.index_uid().index_id.clone(), + es_doc_id, + status: StatusCode::CREATED, + error: None, + }; + let action = ElasticBulkAction::Index(item); + actions.push(action); + } + } for failure in ingest_response_v2.failures { let es_doc_ids = per_subrequest_id_es_doc_ids .remove(&failure.subrequest_id) @@ -161,7 +186,25 @@ pub(crate) async fn elastic_bulk_ingest_v2( status: StatusCode::NOT_FOUND, error: Some(error), }; - items.push(ElasticBulkItemAction::Index(item)); + let action = ElasticBulkAction::Index(item); + actions.push(action); + } + } + IngestFailureReason::Timeout => { + for es_doc_id in es_doc_ids { + let error = ElasticBulkError { + index_id: Some(failure.index_id.clone()), + exception: ErrorCauseException::Timeout, + reason: format!("timeout [{}]", failure.index_id), + }; + let item = ElasticBulkItem { + index_id: failure.index_id.clone(), + es_doc_id, + status: StatusCode::REQUEST_TIMEOUT, + error: Some(error), + }; + let action = ElasticBulkAction::Index(item); + actions.push(action); } } _ => { @@ -174,7 +217,7 @@ pub(crate) async fn elastic_bulk_ingest_v2( let bulk_response = ElasticBulkResponse { took_millis, errors, - items, + actions, }; Ok(bulk_response) } @@ -274,6 +317,33 @@ mod tests { let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap(); assert!(!bulk_response.errors); + + let mut items = bulk_response + .actions + .into_iter() + .map(|action| match action { + ElasticBulkAction::Create(item) => item, + ElasticBulkAction::Index(item) => item, + }) + .collect::>(); + assert_eq!(items.len(), 3); + + items.sort_by(|left, right| { + left.index_id + .cmp(&right.index_id) + .then(left.es_doc_id.cmp(&right.es_doc_id)) + }); + assert_eq!(items[0].index_id, "my-index-1"); + assert!(items[0].es_doc_id.is_none()); + assert_eq!(items[0].status, StatusCode::CREATED); + + assert_eq!(items[1].index_id, "my-index-1"); + assert_eq!(items[1].es_doc_id.as_ref().unwrap(), "1"); + assert_eq!(items[1].status, StatusCode::CREATED); + + assert_eq!(items[2].index_id, "my-index-2"); + assert_eq!(items[2].es_doc_id.as_ref().unwrap(), "1"); + assert_eq!(items[2].status, StatusCode::CREATED); } #[tokio::test] @@ -466,6 +536,6 @@ mod tests { let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap(); assert!(bulk_response.errors); - assert_eq!(bulk_response.items.len(), 3); + assert_eq!(bulk_response.actions.len(), 3); } } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs index c648eefc89f..b6ebcb1c353 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs @@ -142,6 +142,8 @@ pub enum ErrorCauseException { IllegalArgument, #[serde(rename = "index_not_found_exception")] IndexNotFound, + #[serde(rename = "timeout_exception")] + Timeout, } impl ErrorCauseException { @@ -150,6 +152,7 @@ impl ErrorCauseException { Self::ActionRequestValidation => "action_request_validation_exception", Self::IllegalArgument => "illegal_argument_exception", Self::IndexNotFound => "index_not_found_exception", + Self::Timeout => "timeout_exception", } } } diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0001-happy-path.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0001-happy-path.yaml new file mode 100644 index 00000000000..589288f8c56 --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0001-happy-path.yaml @@ -0,0 +1,16 @@ +ndjson: + - index: { "_index": "test-index-happy-path", "_id": "1" } + - message: Hello, World! + - index: { "_index": "test-index-happy-path" } + - message: Hola, Mundo! +status_code: 200 +expected: + errors: false + items: + - index: + _index: test-index-happy-path + _id: "1" + status: 201 + - index: + _index: test-index-happy-path + status: 201 diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0001-malformed-action.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0002-malformed-action.yaml similarity index 100% rename from quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0001-malformed-action.yaml rename to quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0002-malformed-action.yaml diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0002-validation-failed-index-missing.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0003-validation-failed-index-missing.yaml similarity index 100% rename from quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0002-validation-failed-index-missing.yaml rename to quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0003-validation-failed-index-missing.yaml diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0003-validation-failed-no-requests-added.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0004-validation-failed-no-requests-added.yaml similarity index 100% rename from quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0003-validation-failed-no-requests-added.yaml rename to quickwit/rest-api-tests/scenarii/es_compatibility/bulk/0004-validation-failed-no-requests-added.yaml diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/_setup.elasticsearch.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/_setup.elasticsearch.yaml index d75a7b895c7..15bdb58fee6 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/_setup.elasticsearch.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/_setup.elasticsearch.yaml @@ -15,4 +15,3 @@ json: { } } } - diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/_setup.quickwit.yaml index 24c8a3f6d59..3d3997dc417 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/bulk/_setup.quickwit.yaml @@ -8,10 +8,24 @@ method: POST api_root: http://localhost:7280/api/v1/ endpoint: indexes/ json: - version: "0.7" + version: "0.8" index_id: test-index doc_mapping: field_mappings: - name: message type: text sleep_after: 3 +--- +# Create index template +method: POST +api_root: http://localhost:7280/api/v1/ +endpoint: templates +json: + version: "0.8" + template_id: test-index-template + index_id_patterns: + - test-index-happy-path* + doc_mapping: + mode: dynamic + indexing_settings: + commit_timeout_secs: 1