Skip to content

Commit

Permalink
Retry on NoShardsAvailable
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Mar 6, 2024
1 parent 8c53029 commit 5b4e103
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 52 deletions.
144 changes: 112 additions & 32 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,28 +291,7 @@ impl IngestRouter {
persist_summary.leader_id
);
}
match persist_error {
IngestV2Error::Unavailable(_) => {
workbench
.unavailable_leaders
.insert(persist_summary.leader_id);
for subrequest_id in persist_summary.subrequest_ids {
workbench.record_transport_error(subrequest_id);
}
}
IngestV2Error::IngesterUnavailable { .. }
| IngestV2Error::Internal(_)
| IngestV2Error::NotFound { .. }
| IngestV2Error::Timeout(_)
| IngestV2Error::TooManyRequests => {
for subrequest_id in persist_summary.subrequest_ids {
workbench.record_internal_error(
subrequest_id,
persist_error.to_string(),
);
}
}
}
workbench.record_persist_error(persist_error, persist_summary);
}
};
}
Expand Down Expand Up @@ -340,8 +319,8 @@ impl IngestRouter {
self.populate_routing_table_debounced(workbench, debounced_request)
.await;

// List of subrequest IDs for which no shards were available to route the subrequests to.
let mut unavailable_subrequest_ids = Vec::new();
// List of subrequest IDs for which no shards are available to route the subrequests to.
let mut no_shards_available_subrequest_ids = Vec::new();

let mut per_leader_persist_subrequests: HashMap<&LeaderId, Vec<PersistSubrequest>> =
HashMap::new();
Expand All @@ -358,7 +337,7 @@ impl IngestRouter {
.find_entry(&subrequest.index_id, &subrequest.source_id)
.and_then(|entry| entry.next_open_shard_round_robin(&self.ingester_pool))
else {
unavailable_subrequest_ids.push(subrequest.subrequest_id);
no_shards_available_subrequest_ids.push(subrequest.subrequest_id);
continue;
};
let persist_subrequest = PersistSubrequest {
Expand All @@ -382,7 +361,7 @@ impl IngestRouter {
.map(|subrequest| subrequest.subrequest_id)
.collect();
let Some(mut ingester) = self.ingester_pool.get(&leader_id) else {
unavailable_subrequest_ids.extend(subrequest_ids);
no_shards_available_subrequest_ids.extend(subrequest_ids);
continue;
};
let persist_summary = PersistRequestSummary {
Expand Down Expand Up @@ -413,7 +392,7 @@ impl IngestRouter {
}
drop(state_guard);

for subrequest_id in unavailable_subrequest_ids {
for subrequest_id in no_shards_available_subrequest_ids {
workbench.record_no_shards_available(subrequest_id);
}
self.process_persist_results(workbench, persist_futures)
Expand Down Expand Up @@ -536,9 +515,9 @@ impl EventSubscriber<ShardPositionsUpdate> for WeakRouterState {
}
}

struct PersistRequestSummary {
leader_id: NodeId,
subrequest_ids: Vec<SubrequestId>,
pub(super) struct PersistRequestSummary {
pub leader_id: NodeId,
pub subrequest_ids: Vec<SubrequestId>,
}

#[cfg(test)]
Expand Down Expand Up @@ -902,6 +881,107 @@ mod tests {
));
}

#[tokio::test]
async fn test_router_batch_persist_records_no_shards_available_empty_routing_table() {
let self_node_id = "test-router".into();
let mut control_plane_mock = ControlPlaneServiceClient::mock();
control_plane_mock
.expect_get_or_create_open_shards()
.once()
.returning(move |request| {
assert_eq!(request.subrequests.len(), 1);

let subrequest = &request.subrequests[0];
assert_eq!(subrequest.index_id, "test-index");
assert_eq!(subrequest.source_id, "test-source");

let response = GetOrCreateOpenShardsResponse::default();
Ok(response)
});
let control_plane: ControlPlaneServiceClient = control_plane_mock.into();
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
let mut router = IngestRouter::new(
self_node_id,
control_plane,
ingester_pool.clone(),
replication_factor,
);
let ingest_subrequests = vec![IngestSubrequest {
subrequest_id: 0,
index_id: "test-index".to_string(),
source_id: "test-source".to_string(),
..Default::default()
}];
let mut workbench = IngestWorkbench::new(ingest_subrequests, 2);
let commit_type = CommitTypeV2::Auto;
router.batch_persist(&mut workbench, commit_type).await;

let subworkbench = workbench.subworkbenches.get(&0).unwrap();
assert!(matches!(
subworkbench.last_failure_opt,
Some(SubworkbenchFailure::NoShardsAvailable)
));
}

#[tokio::test]
async fn test_router_batch_persist_records_no_shards_available_unavailable_ingester() {
let self_node_id = "test-router".into();
let mut control_plane_mock = ControlPlaneServiceClient::mock();
control_plane_mock
.expect_get_or_create_open_shards()
.once()
.returning(move |request| {
assert_eq!(request.subrequests.len(), 1);

let subrequest = &request.subrequests[0];
assert_eq!(subrequest.index_id, "test-index");
assert_eq!(subrequest.source_id, "test-source");

let response = GetOrCreateOpenShardsResponse {
successes: vec![GetOrCreateOpenShardsSuccess {
subrequest_id: 0,
index_uid: Some(IndexUid::for_test("test-index", 0)),
source_id: "test-source".to_string(),
open_shards: vec![Shard {
index_uid: Some(IndexUid::for_test("test-index", 0)),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1)),
shard_state: ShardState::Open as i32,
leader_id: "test-ingester".into(),
..Default::default()
}],
}],
..Default::default()
};
Ok(response)
});
let control_plane: ControlPlaneServiceClient = control_plane_mock.into();
let ingester_pool = IngesterPool::default();
let replication_factor = 1;
let mut router = IngestRouter::new(
self_node_id,
control_plane,
ingester_pool.clone(),
replication_factor,
);
let ingest_subrequests = vec![IngestSubrequest {
subrequest_id: 0,
index_id: "test-index".to_string(),
source_id: "test-source".to_string(),
..Default::default()
}];
let mut workbench = IngestWorkbench::new(ingest_subrequests, 2);
let commit_type = CommitTypeV2::Auto;
router.batch_persist(&mut workbench, commit_type).await;

let subworkbench = workbench.subworkbenches.get(&0).unwrap();
assert!(matches!(
subworkbench.last_failure_opt,
Some(SubworkbenchFailure::NoShardsAvailable)
));
}

#[tokio::test]
async fn test_router_process_persist_results_record_persist_successes() {
let self_node_id = "test-router".into();
Expand Down Expand Up @@ -1087,7 +1167,7 @@ mod tests {
}

#[tokio::test]
async fn test_router_process_persist_results_does_not_removes_unavailable_leaders() {
async fn test_router_process_persist_results_does_not_remove_unavailable_leaders() {
let self_node_id = "test-router".into();
let control_plane = ControlPlaneServiceClient::mock().into();

Expand Down Expand Up @@ -1171,7 +1251,7 @@ mod tests {
let subworkbench = workbench.subworkbenches.get(&1).unwrap();
assert!(matches!(
subworkbench.last_failure_opt,
Some(SubworkbenchFailure::Transport)
Some(SubworkbenchFailure::Unavailable)
));
}

Expand Down
Loading

0 comments on commit 5b4e103

Please sign in to comment.