Skip to content

Commit

Permalink
Control plane fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Oct 26, 2023
1 parent 34eb593 commit e9f538d
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 47 deletions.
11 changes: 9 additions & 2 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,17 @@ impl Handler<GetOrCreateOpenShardsRequest> for ControlPlane {
request: GetOrCreateOpenShardsRequest,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
Ok(self
let response = match self
.ingest_controller
.get_or_create_open_shards(request, &mut self.model, ctx.progress())
.await)
.await
{
Ok(response) => response,
Err(error) => return Ok(Err(error)),
};
// TODO: Why do we return an error if the indexing scheduler fails?
self.indexing_scheduler.on_index_change(&self.model).await?;
Ok(Ok(response))
}
}

Expand Down
43 changes: 25 additions & 18 deletions quickwit/quickwit-control-plane/src/control_plane_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl ControlPlaneModel {
&mut self,
index_uid: &IndexUid,
source_id: &SourceId,
shards: &[Shard],
shards: Vec<Shard>,
next_shard_id: NextShardId,
) {
self.shard_table
Expand All @@ -309,10 +309,10 @@ impl ControlPlaneModel {
&self,
index_uid: &IndexUid,
source_id: &SourceId,
unavailable_ingesters: &FnvHashSet<NodeId>,
unavailable_leaders: &FnvHashSet<NodeId>,
) -> Option<(Vec<Shard>, NextShardId)> {
self.shard_table
.find_open_shards(index_uid, source_id, unavailable_ingesters)
.find_open_shards(index_uid, source_id, unavailable_leaders)
}
}

Expand Down Expand Up @@ -375,7 +375,7 @@ impl ShardTable {
&self,
index_uid: &IndexUid,
source_id: &SourceId,
unavailable_ingesters: &FnvHashSet<NodeId>,
unavailable_leaders: &FnvHashSet<NodeId>,
) -> Option<(Vec<Shard>, NextShardId)> {
let source_uid = SourceUid {
index_uid: index_uid.clone(),
Expand All @@ -387,7 +387,7 @@ impl ShardTable {
.values()
.filter(|shard| {
shard.is_open()
&& !unavailable_ingesters.contains(NodeIdRef::from_str(&shard.leader_id))
&& !unavailable_leaders.contains(NodeIdRef::from_str(&shard.leader_id))
})
.cloned()
.collect();
Expand All @@ -406,7 +406,7 @@ impl ShardTable {
&mut self,
index_uid: &IndexUid,
source_id: &SourceId,
shards: &[Shard],
shards: Vec<Shard>,
next_shard_id: NextShardId,
) {
let source_uid = SourceUid {
Expand All @@ -415,19 +415,21 @@ impl ShardTable {
};
match self.table_entries.entry(source_uid) {
Entry::Occupied(mut entry) => {
let table_entry = entry.get_mut();

for shard in shards {
let table_entry = entry.get_mut();
table_entry.shards.insert(shard.shard_id, shard.clone());
table_entry.next_shard_id = next_shard_id;
// We only insert shards that we don't know about, the control plane knows more
// about the state of the shards than the metastore.
table_entry.shards.entry(shard.shard_id).or_insert(shard);
}
table_entry.next_shard_id = next_shard_id;
}
// This should never happen if the control plane view is consistent with the state of
// the metastore, so should we panic here? Warnings are most likely going to go
// unnoticed.
Entry::Vacant(entry) => {
let shards: FnvHashMap<ShardId, Shard> = shards
.iter()
.cloned()
.into_iter()
.map(|shard| (shard.shard_id, shard))
.collect();
let table_entry = ShardTableEntry {
Expand Down Expand Up @@ -561,7 +563,7 @@ mod tests {
shard_table.update_shards(
&index_uid,
&source_id,
&[shard_01, shard_02, shard_03.clone(), shard_04.clone()],
vec![shard_01, shard_02, shard_03.clone(), shard_04.clone()],
5,
);
let (open_shards, next_shard_id) = shard_table
Expand Down Expand Up @@ -597,7 +599,7 @@ mod tests {
shard_state: ShardState::Open as i32,
..Default::default()
};
shard_table.update_shards(&index_uid_0, &source_id, &[shard_01.clone()], 2);
shard_table.update_shards(&index_uid_0, &source_id, vec![shard_01.clone()], 2);

assert_eq!(shard_table.table_entries.len(), 1);

Expand Down Expand Up @@ -625,7 +627,7 @@ mod tests {
shard_table.update_shards(
&index_uid_0,
&source_id,
&[shard_01.clone(), shard_02.clone()],
vec![shard_01.clone(), shard_02.clone()],
3,
);

Expand Down Expand Up @@ -675,8 +677,8 @@ mod tests {
shard_state: ShardState::Open as i32,
..Default::default()
};
shard_table.update_shards(&index_uid_0, &source_id, &[shard_01, shard_02], 3);
shard_table.update_shards(&index_uid_0, &source_id, &[shard_11], 2);
shard_table.update_shards(&index_uid_0, &source_id, vec![shard_01, shard_02], 3);
shard_table.update_shards(&index_uid_0, &source_id, vec![shard_11], 2);

let closed_shard_ids = shard_table.close_shards(&index_uid_0, &source_id, &[1, 2, 3]);
assert_eq!(closed_shard_ids, &[1]);
Expand Down Expand Up @@ -722,8 +724,13 @@ mod tests {
shard_state: ShardState::Open as i32,
..Default::default()
};
shard_table.update_shards(&index_uid_0, &source_id, &[shard_01.clone(), shard_02], 3);
shard_table.update_shards(&index_uid_1, &source_id, &[shard_11], 2);
shard_table.update_shards(
&index_uid_0,
&source_id,
vec![shard_01.clone(), shard_02],
3,
);
shard_table.update_shards(&index_uid_1, &source_id, vec![shard_11], 2);
shard_table.delete_shards(&index_uid_0, &source_id, &[2]);
shard_table.delete_shards(&index_uid_1, &source_id, &[1]);

Expand Down
28 changes: 16 additions & 12 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,22 +302,26 @@ impl IngestController {
let open_shards_response = progress
.protect_future(self.metastore.open_shards(open_shards_request))
.await?;
for open_shards_subresponse in &open_shards_response.subresponses {
for open_shards_subresponse in open_shards_response.subresponses {
let index_uid: IndexUid = open_shards_subresponse.index_uid.clone().into();
let source_id = open_shards_subresponse.source_id;

model.update_shards(
&index_uid,
&open_shards_subresponse.source_id,
&open_shards_subresponse.open_shards,
&source_id,
open_shards_subresponse.open_shards,
open_shards_subresponse.next_shard_id,
);
}
for open_shards_subresponse in open_shards_response.subresponses {
let get_open_shards_subresponse = GetOpenShardsSubresponse {
index_uid: open_shards_subresponse.index_uid,
source_id: open_shards_subresponse.source_id,
open_shards: open_shards_subresponse.open_shards,
};
get_open_shards_subresponses.push(get_open_shards_subresponse);
if let Some((open_shards, _next_shard_id)) =
model.find_open_shards(&index_uid, &source_id, &unavailable_leaders)
{
let get_open_shards_subresponse = GetOpenShardsSubresponse {
index_uid: index_uid.into(),
source_id,
open_shards,
};
get_open_shards_subresponses.push(get_open_shards_subresponse);
}
}
}
Ok(GetOrCreateOpenShardsResponse {
Expand Down Expand Up @@ -634,7 +638,7 @@ mod tests {
},
];

model.update_shards(&index_uid_0, &source_id.into(), &shards, 3);
model.update_shards(&index_uid_0, &source_id.into(), shards, 3);

let request = GetOrCreateOpenShardsRequest {
subrequests: Vec::new(),
Expand Down
44 changes: 38 additions & 6 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use anyhow::{bail, Context};
use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_ingest::{decoded_mrecords, IngesterPool, MRecord, MultiFetchStream};
use quickwit_ingest::{
decoded_mrecords, FetchStreamError, IngesterPool, MRecord, MultiFetchStream,
};
use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpoint};
use quickwit_proto::ingest::ingester::{
FetchResponseV2, IngesterService, TruncateRequest, TruncateSubrequest,
Expand Down Expand Up @@ -93,12 +95,21 @@ impl ClientId {
}
}

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
enum ShardStatus {
#[default]
Active,
Eof,
Error,
}

#[derive(Debug, Eq, PartialEq)]
struct AssignedShard {
leader_id: NodeId,
follower_id_opt: Option<NodeId>,
partition_id: PartitionId,
current_position_inclusive: Position,
status: ShardStatus,
}

/// Streams documents from a set of shards.
Expand Down Expand Up @@ -175,6 +186,7 @@ impl IngestSource {
batch_builder.force_commit();
}
MRecord::Eof => {
assigned_shard.status = ShardStatus::Eof;
break;
}
MRecord::Unknown => {
Expand All @@ -194,6 +206,12 @@ impl IngestSource {
Ok(())
}

fn process_fetch_stream_error(&mut self, fetch_stream_error: FetchStreamError) {
if let Some(shard) = self.assigned_shards.get_mut(&fetch_stream_error.shard_id) {
shard.status = ShardStatus::Error;
}
}

async fn truncate(&self, truncation_point: &[(ShardId, Position)]) {
let mut per_ingester_truncate_subrequests: HashMap<&NodeId, Vec<TruncateSubrequest>> =
HashMap::new();
Expand Down Expand Up @@ -270,10 +288,11 @@ impl Source for IngestSource {
break;
}
}
Ok(Err(error)) => {
error!(error=?error, "failed to fetch payload");
Ok(Err(fetch_stream_error)) => {
self.process_fetch_stream_error(fetch_stream_error);
}
Err(_) => {
// The deadline has elapsed.
break;
}
}
Expand Down Expand Up @@ -373,7 +392,9 @@ impl Source for IngestSource {
let from_position_exclusive = current_position_inclusive.clone();
let to_position_inclusive = Position::Eof;

if let Err(error) = ctx
let status = if from_position_exclusive == Position::Eof {
ShardStatus::Eof
} else if let Err(error) = ctx
.protect_future(self.fetch_stream.subscribe(
leader_id.clone(),
follower_id_opt.clone(),
Expand All @@ -386,15 +407,18 @@ impl Source for IngestSource {
.await
{
error!(error=%error, "failed to subscribe to shard");
continue;
}
ShardStatus::Error
} else {
ShardStatus::Active
};
truncation_point.push((shard_id, current_position_inclusive.clone()));

let assigned_shard = AssignedShard {
leader_id,
follower_id_opt,
partition_id,
current_position_inclusive,
status,
};
self.assigned_shards.insert(shard_id, assigned_shard);
}
Expand Down Expand Up @@ -584,6 +608,7 @@ mod tests {
follower_id_opt: None,
partition_id: 1u64.into(),
current_position_inclusive: 11u64.into(),
status: ShardStatus::Active,
};
assert_eq!(assigned_shard, &expected_assigned_shard);

Expand Down Expand Up @@ -632,6 +657,7 @@ mod tests {
follower_id_opt: None,
partition_id: 1u64.into(),
current_position_inclusive: 11u64.into(),
status: ShardStatus::Active,
},
);
source.assigned_shards.insert(
Expand All @@ -641,6 +667,7 @@ mod tests {
follower_id_opt: None,
partition_id: 2u64.into(),
current_position_inclusive: 22u64.into(),
status: ShardStatus::Active,
},
);
let fetch_response_tx = source.fetch_stream.fetch_response_tx();
Expand Down Expand Up @@ -808,6 +835,7 @@ mod tests {
follower_id_opt: None,
partition_id: 1u64.into(),
current_position_inclusive: 11u64.into(),
status: ShardStatus::Active,
},
);
source.assigned_shards.insert(
Expand All @@ -817,6 +845,7 @@ mod tests {
follower_id_opt: Some("test-ingester-1".into()),
partition_id: 2u64.into(),
current_position_inclusive: 22u64.into(),
status: ShardStatus::Active,
},
);
source.assigned_shards.insert(
Expand All @@ -826,6 +855,7 @@ mod tests {
follower_id_opt: Some("test-ingester-0".into()),
partition_id: 3u64.into(),
current_position_inclusive: 33u64.into(),
status: ShardStatus::Active,
},
);
source.assigned_shards.insert(
Expand All @@ -835,6 +865,7 @@ mod tests {
follower_id_opt: Some("test-ingester-3".into()),
partition_id: 4u64.into(),
current_position_inclusive: 44u64.into(),
status: ShardStatus::Active,
},
);
source.assigned_shards.insert(
Expand All @@ -844,6 +875,7 @@ mod tests {
follower_id_opt: Some("test-ingester-3".into()),
partition_id: 4u64.into(),
current_position_inclusive: Position::Beginning,
status: ShardStatus::Active,
},
);
let checkpoint = SourceCheckpoint::from_iter(vec![
Expand Down
Loading

0 comments on commit e9f538d

Please sign in to comment.