Skip to content

Commit

Permalink
Keeping details of ingest error longer.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jul 25, 2024
1 parent f9ccc31 commit 972a496
Show file tree
Hide file tree
Showing 19 changed files with 327 additions and 102 deletions.
16 changes: 9 additions & 7 deletions quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use mrecordlog::error::*;
use quickwit_actors::AskError;
use quickwit_common::tower::BufferError;
pub(crate) use quickwit_proto::error::{grpc_error_to_grpc_status, grpc_status_to_service_error};
use quickwit_proto::ingest::IngestV2Error;
use quickwit_proto::ingest::{IngestV2Error, RateLimitingCause};
use quickwit_proto::types::IndexId;
use quickwit_proto::{tonic, GrpcServiceError, ServiceError, ServiceErrorCode};
use serde::{Deserialize, Serialize};
Expand All @@ -42,8 +42,8 @@ pub enum IngestServiceError {
InvalidPosition(String),
#[error("io error {0}")]
IoError(String),
#[error("rate limited")]
RateLimited,
#[error("rate limited {0}")]
RateLimited(RateLimitingCause),
#[error("ingest service is unavailable ({0})")]
Unavailable(String),
}
Expand Down Expand Up @@ -88,7 +88,9 @@ impl From<IngestV2Error> for IngestServiceError {
IngestV2Error::ShardNotFound { .. } => {
IngestServiceError::Internal("shard not found".to_string())
}
IngestV2Error::TooManyRequests => IngestServiceError::RateLimited,
IngestV2Error::TooManyRequests(rate_limiting_cause) => {
IngestServiceError::RateLimited(rate_limiting_cause)
}
}
}
}
Expand All @@ -102,7 +104,7 @@ impl ServiceError for IngestServiceError {
Self::Internal(_) => ServiceErrorCode::Internal,
Self::InvalidPosition(_) => ServiceErrorCode::BadRequest,
Self::IoError { .. } => ServiceErrorCode::Internal,
Self::RateLimited => ServiceErrorCode::TooManyRequests,
Self::RateLimited(_) => ServiceErrorCode::TooManyRequests,
Self::Unavailable(_) => ServiceErrorCode::Unavailable,
}
}
Expand All @@ -118,7 +120,7 @@ impl GrpcServiceError for IngestServiceError {
}

fn new_too_many_requests() -> Self {
Self::RateLimited
Self::RateLimited(RateLimitingCause::Unknown)
}

fn new_unavailable(error_msg: String) -> Self {
Expand All @@ -145,7 +147,7 @@ impl From<IngestServiceError> for tonic::Status {
IngestServiceError::Internal(_) => tonic::Code::Internal,
IngestServiceError::InvalidPosition(_) => tonic::Code::InvalidArgument,
IngestServiceError::IoError { .. } => tonic::Code::Internal,
IngestServiceError::RateLimited => tonic::Code::ResourceExhausted,
IngestServiceError::RateLimited(_) => tonic::Code::ResourceExhausted,
IngestServiceError::Unavailable(_) => tonic::Code::Unavailable,
};
let message = error.to_string();
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use quickwit_actors::{
};
use quickwit_common::runtimes::RuntimeType;
use quickwit_common::tower::Cost;
use quickwit_proto::ingest::RateLimitingCause;
use tracing::{error, info};
use ulid::Ulid;

Expand Down Expand Up @@ -166,7 +167,7 @@ impl IngestApiService {

if disk_used > self.disk_limit {
info!("ingestion rejected due to disk limit");
return Err(IngestServiceError::RateLimited);
return Err(IngestServiceError::RateLimited(RateLimitingCause::WalFull));
}

if self
Expand All @@ -175,7 +176,7 @@ impl IngestApiService {
.is_err()
{
info!("ingest request rejected due to memory limit");
return Err(IngestServiceError::RateLimited);
return Err(IngestServiceError::RateLimited(RateLimitingCause::WalFull));
}
let mut num_docs = 0usize;
let mut notifications = Vec::new();
Expand Down
28 changes: 14 additions & 14 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use quickwit_common::pretty::PrettyDisplay;
use quickwit_common::pubsub::{EventBroker, EventSubscriber};
use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings};
use quickwit_common::tower::Pool;
use quickwit_common::{rate_limited_warn, ServiceStream};
use quickwit_common::{rate_limited_error, rate_limited_warn, ServiceStream};
use quickwit_proto::control_plane::{
AdviseResetShardsRequest, ControlPlaneService, ControlPlaneServiceClient,
};
Expand Down Expand Up @@ -544,7 +544,7 @@ impl Ingester {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::ResourceExhausted as i32,
reason: PersistFailureReason::WalFull as i32,
};
persist_failures.push(persist_failure);
continue;
Expand All @@ -562,7 +562,7 @@ impl Ingester {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::RateLimited as i32,
reason: PersistFailureReason::ShardRateLimited as i32,
};
persist_failures.push(persist_failure);
continue;
Expand Down Expand Up @@ -695,9 +695,7 @@ impl Ingester {
PersistFailureReason::ShardNotFound
}
ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed,
ReplicateFailureReason::ResourceExhausted => {
PersistFailureReason::ResourceExhausted
}
ReplicateFailureReason::WalFull => PersistFailureReason::WalFull,
};
let persist_failure = PersistFailure {
subrequest_id: replicate_failure.subrequest_id,
Expand Down Expand Up @@ -889,13 +887,12 @@ impl Ingester {

let mut state_guard = self.state.lock_partially().await?;

let shard =
state_guard
.shards
.get_mut(&queue_id)
.ok_or_else(|| IngestV2Error::ShardNotFound {
shard_id: open_fetch_stream_request.shard_id().clone(),
})?;
let shard = state_guard.shards.get_mut(&queue_id).ok_or_else(|| {
rate_limited_error!(limit_per_min=6, queue_id=%queue_id, "shard not found");
IngestV2Error::ShardNotFound {
shard_id: open_fetch_stream_request.shard_id().clone(),
}
})?;
// An indexer can only know about a newly opened shard if it has been scheduled by the
// control plane, which confirms that the shard was correctly opened in the
// metastore.
Expand Down Expand Up @@ -2102,7 +2099,10 @@ mod tests {
assert_eq!(persist_response.failures.len(), 1);

let persist_failure = &persist_response.failures[0];
assert_eq!(persist_failure.reason(), PersistFailureReason::RateLimited);
assert_eq!(
persist_failure.reason(),
PersistFailureReason::ShardRateLimited
);
}

// This test should be run manually and independently of other tests with the `failpoints`
Expand Down
57 changes: 55 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,61 @@
use mrecordlog::ResourceUsage;
use once_cell::sync::Lazy;
use quickwit_common::metrics::{
exponential_buckets, linear_buckets, new_counter_vec, new_gauge, new_gauge_vec, new_histogram,
new_histogram_vec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec,
exponential_buckets, linear_buckets, new_counter_vec, new_gauge, new_gauge_vec, new_histogram, new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec
};

// Counter vec counting the different outcomes of ingest requests as
// measure at the end of the router work.
//
// The counter are counting persist subrequests.
pub(crate) struct IngestResultMetrics {
pub success: IntCounter,
pub circuit_breaker: IntCounter,
pub unspecified: IntCounter,
pub index_not_found: IntCounter,
pub source_not_found: IntCounter,
pub internal: IntCounter,
pub no_shards_available: IntCounter,
pub shard_rate_limited: IntCounter,
pub wal_full: IntCounter,
pub timeout: IntCounter,
pub router_timeout: IntCounter,
pub router_load_shedding: IntCounter,
pub load_shedding: IntCounter,
pub shard_not_found: IntCounter,
pub unavailable: IntCounter,
}

impl Default for IngestResultMetrics {
fn default() -> Self {
let ingest_result_total_vec = new_counter_vec::<1>(
"ingest_result_total",
"Number of ingest requests by result",
"ingest",
&[],
["result"],
);
Self {
success: ingest_result_total_vec.with_label_values(["success"]),
circuit_breaker: ingest_result_total_vec.with_label_values(["circuit_breaker"]),
unspecified: ingest_result_total_vec.with_label_values(["unspecified"]),
index_not_found: ingest_result_total_vec.with_label_values(["index_not_found"]),
source_not_found: ingest_result_total_vec.with_label_values(["source_not_found"]),
internal: ingest_result_total_vec.with_label_values(["internal"]),
no_shards_available: ingest_result_total_vec.with_label_values(["no_shards_available"]),
shard_rate_limited: ingest_result_total_vec.with_label_values(["shard_rate_limited"]),
wal_full: ingest_result_total_vec.with_label_values(["wal_full"]),
timeout: ingest_result_total_vec.with_label_values(["timeout"]),
router_timeout: ingest_result_total_vec.with_label_values(["router_timeout"]),
router_load_shedding: ingest_result_total_vec.with_label_values(["router_load_shedding"]),
load_shedding: ingest_result_total_vec.with_label_values(["load_shedding"]),
unavailable: ingest_result_total_vec.with_label_values(["unavailable"]),
shard_not_found: ingest_result_total_vec.with_label_values(["shard_not_found"]),
}
}
}


pub(super) struct IngestV2Metrics {
pub reset_shards_operations_total: IntCounterVec<1>,
pub open_shards: IntGauge,
Expand All @@ -34,11 +85,13 @@ pub(super) struct IngestV2Metrics {
pub wal_acquire_lock_request_duration_secs: HistogramVec<2>,
pub wal_disk_used_bytes: IntGauge,
pub wal_memory_used_bytes: IntGauge,
pub ingest_results: IngestResultMetrics,
}

impl Default for IngestV2Metrics {
fn default() -> Self {
Self {
ingest_results: IngestResultMetrics::default(),
reset_shards_operations_total: new_counter_vec(
"reset_shards_operations_total",
"Total number of reset shards operations performed.",
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ impl ReplicationTask {
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: ReplicateFailureReason::ResourceExhausted as i32,
reason: ReplicateFailureReason::WalFull as i32,
};
replicate_failures.push(replicate_failure);
continue;
Expand Down
Loading

0 comments on commit 972a496

Please sign in to comment.