diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 39e71b06419..f3c86733823 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -363,10 +363,17 @@ impl Handler for ControlPlane { request: GetOrCreateOpenShardsRequest, ctx: &ActorContext, ) -> Result { - Ok(self + let response = match self .ingest_controller .get_or_create_open_shards(request, &mut self.model, ctx.progress()) - .await) + .await + { + Ok(response) => response, + Err(error) => return Ok(Err(error)), + }; + // TODO: Why do we return an error if the indexing scheduler fails? + self.indexing_scheduler.on_index_change(&self.model).await?; + Ok(Ok(response)) } } diff --git a/quickwit/quickwit-control-plane/src/control_plane_model.rs b/quickwit/quickwit-control-plane/src/control_plane_model.rs index 98952293380..774ffb11693 100644 --- a/quickwit/quickwit-control-plane/src/control_plane_model.rs +++ b/quickwit/quickwit-control-plane/src/control_plane_model.rs @@ -296,7 +296,7 @@ impl ControlPlaneModel { &mut self, index_uid: &IndexUid, source_id: &SourceId, - shards: &[Shard], + shards: Vec, next_shard_id: NextShardId, ) { self.shard_table @@ -309,10 +309,10 @@ impl ControlPlaneModel { &self, index_uid: &IndexUid, source_id: &SourceId, - unavailable_ingesters: &FnvHashSet, + unavailable_leaders: &FnvHashSet, ) -> Option<(Vec, NextShardId)> { self.shard_table - .find_open_shards(index_uid, source_id, unavailable_ingesters) + .find_open_shards(index_uid, source_id, unavailable_leaders) } } @@ -375,7 +375,7 @@ impl ShardTable { &self, index_uid: &IndexUid, source_id: &SourceId, - unavailable_ingesters: &FnvHashSet, + unavailable_leaders: &FnvHashSet, ) -> Option<(Vec, NextShardId)> { let source_uid = SourceUid { index_uid: index_uid.clone(), @@ -387,7 +387,7 @@ impl ShardTable { .values() .filter(|shard| { shard.is_open() - && !unavailable_ingesters.contains(NodeIdRef::from_str(&shard.leader_id)) + && !unavailable_leaders.contains(NodeIdRef::from_str(&shard.leader_id)) }) .cloned() .collect(); @@ -406,7 +406,7 @@ impl ShardTable { &mut self, index_uid: &IndexUid, source_id: &SourceId, - shards: &[Shard], + shards: Vec, next_shard_id: NextShardId, ) { let source_uid = SourceUid { @@ -415,19 +415,21 @@ impl ShardTable { }; match self.table_entries.entry(source_uid) { Entry::Occupied(mut entry) => { + let table_entry = entry.get_mut(); + for shard in shards { - let table_entry = entry.get_mut(); - table_entry.shards.insert(shard.shard_id, shard.clone()); - table_entry.next_shard_id = next_shard_id; + // We only insert shards that we don't know about, the control plane knows more + // about the state of the shards than the metastore. + table_entry.shards.entry(shard.shard_id).or_insert(shard); } + table_entry.next_shard_id = next_shard_id; } // This should never happen if the control plane view is consistent with the state of // the metastore, so should we panic here? Warnings are most likely going to go // unnoticed. Entry::Vacant(entry) => { let shards: FnvHashMap = shards - .iter() - .cloned() + .into_iter() .map(|shard| (shard.shard_id, shard)) .collect(); let table_entry = ShardTableEntry { @@ -561,7 +563,7 @@ mod tests { shard_table.update_shards( &index_uid, &source_id, - &[shard_01, shard_02, shard_03.clone(), shard_04.clone()], + vec![shard_01, shard_02, shard_03.clone(), shard_04.clone()], 5, ); let (open_shards, next_shard_id) = shard_table @@ -597,7 +599,7 @@ mod tests { shard_state: ShardState::Open as i32, ..Default::default() }; - shard_table.update_shards(&index_uid_0, &source_id, &[shard_01.clone()], 2); + shard_table.update_shards(&index_uid_0, &source_id, vec![shard_01.clone()], 2); assert_eq!(shard_table.table_entries.len(), 1); @@ -625,7 +627,7 @@ mod tests { shard_table.update_shards( &index_uid_0, &source_id, - &[shard_01.clone(), shard_02.clone()], + vec![shard_01.clone(), shard_02.clone()], 3, ); @@ -675,8 +677,8 @@ mod tests { shard_state: ShardState::Open as i32, ..Default::default() }; - shard_table.update_shards(&index_uid_0, &source_id, &[shard_01, shard_02], 3); - shard_table.update_shards(&index_uid_0, &source_id, &[shard_11], 2); + shard_table.update_shards(&index_uid_0, &source_id, vec![shard_01, shard_02], 3); + shard_table.update_shards(&index_uid_0, &source_id, vec![shard_11], 2); let closed_shard_ids = shard_table.close_shards(&index_uid_0, &source_id, &[1, 2, 3]); assert_eq!(closed_shard_ids, &[1]); @@ -722,8 +724,13 @@ mod tests { shard_state: ShardState::Open as i32, ..Default::default() }; - shard_table.update_shards(&index_uid_0, &source_id, &[shard_01.clone(), shard_02], 3); - shard_table.update_shards(&index_uid_1, &source_id, &[shard_11], 2); + shard_table.update_shards( + &index_uid_0, + &source_id, + vec![shard_01.clone(), shard_02], + 3, + ); + shard_table.update_shards(&index_uid_1, &source_id, vec![shard_11], 2); shard_table.delete_shards(&index_uid_0, &source_id, &[2]); shard_table.delete_shards(&index_uid_1, &source_id, &[1]); diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index b2a51957427..55e4dfe7a1c 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -302,22 +302,26 @@ impl IngestController { let open_shards_response = progress .protect_future(self.metastore.open_shards(open_shards_request)) .await?; - for open_shards_subresponse in &open_shards_response.subresponses { + for open_shards_subresponse in open_shards_response.subresponses { let index_uid: IndexUid = open_shards_subresponse.index_uid.clone().into(); + let source_id = open_shards_subresponse.source_id; + model.update_shards( &index_uid, - &open_shards_subresponse.source_id, - &open_shards_subresponse.open_shards, + &source_id, + open_shards_subresponse.open_shards, open_shards_subresponse.next_shard_id, ); - } - for open_shards_subresponse in open_shards_response.subresponses { - let get_open_shards_subresponse = GetOpenShardsSubresponse { - index_uid: open_shards_subresponse.index_uid, - source_id: open_shards_subresponse.source_id, - open_shards: open_shards_subresponse.open_shards, - }; - get_open_shards_subresponses.push(get_open_shards_subresponse); + if let Some((open_shards, _next_shard_id)) = + model.find_open_shards(&index_uid, &source_id, &unavailable_leaders) + { + let get_open_shards_subresponse = GetOpenShardsSubresponse { + index_uid: index_uid.into(), + source_id, + open_shards, + }; + get_open_shards_subresponses.push(get_open_shards_subresponse); + } } } Ok(GetOrCreateOpenShardsResponse { @@ -634,7 +638,7 @@ mod tests { }, ]; - model.update_shards(&index_uid_0, &source_id.into(), &shards, 3); + model.update_shards(&index_uid_0, &source_id.into(), shards, 3); let request = GetOrCreateOpenShardsRequest { subrequests: Vec::new(), diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index e218331580f..7be031bf195 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -26,7 +26,9 @@ use anyhow::{bail, Context}; use async_trait::async_trait; use itertools::Itertools; use quickwit_actors::{ActorExitStatus, Mailbox}; -use quickwit_ingest::{decoded_mrecords, IngesterPool, MRecord, MultiFetchStream}; +use quickwit_ingest::{ + decoded_mrecords, FetchStreamError, IngesterPool, MRecord, MultiFetchStream, +}; use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint}; use quickwit_proto::ingest::ingester::{ FetchResponseV2, IngesterService, TruncateRequest, TruncateSubrequest, @@ -93,12 +95,21 @@ impl ClientId { } } +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] +enum ShardStatus { + #[default] + Active, + Eof, + Error, +} + #[derive(Debug, Eq, PartialEq)] struct AssignedShard { leader_id: NodeId, follower_id_opt: Option, partition_id: PartitionId, current_position_inclusive: Position, + status: ShardStatus, } /// Streams documents from a set of shards. @@ -175,6 +186,7 @@ impl IngestSource { batch_builder.force_commit(); } MRecord::Eof => { + assigned_shard.status = ShardStatus::Eof; break; } MRecord::Unknown => { @@ -194,6 +206,12 @@ impl IngestSource { Ok(()) } + fn process_fetch_stream_error(&mut self, fetch_stream_error: FetchStreamError) { + if let Some(shard) = self.assigned_shards.get_mut(&fetch_stream_error.shard_id) { + shard.status = ShardStatus::Error; + } + } + async fn truncate(&self, truncation_point: &[(ShardId, Position)]) { let mut per_ingester_truncate_subrequests: HashMap<&NodeId, Vec> = HashMap::new(); @@ -270,10 +288,11 @@ impl Source for IngestSource { break; } } - Ok(Err(error)) => { - error!(error=?error, "failed to fetch payload"); + Ok(Err(fetch_stream_error)) => { + self.process_fetch_stream_error(fetch_stream_error); } Err(_) => { + // The deadline has elapsed. break; } } @@ -373,7 +392,9 @@ impl Source for IngestSource { let from_position_exclusive = current_position_inclusive.clone(); let to_position_inclusive = Position::Eof; - if let Err(error) = ctx + let status = if from_position_exclusive == Position::Eof { + ShardStatus::Eof + } else if let Err(error) = ctx .protect_future(self.fetch_stream.subscribe( leader_id.clone(), follower_id_opt.clone(), @@ -386,8 +407,10 @@ impl Source for IngestSource { .await { error!(error=%error, "failed to subscribe to shard"); - continue; - } + ShardStatus::Error + } else { + ShardStatus::Active + }; truncation_point.push((shard_id, current_position_inclusive.clone())); let assigned_shard = AssignedShard { @@ -395,6 +418,7 @@ impl Source for IngestSource { follower_id_opt, partition_id, current_position_inclusive, + status, }; self.assigned_shards.insert(shard_id, assigned_shard); } @@ -584,6 +608,7 @@ mod tests { follower_id_opt: None, partition_id: 1u64.into(), current_position_inclusive: 11u64.into(), + status: ShardStatus::Active, }; assert_eq!(assigned_shard, &expected_assigned_shard); @@ -632,6 +657,7 @@ mod tests { follower_id_opt: None, partition_id: 1u64.into(), current_position_inclusive: 11u64.into(), + status: ShardStatus::Active, }, ); source.assigned_shards.insert( @@ -641,6 +667,7 @@ mod tests { follower_id_opt: None, partition_id: 2u64.into(), current_position_inclusive: 22u64.into(), + status: ShardStatus::Active, }, ); let fetch_response_tx = source.fetch_stream.fetch_response_tx(); @@ -808,6 +835,7 @@ mod tests { follower_id_opt: None, partition_id: 1u64.into(), current_position_inclusive: 11u64.into(), + status: ShardStatus::Active, }, ); source.assigned_shards.insert( @@ -817,6 +845,7 @@ mod tests { follower_id_opt: Some("test-ingester-1".into()), partition_id: 2u64.into(), current_position_inclusive: 22u64.into(), + status: ShardStatus::Active, }, ); source.assigned_shards.insert( @@ -826,6 +855,7 @@ mod tests { follower_id_opt: Some("test-ingester-0".into()), partition_id: 3u64.into(), current_position_inclusive: 33u64.into(), + status: ShardStatus::Active, }, ); source.assigned_shards.insert( @@ -835,6 +865,7 @@ mod tests { follower_id_opt: Some("test-ingester-3".into()), partition_id: 4u64.into(), current_position_inclusive: 44u64.into(), + status: ShardStatus::Active, }, ); source.assigned_shards.insert( @@ -844,6 +875,7 @@ mod tests { follower_id_opt: Some("test-ingester-3".into()), partition_id: 4u64.into(), current_position_inclusive: Position::Beginning, + status: ShardStatus::Active, }, ); let checkpoint = SourceCheckpoint::from_iter(vec![ diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 40d0d3ae37e..5bd3fc23997 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -214,6 +214,13 @@ impl FetchTask { } } +pub struct FetchStreamError { + pub index_uid: IndexUid, + pub source_id: SourceId, + pub shard_id: ShardId, + pub ingest_error: IngestV2Error, +} + /// Combines multiple fetch streams originating from different ingesters into a single stream. It /// tolerates the failure of ingesters and automatically fails over to replica shards. pub struct MultiFetchStream { @@ -221,8 +228,8 @@ pub struct MultiFetchStream { client_id: ClientId, ingester_pool: IngesterPool, fetch_task_handles: HashMap>, - fetch_response_rx: mpsc::Receiver>, - fetch_response_tx: mpsc::Sender>, + fetch_response_rx: mpsc::Receiver>, + fetch_response_tx: mpsc::Sender>, } impl MultiFetchStream { @@ -239,7 +246,7 @@ impl MultiFetchStream { } #[cfg(any(test, feature = "testsuite"))] - pub fn fetch_response_tx(&self) -> mpsc::Sender> { + pub fn fetch_response_tx(&self) -> mpsc::Sender> { self.fetch_response_tx.clone() } @@ -307,7 +314,7 @@ impl MultiFetchStream { /// # Cancel safety /// /// This method is cancel safe. - pub async fn next(&mut self) -> IngestV2Result { + pub async fn next(&mut self) -> Result { // Because we always hold a sender and never call `close()` on the receiver, the channel is // always open. self.fetch_response_rx @@ -363,7 +370,7 @@ async fn fault_tolerant_fetch_task( to_position_inclusive: Position, ingester_ids: Vec, ingester_pool: IngesterPool, - fetch_response_tx: mpsc::Sender>, + fetch_response_tx: mpsc::Sender>, ) { // TODO: We can probably simplify this code by breaking it into smaller functions. 'outer: for (ingester_idx, ingester_id) in ingester_ids.iter().enumerate() { @@ -393,7 +400,14 @@ async fn fault_tolerant_fetch_task( }; // Attempt to send the error to the consumer in a best-effort manner before // returning. - let _ = fetch_response_tx.send(Err(ingest_error)).await; + let fetch_stream_error = FetchStreamError { + index_uid, + source_id, + shard_id, + ingest_error, + }; + let _ = fetch_response_tx.send(Err(fetch_stream_error)).await; + return; } continue; } @@ -427,7 +441,14 @@ async fn fault_tolerant_fetch_task( error=%ingest_error, "failed to open fetch stream from ingester `{ingester_id}`: closing fetch stream" ); - let _ = fetch_response_tx.send(Err(ingest_error)).await; + let fetch_stream_error = FetchStreamError { + index_uid, + source_id, + shard_id, + ingest_error, + }; + let _ = fetch_response_tx.send(Err(fetch_stream_error)).await; + return; } continue; } @@ -466,7 +487,14 @@ async fn fault_tolerant_fetch_task( error=%ingest_error, "failed to fetch records from ingester `{ingester_id}`: closing fetch stream" ); - let _ = fetch_response_tx.send(Err(ingest_error)).await; + let fetch_stream_error = FetchStreamError { + index_uid, + source_id, + shard_id, + ingest_error, + }; + let _ = fetch_response_tx.send(Err(fetch_stream_error)).await; + return; } continue 'outer; } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs index 2940a6c4b4a..16e4317ab70 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mod.rs @@ -32,7 +32,7 @@ use quickwit_common::tower::Pool; use quickwit_proto::ingest::ingester::IngesterServiceClient; use quickwit_proto::types::NodeId; -pub use self::fetch::MultiFetchStream; +pub use self::fetch::{FetchStreamError, MultiFetchStream}; pub use self::ingester::Ingester; pub use self::mrecord::{decoded_mrecords, MRecord}; pub use self::router::IngestRouter;