Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't ignore errors on query status API #1489

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 31 additions & 30 deletions ipa-core/src/query/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,14 @@
/// [`QueryStatusError::DifferentStatus`] and retrieve it's internal state. Returns [`None`]
/// if not possible.
#[cfg(feature = "in-memory-infra")]
fn downcast_state_error(box_error: crate::error::BoxError) -> Option<QueryStatus> {
fn downcast_state_error(box_error: &crate::error::BoxError) -> Option<QueryStatus> {
use crate::helpers::ApiError;
let api_error = box_error.downcast::<ApiError>().ok()?;
if let ApiError::QueryStatus(QueryStatusError::DifferentStatus { my_status, .. }) =
*api_error
let api_error = box_error.downcast_ref::<ApiError>();
if let Some(ApiError::QueryStatus(QueryStatusError::DifferentStatus {
my_status, ..
})) = api_error
{
return Some(my_status);
return Some(*my_status);
}
None
}
Expand All @@ -386,7 +387,7 @@
/// of relying on errors.
#[cfg(feature = "in-memory-infra")]
fn get_state_from_error(
error: crate::helpers::InMemoryTransportError<ShardIndex>,
error: &crate::helpers::InMemoryTransportError<ShardIndex>,
) -> Option<QueryStatus> {
if let crate::helpers::InMemoryTransportError::Rejected { inner, .. } = error {
return Self::downcast_state_error(inner);
Expand All @@ -399,8 +400,8 @@
/// TODO: Ideally broadcast should return a value, that we could use to parse the state instead
/// of relying on errors.
#[cfg(feature = "real-world-infra")]
fn get_state_from_error(shard_error: crate::net::ShardError) -> Option<QueryStatus> {
if let crate::net::Error::ShardQueryStatusMismatch { error, .. } = shard_error.source {
fn get_state_from_error(shard_error: &crate::net::ShardError) -> Option<QueryStatus> {
if let crate::net::Error::ShardQueryStatusMismatch { error, .. } = &shard_error.source {

Check warning on line 404 in ipa-core/src/query/processor.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/query/processor.rs#L403-L404

Added lines #L403 - L404 were not covered by tests
return Some(error.actual);
}
None
Expand Down Expand Up @@ -431,17 +432,14 @@

let shard_responses = shard_transport.broadcast(shard_query_status_req).await;
if let Err(e) = shard_responses {
// The following silently ignores the cases where the query isn't found.
// TODO: this code is a ticking bomb - it ignores all errors, not just when
// query is not found. If there is no handler, handler responded with an error, etc.
// Moreover, any error may result in client mistakenly assuming that the status
// is completed.
let states: Vec<_> = e
.failures
.into_iter()
.filter_map(|(_si, e)| Self::get_state_from_error(e))
.collect();
status = states.into_iter().fold(status, min_status);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my education, why does this cause errors to be ignored? What is a minimum status and what would folding an iterator of failures even do?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it is a bit subtle here. filter_map filters out all errors that are not status mismatch error. The closure maps them to None and filter_map excludes them

for (shard, failure) in &e.failures {
if let Some(other) = Self::get_state_from_error(failure) {
status = min_status(status, other);
} else {
tracing::error!("failed to get status from shard {shard}: {failure:?}");
return Err(e.into());
}
}
}

Ok(status)
Expand Down Expand Up @@ -1205,16 +1203,25 @@
/// * From the standpoint of leader shard in Helper 1
/// * On query_status
///
/// If one of my shards hasn't received the query yet (NoSuchQuery) the leader shouldn't
/// return an error but instead with the min state.
/// If one of my shards hasn't received the query yet (NoSuchQuery) the leader should
/// return an error despite other shards returning their status
#[tokio::test]
#[should_panic(
expected = "(ShardIndex(3), Rejected { dest: ShardIndex(3), inner: QueryStatus(NoSuchQuery(QueryId)) })"
)]
async fn status_query_doesnt_exist() {
fn shard_handle(si: ShardIndex) -> Arc<dyn RequestHandler<ShardIndex>> {
create_handler(move |_| async move {
if si == ShardIndex::from(3) {
Err(ApiError::QueryStatus(QueryStatusError::NoSuchQuery(
QueryId,
)))
} else if si == ShardIndex::from(2) {
Err(ApiError::QueryStatus(QueryStatusError::DifferentStatus {
query_id: QueryId,
my_status: QueryStatus::Running,
other_status: QueryStatus::Preparing,
}))
} else {
Ok(HelperResponse::ok())
}
Expand All @@ -1237,16 +1244,10 @@
req,
)
.unwrap();
let r = t
.processor
t.processor
.query_status(t.shard_transport.clone_ref(), QueryId)
.await;
if let Err(e) = r {
panic!("Unexpected error {e}");
}
if let Ok(st) = r {
assert_eq!(QueryStatus::AwaitingInputs, st);
}
.await
.unwrap();
}

/// Context:
Expand Down
Loading