Skip to content

Commit

Permalink
Merge pull request #1489 from akoshelev/query-status-no-swallow
Browse files Browse the repository at this point in the history
Don't ignore errors on query status API
  • Loading branch information
akoshelev authored Dec 12, 2024
2 parents ff37b8a + c464883 commit cdb618d
Showing 1 changed file with 31 additions and 30 deletions.
61 changes: 31 additions & 30 deletions ipa-core/src/query/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,14 @@ impl Processor {
/// [`QueryStatusError::DifferentStatus`] and retrieve it's internal state. Returns [`None`]
/// if not possible.
#[cfg(feature = "in-memory-infra")]
fn downcast_state_error(box_error: crate::error::BoxError) -> Option<QueryStatus> {
fn downcast_state_error(box_error: &crate::error::BoxError) -> Option<QueryStatus> {
use crate::helpers::ApiError;
let api_error = box_error.downcast::<ApiError>().ok()?;
if let ApiError::QueryStatus(QueryStatusError::DifferentStatus { my_status, .. }) =
*api_error
let api_error = box_error.downcast_ref::<ApiError>();
if let Some(ApiError::QueryStatus(QueryStatusError::DifferentStatus {
my_status, ..
})) = api_error
{
return Some(my_status);
return Some(*my_status);
}
None
}
Expand All @@ -386,7 +387,7 @@ impl Processor {
/// of relying on errors.
#[cfg(feature = "in-memory-infra")]
fn get_state_from_error(
error: crate::helpers::InMemoryTransportError<ShardIndex>,
error: &crate::helpers::InMemoryTransportError<ShardIndex>,
) -> Option<QueryStatus> {
if let crate::helpers::InMemoryTransportError::Rejected { inner, .. } = error {
return Self::downcast_state_error(inner);
Expand All @@ -399,8 +400,8 @@ impl Processor {
/// TODO: Ideally broadcast should return a value, that we could use to parse the state instead
/// of relying on errors.
#[cfg(feature = "real-world-infra")]
fn get_state_from_error(shard_error: crate::net::ShardError) -> Option<QueryStatus> {
if let crate::net::Error::ShardQueryStatusMismatch { error, .. } = shard_error.source {
fn get_state_from_error(shard_error: &crate::net::ShardError) -> Option<QueryStatus> {
if let crate::net::Error::ShardQueryStatusMismatch { error, .. } = &shard_error.source {
return Some(error.actual);
}
None
Expand Down Expand Up @@ -431,17 +432,14 @@ impl Processor {

let shard_responses = shard_transport.broadcast(shard_query_status_req).await;
if let Err(e) = shard_responses {
// The following silently ignores the cases where the query isn't found.
// TODO: this code is a ticking bomb - it ignores all errors, not just when
// query is not found. If there is no handler, handler responded with an error, etc.
// Moreover, any error may result in client mistakenly assuming that the status
// is completed.
let states: Vec<_> = e
.failures
.into_iter()
.filter_map(|(_si, e)| Self::get_state_from_error(e))
.collect();
status = states.into_iter().fold(status, min_status);
for (shard, failure) in &e.failures {
if let Some(other) = Self::get_state_from_error(failure) {
status = min_status(status, other);
} else {
tracing::error!("failed to get status from shard {shard}: {failure:?}");
return Err(e.into());
}
}
}

Ok(status)
Expand Down Expand Up @@ -1205,16 +1203,25 @@ mod tests {
/// * From the standpoint of leader shard in Helper 1
/// * On query_status
///
/// If one of my shards hasn't received the query yet (NoSuchQuery) the leader shouldn't
/// return an error but instead with the min state.
/// If one of my shards hasn't received the query yet (NoSuchQuery) the leader should
/// return an error despite other shards returning their status
#[tokio::test]
#[should_panic(
expected = "(ShardIndex(3), Rejected { dest: ShardIndex(3), inner: QueryStatus(NoSuchQuery(QueryId)) })"
)]
async fn status_query_doesnt_exist() {
fn shard_handle(si: ShardIndex) -> Arc<dyn RequestHandler<ShardIndex>> {
create_handler(move |_| async move {
if si == ShardIndex::from(3) {
Err(ApiError::QueryStatus(QueryStatusError::NoSuchQuery(
QueryId,
)))
} else if si == ShardIndex::from(2) {
Err(ApiError::QueryStatus(QueryStatusError::DifferentStatus {
query_id: QueryId,
my_status: QueryStatus::Running,
other_status: QueryStatus::Preparing,
}))
} else {
Ok(HelperResponse::ok())
}
Expand All @@ -1237,16 +1244,10 @@ mod tests {
req,
)
.unwrap();
let r = t
.processor
t.processor
.query_status(t.shard_transport.clone_ref(), QueryId)
.await;
if let Err(e) = r {
panic!("Unexpected error {e}");
}
if let Ok(st) = r {
assert_eq!(QueryStatus::AwaitingInputs, st);
}
.await
.unwrap();
}

/// Context:
Expand Down

0 comments on commit cdb618d

Please sign in to comment.