Skip to content

Commit

Permalink
Keeping details of ingest error longer. (#5258)
Browse files Browse the repository at this point in the history
This PR:
- logs an error on router timeout (as there is already an error on the
  ingester side)
- added metrics on the persist subrequests as measured at the end of the
  router work.
- stops returning a rate limiting error when all of the subrequests were
  rate limited.
- advertising: rate limiting as a rate limiting error.
- stopped lifting all subrequests as rate limiting as a rate limited
  request. We now just return a 200, but the individual subrequests
  return a rate limiting error.
  • Loading branch information
fulmicoton authored Jul 27, 2024
1 parent ded426c commit 6e2e630
Show file tree
Hide file tree
Showing 19 changed files with 366 additions and 119 deletions.
16 changes: 9 additions & 7 deletions quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use quickwit_actors::AskError;
use quickwit_common::rate_limited_error;
use quickwit_common::tower::BufferError;
pub(crate) use quickwit_proto::error::{grpc_error_to_grpc_status, grpc_status_to_service_error};
use quickwit_proto::ingest::IngestV2Error;
use quickwit_proto::ingest::{IngestV2Error, RateLimitingCause};
use quickwit_proto::types::IndexId;
use quickwit_proto::{tonic, GrpcServiceError, ServiceError, ServiceErrorCode};
use serde::{Deserialize, Serialize};
Expand All @@ -43,8 +43,8 @@ pub enum IngestServiceError {
InvalidPosition(String),
#[error("io error {0}")]
IoError(String),
#[error("rate limited")]
RateLimited,
#[error("rate limited {0}")]
RateLimited(RateLimitingCause),
#[error("ingest service is unavailable ({0})")]
Unavailable(String),
}
Expand Down Expand Up @@ -89,7 +89,9 @@ impl From<IngestV2Error> for IngestServiceError {
IngestV2Error::ShardNotFound { .. } => {
IngestServiceError::Internal("shard not found".to_string())
}
IngestV2Error::TooManyRequests => IngestServiceError::RateLimited,
IngestV2Error::TooManyRequests(rate_limiting_cause) => {
IngestServiceError::RateLimited(rate_limiting_cause)
}
}
}
}
Expand All @@ -115,7 +117,7 @@ impl ServiceError for IngestServiceError {
rate_limited_error!(limit_per_min = 6, "ingest/io internal error: {io_err}");
ServiceErrorCode::Internal
}
Self::RateLimited => ServiceErrorCode::TooManyRequests,
Self::RateLimited(_) => ServiceErrorCode::TooManyRequests,
Self::Unavailable(_) => ServiceErrorCode::Unavailable,
}
}
Expand All @@ -131,7 +133,7 @@ impl GrpcServiceError for IngestServiceError {
}

fn new_too_many_requests() -> Self {
Self::RateLimited
Self::RateLimited(RateLimitingCause::Unknown)
}

fn new_unavailable(error_msg: String) -> Self {
Expand All @@ -158,7 +160,7 @@ impl From<IngestServiceError> for tonic::Status {
IngestServiceError::Internal(_) => tonic::Code::Internal,
IngestServiceError::InvalidPosition(_) => tonic::Code::InvalidArgument,
IngestServiceError::IoError { .. } => tonic::Code::Internal,
IngestServiceError::RateLimited => tonic::Code::ResourceExhausted,
IngestServiceError::RateLimited(_) => tonic::Code::ResourceExhausted,
IngestServiceError::Unavailable(_) => tonic::Code::Unavailable,
};
let message = error.to_string();
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use quickwit_actors::{
};
use quickwit_common::runtimes::RuntimeType;
use quickwit_common::tower::Cost;
use quickwit_proto::ingest::RateLimitingCause;
use tracing::{error, info};
use ulid::Ulid;

Expand Down Expand Up @@ -166,7 +167,7 @@ impl IngestApiService {

if disk_used > self.disk_limit {
info!("ingestion rejected due to disk limit");
return Err(IngestServiceError::RateLimited);
return Err(IngestServiceError::RateLimited(RateLimitingCause::WalFull));
}

if self
Expand All @@ -175,7 +176,7 @@ impl IngestApiService {
.is_err()
{
info!("ingest request rejected due to memory limit");
return Err(IngestServiceError::RateLimited);
return Err(IngestServiceError::RateLimited(RateLimitingCause::WalFull));
}
let mut num_docs = 0usize;
let mut notifications = Vec::new();
Expand Down
43 changes: 20 additions & 23 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use quickwit_common::pretty::PrettyDisplay;
use quickwit_common::pubsub::{EventBroker, EventSubscriber};
use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings};
use quickwit_common::tower::Pool;
use quickwit_common::{rate_limited_warn, ServiceStream};
use quickwit_common::{rate_limited_error, rate_limited_warn, ServiceStream};
use quickwit_proto::control_plane::{
AdviseResetShardsRequest, ControlPlaneService, ControlPlaneServiceClient,
};
Expand Down Expand Up @@ -544,7 +544,7 @@ impl Ingester {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::ResourceExhausted as i32,
reason: PersistFailureReason::WalFull as i32,
};
persist_failures.push(persist_failure);
continue;
Expand All @@ -562,7 +562,7 @@ impl Ingester {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::RateLimited as i32,
reason: PersistFailureReason::ShardRateLimited as i32,
};
persist_failures.push(persist_failure);
continue;
Expand Down Expand Up @@ -695,9 +695,7 @@ impl Ingester {
PersistFailureReason::ShardNotFound
}
ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed,
ReplicateFailureReason::ResourceExhausted => {
PersistFailureReason::ResourceExhausted
}
ReplicateFailureReason::WalFull => PersistFailureReason::WalFull,
};
let persist_failure = PersistFailure {
subrequest_id: replicate_failure.subrequest_id,
Expand Down Expand Up @@ -889,13 +887,12 @@ impl Ingester {

let mut state_guard = self.state.lock_partially().await?;

let shard =
state_guard
.shards
.get_mut(&queue_id)
.ok_or_else(|| IngestV2Error::ShardNotFound {
shard_id: open_fetch_stream_request.shard_id().clone(),
})?;
let shard = state_guard.shards.get_mut(&queue_id).ok_or_else(|| {
rate_limited_error!(limit_per_min=6, queue_id=%queue_id, "shard not found");
IngestV2Error::ShardNotFound {
shard_id: open_fetch_stream_request.shard_id().clone(),
}
})?;
// An indexer can only know about a newly opened shard if it has been scheduled by the
// control plane, which confirms that the shard was correctly opened in the
// metastore.
Expand Down Expand Up @@ -2039,10 +2036,7 @@ mod tests {
assert_eq!(persist_response.failures.len(), 1);

let persist_failure = &persist_response.failures[0];
assert_eq!(
persist_failure.reason(),
PersistFailureReason::ResourceExhausted
);
assert_eq!(persist_failure.reason(), PersistFailureReason::WalFull);
}

#[tokio::test]
Expand Down Expand Up @@ -2102,7 +2096,10 @@ mod tests {
assert_eq!(persist_response.failures.len(), 1);

let persist_failure = &persist_response.failures[0];
assert_eq!(persist_failure.reason(), PersistFailureReason::RateLimited);
assert_eq!(
persist_failure.reason(),
PersistFailureReason::ShardRateLimited
);
}

// This test should be run manually and independently of other tests with the `failpoints`
Expand Down Expand Up @@ -2725,7 +2722,10 @@ mod tests {
assert_eq!(persist_failure.index_uid(), &index_uid);
assert_eq!(persist_failure.source_id, "test-source");
assert_eq!(persist_failure.shard_id(), ShardId::from(1));
assert_eq!(persist_failure.reason(), PersistFailureReason::RateLimited);
assert_eq!(
persist_failure.reason(),
PersistFailureReason::ShardRateLimited
);

let state_guard = ingester.state.lock_fully().await.unwrap();
assert_eq!(state_guard.shards.len(), 1);
Expand Down Expand Up @@ -2802,10 +2802,7 @@ mod tests {
assert_eq!(persist_failure.index_uid(), &index_uid);
assert_eq!(persist_failure.source_id, "test-source");
assert_eq!(persist_failure.shard_id(), ShardId::from(1));
assert_eq!(
persist_failure.reason(),
PersistFailureReason::ResourceExhausted
);
assert_eq!(persist_failure.reason(), PersistFailureReason::WalFull);

let state_guard = ingester.state.lock_fully().await.unwrap();
assert_eq!(state_guard.shards.len(), 1);
Expand Down
56 changes: 55 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,61 @@ use mrecordlog::ResourceUsage;
use once_cell::sync::Lazy;
use quickwit_common::metrics::{
exponential_buckets, linear_buckets, new_counter_vec, new_gauge, new_gauge_vec, new_histogram,
new_histogram_vec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec,
new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
};

// Counter vec counting the different outcomes of ingest requests as
// measure at the end of the router work.
//
// The counter are counting persist subrequests.
pub(crate) struct IngestResultMetrics {
pub success: IntCounter,
pub circuit_breaker: IntCounter,
pub unspecified: IntCounter,
pub index_not_found: IntCounter,
pub source_not_found: IntCounter,
pub internal: IntCounter,
pub no_shards_available: IntCounter,
pub shard_rate_limited: IntCounter,
pub wal_full: IntCounter,
pub timeout: IntCounter,
pub router_timeout: IntCounter,
pub router_load_shedding: IntCounter,
pub load_shedding: IntCounter,
pub shard_not_found: IntCounter,
pub unavailable: IntCounter,
}

impl Default for IngestResultMetrics {
fn default() -> Self {
let ingest_result_total_vec = new_counter_vec::<1>(
"ingest_result_total",
"Number of ingest requests by result",
"ingest",
&[],
["result"],
);
Self {
success: ingest_result_total_vec.with_label_values(["success"]),
circuit_breaker: ingest_result_total_vec.with_label_values(["circuit_breaker"]),
unspecified: ingest_result_total_vec.with_label_values(["unspecified"]),
index_not_found: ingest_result_total_vec.with_label_values(["index_not_found"]),
source_not_found: ingest_result_total_vec.with_label_values(["source_not_found"]),
internal: ingest_result_total_vec.with_label_values(["internal"]),
no_shards_available: ingest_result_total_vec.with_label_values(["no_shards_available"]),
shard_rate_limited: ingest_result_total_vec.with_label_values(["shard_rate_limited"]),
wal_full: ingest_result_total_vec.with_label_values(["wal_full"]),
timeout: ingest_result_total_vec.with_label_values(["timeout"]),
router_timeout: ingest_result_total_vec.with_label_values(["router_timeout"]),
router_load_shedding: ingest_result_total_vec
.with_label_values(["router_load_shedding"]),
load_shedding: ingest_result_total_vec.with_label_values(["load_shedding"]),
unavailable: ingest_result_total_vec.with_label_values(["unavailable"]),
shard_not_found: ingest_result_total_vec.with_label_values(["shard_not_found"]),
}
}
}

pub(super) struct IngestV2Metrics {
pub reset_shards_operations_total: IntCounterVec<1>,
pub open_shards: IntGauge,
Expand All @@ -34,11 +86,13 @@ pub(super) struct IngestV2Metrics {
pub wal_acquire_lock_request_duration_secs: HistogramVec<2>,
pub wal_disk_used_bytes: IntGauge,
pub wal_memory_used_bytes: IntGauge,
pub ingest_results: IngestResultMetrics,
}

impl Default for IngestV2Metrics {
fn default() -> Self {
Self {
ingest_results: IngestResultMetrics::default(),
reset_shards_operations_total: new_counter_vec(
"reset_shards_operations_total",
"Total number of reset shards operations performed.",
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ impl ReplicationTask {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: ReplicateFailureReason::ResourceExhausted as i32,
reason: ReplicateFailureReason::WalFull as i32,
};
replicate_failures.push(replicate_failure);
continue;
Expand Down Expand Up @@ -1626,7 +1626,7 @@ mod tests {
assert_eq!(replicate_failure_0.shard_id(), ShardId::from(1));
assert_eq!(
replicate_failure_0.reason(),
ReplicateFailureReason::ResourceExhausted
ReplicateFailureReason::WalFull
);
}
}
Loading

0 comments on commit 6e2e630

Please sign in to comment.