Skip to content

Commit

Permalink
Return TooManyRequests if all subrequests are rated-limited or time…
Browse files Browse the repository at this point in the history
…d out (#4792)
  • Loading branch information
guilload authored Mar 26, 2024
1 parent bcb4d06 commit 698555a
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 18 deletions.
10 changes: 5 additions & 5 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,14 +404,14 @@ impl IngestRouter {
&mut self,
ingest_request: IngestRequestV2,
max_num_attempts: usize,
) -> IngestResponseV2 {
) -> IngestV2Result<IngestResponseV2> {
let commit_type = ingest_request.commit_type();
let mut workbench = IngestWorkbench::new(ingest_request.subrequests, max_num_attempts);
while !workbench.is_complete() {
workbench.new_attempt();
self.batch_persist(&mut workbench, commit_type).await;
}
workbench.into_ingest_response()
workbench.into_ingest_result()
}

async fn ingest_timeout(
Expand All @@ -430,7 +430,7 @@ impl IngestRouter {
INGEST_REQUEST_TIMEOUT.as_secs()
);
IngestV2Error::Timeout(message)
})
})?
}
}

Expand Down Expand Up @@ -1217,7 +1217,7 @@ mod tests {
subrequest_ids: vec![0],
};
let persist_result =
Err::<_, IngestV2Error>(IngestV2Error::Timeout("timeout error".to_string()));
Err::<_, IngestV2Error>(IngestV2Error::Internal("internal error".to_string()));
(persist_summary, persist_result)
});
router
Expand All @@ -1227,7 +1227,7 @@ mod tests {
let subworkbench = workbench.subworkbenches.get(&0).unwrap();
assert!(matches!(
&subworkbench.last_failure_opt,
&Some(SubworkbenchFailure::Internal(ref msg)) if msg.contains("timed out")
Some(SubworkbenchFailure::Internal(msg)) if msg.contains("internal")
));

assert!(!workbench
Expand Down
117 changes: 106 additions & 11 deletions quickwit/quickwit-ingest/src/ingest_v2/workbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use quickwit_proto::ingest::ingester::{PersistFailure, PersistFailureReason, Per
use quickwit_proto::ingest::router::{
IngestFailure, IngestFailureReason, IngestResponseV2, IngestSubrequest, IngestSuccess,
};
use quickwit_proto::ingest::IngestV2Error;
use quickwit_proto::ingest::{IngestV2Error, IngestV2Result};
use quickwit_proto::types::{NodeId, SubrequestId};
use tracing::warn;

Expand Down Expand Up @@ -144,10 +144,15 @@ impl IngestWorkbench {
persist_error: IngestV2Error,
persist_summary: PersistRequestSummary,
) {
// Persist responses use dedicated failure reasons for `IngesterUnavailable`,
// `NotFound`, and `TooManyRequests`: in reality, we should never have to handle these cases
// here.
// Persist responses use dedicated failure reasons for `ShardNotFound` and
// `TooManyRequests`: in reality, we should never have to handle these cases here.
match persist_error {
IngestV2Error::Timeout(_) => {
for subrequest_id in persist_summary.subrequest_ids {
let failure = SubworkbenchFailure::Persist(PersistFailureReason::Timeout);
self.record_failure(subrequest_id, failure);
}
}
IngestV2Error::Unavailable(_) => {
self.unavailable_leaders.insert(persist_summary.leader_id);

Expand All @@ -157,7 +162,6 @@ impl IngestWorkbench {
}
IngestV2Error::Internal(_)
| IngestV2Error::ShardNotFound { .. }
| IngestV2Error::Timeout(_)
| IngestV2Error::TooManyRequests => {
for subrequest_id in persist_summary.subrequest_ids {
self.record_internal_error(subrequest_id, persist_error.to_string());
Expand Down Expand Up @@ -195,7 +199,7 @@ impl IngestWorkbench {
self.record_failure(subrequest_id, SubworkbenchFailure::Internal(error_message));
}

pub fn into_ingest_response(self) -> IngestResponseV2 {
pub fn into_ingest_result(self) -> IngestV2Result<IngestResponseV2> {
let num_subworkbenches = self.subworkbenches.len();
let mut successes = Vec::with_capacity(self.num_successes);
let mut failures = Vec::with_capacity(num_subworkbenches - self.num_successes);
Expand All @@ -220,12 +224,28 @@ impl IngestWorkbench {
failures.push(failure);
}
}
assert_eq!(successes.len() + failures.len(), num_subworkbenches);

IngestResponseV2 {
let num_successes = successes.len();
let num_failures = failures.len();
assert_eq!(num_successes + num_failures, num_subworkbenches);

if self.num_successes == 0
&& num_failures > 0
&& failures.iter().all(|failure| {
matches!(
failure.reason(),
IngestFailureReason::RateLimited
| IngestFailureReason::ResourceExhausted
| IngestFailureReason::Timeout
)
})
{
return Err(IngestV2Error::TooManyRequests);
}
let response = IngestResponseV2 {
successes,
failures,
}
};
Ok(response)
}

#[cfg(test)]
Expand Down Expand Up @@ -337,7 +357,7 @@ mod tests {
assert!(subworkbench.last_failure_is_transient());

subworkbench.last_failure_opt =
Some(SubworkbenchFailure::Internal("timed out".to_string()));
Some(SubworkbenchFailure::Internal("internal error".to_string()));
assert!(subworkbench.is_pending());
assert!(subworkbench.last_failure_is_transient());

Expand Down Expand Up @@ -510,6 +530,32 @@ mod tests {
assert_eq!(subworkbench.num_attempts, 1);
}

#[test]
fn test_ingest_workbench_record_persist_error_timeout() {
let ingest_subrequests = vec![IngestSubrequest {
subrequest_id: 0,
..Default::default()
}];
let mut workbench = IngestWorkbench::new(ingest_subrequests, 1);

let persist_error = IngestV2Error::Timeout("request timed out".to_string());
let leader_id = NodeId::from("test-leader");
let persist_summary = PersistRequestSummary {
leader_id: leader_id.clone(),
subrequest_ids: vec![0],
};
workbench.record_persist_error(persist_error, persist_summary);

let subworkbench = workbench.subworkbenches.get(&0).unwrap();
assert_eq!(subworkbench.num_attempts, 1);

assert!(matches!(
subworkbench.last_failure_opt,
Some(SubworkbenchFailure::Persist(PersistFailureReason::Timeout))
));
assert!(subworkbench.persist_success_opt.is_none());
}

#[test]
fn test_ingest_workbench_record_persist_error_unavailable() {
let ingest_subrequests = vec![IngestSubrequest {
Expand Down Expand Up @@ -616,4 +662,53 @@ mod tests {
));
assert_eq!(subworkbench.num_attempts, 1);
}

#[test]
fn test_ingest_workbench_into_ingest_result() {
let workbench = IngestWorkbench::new(Vec::new(), 0);
let response = workbench.into_ingest_result().unwrap();
assert!(response.successes.is_empty());
assert!(response.failures.is_empty());

let ingest_subrequests = vec![
IngestSubrequest {
subrequest_id: 0,
..Default::default()
},
IngestSubrequest {
subrequest_id: 1,
..Default::default()
},
];
let mut workbench = IngestWorkbench::new(ingest_subrequests, 1);
let persist_success = PersistSuccess {
..Default::default()
};
let subworkbench = workbench.subworkbenches.get_mut(&0).unwrap();
subworkbench.persist_success_opt = Some(persist_success);

workbench.record_no_shards_available(1);

let response = workbench.into_ingest_result().unwrap();
assert_eq!(response.successes.len(), 1);
assert_eq!(response.successes[0].subrequest_id, 0);

assert_eq!(response.failures.len(), 1);
assert_eq!(response.failures[0].subrequest_id, 1);
assert_eq!(
response.failures[0].reason(),
IngestFailureReason::NoShardsAvailable
);

let ingest_subrequests = vec![IngestSubrequest {
subrequest_id: 0,
..Default::default()
}];
let mut workbench = IngestWorkbench::new(ingest_subrequests, 1);
let failure = SubworkbenchFailure::Persist(PersistFailureReason::Timeout);
workbench.record_failure(0, failure);

let error = workbench.into_ingest_result().unwrap_err();
assert_eq!(error, IngestV2Error::TooManyRequests);
}
}
1 change: 1 addition & 0 deletions quickwit/quickwit-proto/protos/quickwit/ingester.proto
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ enum PersistFailureReason {
PERSIST_FAILURE_REASON_SHARD_CLOSED = 2;
PERSIST_FAILURE_REASON_RATE_LIMITED = 3;
PERSIST_FAILURE_REASON_RESOURCE_EXHAUSTED = 4;
PERSIST_FAILURE_REASON_TIMEOUT = 5;
}

message PersistFailure {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-proto/protos/quickwit/router.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ enum IngestFailureReason {
INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE = 4;
INGEST_FAILURE_REASON_RATE_LIMITED = 5;
INGEST_FAILURE_REASON_RESOURCE_EXHAUSTED = 6;
INGEST_FAILURE_REASON_TIMEOUT = 7;
}

message IngestFailure {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-proto/src/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ impl From<PersistFailureReason> for IngestFailureReason {
PersistFailureReason::ShardClosed => IngestFailureReason::NoShardsAvailable,
PersistFailureReason::ResourceExhausted => IngestFailureReason::ResourceExhausted,
PersistFailureReason::RateLimited => IngestFailureReason::RateLimited,
PersistFailureReason::Timeout => IngestFailureReason::Timeout,
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ fn convert_ingest_response_v2(
let ingest_failure = response.failures.pop().unwrap();
Err(match ingest_failure.reason() {
IngestFailureReason::Unspecified => {
IngestServiceError::Internal("Unknown reason".to_string())
IngestServiceError::Internal("unknown error".to_string())
}
IngestFailureReason::IndexNotFound => IngestServiceError::IndexNotFound {
index_id: ingest_failure.index_id,
Expand All @@ -183,10 +183,13 @@ fn convert_ingest_response_v2(
"Ingest v2 source not found for index {}",
ingest_failure.index_id
)),
IngestFailureReason::Internal => IngestServiceError::Internal("Internal error".to_string()),
IngestFailureReason::Internal => IngestServiceError::Internal("internal error".to_string()),
IngestFailureReason::NoShardsAvailable => IngestServiceError::Unavailable,
IngestFailureReason::RateLimited => IngestServiceError::RateLimited,
IngestFailureReason::ResourceExhausted => IngestServiceError::RateLimited,
IngestFailureReason::Timeout => {
IngestServiceError::Internal("request timed out".to_string())
}
})
}

Expand Down

0 comments on commit 698555a

Please sign in to comment.