Skip to content

Commit

Permalink
Populate ES bulk response items on success too (#5019)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored May 22, 2024
1 parent ec502f0 commit 5c1469d
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 12 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async fn elastic_ingest_bulk(
let bulk_response = ElasticBulkResponse {
took_millis,
errors,
items: Vec::new(),
actions: Vec::new(),
};
Ok(bulk_response)
}
Expand Down
88 changes: 79 additions & 9 deletions quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,19 @@ pub(crate) struct ElasticBulkResponse {
#[serde(rename = "took")]
pub took_millis: u64,
pub errors: bool,
pub items: Vec<ElasticBulkItemAction>,
#[serde(rename = "items")]
pub actions: Vec<ElasticBulkAction>,
}

#[derive(Debug, Serialize, Deserialize)]
pub(crate) enum ElasticBulkItemAction {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum ElasticBulkAction {
#[serde(rename = "create")]
Create(ElasticBulkItem),
#[serde(rename = "index")]
Index(ElasticBulkItem),
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ElasticBulkItem {
#[serde(rename = "_index")]
pub index_id: IndexId,
Expand All @@ -63,7 +64,7 @@ pub(crate) struct ElasticBulkItem {
pub error: Option<ElasticBulkError>,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ElasticBulkError {
#[serde(rename = "index")]
pub index_id: Option<IndexId>,
Expand Down Expand Up @@ -132,8 +133,32 @@ pub(crate) async fn elastic_bulk_ingest_v2(
};
let ingest_response_v2 = ingest_router.ingest(ingest_request).await?;
let errors = !ingest_response_v2.failures.is_empty();
let mut items = Vec::new();
let mut actions: Vec<ElasticBulkAction> = Vec::new();

for success in ingest_response_v2.successes {
let es_doc_ids = per_subrequest_id_es_doc_ids
.remove(&success.subrequest_id)
.ok_or_else(|| {
ElasticsearchError::new(
StatusCode::INTERNAL_SERVER_ERROR,
format!(
"could not find subrequest `{}` in bulk request",
success.subrequest_id
),
None,
)
})?;
for es_doc_id in es_doc_ids {
let item = ElasticBulkItem {
index_id: success.index_uid().index_id.clone(),
es_doc_id,
status: StatusCode::CREATED,
error: None,
};
let action = ElasticBulkAction::Index(item);
actions.push(action);
}
}
for failure in ingest_response_v2.failures {
let es_doc_ids = per_subrequest_id_es_doc_ids
.remove(&failure.subrequest_id)
Expand Down Expand Up @@ -161,7 +186,25 @@ pub(crate) async fn elastic_bulk_ingest_v2(
status: StatusCode::NOT_FOUND,
error: Some(error),
};
items.push(ElasticBulkItemAction::Index(item));
let action = ElasticBulkAction::Index(item);
actions.push(action);
}
}
IngestFailureReason::Timeout => {
for es_doc_id in es_doc_ids {
let error = ElasticBulkError {
index_id: Some(failure.index_id.clone()),
exception: ErrorCauseException::Timeout,
reason: format!("timeout [{}]", failure.index_id),
};
let item = ElasticBulkItem {
index_id: failure.index_id.clone(),
es_doc_id,
status: StatusCode::REQUEST_TIMEOUT,
error: Some(error),
};
let action = ElasticBulkAction::Index(item);
actions.push(action);
}
}
_ => {
Expand All @@ -174,7 +217,7 @@ pub(crate) async fn elastic_bulk_ingest_v2(
let bulk_response = ElasticBulkResponse {
took_millis,
errors,
items,
actions,
};
Ok(bulk_response)
}
Expand Down Expand Up @@ -274,6 +317,33 @@ mod tests {

let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap();
assert!(!bulk_response.errors);

let mut items = bulk_response
.actions
.into_iter()
.map(|action| match action {
ElasticBulkAction::Create(item) => item,
ElasticBulkAction::Index(item) => item,
})
.collect::<Vec<_>>();
assert_eq!(items.len(), 3);

items.sort_by(|left, right| {
left.index_id
.cmp(&right.index_id)
.then(left.es_doc_id.cmp(&right.es_doc_id))
});
assert_eq!(items[0].index_id, "my-index-1");
assert!(items[0].es_doc_id.is_none());
assert_eq!(items[0].status, StatusCode::CREATED);

assert_eq!(items[1].index_id, "my-index-1");
assert_eq!(items[1].es_doc_id.as_ref().unwrap(), "1");
assert_eq!(items[1].status, StatusCode::CREATED);

assert_eq!(items[2].index_id, "my-index-2");
assert_eq!(items[2].es_doc_id.as_ref().unwrap(), "1");
assert_eq!(items[2].status, StatusCode::CREATED);
}

#[tokio::test]
Expand Down Expand Up @@ -466,6 +536,6 @@ mod tests {

let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap();
assert!(bulk_response.errors);
assert_eq!(bulk_response.items.len(), 3);
assert_eq!(bulk_response.actions.len(), 3);
}
}
3 changes: 3 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ pub enum ErrorCauseException {
IllegalArgument,
#[serde(rename = "index_not_found_exception")]
IndexNotFound,
#[serde(rename = "timeout_exception")]
Timeout,
}

impl ErrorCauseException {
Expand All @@ -150,6 +152,7 @@ impl ErrorCauseException {
Self::ActionRequestValidation => "action_request_validation_exception",
Self::IllegalArgument => "illegal_argument_exception",
Self::IndexNotFound => "index_not_found_exception",
Self::Timeout => "timeout_exception",
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
ndjson:
- index: { "_index": "test-index-happy-path", "_id": "1" }
- message: Hello, World!
- index: { "_index": "test-index-happy-path" }
- message: Hola, Mundo!
status_code: 200
expected:
errors: false
items:
- index:
_index: test-index-happy-path
_id: "1"
status: 201
- index:
_index: test-index-happy-path
status: 201
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,3 @@ json: {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,24 @@ method: POST
api_root: http://localhost:7280/api/v1/
endpoint: indexes/
json:
version: "0.7"
version: "0.8"
index_id: test-index
doc_mapping:
field_mappings:
- name: message
type: text
sleep_after: 3
---
# Create index template
method: POST
api_root: http://localhost:7280/api/v1/
endpoint: templates
json:
version: "0.8"
template_id: test-index-template
index_id_patterns:
- test-index-happy-path*
doc_mapping:
mode: dynamic
indexing_settings:
commit_timeout_secs: 1

0 comments on commit 5c1469d

Please sign in to comment.