Skip to content

Commit

Permalink
Issue/4213 fix flaky test ingest v2 happy path (#5026)
Browse files Browse the repository at this point in the history
* Adding detailed error message to unavailable error

* Fixes flaky unit test.

This also adds extra detail to service unavailable ingest errors.
The error was likely cause by the lag between the nodes being detected
as ready and servers being added to the ingester pool.

Ideally we should have a concept of readiness associated to routers:
"I am ready to rest ingest requests".

Closes #4213
  • Loading branch information
fulmicoton authored May 24, 2024
1 parent cf6f6c6 commit a400725
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 22 deletions.
25 changes: 15 additions & 10 deletions quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ pub enum IngestServiceError {
IoError(String),
#[error("rate limited")]
RateLimited,
#[error("ingest service is unavailable")]
Unavailable,
#[error("ingest service is unavailable ({0})")]
Unavailable(String),
}

impl From<AskError<IngestServiceError>> for IngestServiceError {
fn from(error: AskError<IngestServiceError>) -> Self {
match error {
AskError::ErrorReply(error) => error,
AskError::MessageNotDelivered => IngestServiceError::Unavailable,
AskError::MessageNotDelivered => {
IngestServiceError::Unavailable("actor not running".to_string())
}
AskError::ProcessMessageError => IngestServiceError::Internal(error.to_string()),
}
}
Expand All @@ -61,7 +63,7 @@ impl From<AskError<IngestServiceError>> for IngestServiceError {
impl From<BufferError> for IngestServiceError {
fn from(error: BufferError) -> Self {
match error {
BufferError::Closed => IngestServiceError::Unavailable,
BufferError::Closed => IngestServiceError::Unavailable(error.to_string()),
BufferError::Unknown => IngestServiceError::Internal(error.to_string()),
}
}
Expand All @@ -76,8 +78,11 @@ impl From<io::Error> for IngestServiceError {
impl From<IngestV2Error> for IngestServiceError {
fn from(error: IngestV2Error) -> Self {
match error {
IngestV2Error::Timeout(_) | IngestV2Error::Unavailable(_) => {
IngestServiceError::Unavailable
IngestV2Error::Timeout(error_msg) => {
IngestServiceError::Unavailable(format!("timeout {error_msg}"))
}
IngestV2Error::Unavailable(error_msg) => {
IngestServiceError::Unavailable(format!("unavailable: {error_msg}"))
}
IngestV2Error::Internal(message) => IngestServiceError::Internal(message),
IngestV2Error::ShardNotFound { .. } => {
Expand All @@ -98,7 +103,7 @@ impl ServiceError for IngestServiceError {
Self::InvalidPosition(_) => ServiceErrorCode::BadRequest,
Self::IoError { .. } => ServiceErrorCode::Internal,
Self::RateLimited => ServiceErrorCode::TooManyRequests,
Self::Unavailable => ServiceErrorCode::Unavailable,
Self::Unavailable(_) => ServiceErrorCode::Unavailable,
}
}
}
Expand All @@ -116,8 +121,8 @@ impl GrpcServiceError for IngestServiceError {
Self::RateLimited
}

fn new_unavailable(_: String) -> Self {
Self::Unavailable
fn new_unavailable(error_msg: String) -> Self {
Self::Unavailable(error_msg)
}
}

Expand All @@ -141,7 +146,7 @@ impl From<IngestServiceError> for tonic::Status {
IngestServiceError::InvalidPosition(_) => tonic::Code::InvalidArgument,
IngestServiceError::IoError { .. } => tonic::Code::Internal,
IngestServiceError::RateLimited => tonic::Code::ResourceExhausted,
IngestServiceError::Unavailable => tonic::Code::Unavailable,
IngestServiceError::Unavailable(_) => tonic::Code::Unavailable,
};
let message = error.to_string();
tonic::Status::new(code, message)
Expand Down
45 changes: 34 additions & 11 deletions quickwit/quickwit-integration-tests/src/tests/index_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use std::collections::HashSet;
use std::time::Duration;

use hyper::StatusCode;
use quickwit_config::service::QuickwitService;
use quickwit_config::ConfigFormat;
use quickwit_metastore::SplitState;
Expand Down Expand Up @@ -299,17 +300,39 @@ async fn test_ingest_v2_happy_path() {
.toggle("_ingest-source", true)
.await
.unwrap();
sandbox
.indexer_rest_client
.ingest(
"test_index",
ingest_json!({"body": "doc1"}),
None,
None,
CommitType::WaitFor,
)
.await
.unwrap();

// The server have been detected as ready. Unfortunately, they may not have been added
// to the ingester pool yet.
//
// If we get an unavailable error, we retry up to 10 times.
// See #4213
const MAX_NUM_RETRIES: usize = 10;
for i in 1..=MAX_NUM_RETRIES {
let ingest_res = sandbox
.indexer_rest_client
.ingest(
"test_index",
ingest_json!({"body": "doc1"}),
None,
None,
CommitType::WaitFor,
)
.await;
let Some(ingest_error) = ingest_res.err() else {
// Success
break;
};
assert_eq!(
ingest_error.status_code(),
Some(StatusCode::SERVICE_UNAVAILABLE)
);
assert!(
i < MAX_NUM_RETRIES,
"service not available after {MAX_NUM_RETRIES} tries"
);
tokio::time::sleep(Duration::from_millis(200)).await;
}

sandbox
.wait_for_splits("test_index", Some(vec![SplitState::Published]), 1)
.await
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ fn convert_ingest_response_v2(
ingest_failure.index_id
)),
IngestFailureReason::Internal => IngestServiceError::Internal("internal error".to_string()),
IngestFailureReason::NoShardsAvailable => IngestServiceError::Unavailable,
IngestFailureReason::NoShardsAvailable => {
IngestServiceError::Unavailable("no shards available".to_string())
}
IngestFailureReason::RateLimited => IngestServiceError::RateLimited,
IngestFailureReason::ResourceExhausted => IngestServiceError::RateLimited,
IngestFailureReason::Timeout => {
Expand Down

0 comments on commit a400725

Please sign in to comment.