From 698555a99dc0c41f9726e32e6a403112d6ed6882 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Tue, 26 Mar 2024 04:26:29 -0700 Subject: [PATCH] Return `TooManyRequests` if all subrequests are rated-limited or timed out (#4792) --- .../quickwit-ingest/src/ingest_v2/router.rs | 10 +- .../src/ingest_v2/workbench.rs | 117 ++++++++++++++++-- .../protos/quickwit/ingester.proto | 1 + .../protos/quickwit/router.proto | 1 + .../quickwit/quickwit.ingest.ingester.rs | 3 + .../quickwit/quickwit.ingest.router.rs | 3 + quickwit/quickwit-proto/src/ingest/mod.rs | 1 + .../src/ingest_api/rest_handler.rs | 7 +- 8 files changed, 125 insertions(+), 18 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index f65d9400638..13035e51278 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -404,14 +404,14 @@ impl IngestRouter { &mut self, ingest_request: IngestRequestV2, max_num_attempts: usize, - ) -> IngestResponseV2 { + ) -> IngestV2Result { let commit_type = ingest_request.commit_type(); let mut workbench = IngestWorkbench::new(ingest_request.subrequests, max_num_attempts); while !workbench.is_complete() { workbench.new_attempt(); self.batch_persist(&mut workbench, commit_type).await; } - workbench.into_ingest_response() + workbench.into_ingest_result() } async fn ingest_timeout( @@ -430,7 +430,7 @@ impl IngestRouter { INGEST_REQUEST_TIMEOUT.as_secs() ); IngestV2Error::Timeout(message) - }) + })? } } @@ -1217,7 +1217,7 @@ mod tests { subrequest_ids: vec![0], }; let persist_result = - Err::<_, IngestV2Error>(IngestV2Error::Timeout("timeout error".to_string())); + Err::<_, IngestV2Error>(IngestV2Error::Internal("internal error".to_string())); (persist_summary, persist_result) }); router @@ -1227,7 +1227,7 @@ mod tests { let subworkbench = workbench.subworkbenches.get(&0).unwrap(); assert!(matches!( &subworkbench.last_failure_opt, - &Some(SubworkbenchFailure::Internal(ref msg)) if msg.contains("timed out") + Some(SubworkbenchFailure::Internal(msg)) if msg.contains("internal") )); assert!(!workbench diff --git a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs index 29dadcb7222..18464042f43 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs @@ -26,7 +26,7 @@ use quickwit_proto::ingest::ingester::{PersistFailure, PersistFailureReason, Per use quickwit_proto::ingest::router::{ IngestFailure, IngestFailureReason, IngestResponseV2, IngestSubrequest, IngestSuccess, }; -use quickwit_proto::ingest::IngestV2Error; +use quickwit_proto::ingest::{IngestV2Error, IngestV2Result}; use quickwit_proto::types::{NodeId, SubrequestId}; use tracing::warn; @@ -144,10 +144,15 @@ impl IngestWorkbench { persist_error: IngestV2Error, persist_summary: PersistRequestSummary, ) { - // Persist responses use dedicated failure reasons for `IngesterUnavailable`, - // `NotFound`, and `TooManyRequests`: in reality, we should never have to handle these cases - // here. + // Persist responses use dedicated failure reasons for `ShardNotFound` and + // `TooManyRequests`: in reality, we should never have to handle these cases here. match persist_error { + IngestV2Error::Timeout(_) => { + for subrequest_id in persist_summary.subrequest_ids { + let failure = SubworkbenchFailure::Persist(PersistFailureReason::Timeout); + self.record_failure(subrequest_id, failure); + } + } IngestV2Error::Unavailable(_) => { self.unavailable_leaders.insert(persist_summary.leader_id); @@ -157,7 +162,6 @@ impl IngestWorkbench { } IngestV2Error::Internal(_) | IngestV2Error::ShardNotFound { .. } - | IngestV2Error::Timeout(_) | IngestV2Error::TooManyRequests => { for subrequest_id in persist_summary.subrequest_ids { self.record_internal_error(subrequest_id, persist_error.to_string()); @@ -195,7 +199,7 @@ impl IngestWorkbench { self.record_failure(subrequest_id, SubworkbenchFailure::Internal(error_message)); } - pub fn into_ingest_response(self) -> IngestResponseV2 { + pub fn into_ingest_result(self) -> IngestV2Result { let num_subworkbenches = self.subworkbenches.len(); let mut successes = Vec::with_capacity(self.num_successes); let mut failures = Vec::with_capacity(num_subworkbenches - self.num_successes); @@ -220,12 +224,28 @@ impl IngestWorkbench { failures.push(failure); } } - assert_eq!(successes.len() + failures.len(), num_subworkbenches); - - IngestResponseV2 { + let num_successes = successes.len(); + let num_failures = failures.len(); + assert_eq!(num_successes + num_failures, num_subworkbenches); + + if self.num_successes == 0 + && num_failures > 0 + && failures.iter().all(|failure| { + matches!( + failure.reason(), + IngestFailureReason::RateLimited + | IngestFailureReason::ResourceExhausted + | IngestFailureReason::Timeout + ) + }) + { + return Err(IngestV2Error::TooManyRequests); + } + let response = IngestResponseV2 { successes, failures, - } + }; + Ok(response) } #[cfg(test)] @@ -337,7 +357,7 @@ mod tests { assert!(subworkbench.last_failure_is_transient()); subworkbench.last_failure_opt = - Some(SubworkbenchFailure::Internal("timed out".to_string())); + Some(SubworkbenchFailure::Internal("internal error".to_string())); assert!(subworkbench.is_pending()); assert!(subworkbench.last_failure_is_transient()); @@ -510,6 +530,32 @@ mod tests { assert_eq!(subworkbench.num_attempts, 1); } + #[test] + fn test_ingest_workbench_record_persist_error_timeout() { + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 1); + + let persist_error = IngestV2Error::Timeout("request timed out".to_string()); + let leader_id = NodeId::from("test-leader"); + let persist_summary = PersistRequestSummary { + leader_id: leader_id.clone(), + subrequest_ids: vec![0], + }; + workbench.record_persist_error(persist_error, persist_summary); + + let subworkbench = workbench.subworkbenches.get(&0).unwrap(); + assert_eq!(subworkbench.num_attempts, 1); + + assert!(matches!( + subworkbench.last_failure_opt, + Some(SubworkbenchFailure::Persist(PersistFailureReason::Timeout)) + )); + assert!(subworkbench.persist_success_opt.is_none()); + } + #[test] fn test_ingest_workbench_record_persist_error_unavailable() { let ingest_subrequests = vec![IngestSubrequest { @@ -616,4 +662,53 @@ mod tests { )); assert_eq!(subworkbench.num_attempts, 1); } + + #[test] + fn test_ingest_workbench_into_ingest_result() { + let workbench = IngestWorkbench::new(Vec::new(), 0); + let response = workbench.into_ingest_result().unwrap(); + assert!(response.successes.is_empty()); + assert!(response.failures.is_empty()); + + let ingest_subrequests = vec![ + IngestSubrequest { + subrequest_id: 0, + ..Default::default() + }, + IngestSubrequest { + subrequest_id: 1, + ..Default::default() + }, + ]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 1); + let persist_success = PersistSuccess { + ..Default::default() + }; + let subworkbench = workbench.subworkbenches.get_mut(&0).unwrap(); + subworkbench.persist_success_opt = Some(persist_success); + + workbench.record_no_shards_available(1); + + let response = workbench.into_ingest_result().unwrap(); + assert_eq!(response.successes.len(), 1); + assert_eq!(response.successes[0].subrequest_id, 0); + + assert_eq!(response.failures.len(), 1); + assert_eq!(response.failures[0].subrequest_id, 1); + assert_eq!( + response.failures[0].reason(), + IngestFailureReason::NoShardsAvailable + ); + + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 1); + let failure = SubworkbenchFailure::Persist(PersistFailureReason::Timeout); + workbench.record_failure(0, failure); + + let error = workbench.into_ingest_result().unwrap_err(); + assert_eq!(error, IngestV2Error::TooManyRequests); + } } diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index 21a407e3f5a..1ae46efa41d 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -102,6 +102,7 @@ enum PersistFailureReason { PERSIST_FAILURE_REASON_SHARD_CLOSED = 2; PERSIST_FAILURE_REASON_RATE_LIMITED = 3; PERSIST_FAILURE_REASON_RESOURCE_EXHAUSTED = 4; + PERSIST_FAILURE_REASON_TIMEOUT = 5; } message PersistFailure { diff --git a/quickwit/quickwit-proto/protos/quickwit/router.proto b/quickwit/quickwit-proto/protos/quickwit/router.proto index 98523a7e31e..2aed3550026 100644 --- a/quickwit/quickwit-proto/protos/quickwit/router.proto +++ b/quickwit/quickwit-proto/protos/quickwit/router.proto @@ -67,6 +67,7 @@ enum IngestFailureReason { INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE = 4; INGEST_FAILURE_REASON_RATE_LIMITED = 5; INGEST_FAILURE_REASON_RESOURCE_EXHAUSTED = 6; + INGEST_FAILURE_REASON_TIMEOUT = 7; } message IngestFailure { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 84f31e56367..097a1c58e71 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -392,6 +392,7 @@ pub enum PersistFailureReason { ShardClosed = 2, RateLimited = 3, ResourceExhausted = 4, + Timeout = 5, } impl PersistFailureReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -409,6 +410,7 @@ impl PersistFailureReason { PersistFailureReason::ResourceExhausted => { "PERSIST_FAILURE_REASON_RESOURCE_EXHAUSTED" } + PersistFailureReason::Timeout => "PERSIST_FAILURE_REASON_TIMEOUT", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -419,6 +421,7 @@ impl PersistFailureReason { "PERSIST_FAILURE_REASON_SHARD_CLOSED" => Some(Self::ShardClosed), "PERSIST_FAILURE_REASON_RATE_LIMITED" => Some(Self::RateLimited), "PERSIST_FAILURE_REASON_RESOURCE_EXHAUSTED" => Some(Self::ResourceExhausted), + "PERSIST_FAILURE_REASON_TIMEOUT" => Some(Self::Timeout), _ => None, } } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs index 6e9da46aa54..fe0607d49c1 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs @@ -73,6 +73,7 @@ pub enum IngestFailureReason { NoShardsAvailable = 4, RateLimited = 5, ResourceExhausted = 6, + Timeout = 7, } impl IngestFailureReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -94,6 +95,7 @@ impl IngestFailureReason { IngestFailureReason::ResourceExhausted => { "INGEST_FAILURE_REASON_RESOURCE_EXHAUSTED" } + IngestFailureReason::Timeout => "INGEST_FAILURE_REASON_TIMEOUT", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -106,6 +108,7 @@ impl IngestFailureReason { "INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE" => Some(Self::NoShardsAvailable), "INGEST_FAILURE_REASON_RATE_LIMITED" => Some(Self::RateLimited), "INGEST_FAILURE_REASON_RESOURCE_EXHAUSTED" => Some(Self::ResourceExhausted), + "INGEST_FAILURE_REASON_TIMEOUT" => Some(Self::Timeout), _ => None, } } diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index 5c9829467b7..4bbed7b1561 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -277,6 +277,7 @@ impl From for IngestFailureReason { PersistFailureReason::ShardClosed => IngestFailureReason::NoShardsAvailable, PersistFailureReason::ResourceExhausted => IngestFailureReason::ResourceExhausted, PersistFailureReason::RateLimited => IngestFailureReason::RateLimited, + PersistFailureReason::Timeout => IngestFailureReason::Timeout, } } } diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs index dab54901702..b55dcd47401 100644 --- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs @@ -174,7 +174,7 @@ fn convert_ingest_response_v2( let ingest_failure = response.failures.pop().unwrap(); Err(match ingest_failure.reason() { IngestFailureReason::Unspecified => { - IngestServiceError::Internal("Unknown reason".to_string()) + IngestServiceError::Internal("unknown error".to_string()) } IngestFailureReason::IndexNotFound => IngestServiceError::IndexNotFound { index_id: ingest_failure.index_id, @@ -183,10 +183,13 @@ fn convert_ingest_response_v2( "Ingest v2 source not found for index {}", ingest_failure.index_id )), - IngestFailureReason::Internal => IngestServiceError::Internal("Internal error".to_string()), + IngestFailureReason::Internal => IngestServiceError::Internal("internal error".to_string()), IngestFailureReason::NoShardsAvailable => IngestServiceError::Unavailable, IngestFailureReason::RateLimited => IngestServiceError::RateLimited, IngestFailureReason::ResourceExhausted => IngestServiceError::RateLimited, + IngestFailureReason::Timeout => { + IngestServiceError::Internal("request timed out".to_string()) + } }) }