From 6e2e630502c7a31ae8c4a25a471e8033b3580425 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 27 Jul 2024 09:55:32 +0900 Subject: [PATCH] Keeping details of ingest error longer. (#5258) This PR: - logs an error on router timeout (as there is already an error on the ingester side) - added metrics on the persist subrequests as measured at the end of the router work. - stops returning a rate limiting error when all of the subrequests were rate limited. - advertising: rate limiting as a rate limiting error. - stopped lifting all subrequests as rate limiting as a rate limited request. We now just return a 200, but the individual subrequests return a rate limiting error. --- quickwit/quickwit-ingest/src/error.rs | 16 ++- .../quickwit-ingest/src/ingest_api_service.rs | 5 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 43 +++--- .../quickwit-ingest/src/ingest_v2/metrics.rs | 56 +++++++- .../src/ingest_v2/replication.rs | 4 +- .../quickwit-ingest/src/ingest_v2/router.rs | 133 ++++++++++++++++-- .../src/ingest_v2/workbench.rs | 67 +++++---- quickwit/quickwit-ingest/src/lib.rs | 12 +- .../protos/quickwit/ingester.proto | 7 +- .../protos/quickwit/router.proto | 7 +- .../quickwit/quickwit.ingest.ingester.rs | 24 ++-- .../quickwit/quickwit.ingest.router.rs | 29 +++- quickwit/quickwit-proto/src/ingest/mod.rs | 30 +++- .../src/elasticsearch_api/bulk_v2.rs | 5 + .../src/elasticsearch_api/mod.rs | 10 +- .../src/elasticsearch_api/model/error.rs | 4 + .../src/ingest_api/rest_handler.rs | 19 ++- quickwit/quickwit-serve/src/lib.rs | 6 +- quickwit/quickwit-serve/src/metrics.rs | 8 +- 19 files changed, 366 insertions(+), 119 deletions(-) diff --git a/quickwit/quickwit-ingest/src/error.rs b/quickwit/quickwit-ingest/src/error.rs index 189a94d8186..cf926223c4b 100644 --- a/quickwit/quickwit-ingest/src/error.rs +++ b/quickwit/quickwit-ingest/src/error.rs @@ -24,7 +24,7 @@ use quickwit_actors::AskError; use quickwit_common::rate_limited_error; use quickwit_common::tower::BufferError; pub(crate) use quickwit_proto::error::{grpc_error_to_grpc_status, grpc_status_to_service_error}; -use quickwit_proto::ingest::IngestV2Error; +use quickwit_proto::ingest::{IngestV2Error, RateLimitingCause}; use quickwit_proto::types::IndexId; use quickwit_proto::{tonic, GrpcServiceError, ServiceError, ServiceErrorCode}; use serde::{Deserialize, Serialize}; @@ -43,8 +43,8 @@ pub enum IngestServiceError { InvalidPosition(String), #[error("io error {0}")] IoError(String), - #[error("rate limited")] - RateLimited, + #[error("rate limited {0}")] + RateLimited(RateLimitingCause), #[error("ingest service is unavailable ({0})")] Unavailable(String), } @@ -89,7 +89,9 @@ impl From for IngestServiceError { IngestV2Error::ShardNotFound { .. } => { IngestServiceError::Internal("shard not found".to_string()) } - IngestV2Error::TooManyRequests => IngestServiceError::RateLimited, + IngestV2Error::TooManyRequests(rate_limiting_cause) => { + IngestServiceError::RateLimited(rate_limiting_cause) + } } } } @@ -115,7 +117,7 @@ impl ServiceError for IngestServiceError { rate_limited_error!(limit_per_min = 6, "ingest/io internal error: {io_err}"); ServiceErrorCode::Internal } - Self::RateLimited => ServiceErrorCode::TooManyRequests, + Self::RateLimited(_) => ServiceErrorCode::TooManyRequests, Self::Unavailable(_) => ServiceErrorCode::Unavailable, } } @@ -131,7 +133,7 @@ impl GrpcServiceError for IngestServiceError { } fn new_too_many_requests() -> Self { - Self::RateLimited + Self::RateLimited(RateLimitingCause::Unknown) } fn new_unavailable(error_msg: String) -> Self { @@ -158,7 +160,7 @@ impl From for tonic::Status { IngestServiceError::Internal(_) => tonic::Code::Internal, IngestServiceError::InvalidPosition(_) => tonic::Code::InvalidArgument, IngestServiceError::IoError { .. } => tonic::Code::Internal, - IngestServiceError::RateLimited => tonic::Code::ResourceExhausted, + IngestServiceError::RateLimited(_) => tonic::Code::ResourceExhausted, IngestServiceError::Unavailable(_) => tonic::Code::Unavailable, }; let message = error.to_string(); diff --git a/quickwit/quickwit-ingest/src/ingest_api_service.rs b/quickwit/quickwit-ingest/src/ingest_api_service.rs index be2f8476bb3..28b094d5fc7 100644 --- a/quickwit/quickwit-ingest/src/ingest_api_service.rs +++ b/quickwit/quickwit-ingest/src/ingest_api_service.rs @@ -27,6 +27,7 @@ use quickwit_actors::{ }; use quickwit_common::runtimes::RuntimeType; use quickwit_common::tower::Cost; +use quickwit_proto::ingest::RateLimitingCause; use tracing::{error, info}; use ulid::Ulid; @@ -166,7 +167,7 @@ impl IngestApiService { if disk_used > self.disk_limit { info!("ingestion rejected due to disk limit"); - return Err(IngestServiceError::RateLimited); + return Err(IngestServiceError::RateLimited(RateLimitingCause::WalFull)); } if self @@ -175,7 +176,7 @@ impl IngestApiService { .is_err() { info!("ingest request rejected due to memory limit"); - return Err(IngestServiceError::RateLimited); + return Err(IngestServiceError::RateLimited(RateLimitingCause::WalFull)); } let mut num_docs = 0usize; let mut notifications = Vec::new(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 0abf8f586fb..d59b5bdafe6 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -37,7 +37,7 @@ use quickwit_common::pretty::PrettyDisplay; use quickwit_common::pubsub::{EventBroker, EventSubscriber}; use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings}; use quickwit_common::tower::Pool; -use quickwit_common::{rate_limited_warn, ServiceStream}; +use quickwit_common::{rate_limited_error, rate_limited_warn, ServiceStream}; use quickwit_proto::control_plane::{ AdviseResetShardsRequest, ControlPlaneService, ControlPlaneServiceClient, }; @@ -544,7 +544,7 @@ impl Ingester { index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - reason: PersistFailureReason::ResourceExhausted as i32, + reason: PersistFailureReason::WalFull as i32, }; persist_failures.push(persist_failure); continue; @@ -562,7 +562,7 @@ impl Ingester { index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - reason: PersistFailureReason::RateLimited as i32, + reason: PersistFailureReason::ShardRateLimited as i32, }; persist_failures.push(persist_failure); continue; @@ -695,9 +695,7 @@ impl Ingester { PersistFailureReason::ShardNotFound } ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed, - ReplicateFailureReason::ResourceExhausted => { - PersistFailureReason::ResourceExhausted - } + ReplicateFailureReason::WalFull => PersistFailureReason::WalFull, }; let persist_failure = PersistFailure { subrequest_id: replicate_failure.subrequest_id, @@ -889,13 +887,12 @@ impl Ingester { let mut state_guard = self.state.lock_partially().await?; - let shard = - state_guard - .shards - .get_mut(&queue_id) - .ok_or_else(|| IngestV2Error::ShardNotFound { - shard_id: open_fetch_stream_request.shard_id().clone(), - })?; + let shard = state_guard.shards.get_mut(&queue_id).ok_or_else(|| { + rate_limited_error!(limit_per_min=6, queue_id=%queue_id, "shard not found"); + IngestV2Error::ShardNotFound { + shard_id: open_fetch_stream_request.shard_id().clone(), + } + })?; // An indexer can only know about a newly opened shard if it has been scheduled by the // control plane, which confirms that the shard was correctly opened in the // metastore. @@ -2039,10 +2036,7 @@ mod tests { assert_eq!(persist_response.failures.len(), 1); let persist_failure = &persist_response.failures[0]; - assert_eq!( - persist_failure.reason(), - PersistFailureReason::ResourceExhausted - ); + assert_eq!(persist_failure.reason(), PersistFailureReason::WalFull); } #[tokio::test] @@ -2102,7 +2096,10 @@ mod tests { assert_eq!(persist_response.failures.len(), 1); let persist_failure = &persist_response.failures[0]; - assert_eq!(persist_failure.reason(), PersistFailureReason::RateLimited); + assert_eq!( + persist_failure.reason(), + PersistFailureReason::ShardRateLimited + ); } // This test should be run manually and independently of other tests with the `failpoints` @@ -2725,7 +2722,10 @@ mod tests { assert_eq!(persist_failure.index_uid(), &index_uid); assert_eq!(persist_failure.source_id, "test-source"); assert_eq!(persist_failure.shard_id(), ShardId::from(1)); - assert_eq!(persist_failure.reason(), PersistFailureReason::RateLimited); + assert_eq!( + persist_failure.reason(), + PersistFailureReason::ShardRateLimited + ); let state_guard = ingester.state.lock_fully().await.unwrap(); assert_eq!(state_guard.shards.len(), 1); @@ -2802,10 +2802,7 @@ mod tests { assert_eq!(persist_failure.index_uid(), &index_uid); assert_eq!(persist_failure.source_id, "test-source"); assert_eq!(persist_failure.shard_id(), ShardId::from(1)); - assert_eq!( - persist_failure.reason(), - PersistFailureReason::ResourceExhausted - ); + assert_eq!(persist_failure.reason(), PersistFailureReason::WalFull); let state_guard = ingester.state.lock_fully().await.unwrap(); assert_eq!(state_guard.shards.len(), 1); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index f0c0ce649dc..8fc6a75b9f4 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -21,9 +21,61 @@ use mrecordlog::ResourceUsage; use once_cell::sync::Lazy; use quickwit_common::metrics::{ exponential_buckets, linear_buckets, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, - new_histogram_vec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, + new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, }; +// Counter vec counting the different outcomes of ingest requests as +// measure at the end of the router work. +// +// The counter are counting persist subrequests. +pub(crate) struct IngestResultMetrics { + pub success: IntCounter, + pub circuit_breaker: IntCounter, + pub unspecified: IntCounter, + pub index_not_found: IntCounter, + pub source_not_found: IntCounter, + pub internal: IntCounter, + pub no_shards_available: IntCounter, + pub shard_rate_limited: IntCounter, + pub wal_full: IntCounter, + pub timeout: IntCounter, + pub router_timeout: IntCounter, + pub router_load_shedding: IntCounter, + pub load_shedding: IntCounter, + pub shard_not_found: IntCounter, + pub unavailable: IntCounter, +} + +impl Default for IngestResultMetrics { + fn default() -> Self { + let ingest_result_total_vec = new_counter_vec::<1>( + "ingest_result_total", + "Number of ingest requests by result", + "ingest", + &[], + ["result"], + ); + Self { + success: ingest_result_total_vec.with_label_values(["success"]), + circuit_breaker: ingest_result_total_vec.with_label_values(["circuit_breaker"]), + unspecified: ingest_result_total_vec.with_label_values(["unspecified"]), + index_not_found: ingest_result_total_vec.with_label_values(["index_not_found"]), + source_not_found: ingest_result_total_vec.with_label_values(["source_not_found"]), + internal: ingest_result_total_vec.with_label_values(["internal"]), + no_shards_available: ingest_result_total_vec.with_label_values(["no_shards_available"]), + shard_rate_limited: ingest_result_total_vec.with_label_values(["shard_rate_limited"]), + wal_full: ingest_result_total_vec.with_label_values(["wal_full"]), + timeout: ingest_result_total_vec.with_label_values(["timeout"]), + router_timeout: ingest_result_total_vec.with_label_values(["router_timeout"]), + router_load_shedding: ingest_result_total_vec + .with_label_values(["router_load_shedding"]), + load_shedding: ingest_result_total_vec.with_label_values(["load_shedding"]), + unavailable: ingest_result_total_vec.with_label_values(["unavailable"]), + shard_not_found: ingest_result_total_vec.with_label_values(["shard_not_found"]), + } + } +} + pub(super) struct IngestV2Metrics { pub reset_shards_operations_total: IntCounterVec<1>, pub open_shards: IntGauge, @@ -34,11 +86,13 @@ pub(super) struct IngestV2Metrics { pub wal_acquire_lock_request_duration_secs: HistogramVec<2>, pub wal_disk_used_bytes: IntGauge, pub wal_memory_used_bytes: IntGauge, + pub ingest_results: IngestResultMetrics, } impl Default for IngestV2Metrics { fn default() -> Self { Self { + ingest_results: IngestResultMetrics::default(), reset_shards_operations_total: new_counter_vec( "reset_shards_operations_total", "Total number of reset shards operations performed.", diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 8bbe5f732f3..43d30d73740 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -624,7 +624,7 @@ impl ReplicationTask { index_uid: subrequest.index_uid, source_id: subrequest.source_id, shard_id: subrequest.shard_id, - reason: ReplicateFailureReason::ResourceExhausted as i32, + reason: ReplicateFailureReason::WalFull as i32, }; replicate_failures.push(replicate_failure); continue; @@ -1626,7 +1626,7 @@ mod tests { assert_eq!(replicate_failure_0.shard_id(), ShardId::from(1)); assert_eq!( replicate_failure_0.reason(), - ReplicateFailureReason::ResourceExhausted + ReplicateFailureReason::WalFull ); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 4249e04be67..7c786aa7c82 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -36,18 +36,24 @@ use quickwit_proto::indexing::ShardPositionsUpdate; use quickwit_proto::ingest::ingester::{ IngesterService, PersistFailureReason, PersistRequest, PersistResponse, PersistSubrequest, }; -use quickwit_proto::ingest::router::{IngestRequestV2, IngestResponseV2, IngestRouterService}; -use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState}; +use quickwit_proto::ingest::router::{ + IngestFailureReason, IngestRequestV2, IngestResponseV2, IngestRouterService, +}; +use quickwit_proto::ingest::{ + CommitTypeV2, IngestV2Error, IngestV2Result, RateLimitingCause, ShardState, +}; use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SubrequestId}; use serde_json::{json, Value as JsonValue}; use tokio::sync::{Mutex, Semaphore}; -use tracing::info; +use tokio::time::error::Elapsed; +use tracing::{error, info}; use super::broadcast::LocalShardsUpdate; use super::debouncing::{ DebouncedGetOrCreateOpenShardsRequest, GetOrCreateOpenShardsRequestDebouncer, }; use super::ingester::PERSIST_REQUEST_TIMEOUT; +use super::metrics::IngestResultMetrics; use super::routing_table::RoutingTable; use super::workbench::IngestWorkbench; use super::IngesterPool; @@ -66,7 +72,19 @@ fn ingest_request_timeout() -> Duration { "QW_INGEST_REQUEST_TIMEOUT_MS", DEFAULT_INGEST_REQUEST_TIMEOUT.as_millis() as u64, ); - Duration::from_millis(duration_ms) + let minimum_ingest_request_timeout: Duration = + PERSIST_REQUEST_TIMEOUT * (MAX_PERSIST_ATTEMPTS as u32) + Duration::from_secs(5); + let requested_ingest_request_timeout = Duration::from_millis(duration_ms); + if requested_ingest_request_timeout < minimum_ingest_request_timeout { + error!( + "ingest request timeout too short {}ms, setting to {}ms", + requested_ingest_request_timeout.as_millis(), + minimum_ingest_request_timeout.as_millis() + ); + minimum_ingest_request_timeout + } else { + requested_ingest_request_timeout + } }) } @@ -442,11 +460,16 @@ impl IngestRouter { self.retry_batch_persist(ingest_request, MAX_PERSIST_ATTEMPTS), ) .await - .map_err(|_| { + .map_err(|_elapsed: Elapsed| { let message = format!( "ingest request timed out after {} millis", timeout_duration.as_millis() ); + error!( + "ingest request should not timeout as there is a timeout on independent ingest \ + requests too. timeout after {}", + timeout_duration.as_millis() + ); IngestV2Error::Timeout(message) })? } @@ -461,6 +484,90 @@ impl IngestRouter { } } +fn update_ingest_metrics(ingest_result: &IngestV2Result, num_subrequests: usize) { + let num_subrequests = num_subrequests as u64; + let ingest_results_metrics: &IngestResultMetrics = + &crate::ingest_v2::metrics::INGEST_V2_METRICS.ingest_results; + match ingest_result { + Ok(ingest_response) => { + ingest_results_metrics + .success + .inc_by(ingest_response.successes.len() as u64); + for ingest_failure in &ingest_response.failures { + match ingest_failure.reason() { + IngestFailureReason::CircuitBreaker => { + ingest_results_metrics.circuit_breaker.inc(); + } + IngestFailureReason::Unspecified => ingest_results_metrics.unspecified.inc(), + IngestFailureReason::IndexNotFound => { + ingest_results_metrics.index_not_found.inc() + } + IngestFailureReason::SourceNotFound => { + ingest_results_metrics.source_not_found.inc() + } + IngestFailureReason::Internal => ingest_results_metrics.internal.inc(), + IngestFailureReason::NoShardsAvailable => { + ingest_results_metrics.no_shards_available.inc() + } + IngestFailureReason::ShardRateLimited => { + ingest_results_metrics.shard_rate_limited.inc() + } + IngestFailureReason::WalFull => ingest_results_metrics.wal_full.inc(), + IngestFailureReason::Timeout => ingest_results_metrics.timeout.inc(), + IngestFailureReason::RouterLoadShedding => { + ingest_results_metrics.router_load_shedding.inc() + } + IngestFailureReason::LoadShedding => ingest_results_metrics.load_shedding.inc(), + } + } + } + Err(ingest_error) => match ingest_error { + IngestV2Error::TooManyRequests(rate_limiting_cause) => match rate_limiting_cause { + RateLimitingCause::RouterLoadShedding => { + ingest_results_metrics + .router_load_shedding + .inc_by(num_subrequests); + } + RateLimitingCause::LoadShedding => { + ingest_results_metrics.load_shedding.inc_by(num_subrequests) + } + RateLimitingCause::WalFull => { + ingest_results_metrics.wal_full.inc_by(num_subrequests); + } + RateLimitingCause::CircuitBreaker => { + ingest_results_metrics + .circuit_breaker + .inc_by(num_subrequests); + } + RateLimitingCause::ShardRateLimiting => { + ingest_results_metrics + .shard_rate_limited + .inc_by(num_subrequests); + } + RateLimitingCause::Unknown => { + ingest_results_metrics.unspecified.inc_by(num_subrequests); + } + }, + IngestV2Error::Timeout(_) => { + ingest_results_metrics + .router_timeout + .inc_by(num_subrequests); + } + IngestV2Error::ShardNotFound { .. } => { + ingest_results_metrics + .shard_not_found + .inc_by(num_subrequests); + } + IngestV2Error::Unavailable(_) => { + ingest_results_metrics.unavailable.inc_by(num_subrequests); + } + IngestV2Error::Internal(_) => { + ingest_results_metrics.internal.inc_by(num_subrequests); + } + }, + } +} + #[async_trait] impl IngestRouterService for IngestRouter { async fn ingest(&self, ingest_request: IngestRequestV2) -> IngestV2Result { @@ -468,15 +575,21 @@ impl IngestRouterService for IngestRouter { let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingest_router); gauge_guard.add(request_size_bytes as i64); + let num_subrequests = ingest_request.subrequests.len(); let _permit = self .ingest_semaphore .clone() .try_acquire_many_owned(request_size_bytes as u32) - .map_err(|_| IngestV2Error::TooManyRequests)?; + .map_err(|_| IngestV2Error::TooManyRequests(RateLimitingCause::RouterLoadShedding))?; - self.ingest_timeout(ingest_request, ingest_request_timeout()) - .await + let ingest_res = self + .ingest_timeout(ingest_request, ingest_request_timeout()) + .await; + + update_ingest_metrics(&ingest_res, num_subrequests); + + ingest_res } } @@ -1098,7 +1211,7 @@ mod tests { index_uid: Some(index_uid.clone()), source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), - reason: PersistFailureReason::RateLimited as i32, + reason: PersistFailureReason::ShardRateLimited as i32, }], }); (persist_summary, persist_result) @@ -1572,7 +1685,7 @@ mod tests { index_uid: Some(index_uid_clone.clone()), source_id: "test-source".to_string(), shard_id: Some(ShardId::from(1)), - reason: PersistFailureReason::RateLimited as i32, + reason: PersistFailureReason::ShardRateLimited as i32, }], }; Ok(response) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs index 501dedf99dd..eeca716186a 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/workbench.rs @@ -26,7 +26,7 @@ use quickwit_proto::ingest::ingester::{PersistFailure, PersistFailureReason, Per use quickwit_proto::ingest::router::{ IngestFailure, IngestFailureReason, IngestResponseV2, IngestSubrequest, IngestSuccess, }; -use quickwit_proto::ingest::{IngestV2Error, IngestV2Result}; +use quickwit_proto::ingest::{IngestV2Error, IngestV2Result, RateLimitingCause}; use quickwit_proto::types::{NodeId, SubrequestId}; use tracing::warn; @@ -160,13 +160,16 @@ impl IngestWorkbench { self.record_ingester_unavailable(subrequest_id); } } - IngestV2Error::Internal(_) - | IngestV2Error::ShardNotFound { .. } - | IngestV2Error::TooManyRequests => { + IngestV2Error::Internal(_) | IngestV2Error::ShardNotFound { .. } => { for subrequest_id in persist_summary.subrequest_ids { self.record_internal_error(subrequest_id); } } + IngestV2Error::TooManyRequests(rate_limiting_cause) => { + for subrequest_id in persist_summary.subrequest_ids { + self.record_too_many_requests(subrequest_id, rate_limiting_cause); + } + } } } @@ -199,11 +202,23 @@ impl IngestWorkbench { self.record_failure(subrequest_id, SubworkbenchFailure::Internal); } + fn record_too_many_requests( + &mut self, + subrequest_id: SubrequestId, + rate_limiting_cause: RateLimitingCause, + ) { + self.record_failure( + subrequest_id, + SubworkbenchFailure::RateLimited(rate_limiting_cause), + ); + } + pub fn into_ingest_result(self) -> IngestV2Result { let num_subworkbenches = self.subworkbenches.len(); let mut successes = Vec::with_capacity(self.num_successes); let mut failures = Vec::with_capacity(num_subworkbenches - self.num_successes); + // We consider the last retry outcome as the actual outcome. for subworkbench in self.subworkbenches.into_values() { if let Some(persist_success) = subworkbench.persist_success_opt { let success = IngestSuccess { @@ -230,6 +245,7 @@ impl IngestWorkbench { let num_failures = failures.len(); assert_eq!(num_successes + num_failures, num_subworkbenches); + // For tests, we sort the successes and failures by subrequest_id #[cfg(test)] { for success in &mut successes { @@ -240,19 +256,7 @@ impl IngestWorkbench { successes.sort_by_key(|success| success.subrequest_id); failures.sort_by_key(|failure| failure.subrequest_id); } - if self.num_successes == 0 - && num_failures > 0 - && failures.iter().all(|failure| { - matches!( - failure.reason(), - IngestFailureReason::RateLimited - | IngestFailureReason::ResourceExhausted - | IngestFailureReason::Timeout - ) - }) - { - return Err(IngestV2Error::TooManyRequests); - } + let response = IngestResponseV2 { successes, failures, @@ -292,6 +296,8 @@ pub(super) enum SubworkbenchFailure { Internal, // The ingester is no longer in the pool or a transport error occurred. Unavailable, + // The ingester is rate limited. + RateLimited(RateLimitingCause), } impl SubworkbenchFailure { @@ -305,6 +311,14 @@ impl SubworkbenchFailure { // In our last attempt, we did not manage to reach the ingester. // We can consider that as a no shards available. Self::Unavailable => IngestFailureReason::NoShardsAvailable, + Self::RateLimited(rate_limiting_cause) => match rate_limiting_cause { + RateLimitingCause::RouterLoadShedding => IngestFailureReason::RouterLoadShedding, + RateLimitingCause::LoadShedding => IngestFailureReason::RouterLoadShedding, + RateLimitingCause::WalFull => IngestFailureReason::WalFull, + RateLimitingCause::CircuitBreaker => IngestFailureReason::CircuitBreaker, + RateLimitingCause::ShardRateLimiting => IngestFailureReason::ShardRateLimited, + RateLimitingCause::Unknown => IngestFailureReason::Unspecified, + }, Self::Persist(persist_failure_reason) => (*persist_failure_reason).into(), } } @@ -331,7 +345,7 @@ impl IngestSubworkbench { self.persist_success_opt.is_none() && self.last_failure_is_transient() } - /// Returns `false` if and only if the last attempt suggests retrying will fail. + /// Returns `false` if and only if the last attempt suggests retrying (on any node) will fail. /// e.g.: /// - the index does not exist /// - the source does not exist. @@ -343,6 +357,7 @@ impl IngestSubworkbench { Some(SubworkbenchFailure::NoShardsAvailable) => true, Some(SubworkbenchFailure::Persist(_)) => true, Some(SubworkbenchFailure::Unavailable) => true, + Some(SubworkbenchFailure::RateLimited(_)) => true, None => true, } } @@ -384,7 +399,7 @@ mod tests { assert!(!subworkbench.last_failure_is_transient()); subworkbench.last_failure_opt = Some(SubworkbenchFailure::Persist( - PersistFailureReason::RateLimited, + PersistFailureReason::ShardRateLimited, )); assert!(subworkbench.is_pending()); assert!(subworkbench.last_failure_is_transient()); @@ -630,7 +645,7 @@ mod tests { let persist_failure = PersistFailure { subrequest_id: 42, - reason: PersistFailureReason::RateLimited as i32, + reason: PersistFailureReason::ShardRateLimited as i32, ..Default::default() }; workbench.record_persist_failure(&persist_failure); @@ -638,7 +653,7 @@ mod tests { let persist_failure = PersistFailure { subrequest_id: 0, shard_id: Some(ShardId::from(1)), - reason: PersistFailureReason::ResourceExhausted as i32, + reason: PersistFailureReason::WalFull as i32, ..Default::default() }; workbench.record_persist_failure(&persist_failure); @@ -648,7 +663,7 @@ mod tests { let subworkbench = workbench.subworkbenches.get(&0).unwrap(); assert!(matches!( subworkbench.last_failure_opt, - Some(SubworkbenchFailure::Persist(reason)) if reason == PersistFailureReason::ResourceExhausted + Some(SubworkbenchFailure::Persist(reason)) if reason == PersistFailureReason::WalFull )); assert_eq!(subworkbench.num_attempts, 1); } @@ -719,7 +734,11 @@ mod tests { let failure = SubworkbenchFailure::Persist(PersistFailureReason::Timeout); workbench.record_failure(0, failure); - let error = workbench.into_ingest_result().unwrap_err(); - assert_eq!(error, IngestV2Error::TooManyRequests); + let ingest_response = workbench.into_ingest_result().unwrap(); + assert_eq!(ingest_response.successes.len(), 0); + assert_eq!( + ingest_response.failures[0].reason(), + IngestFailureReason::Timeout + ); } } diff --git a/quickwit/quickwit-ingest/src/lib.rs b/quickwit/quickwit-ingest/src/lib.rs index 099126fa6d6..12807f637b6 100644 --- a/quickwit/quickwit-ingest/src/lib.rs +++ b/quickwit/quickwit-ingest/src/lib.rs @@ -137,6 +137,13 @@ macro_rules! with_lock_metrics { let now = std::time::Instant::now(); let guard = $future; + let elapsed = now.elapsed(); + if elapsed > std::time::Duration::from_secs(1) { + quickwit_common::rate_limited_warn!( + limit_per_min=6, + "lock acquisition took {}ms", elapsed.as_millis() + ); + } $crate::ingest_v2::metrics::INGEST_V2_METRICS .wal_acquire_lock_requests_in_flight .with_label_values([$($label),*]) @@ -144,7 +151,7 @@ macro_rules! with_lock_metrics { $crate::ingest_v2::metrics::INGEST_V2_METRICS .wal_acquire_lock_request_duration_secs .with_label_values([$($label),*]) - .observe(now.elapsed().as_secs_f64()); + .observe(elapsed.as_secs_f64()); guard } @@ -155,6 +162,7 @@ macro_rules! with_lock_metrics { mod tests { use quickwit_actors::AskError; + use quickwit_proto::ingest::RateLimitingCause; use super::*; use crate::{CreateQueueRequest, IngestRequest, SuggestTruncateRequest}; @@ -287,7 +295,7 @@ mod tests { .ask_for_res(ingest_request.clone()) .await .unwrap_err(), - AskError::ErrorReply(IngestServiceError::RateLimited) + AskError::ErrorReply(IngestServiceError::RateLimited(RateLimitingCause::WalFull)) )); // delete the first batch diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index 45bc1d6c664..f0fbe9e804a 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -98,12 +98,13 @@ message PersistSuccess { repeated quickwit.ingest.ParseFailure parse_failures = 7; } + enum PersistFailureReason { PERSIST_FAILURE_REASON_UNSPECIFIED = 0; PERSIST_FAILURE_REASON_SHARD_NOT_FOUND = 1; PERSIST_FAILURE_REASON_SHARD_CLOSED = 2; - PERSIST_FAILURE_REASON_RATE_LIMITED = 3; - PERSIST_FAILURE_REASON_RESOURCE_EXHAUSTED = 4; + PERSIST_FAILURE_REASON_SHARD_RATE_LIMITED = 3; + PERSIST_FAILURE_REASON_WAL_FULL = 4; PERSIST_FAILURE_REASON_TIMEOUT = 5; } @@ -191,7 +192,7 @@ enum ReplicateFailureReason { REPLICATE_FAILURE_REASON_SHARD_NOT_FOUND = 1; REPLICATE_FAILURE_REASON_SHARD_CLOSED = 2; reserved 3; // REPLICATE_FAILURE_REASON_RATE_LIMITED = 3; - REPLICATE_FAILURE_REASON_RESOURCE_EXHAUSTED = 4; + REPLICATE_FAILURE_REASON_WAL_FULL = 4; } message ReplicateFailure { diff --git a/quickwit/quickwit-proto/protos/quickwit/router.proto b/quickwit/quickwit-proto/protos/quickwit/router.proto index 51677cfccb3..8db31d7bf15 100644 --- a/quickwit/quickwit-proto/protos/quickwit/router.proto +++ b/quickwit/quickwit-proto/protos/quickwit/router.proto @@ -67,9 +67,12 @@ enum IngestFailureReason { INGEST_FAILURE_REASON_SOURCE_NOT_FOUND = 2; INGEST_FAILURE_REASON_INTERNAL = 3; INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE = 4; - INGEST_FAILURE_REASON_RATE_LIMITED = 5; - INGEST_FAILURE_REASON_RESOURCE_EXHAUSTED = 6; + INGEST_FAILURE_REASON_SHARD_RATE_LIMITED = 5; + INGEST_FAILURE_REASON_WAL_FULL = 6; INGEST_FAILURE_REASON_TIMEOUT = 7; + INGEST_FAILURE_REASON_ROUTER_LOAD_SHEDDING = 8; + INGEST_FAILURE_REASON_LOAD_SHEDDING = 9; + INGEST_FAILURE_REASON_CIRCUIT_BREAKER = 10; } message IngestFailure { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index d23968d407e..d3daaec9bd1 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -436,8 +436,8 @@ pub enum PersistFailureReason { Unspecified = 0, ShardNotFound = 1, ShardClosed = 2, - RateLimited = 3, - ResourceExhausted = 4, + ShardRateLimited = 3, + WalFull = 4, Timeout = 5, } impl PersistFailureReason { @@ -452,10 +452,10 @@ impl PersistFailureReason { "PERSIST_FAILURE_REASON_SHARD_NOT_FOUND" } PersistFailureReason::ShardClosed => "PERSIST_FAILURE_REASON_SHARD_CLOSED", - PersistFailureReason::RateLimited => "PERSIST_FAILURE_REASON_RATE_LIMITED", - PersistFailureReason::ResourceExhausted => { - "PERSIST_FAILURE_REASON_RESOURCE_EXHAUSTED" + PersistFailureReason::ShardRateLimited => { + "PERSIST_FAILURE_REASON_SHARD_RATE_LIMITED" } + PersistFailureReason::WalFull => "PERSIST_FAILURE_REASON_WAL_FULL", PersistFailureReason::Timeout => "PERSIST_FAILURE_REASON_TIMEOUT", } } @@ -465,8 +465,8 @@ impl PersistFailureReason { "PERSIST_FAILURE_REASON_UNSPECIFIED" => Some(Self::Unspecified), "PERSIST_FAILURE_REASON_SHARD_NOT_FOUND" => Some(Self::ShardNotFound), "PERSIST_FAILURE_REASON_SHARD_CLOSED" => Some(Self::ShardClosed), - "PERSIST_FAILURE_REASON_RATE_LIMITED" => Some(Self::RateLimited), - "PERSIST_FAILURE_REASON_RESOURCE_EXHAUSTED" => Some(Self::ResourceExhausted), + "PERSIST_FAILURE_REASON_SHARD_RATE_LIMITED" => Some(Self::ShardRateLimited), + "PERSIST_FAILURE_REASON_WAL_FULL" => Some(Self::WalFull), "PERSIST_FAILURE_REASON_TIMEOUT" => Some(Self::Timeout), _ => None, } @@ -480,7 +480,7 @@ pub enum ReplicateFailureReason { Unspecified = 0, ShardNotFound = 1, ShardClosed = 2, - ResourceExhausted = 4, + WalFull = 4, } impl ReplicateFailureReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -496,9 +496,7 @@ impl ReplicateFailureReason { ReplicateFailureReason::ShardClosed => { "REPLICATE_FAILURE_REASON_SHARD_CLOSED" } - ReplicateFailureReason::ResourceExhausted => { - "REPLICATE_FAILURE_REASON_RESOURCE_EXHAUSTED" - } + ReplicateFailureReason::WalFull => "REPLICATE_FAILURE_REASON_WAL_FULL", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -507,9 +505,7 @@ impl ReplicateFailureReason { "REPLICATE_FAILURE_REASON_UNSPECIFIED" => Some(Self::Unspecified), "REPLICATE_FAILURE_REASON_SHARD_NOT_FOUND" => Some(Self::ShardNotFound), "REPLICATE_FAILURE_REASON_SHARD_CLOSED" => Some(Self::ShardClosed), - "REPLICATE_FAILURE_REASON_RESOURCE_EXHAUSTED" => { - Some(Self::ResourceExhausted) - } + "REPLICATE_FAILURE_REASON_WAL_FULL" => Some(Self::WalFull), _ => None, } } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs index dabfc978b63..1f43bd342ca 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs @@ -75,9 +75,12 @@ pub enum IngestFailureReason { SourceNotFound = 2, Internal = 3, NoShardsAvailable = 4, - RateLimited = 5, - ResourceExhausted = 6, + ShardRateLimited = 5, + WalFull = 6, Timeout = 7, + RouterLoadShedding = 8, + LoadShedding = 9, + CircuitBreaker = 10, } impl IngestFailureReason { /// String value of the enum field names used in the ProtoBuf definition. @@ -95,11 +98,18 @@ impl IngestFailureReason { IngestFailureReason::NoShardsAvailable => { "INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE" } - IngestFailureReason::RateLimited => "INGEST_FAILURE_REASON_RATE_LIMITED", - IngestFailureReason::ResourceExhausted => { - "INGEST_FAILURE_REASON_RESOURCE_EXHAUSTED" + IngestFailureReason::ShardRateLimited => { + "INGEST_FAILURE_REASON_SHARD_RATE_LIMITED" } + IngestFailureReason::WalFull => "INGEST_FAILURE_REASON_WAL_FULL", IngestFailureReason::Timeout => "INGEST_FAILURE_REASON_TIMEOUT", + IngestFailureReason::RouterLoadShedding => { + "INGEST_FAILURE_REASON_ROUTER_LOAD_SHEDDING" + } + IngestFailureReason::LoadShedding => "INGEST_FAILURE_REASON_LOAD_SHEDDING", + IngestFailureReason::CircuitBreaker => { + "INGEST_FAILURE_REASON_CIRCUIT_BREAKER" + } } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -110,9 +120,14 @@ impl IngestFailureReason { "INGEST_FAILURE_REASON_SOURCE_NOT_FOUND" => Some(Self::SourceNotFound), "INGEST_FAILURE_REASON_INTERNAL" => Some(Self::Internal), "INGEST_FAILURE_REASON_NO_SHARDS_AVAILABLE" => Some(Self::NoShardsAvailable), - "INGEST_FAILURE_REASON_RATE_LIMITED" => Some(Self::RateLimited), - "INGEST_FAILURE_REASON_RESOURCE_EXHAUSTED" => Some(Self::ResourceExhausted), + "INGEST_FAILURE_REASON_SHARD_RATE_LIMITED" => Some(Self::ShardRateLimited), + "INGEST_FAILURE_REASON_WAL_FULL" => Some(Self::WalFull), "INGEST_FAILURE_REASON_TIMEOUT" => Some(Self::Timeout), + "INGEST_FAILURE_REASON_ROUTER_LOAD_SHEDDING" => { + Some(Self::RouterLoadShedding) + } + "INGEST_FAILURE_REASON_LOAD_SHEDDING" => Some(Self::LoadShedding), + "INGEST_FAILURE_REASON_CIRCUIT_BREAKER" => Some(Self::CircuitBreaker), _ => None, } } diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index 773d2236fc6..d55c808fbcb 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -36,6 +36,22 @@ pub mod router; include!("../codegen/quickwit/quickwit.ingest.rs"); pub type IngestV2Result = std::result::Result; +#[derive(Debug, Copy, Clone, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)] +pub enum RateLimitingCause { + #[error("router load shedding")] + RouterLoadShedding, + #[error("load shadding")] + LoadShedding, + #[error("wal full (memory or disk)")] + WalFull, + #[error("circuit breaker")] + CircuitBreaker, + #[error("shard rate limiting")] + ShardRateLimiting, + #[error("unknown")] + Unknown, +} + #[derive(Debug, thiserror::Error, Eq, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum IngestV2Error { @@ -46,7 +62,7 @@ pub enum IngestV2Error { #[error("request timed out: {0}")] Timeout(String), #[error("too many requests")] - TooManyRequests, + TooManyRequests(RateLimitingCause), #[error("service unavailable: {0}")] Unavailable(String), } @@ -66,7 +82,7 @@ impl ServiceError for IngestV2Error { } Self::ShardNotFound { .. } => ServiceErrorCode::NotFound, Self::Timeout(_) => ServiceErrorCode::Timeout, - Self::TooManyRequests => ServiceErrorCode::TooManyRequests, + Self::TooManyRequests(_) => ServiceErrorCode::TooManyRequests, Self::Unavailable(_) => ServiceErrorCode::Unavailable, } } @@ -82,7 +98,7 @@ impl GrpcServiceError for IngestV2Error { } fn new_too_many_requests() -> Self { - Self::TooManyRequests + Self::TooManyRequests(RateLimitingCause::Unknown) } fn new_unavailable(message: String) -> Self { @@ -92,7 +108,7 @@ impl GrpcServiceError for IngestV2Error { impl MakeLoadShedError for IngestV2Error { fn make_load_shed_error() -> Self { - IngestV2Error::TooManyRequests + IngestV2Error::TooManyRequests(RateLimitingCause::LoadShedding) } } @@ -293,8 +309,8 @@ impl From for IngestFailureReason { PersistFailureReason::Unspecified => IngestFailureReason::Unspecified, PersistFailureReason::ShardNotFound => IngestFailureReason::NoShardsAvailable, PersistFailureReason::ShardClosed => IngestFailureReason::NoShardsAvailable, - PersistFailureReason::ResourceExhausted => IngestFailureReason::ResourceExhausted, - PersistFailureReason::RateLimited => IngestFailureReason::RateLimited, + PersistFailureReason::WalFull => IngestFailureReason::WalFull, + PersistFailureReason::ShardRateLimited => IngestFailureReason::ShardRateLimited, PersistFailureReason::Timeout => IngestFailureReason::Timeout, } } @@ -306,7 +322,7 @@ impl From for PersistFailureReason { ReplicateFailureReason::Unspecified => PersistFailureReason::Unspecified, ReplicateFailureReason::ShardNotFound => PersistFailureReason::ShardNotFound, ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed, - ReplicateFailureReason::ResourceExhausted => PersistFailureReason::ResourceExhausted, + ReplicateFailureReason::WalFull => PersistFailureReason::WalFull, } } } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs index 09b01acd1c6..e068483730a 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs @@ -268,6 +268,11 @@ fn make_elastic_bulk_response_v2( format!("timeout [{}]", failure.index_id), StatusCode::REQUEST_TIMEOUT, ), + IngestFailureReason::ShardRateLimited => ( + ElasticException::RateLimited, + format!("shard rate limiting [{}]", failure.index_id), + StatusCode::TOO_MANY_REQUESTS, + ), reason => { let pretty_reason = reason .as_str_name() diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs index 8a53e9d2975..a3b156dedee 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/mod.rs @@ -63,6 +63,11 @@ pub fn elastic_api_handlers( ) -> impl Filter + Clone { es_compat_cluster_info_handler(node_config, BuildInfo::get()) .or(es_compat_search_handler(search_service.clone())) + .or(es_compat_bulk_handler( + ingest_service.clone(), + ingest_router.clone(), + )) + .or(es_compat_index_bulk_handler(ingest_service, ingest_router)) .or(es_compat_index_search_handler(search_service.clone())) .or(es_compat_index_count_handler(search_service.clone())) .or(es_compat_scroll_handler(search_service.clone())) @@ -70,11 +75,6 @@ pub fn elastic_api_handlers( .or(es_compat_index_field_capabilities_handler( search_service.clone(), )) - .or(es_compat_bulk_handler( - ingest_service.clone(), - ingest_router.clone(), - )) - .or(es_compat_index_bulk_handler(ingest_service, ingest_router)) .or(es_compat_index_stats_handler(metastore.clone())) .or(es_compat_delete_index_handler(index_service)) .or(es_compat_stats_handler(metastore.clone())) diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs index 2c1964968e5..a8e09c871ba 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/error.rs @@ -154,6 +154,9 @@ pub enum ElasticException { #[serde(rename = "index_not_found_exception")] IndexNotFound, // This is an exception proper to Quickwit. + #[serde(rename = "rate_limited_exception")] + RateLimited, + // This is an exception proper to Quickwit. #[serde(rename = "source_not_found_exception")] SourceNotFound, #[serde(rename = "timeout_exception")] @@ -166,6 +169,7 @@ impl ElasticException { Self::ActionRequestValidation => "action_request_validation_exception", Self::DocumentParsing => "document_parsing_exception", Self::Internal => "internal_exception", + Self::RateLimited => "rate_limited_exception", Self::IllegalArgument => "illegal_argument_exception", Self::IndexNotFound => "index_not_found_exception", Self::SourceNotFound => "source_not_found_exception", diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs index 9acadd080a8..266bc2e6eb8 100644 --- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs @@ -27,6 +27,7 @@ use quickwit_proto::ingest::router::{ IngestFailureReason, IngestRequestV2, IngestResponseV2, IngestRouterService, IngestRouterServiceClient, IngestSubrequest, }; +use quickwit_proto::ingest::RateLimitingCause; use quickwit_proto::types::{DocUidGenerator, IndexId}; use serde::Deserialize; use warp::{Filter, Rejection}; @@ -166,7 +167,8 @@ fn convert_ingest_response_v2( }); } let ingest_failure = response.failures.pop().unwrap(); - Err(match ingest_failure.reason() { + let reason = ingest_failure.reason(); + Err(match reason { IngestFailureReason::Unspecified => { IngestServiceError::Internal("unknown error".to_string()) } @@ -181,11 +183,22 @@ fn convert_ingest_response_v2( IngestFailureReason::NoShardsAvailable => { IngestServiceError::Unavailable("no shards available".to_string()) } - IngestFailureReason::RateLimited => IngestServiceError::RateLimited, - IngestFailureReason::ResourceExhausted => IngestServiceError::RateLimited, + IngestFailureReason::ShardRateLimited => { + IngestServiceError::RateLimited(RateLimitingCause::ShardRateLimiting) + } + IngestFailureReason::WalFull => IngestServiceError::RateLimited(RateLimitingCause::WalFull), IngestFailureReason::Timeout => { IngestServiceError::Internal("request timed out".to_string()) } + IngestFailureReason::RouterLoadShedding => { + IngestServiceError::RateLimited(RateLimitingCause::RouterLoadShedding) + } + IngestFailureReason::LoadShedding => { + IngestServiceError::RateLimited(RateLimitingCause::LoadShedding) + } + IngestFailureReason::CircuitBreaker => { + IngestServiceError::RateLimited(RateLimitingCause::CircuitBreaker) + } }) } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 5fb5f10b26e..a00c012956e 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -103,7 +103,7 @@ use quickwit_proto::ingest::ingester::{ PersistFailureReason, PersistResponse, }; use quickwit_proto::ingest::router::IngestRouterServiceClient; -use quickwit_proto::ingest::IngestV2Error; +use quickwit_proto::ingest::{IngestV2Error, RateLimitingCause}; use quickwit_proto::metastore::{ EntityKind, ListIndexesMetadataRequest, MetastoreError, MetastoreService, MetastoreServiceClient, @@ -797,7 +797,7 @@ impl CircuitBreakerEvaluator for PersistCircuitBreakerEvaluator { }; for persist_failure in &persist_response.failures { // This is the error we return when the WAL is full. - if persist_failure.reason() == PersistFailureReason::ResourceExhausted { + if persist_failure.reason() == PersistFailureReason::WalFull { return true; } } @@ -805,7 +805,7 @@ impl CircuitBreakerEvaluator for PersistCircuitBreakerEvaluator { } fn make_circuit_breaker_output(&self) -> IngestV2Error { - IngestV2Error::TooManyRequests + IngestV2Error::TooManyRequests(RateLimitingCause::CircuitBreaker) } } diff --git a/quickwit/quickwit-serve/src/metrics.rs b/quickwit/quickwit-serve/src/metrics.rs index 7df17b09fb1..da34ef8e61e 100644 --- a/quickwit/quickwit-serve/src/metrics.rs +++ b/quickwit/quickwit-serve/src/metrics.rs @@ -56,15 +56,15 @@ impl Default for ServeMetrics { quickwit_common::metrics::exponential_buckets(0.02, 2.0, 8).unwrap(), ), ongoing_requests: new_gauge_vec( - "ingest_ongoing_requests", - "Number of ongoing ingest requests.", + "ongoing_requests", + "Number of ongoing requests.", "", &[], ["endpoint_group"], ), pending_requests: new_gauge_vec( - "ingest_pending_requests", - "Number of pending ingest requests.", + "pending_requests", + "Number of pending requests.", "", &[], ["endpoint_group"],