Skip to content

Commit

Permalink
Delete shards on truncate EOF (#4238)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Dec 5, 2023
1 parent f08dc8b commit 8e775e3
Show file tree
Hide file tree
Showing 31 changed files with 1,474 additions and 1,253 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ services:
# It is not an official docker image
# if we prefer we can build a docker from the official docker image (gcloud cli)
# and install the pubsub emulator https://cloud.google.com/pubsub/docs/emulator
image: thekevjames/gcloud-pubsub-emulator:${GCLOUD_EMULATOR:-7555256f2c}
image: thekevjames/gcloud-pubsub-emulator:${GCLOUD_EMULATOR:-455.0.0}
container_name: gcp-pubsub-emulator
ports:
- "${MAP_HOST_GCLOUD_EMULATOR:-127.0.0.1}:8681:8681"
Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ where
.iter()
.map(|(partition_id, position)| CheckpointRow {
partition_id: partition_id.0.to_string(),
offset: position.as_str().to_string(),
offset: position.to_string(),
})
.sorted_by(|left, right| left.partition_id.cmp(&right.partition_id));
let checkpoint_table = make_table("Checkpoint", checkpoint_rows, false);
Expand Down Expand Up @@ -734,7 +734,9 @@ mod tests {

let checkpoint: SourceCheckpoint = vec![("shard-000", ""), ("shard-001", "1234567890")]
.into_iter()
.map(|(partition_id, offset)| (PartitionId::from(partition_id), Position::from(offset)))
.map(|(partition_id, offset)| {
(PartitionId::from(partition_id), Position::offset(offset))
})
.collect();
let sources = vec![SourceConfig {
source_id: "foo-source".to_string(),
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-common/src/tower/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ mod tests {
.call(Request { cost: 1 })
.await
.unwrap();
assert!(now.elapsed() < Duration::from_millis(1));
// The request should go through immediately but in some rare instance the test is slow to
// run and the call to `call` takes more than 1 ms.
assert!(now.elapsed() < Duration::from_millis(5));

let now = Instant::now();
// The first request goes through, but the second one is rate limited.
Expand Down
13 changes: 6 additions & 7 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use quickwit_proto::metastore::{
DeleteSourceRequest, EmptyResponse, MetastoreError, MetastoreService, MetastoreServiceClient,
ToggleSourceRequest,
};
use quickwit_proto::types::{IndexUid, NodeId, Position, ShardId, SourceUid};
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid};
use serde::Serialize;
use tracing::error;

Expand Down Expand Up @@ -195,9 +195,7 @@ impl Handler<ShardPositionsUpdate> for ControlPlane {
let shard_ids_to_close: Vec<ShardId> = shard_positions_update
.shard_positions
.into_iter()
.filter(|(shard_id, position)| {
(position == &Position::Eof) && known_shard_ids.contains(shard_id)
})
.filter(|(shard_id, position)| position.is_eof() && known_shard_ids.contains(shard_id))
.map(|(shard_id, _position)| shard_id)
.collect();
if shard_ids_to_close.is_empty() {
Expand Down Expand Up @@ -538,6 +536,7 @@ mod tests {
DeleteShardsResponse, EntityKind, ListIndexesMetadataRequest, ListIndexesMetadataResponse,
ListShardsRequest, ListShardsResponse, ListShardsSubresponse, MetastoreError, SourceType,
};
use quickwit_proto::types::Position;

use super::*;
use crate::IndexerNodeInfo;
Expand Down Expand Up @@ -1114,7 +1113,7 @@ mod tests {
control_plane_mailbox
.ask(ShardPositionsUpdate {
source_uid: source_uid.clone(),
shard_positions: vec![(17, 1000u64.into())],
shard_positions: vec![(17, Position::offset(1_000u64))],
})
.await
.unwrap();
Expand All @@ -1137,7 +1136,7 @@ mod tests {
control_plane_mailbox
.ask(ShardPositionsUpdate {
source_uid,
shard_positions: vec![(17, Position::Eof)],
shard_positions: vec![(17, Position::eof(1_000u64))],
})
.await
.unwrap();
Expand Down Expand Up @@ -1240,7 +1239,7 @@ mod tests {
control_plane_mailbox
.ask(ShardPositionsUpdate {
source_uid: source_uid.clone(),
shard_positions: vec![(17, Position::Eof)],
shard_positions: vec![(17, Position::eof(1_000u64))],
})
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ mod tests {
suggest_truncate_checkpoints[0]
.position_for_partition(&PartitionId::default())
.unwrap(),
&Position::from(2u64)
&Position::offset(2u64)
);

let merger_msgs: Vec<NewSplits> = merge_planner_inbox.drain_for_test_typed::<NewSplits>();
Expand Down Expand Up @@ -344,7 +344,7 @@ mod tests {
suggest_truncate_checkpoints[0]
.position_for_partition(&PartitionId::default())
.unwrap(),
&Position::from(2u64)
&Position::offset(2u64)
);

let merger_msgs: Vec<NewSplits> = merge_planner_inbox.drain_for_test_typed::<NewSplits>();
Expand Down
40 changes: 23 additions & 17 deletions quickwit/quickwit-indexing/src/models/shard_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,23 +356,23 @@ mod tests {
));
event_broker1.publish(LocalShardPositionsUpdate::new(
source_uid.clone(),
vec![(2, 10u64.into())],
vec![(2, Position::offset(10u64))],
));
event_broker1.publish(LocalShardPositionsUpdate::new(
source_uid.clone(),
vec![(1, 10u64.into())],
vec![(1, Position::offset(10u64))],
));
event_broker2.publish(LocalShardPositionsUpdate::new(
source_uid.clone(),
vec![(2, 10u64.into())],
vec![(2, Position::offset(10u64))],
));
event_broker2.publish(LocalShardPositionsUpdate::new(
source_uid.clone(),
vec![(2, 12u64.into())],
vec![(2, Position::offset(12u64))],
));
event_broker2.publish(LocalShardPositionsUpdate::new(
source_uid.clone(),
vec![(1, Position::Beginning), (2, 12u64.into())],
vec![(1, Position::Beginning), (2, Position::offset(12u64))],
));

let mut updates1: Vec<Vec<(ShardId, Position)>> = Vec::new();
Expand All @@ -387,9 +387,9 @@ mod tests {
updates1,
vec![
vec![(1, Position::Beginning)],
vec![(1, Position::Beginning), (2, 10u64.into())],
vec![(1, 10u64.into()), (2, 10u64.into()),],
vec![(1, 10u64.into()), (2, 12u64.into()),],
vec![(1, Position::Beginning), (2, Position::offset(10u64))],
vec![(1, Position::offset(10u64)), (2, Position::offset(10u64)),],
vec![(1, Position::offset(10u64)), (2, Position::offset(12u64)),],
]
);

Expand All @@ -403,10 +403,10 @@ mod tests {
assert_eq!(
updates2,
vec![
vec![(2, 10u64.into())],
vec![(2, 12u64.into())],
vec![(1, Position::Beginning), (2, 12u64.into())],
vec![(1, 10u64.into()), (2, 12u64.into())],
vec![(2, Position::offset(10u64))],
vec![(2, Position::offset(12u64))],
vec![(1, Position::Beginning), (2, Position::offset(12u64))],
vec![(1, Position::offset(10u64)), (2, Position::offset(12u64))],
]
);

Expand Down Expand Up @@ -446,16 +446,19 @@ mod tests {
source_uid.clone(),
vec![(1, Position::Beginning)],
));
tokio::time::sleep(Duration::from_millis(1000)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let value = cluster.get_self_key_value(&key).await.unwrap();
assert_eq!(&value, r#"{"1":""}"#);
}
{
event_broker.publish(LocalShardPositionsUpdate::new(
source_uid.clone(),
vec![(1, 1_000u64.into()), (2, 2000u64.into())],
vec![
(1, Position::offset(1_000u64)),
(2, Position::offset(2_000u64)),
],
));
tokio::time::sleep(Duration::from_millis(1000)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let value = cluster.get_self_key_value(&key).await.unwrap();
assert_eq!(
&value,
Expand All @@ -465,9 +468,12 @@ mod tests {
{
event_broker.publish(LocalShardPositionsUpdate::new(
source_uid.clone(),
vec![(1, 999u64.into()), (3, 3000u64.into())],
vec![
(1, Position::offset(999u64)),
(3, Position::offset(3_000u64)),
],
));
tokio::time::sleep(Duration::from_millis(1000)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let value = cluster.get_self_key_value(&key).await.unwrap();
// We do not update the position that got lower, nor the position that disappeared
assert_eq!(
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ impl Source for FileSource {
.checkpoint_delta
.record_partition_delta(
partition_id,
Position::from(self.counters.previous_offset),
Position::from(self.counters.current_offset),
Position::offset(self.counters.previous_offset),
Position::offset(self.counters.current_offset),
)
.unwrap();
}
Expand Down Expand Up @@ -364,8 +364,8 @@ mod tests {
let partition_id = PartitionId::from(temp_file_path.to_string_lossy().to_string());
let checkpoint_delta = SourceCheckpointDelta::from_partition_delta(
partition_id,
Position::from(0u64),
Position::from(4u64),
Position::offset(0u64),
Position::offset(4u64),
)
.unwrap();
checkpoint.try_apply_delta(checkpoint_delta).unwrap();
Expand Down
Loading

0 comments on commit 8e775e3

Please sign in to comment.