Skip to content

Commit

Permalink
Truncate shards via Chitchat (#4240)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Dec 6, 2023
1 parent 8e775e3 commit 972e447
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 150 deletions.
69 changes: 48 additions & 21 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,15 @@ impl IngestSource {
}
}

async fn truncate(&mut self, truncate_positions: Vec<(ShardId, Position)>) {
async fn truncate(&mut self, truncate_up_to_positions: Vec<(ShardId, Position)>) {
let shard_positions_update = LocalShardPositionsUpdate::new(
self.client_id.source_uid.clone(),
truncate_positions.clone(),
truncate_up_to_positions.clone(),
);

// Let's record all shards that have reached Eof as complete.
for (shard, truncate_position) in &truncate_positions {
if truncate_position.is_eof() {
for (shard, truncate_up_to_position_inclusive) in &truncate_up_to_positions {
if truncate_up_to_position_inclusive.is_eof() {
if let Some(assigned_shard) = self.assigned_shards.get_mut(shard) {
assigned_shard.status = IndexingStatus::Complete;
}
Expand All @@ -305,8 +305,8 @@ impl IngestSource {
Vec<TruncateShardsSubrequest>,
> = FnvHashMap::default();

for (shard_id, to_position_exclusive) in truncate_positions {
if matches!(to_position_exclusive, Position::Beginning) {
for (shard_id, truncate_up_to_position_inclusive) in truncate_up_to_positions {
if matches!(truncate_up_to_position_inclusive, Position::Beginning) {
continue;
}
let Some(shard) = self.assigned_shards.get(&shard_id) else {
Expand All @@ -317,7 +317,7 @@ impl IngestSource {
index_uid: self.client_id.source_uid.index_uid.clone().into(),
source_id: self.client_id.source_uid.source_id.clone(),
shard_id,
to_position_inclusive: Some(to_position_exclusive.clone()),
truncate_up_to_position_inclusive: Some(truncate_up_to_position_inclusive),
};
if let Some(follower_id) = &shard.follower_id_opt {
per_ingester_truncate_subrequests
Expand Down Expand Up @@ -861,7 +861,10 @@ mod tests {
let subrequest = &request.subrequests[0];
assert_eq!(subrequest.index_uid, "test-index:0");
assert_eq!(subrequest.source_id, "test-source");
assert_eq!(subrequest.to_position_inclusive(), Position::offset(10u64));
assert_eq!(
subrequest.truncate_up_to_position_inclusive(),
Position::offset(10u64)
);

let response = TruncateShardsResponse {};
Ok(response)
Expand All @@ -878,7 +881,10 @@ mod tests {
let subrequest = &request.subrequests[0];
assert_eq!(subrequest.index_uid, "test-index:0");
assert_eq!(subrequest.source_id, "test-source");
assert_eq!(subrequest.to_position_inclusive(), Position::offset(11u64));
assert_eq!(
subrequest.truncate_up_to_position_inclusive(),
Position::offset(11u64)
);

Ok(TruncateShardsResponse {})
});
Expand All @@ -896,12 +902,18 @@ mod tests {
let subrequest = &request.subrequests[0];
assert_eq!(subrequest.index_uid, "test-index:0");
assert_eq!(subrequest.source_id, "test-source");
assert_eq!(subrequest.to_position_inclusive(), Position::offset(11u64));
assert_eq!(
subrequest.truncate_up_to_position_inclusive(),
Position::offset(11u64)
);

let subrequest = &request.subrequests[1];
assert_eq!(subrequest.index_uid, "test-index:0");
assert_eq!(subrequest.source_id, "test-source");
assert_eq!(subrequest.to_position_inclusive(), Position::offset(12u64));
assert_eq!(
subrequest.truncate_up_to_position_inclusive(),
Position::offset(12u64)
);

let response = TruncateShardsResponse {};
Ok(response)
Expand Down Expand Up @@ -1091,14 +1103,17 @@ mod tests {
assert_eq!(subrequest_0.index_uid, "test-index:0");
assert_eq!(subrequest_0.source_id, "test-source");
assert_eq!(subrequest_0.shard_id, 1);
assert_eq!(subrequest_0.to_position_inclusive(), Position::eof(11u64));
assert_eq!(
subrequest_0.truncate_up_to_position_inclusive(),
Position::eof(11u64)
);

let subrequest_1 = &request.subrequests[1];
assert_eq!(subrequest_1.index_uid, "test-index:0");
assert_eq!(subrequest_1.source_id, "test-source");
assert_eq!(subrequest_1.shard_id, 2);
assert_eq!(
subrequest_1.to_position_inclusive(),
subrequest_1.truncate_up_to_position_inclusive(),
Position::Beginning.as_eof()
);

Expand Down Expand Up @@ -1250,13 +1265,19 @@ mod tests {
assert_eq!(subrequest.index_uid, "test-index:0");
assert_eq!(subrequest.source_id, "test-source");
assert_eq!(subrequest.shard_id, 1);
assert_eq!(subrequest.to_position_inclusive(), Position::offset(11u64));
assert_eq!(
subrequest.truncate_up_to_position_inclusive(),
Position::offset(11u64)
);

let subrequest = &request.subrequests[1];
assert_eq!(subrequest.index_uid, "test-index:0");
assert_eq!(subrequest.source_id, "test-source");
assert_eq!(subrequest.shard_id, 2);
assert_eq!(subrequest.to_position_inclusive(), Position::eof(22u64));
assert_eq!(
subrequest.truncate_up_to_position_inclusive(),
Position::eof(22u64)
);

let response = TruncateShardsResponse {};
Ok(response)
Expand Down Expand Up @@ -1512,20 +1533,23 @@ mod tests {
let subrequest_0 = &request.subrequests[0];
assert_eq!(subrequest_0.shard_id, 1);
assert_eq!(
subrequest_0.to_position_inclusive(),
subrequest_0.truncate_up_to_position_inclusive(),
Position::offset(11u64)
);

let subrequest_1 = &request.subrequests[1];
assert_eq!(subrequest_1.shard_id, 2);
assert_eq!(
subrequest_1.to_position_inclusive(),
subrequest_1.truncate_up_to_position_inclusive(),
Position::offset(22u64)
);

let subrequest_2 = &request.subrequests[2];
assert_eq!(subrequest_2.shard_id, 3);
assert_eq!(subrequest_2.to_position_inclusive(), Position::eof(33u64));
assert_eq!(
subrequest_2.truncate_up_to_position_inclusive(),
Position::eof(33u64)
);

Ok(TruncateShardsResponse {})
});
Expand All @@ -1543,13 +1567,16 @@ mod tests {
let subrequest_0 = &request.subrequests[0];
assert_eq!(subrequest_0.shard_id, 2);
assert_eq!(
subrequest_0.to_position_inclusive(),
subrequest_0.truncate_up_to_position_inclusive(),
Position::offset(22u64)
);

let subrequest_1 = &request.subrequests[1];
assert_eq!(subrequest_1.shard_id, 3);
assert_eq!(subrequest_1.to_position_inclusive(), Position::eof(33u64));
assert_eq!(
subrequest_1.truncate_up_to_position_inclusive(),
Position::eof(33u64)
);

Ok(TruncateShardsResponse {})
});
Expand All @@ -1567,7 +1594,7 @@ mod tests {
let subrequest_0 = &request.subrequests[0];
assert_eq!(subrequest_0.shard_id, 4);
assert_eq!(
subrequest_0.to_position_inclusive(),
subrequest_0.truncate_up_to_position_inclusive(),
Position::offset(44u64)
);

Expand Down
Loading

0 comments on commit 972e447

Please sign in to comment.