Skip to content

Commit

Permalink
Bugfix #4336: ingester leaving pool (#4340)
Browse files Browse the repository at this point in the history
* Avoid evicting ingester node from IngesterPool when facing or a transport error.

Before when facing such error case, we were removing the faulty nodes from the pool, and
Nn code path would re-add it to the ingester pool. The ingester pool is also used by the ingest
source resulting in bug #4336.

After this patch:

When facing a transport error, we assume the targetted node is
unreachable and chitchat has just not detected this just yet.

In an ideal world we would inform chitchat about this, but it is a bit
difficult to do codewise.

Instead, we register the leader as unavailable for the span of the
workbench. It will then react as if it was out of the pool for
subsequent retries.

A GetOrCreatedShard will carry the information that the node was
unavailable, and the control plane will attempt to create a shard on a
different node

A timeout on the other hand is treated as a normal retryable error.

Closes #4336

* Update quickwit/quickwit-ingest/src/ingest_v2/workbench.rs

Co-authored-by: Adrien Guillo <[email protected]>

* Rename ConnectionError->TransportError

---------

Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
fulmicoton and guilload authored Jan 11, 2024
1 parent f78ffdf commit b556868
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 131 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-common/src/tower/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ where
}

/// Removes a value from the pool.
pub fn remove(&self, key: &K) {
fn remove(&self, key: &K) {
self.pool
.write()
.expect("lock should not be poisoned")
Expand Down
4 changes: 1 addition & 3 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@ impl Ingester {
"failed to create mrecordlog queue `{}`: {}",
queue_id, io_error
);
return Err(IngestV2Error::IngesterUnavailable {
ingester_id: shard.leader_id.into(),
});
return Err(IngestV2Error::Internal(format!("Io Error: {io_error}")));
}
};
let rate_limiter = RateLimiter::from_settings(self.rate_limiter_settings);
Expand Down
139 changes: 85 additions & 54 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::ingest::ingester::{
IngesterService, PersistFailureReason, PersistRequest, PersistResponse, PersistSubrequest,
};
use quickwit_proto::ingest::router::{
IngestRequestV2, IngestResponseV2, IngestRouterService, IngestSubrequest,
};
use quickwit_proto::ingest::router::{IngestRequestV2, IngestResponseV2, IngestRouterService};
use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardIds, ShardState};
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SubrequestId};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -109,7 +107,6 @@ impl IngestRouter {

pub fn subscribe(&self, event_broker: &EventBroker) {
let weak_router_state = WeakRouterState(Arc::downgrade(&self.state));

event_broker
.subscribe::<LocalShardsUpdate>(weak_router_state.clone())
.forever();
Expand All @@ -122,25 +119,31 @@ impl IngestRouter {
/// [`GetOrCreateOpenShardsRequest`] request if open shards do not exist for all the them.
async fn make_get_or_create_open_shard_request(
&self,
subrequests: impl Iterator<Item = &IngestSubrequest>,
workbench: &mut IngestWorkbench,
ingester_pool: &IngesterPool,
) -> GetOrCreateOpenShardsRequest {
let mut get_open_shards_subrequests = Vec::new();

// `closed_shards` and `unavailable_leaders` are populated by calls to `has_open_shards`
// as we're looking for open shards to route the subrequests to.
let mut closed_shards: Vec<ShardIds> = Vec::new();
let mut unavailable_leaders: HashSet<NodeId> = HashSet::new();
let unavailable_leaders: &mut HashSet<NodeId> = &mut workbench.unavailable_leaders;

let state_guard = self.state.read().await;

for subrequest in subrequests {
for subrequest in workbench.subworkbenches.values().filter_map(|subworbench| {
if subworbench.is_pending() {
Some(&subworbench.subrequest)
} else {
None
}
}) {
if !state_guard.routing_table.has_open_shards(
&subrequest.index_id,
&subrequest.source_id,
&mut closed_shards,
ingester_pool,
&mut unavailable_leaders,
&mut closed_shards,
unavailable_leaders,
) {
let subrequest = GetOrCreateOpenShardsSubrequest {
subrequest_id: subrequest.subrequest_id,
Expand All @@ -153,21 +156,18 @@ impl IngestRouter {
drop(state_guard);

if !closed_shards.is_empty() {
info!(
"reporting {} closed shard(s) to control plane",
closed_shards.len()
)
info!(closed_shards=?closed_shards, "reporting closed shard(s) to control plane");
}
if !unavailable_leaders.is_empty() {
info!(
"reporting {} unavailable leader(s) to control plane",
unavailable_leaders.len()
);
info!(unvailable_leaders=?unavailable_leaders, "reporting unavailable leader(s) to control plane");
}
GetOrCreateOpenShardsRequest {
subrequests: get_open_shards_subrequests,
closed_shards,
unavailable_leaders: unavailable_leaders.into_iter().map(Into::into).collect(),
unavailable_leaders: unavailable_leaders
.iter()
.map(|node_id| node_id.to_string())
.collect(),
}
}

Expand Down Expand Up @@ -230,7 +230,6 @@ impl IngestRouter {
}
for persist_failure in persist_response.failures {
workbench.record_persist_failure(&persist_failure);

if persist_failure.reason() == PersistFailureReason::ShardClosed {
let index_uid: IndexUid = persist_failure.index_uid.into();
let source_id: SourceId = persist_failure.source_id;
Expand Down Expand Up @@ -261,15 +260,19 @@ impl IngestRouter {
);
}
match persist_error {
IngestV2Error::Timeout
| IngestV2Error::Transport { .. }
| IngestV2Error::IngesterUnavailable { .. } => {
IngestV2Error::Transport(_) => {
workbench
.unavailable_leaders
.insert(persist_summary.leader_id);
for subrequest_id in persist_summary.subrequest_ids {
workbench.record_no_shards_available(subrequest_id);
workbench.record_connection_error(subrequest_id);
}
self.ingester_pool.remove(&persist_summary.leader_id);
}
_ => {
IngestV2Error::TooManyRequests
| IngestV2Error::Internal(_)
| IngestV2Error::ShardNotFound { .. }
| IngestV2Error::IngesterUnavailable { .. }
| IngestV2Error::Timeout => {
for subrequest_id in persist_summary.subrequest_ids {
workbench.record_internal_error(
subrequest_id,
Expand Down Expand Up @@ -299,10 +302,7 @@ impl IngestRouter {

async fn batch_persist(&mut self, workbench: &mut IngestWorkbench, commit_type: CommitTypeV2) {
let get_or_create_open_shards_request = self
.make_get_or_create_open_shard_request(
workbench.pending_subrequests(),
&self.ingester_pool,
)
.make_get_or_create_open_shard_request(workbench, &self.ingester_pool)
.await;

self.populate_routing_table(workbench, get_or_create_open_shards_request)
Expand Down Expand Up @@ -394,7 +394,6 @@ impl IngestRouter {
) -> IngestV2Result<IngestResponseV2> {
let commit_type = ingest_request.commit_type();
let mut workbench = IngestWorkbench::new(ingest_request.subrequests, max_num_attempts);

while !workbench.is_complete() {
workbench.new_attempt();
self.batch_persist(&mut workbench, commit_type).await;
Expand Down Expand Up @@ -510,7 +509,6 @@ struct PersistRequestSummary {
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use std::iter;
use std::sync::atomic::AtomicUsize;

use quickwit_proto::control_plane::{
Expand Down Expand Up @@ -543,8 +541,9 @@ mod tests {
ingester_pool.clone(),
replication_factor,
);
let mut workbench = IngestWorkbench::default();
let get_or_create_open_shard_request = router
.make_get_or_create_open_shard_request(iter::empty(), &ingester_pool)
.make_get_or_create_open_shard_request(&mut workbench, &ingester_pool)
.await;
assert!(get_or_create_open_shard_request.subrequests.is_empty());

Expand Down Expand Up @@ -578,7 +577,7 @@ mod tests {
);
drop(state_guard);

let ingest_subrequests = [
let ingest_subrequests: Vec<IngestSubrequest> = vec![
IngestSubrequest {
subrequest_id: 0,
index_id: "test-index-0".to_string(),
Expand All @@ -592,8 +591,9 @@ mod tests {
..Default::default()
},
];
let mut workbench = IngestWorkbench::new(ingest_subrequests.clone(), 3);
let get_or_create_open_shard_request = router
.make_get_or_create_open_shard_request(ingest_subrequests.iter(), &ingester_pool)
.make_get_or_create_open_shard_request(&mut workbench, &ingester_pool)
.await;

assert_eq!(get_or_create_open_shard_request.subrequests.len(), 2);
Expand Down Expand Up @@ -623,25 +623,48 @@ mod tests {
get_or_create_open_shard_request.unavailable_leaders[0],
"test-ingester-0"
);
assert_eq!(workbench.unavailable_leaders.len(), 1);

ingester_pool.insert(
"test-ingester-0".into(),
IngesterServiceClient::mock().into(),
);
{
// Ingester-0 has been marked as unavailable due to the previous requests.
let get_or_create_open_shard_request = router
.make_get_or_create_open_shard_request(&mut workbench, &ingester_pool)
.await;
assert_eq!(get_or_create_open_shard_request.subrequests.len(), 2);
assert_eq!(workbench.unavailable_leaders.len(), 1);
assert_eq!(
workbench
.unavailable_leaders
.iter()
.next()
.unwrap()
.to_string(),
"test-ingester-0"
);
}

let get_or_create_open_shard_request = router
.make_get_or_create_open_shard_request(ingest_subrequests.iter(), &ingester_pool)
.await;

let subrequest = &get_or_create_open_shard_request.subrequests[0];
assert_eq!(subrequest.index_id, "test-index-1");
assert_eq!(subrequest.source_id, "test-source");

assert_eq!(get_or_create_open_shard_request.subrequests.len(), 1);
assert_eq!(
get_or_create_open_shard_request.unavailable_leaders.len(),
0
);
{
// With a fresh workbench, the ingester is not marked as unavailable, and present in the
// pool.
let mut workbench = IngestWorkbench::new(ingest_subrequests, 3);
let get_or_create_open_shard_request = router
.make_get_or_create_open_shard_request(&mut workbench, &ingester_pool)
.await;
assert_eq!(get_or_create_open_shard_request.subrequests.len(), 1);

let subrequest = &get_or_create_open_shard_request.subrequests[0];
assert_eq!(subrequest.index_id, "test-index-1");
assert_eq!(subrequest.source_id, "test-source");

assert_eq!(
get_or_create_open_shard_request.unavailable_leaders.len(),
0
);
}
}

#[tokio::test]
Expand Down Expand Up @@ -1015,7 +1038,7 @@ mod tests {
}

#[tokio::test]
async fn test_router_process_persist_results_removes_unavailable_leaders() {
async fn test_router_process_persist_results_does_not_removes_unavailable_leaders() {
let self_node_id = "test-router".into();
let control_plane = ControlPlaneServiceClient::mock().into();

Expand Down Expand Up @@ -1067,12 +1090,14 @@ mod tests {

let subworkbench = workbench.subworkbenches.get(&0).unwrap();
assert!(matches!(
subworkbench.last_failure_opt,
Some(SubworkbenchFailure::NoShardsAvailable { .. })
&subworkbench.last_failure_opt,
&Some(SubworkbenchFailure::Internal(ref msg)) if msg.contains("timed out")
));

assert!(!workbench
.unavailable_leaders
.contains(&NodeId::from("test-ingester-1")));
let persist_futures = FuturesUnordered::new();

persist_futures.push(async {
let persist_summary = PersistRequestSummary {
leader_id: "test-ingester-1".into(),
Expand All @@ -1085,13 +1110,19 @@ mod tests {
router
.process_persist_results(&mut workbench, persist_futures)
.await;

// We do not remove the leader from the pool.
assert!(!ingester_pool.is_empty());
// ... but we mark it as unavailable.
assert!(workbench
.unavailable_leaders
.contains(&NodeId::from("test-ingester-1")));

let subworkbench = workbench.subworkbenches.get(&1).unwrap();
assert!(matches!(
subworkbench.last_failure_opt,
Some(SubworkbenchFailure::NoShardsAvailable { .. })
Some(SubworkbenchFailure::TransportError)
));

assert!(ingester_pool.is_empty());
}

#[tokio::test]
Expand Down
Loading

0 comments on commit b556868

Please sign in to comment.