Skip to content

Commit

Permalink
Call truncate on leaders AND followers
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Oct 4, 2023
1 parent 1f9ddbb commit 72686e4
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 174 deletions.
80 changes: 57 additions & 23 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl ClientId {
#[derive(Debug, Eq, PartialEq)]
struct AssignedShard {
leader_id: NodeId,
follower_id_opt: Option<NodeId>,
partition_id: PartitionId,
current_position_inclusive: Position,
}
Expand Down Expand Up @@ -180,18 +181,14 @@ impl IngestSource {
}

async fn truncate(&self, truncation_point: &[(ShardId, Position)]) {
let mut per_leader_truncate_subrequests: HashMap<&NodeId, Vec<TruncateSubrequest>> =
let mut per_ingester_truncate_subrequests: HashMap<&NodeId, Vec<TruncateSubrequest>> =
HashMap::new();

for (shard_id, truncate_position) in truncation_point {
if matches!(truncate_position, Position::Beginning) {
continue;
}
let Some(leader_id) = self
.assigned_shards
.get(shard_id)
.map(|shard| &shard.leader_id)
else {
let Some(shard) = self.assigned_shards.get(shard_id) else {
warn!(
"failed to truncate shard: shard `{}` is no longer assigned",
shard_id
Expand All @@ -208,21 +205,27 @@ impl IngestSource {
shard_id: *shard_id,
to_position_inclusive,
};
per_leader_truncate_subrequests
.entry(leader_id)
if let Some(follower_id) = &shard.follower_id_opt {
per_ingester_truncate_subrequests
.entry(follower_id)
.or_default()
.push(truncate_subrequest.clone());
}
per_ingester_truncate_subrequests
.entry(&shard.leader_id)
.or_default()
.push(truncate_subrequest);
}
for (leader_id, truncate_subrequests) in per_leader_truncate_subrequests {
let Some(mut ingester) = self.ingester_pool.get(leader_id) else {
for (ingester_id, truncate_subrequests) in per_ingester_truncate_subrequests {
let Some(mut ingester) = self.ingester_pool.get(ingester_id) else {
warn!(
"failed to truncate shard: ingester `{}` is unavailable",
leader_id
ingester_id
);
continue;
};
let truncate_request = TruncateRequest {
leader_id: leader_id.clone().into(),
ingester_id: ingester_id.clone().into(),
subrequests: truncate_subrequests,
};
let truncate_future = async move {
Expand Down Expand Up @@ -349,9 +352,7 @@ impl Source for IngestSource {

for acquired_shard in acquire_shards_subresponse.acquired_shards {
let leader_id: NodeId = acquired_shard.leader_id.into();
let follower_id: Option<NodeId> = acquired_shard
.follower_id
.map(|follower_id| follower_id.into());
let follower_id_opt: Option<NodeId> = acquired_shard.follower_id.map(Into::into);
let index_uid: IndexUid = acquired_shard.index_uid.into();
let source_id: SourceId = acquired_shard.source_id;
let shard_id = acquired_shard.shard_id;
Expand All @@ -364,7 +365,7 @@ impl Source for IngestSource {
if let Err(error) = ctx
.protect_future(self.fetch_stream.subscribe(
leader_id.clone(),
follower_id.clone(),
follower_id_opt.clone(),
index_uid,
source_id,
shard_id,
Expand All @@ -380,6 +381,7 @@ impl Source for IngestSource {

let assigned_shard = AssignedShard {
leader_id,
follower_id_opt,
partition_id,
current_position_inclusive,
};
Expand Down Expand Up @@ -510,7 +512,7 @@ mod tests {
.expect_truncate()
.once()
.returning(|request| {
assert_eq!(request.leader_id, "test-ingester-0");
assert_eq!(request.ingester_id, "test-ingester-0");
assert_eq!(request.subrequests.len(), 1);

let subrequest = &request.subrequests[0];
Expand Down Expand Up @@ -581,6 +583,7 @@ mod tests {
let assigned_shard = source.assigned_shards.get(&1).unwrap();
let expected_assigned_shard = AssignedShard {
leader_id: "test-ingester-0".into(),
follower_id_opt: None,
partition_id: 1u64.into(),
current_position_inclusive: Position::from(11u64),
};
Expand Down Expand Up @@ -628,6 +631,7 @@ mod tests {
1,
AssignedShard {
leader_id: "test-ingester-0".into(),
follower_id_opt: None,
partition_id: 1u64.into(),
current_position_inclusive: Position::from(11u64),
},
Expand All @@ -636,6 +640,7 @@ mod tests {
2,
AssignedShard {
leader_id: "test-ingester-1".into(),
follower_id_opt: None,
partition_id: 2u64.into(),
current_position_inclusive: Position::from(22u64),
},
Expand Down Expand Up @@ -718,8 +723,8 @@ mod tests {
.expect_truncate()
.once()
.returning(|request| {
assert_eq!(request.leader_id, "test-ingester-0");
assert_eq!(request.subrequests.len(), 2);
assert_eq!(request.ingester_id, "test-ingester-0");
assert_eq!(request.subrequests.len(), 3);

let subrequest_0 = &request.subrequests[0];
assert_eq!(subrequest_0.shard_id, 1);
Expand All @@ -729,6 +734,10 @@ mod tests {
assert_eq!(subrequest_1.shard_id, 2);
assert_eq!(subrequest_1.to_position_inclusive, 22);

let subrequest_2 = &request.subrequests[2];
assert_eq!(subrequest_2.shard_id, 3);
assert_eq!(subrequest_2.to_position_inclusive, 33);

Ok(TruncateResponse {})
});
let ingester_0: IngesterServiceClient = ingester_mock_0.into();
Expand All @@ -739,18 +748,39 @@ mod tests {
.expect_truncate()
.once()
.returning(|request| {
assert_eq!(request.leader_id, "test-ingester-1");
assert_eq!(request.subrequests.len(), 1);
assert_eq!(request.ingester_id, "test-ingester-1");
assert_eq!(request.subrequests.len(), 2);

let subrequest_0 = &request.subrequests[0];
assert_eq!(subrequest_0.shard_id, 3);
assert_eq!(subrequest_0.to_position_inclusive, 33);
assert_eq!(subrequest_0.shard_id, 1);
assert_eq!(subrequest_0.to_position_inclusive, 11);

let subrequest_1 = &request.subrequests[1];
assert_eq!(subrequest_1.shard_id, 2);
assert_eq!(subrequest_1.to_position_inclusive, 22);

Ok(TruncateResponse {})
});
let ingester_1: IngesterServiceClient = ingester_mock_1.into();
ingester_pool.insert("test-ingester-1".into(), ingester_1.clone());

let mut ingester_mock_3 = IngesterServiceClient::mock();
ingester_mock_3
.expect_truncate()
.once()
.returning(|request| {
assert_eq!(request.ingester_id, "test-ingester-3");
assert_eq!(request.subrequests.len(), 1);

let subrequest_0 = &request.subrequests[0];
assert_eq!(subrequest_0.shard_id, 4);
assert_eq!(subrequest_0.to_position_inclusive, 44);

Ok(TruncateResponse {})
});
let ingester_3: IngesterServiceClient = ingester_mock_3.into();
ingester_pool.insert("test-ingester-3".into(), ingester_3.clone());

let runtime_args = Arc::new(SourceRuntimeArgs {
pipeline_id,
source_config,
Expand All @@ -774,6 +804,7 @@ mod tests {
1,
AssignedShard {
leader_id: "test-ingester-0".into(),
follower_id_opt: None,
partition_id: 1u64.into(),
current_position_inclusive: Position::from(11u64),
},
Expand All @@ -782,6 +813,7 @@ mod tests {
2,
AssignedShard {
leader_id: "test-ingester-0".into(),
follower_id_opt: Some("test-ingester-1".into()),
partition_id: 2u64.into(),
current_position_inclusive: Position::from(22u64),
},
Expand All @@ -790,6 +822,7 @@ mod tests {
3,
AssignedShard {
leader_id: "test-ingester-1".into(),
follower_id_opt: Some("test-ingester-0".into()),
partition_id: 3u64.into(),
current_position_inclusive: Position::from(33u64),
},
Expand All @@ -798,6 +831,7 @@ mod tests {
4,
AssignedShard {
leader_id: "test-ingester-2".into(),
follower_id_opt: Some("test-ingester-3".into()),
partition_id: 4u64.into(),
current_position_inclusive: Position::from(44u64),
},
Expand Down
55 changes: 18 additions & 37 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use quickwit_proto::ingest::ingester::{
OpenReplicationStreamResponse, PersistFailure, PersistFailureKind, PersistRequest,
PersistResponse, PersistSuccess, PingRequest, PingResponse, ReplicateRequest,
ReplicateSubrequest, SynReplicationMessage, TruncateRequest, TruncateResponse,
TruncateSubrequest,
};
use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState};
use quickwit_proto::metastore::{
Expand Down Expand Up @@ -211,7 +210,7 @@ impl Ingester {
state.primary_shards.insert(queue_id, primary_shard);
} else {
let replica_shard = ReplicaShard {
leader_id: success.leader_id.into(),
_leader_id: success.leader_id.into(),
shard_state: ShardState::Closed,
publish_position_inclusive,
replica_position_inclusive: current_position,
Expand Down Expand Up @@ -531,7 +530,6 @@ impl IngesterService for Ingester {
let replication_task_handle = ReplicationTask::spawn(
leader_id,
follower_id,
self.metastore.clone(),
self.mrecordlog.clone(),
self.state.clone(),
syn_replication_stream,
Expand Down Expand Up @@ -586,18 +584,16 @@ impl IngesterService for Ingester {
&mut self,
truncate_request: TruncateRequest,
) -> IngestV2Result<TruncateResponse> {
if truncate_request.leader_id != self.self_node_id {
if truncate_request.ingester_id != self.self_node_id {
return Err(IngestV2Error::Internal(format!(
"routing error: expected ingester `{}`, got `{}`",
truncate_request.leader_id, self.self_node_id
self.self_node_id, truncate_request.ingester_id,
)));
}
let mut gc_candidates: Vec<QueueId> = Vec::new();
let mut mrecordlog_guard = self.mrecordlog.write().await;
let mut state_guard = self.state.write().await;

let mut truncate_subrequests: HashMap<NodeId, Vec<TruncateSubrequest>> = HashMap::new();

for subrequest in truncate_request.subrequests {
let queue_id = subrequest.queue_id();

Expand All @@ -613,40 +609,24 @@ impl IngesterService for Ingester {
if primary_shard.is_gc_candidate() {
gc_candidates.push(queue_id.clone());
}
continue;
}
if let Some(replica_shard) = state_guard.replica_shards.get(&queue_id) {
truncate_subrequests
.entry(replica_shard.leader_id.clone())
.or_default()
.push(subrequest);
}
}
let mut truncate_futures = FuturesUnordered::new();
if let Some(replica_shard) = state_guard.replica_shards.get_mut(&queue_id) {
mrecordlog_guard
.truncate(&queue_id, subrequest.to_position_inclusive)
.await
.map_err(|error| {
IngestV2Error::Internal(format!("failed to truncate queue: {error:?}"))
})?;
replica_shard.set_publish_position_inclusive(subrequest.to_position_inclusive);

for (follower_id, subrequests) in truncate_subrequests {
let leader_id = self.self_node_id.clone().into();
let truncate_request = TruncateRequest {
leader_id,
subrequests,
};
let replication_client = state_guard
.replication_clients
.get(&follower_id)
.expect("The replication client should be initialized.")
.clone();
truncate_futures
.push(async move { replication_client.truncate(truncate_request).await });
if replica_shard.is_gc_candidate() {
gc_candidates.push(queue_id.clone());
}
}
}
// Drop the write lock AFTER pushing the replicate request into the replication client
// channel to ensure that sequential writes in mrecordlog turn into sequential replicate
// requests in the same order.
drop(state_guard);

while let Some(truncate_result) = truncate_futures.next().await {
// TODO: Handle errors.
truncate_result?;
}
// TODO: Update publish positions of truncated shards and then delete them when
let truncate_response = TruncateResponse {};
Ok(truncate_response)
}
Expand All @@ -667,6 +647,7 @@ mod tests {

use quickwit_proto::ingest::ingester::{
IngesterServiceGrpcServer, IngesterServiceGrpcServerAdapter, PersistSubrequest,
TruncateSubrequest,
};
use quickwit_proto::ingest::DocBatchV2;
use quickwit_proto::metastore::{
Expand Down Expand Up @@ -1406,7 +1387,7 @@ mod tests {
drop(mrecordlog_guard);

let truncate_request = TruncateRequest {
leader_id: self_node_id.to_string(),
ingester_id: self_node_id.to_string(),
subrequests: vec![
TruncateSubrequest {
index_uid: "test-index:0".to_string(),
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl PrimaryShard {
/// Records the state of a replica shard managed by a follower. See [`PrimaryShard`] for more
/// details about the fields.
pub(super) struct ReplicaShard {
pub leader_id: NodeId,
pub _leader_id: NodeId,
pub shard_state: ShardState,
pub publish_position_inclusive: Position,
pub replica_position_inclusive: Position,
Expand Down Expand Up @@ -267,7 +267,7 @@ impl ReplicaShard {
let (shard_status_tx, shard_status_rx) = watch::channel(shard_status);

Self {
leader_id: leader_id.into(),
_leader_id: leader_id.into(),
shard_state,
publish_position_inclusive,
replica_position_inclusive,
Expand Down
Loading

0 comments on commit 72686e4

Please sign in to comment.