Skip to content

Commit

Permalink
Removing ambiguity between the different ways to create an IndexUid.
Browse files Browse the repository at this point in the history
The code will now panic if the arguments are wrong.
Ideally we would prefer to have a TryFrom implementation.

Before this PR, some client argument could silently create an IndexUi thatwere breaking a contract (IndexUid wrapping a string without a ":k").
After this PR, some client argument could create a panic.

This partly addresses to #3943.
  • Loading branch information
fulmicoton committed Oct 28, 2023
1 parent ef44ac2 commit 814cb3e
Show file tree
Hide file tree
Showing 34 changed files with 187 additions and 154 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ mod tests {
Ok(list_shards_resp)
},
);
let index_uid = IndexUid::new("test-index");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let index_uid_string = index_uid.to_string();
mock_metastore.expect_create_index().times(1).return_once(
|_create_index_request: CreateIndexRequest| {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/src/indexing_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ mod tests {
prop_compose! {
fn gen_kafka_source()
(index_idx in 0usize..100usize, desired_num_pipelines in 1usize..51usize, max_num_pipelines_per_indexer in 1usize..5usize) -> (IndexUid, SourceConfig) {
let index_uid = IndexUid::from_parts(format!("index-id-{index_idx}"), "" /* this is the index uid */);
let index_uid = IndexUid::from_parts(&format!("index-id-{index_idx}"), "" /* this is the index uid */);
let source_id = append_random_suffix("kafka-source");
(index_uid, SourceConfig {
source_id,
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ mod tests {
let split_id = "test-run-gc--split";
let split_metadata = SplitMetadata {
split_id: split_id.to_string(),
index_uid: IndexUid::new(index_id),
index_uid: IndexUid::new_with_random_ulid(index_id),
..Default::default()
};
let stage_splits_request =
Expand Down Expand Up @@ -566,7 +566,7 @@ mod tests {
.times(2)
.returning(|_| Ok(ListSplitsResponse::empty()));
run_garbage_collect(
IndexUid::new("index-test-gc-deletes"),
IndexUid::new_with_random_ulid("index-test-gc-deletes"),
storage.clone(),
MetastoreServiceClient::from(metastore),
Duration::from_secs(30),
Expand Down Expand Up @@ -597,7 +597,7 @@ mod tests {
let split_id = "test-delete-splits-happy--split";
let split_metadata = SplitMetadata {
split_id: split_id.to_string(),
index_uid: IndexUid::new(index_id),
index_uid: IndexUid::new_with_random_ulid(index_id),
..Default::default()
};
let stage_splits_request =
Expand Down Expand Up @@ -772,7 +772,7 @@ mod tests {
let storage = Arc::new(mock_storage);

let index_id = "test-delete-splits-storage-error--index";
let index_uid = IndexUid::new(index_id.to_string());
let index_uid = IndexUid::new_with_random_ulid(&index_id);

let mut mock_metastore = MetastoreServiceClient::mock();
mock_metastore.expect_delete_splits().return_once(|_| {
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/failpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Resul
tantivy_dirs,
};
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new(index_id.to_string()),
index_uid: IndexUid::new_with_random_ulid(index_id.to_string()),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down
22 changes: 11 additions & 11 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ mod tests {

#[tokio::test]
async fn test_indexer_triggers_commit_on_target_num_docs() -> anyhow::Result<()> {
let index_uid = IndexUid::new("test-index");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let pipeline_id = IndexingPipelineId {
index_uid: index_uid.clone(),
source_id: "test-source".to_string(),
Expand Down Expand Up @@ -852,7 +852,7 @@ mod tests {
#[tokio::test]
async fn test_indexer_triggers_commit_on_memory_limit() -> anyhow::Result<()> {
let universe = Universe::new();
let index_uid = IndexUid::new("test-index");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let pipeline_id = IndexingPipelineId {
index_uid: index_uid.clone(),
source_id: "test-source".to_string(),
Expand Down Expand Up @@ -931,7 +931,7 @@ mod tests {
async fn test_indexer_triggers_commit_on_timeout() -> anyhow::Result<()> {
let universe = Universe::new();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down Expand Up @@ -1015,7 +1015,7 @@ mod tests {
async fn test_indexer_triggers_commit_on_drained_mailbox() -> anyhow::Result<()> {
let universe = Universe::new();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down Expand Up @@ -1090,7 +1090,7 @@ mod tests {
async fn test_indexer_triggers_commit_on_quit() -> anyhow::Result<()> {
let universe = Universe::with_accelerated_time();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down Expand Up @@ -1169,7 +1169,7 @@ mod tests {
async fn test_indexer_partitioning() -> anyhow::Result<()> {
let universe = Universe::with_accelerated_time();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down Expand Up @@ -1267,7 +1267,7 @@ mod tests {
async fn test_indexer_exceeding_max_num_partitions() {
let universe = Universe::with_accelerated_time();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down Expand Up @@ -1337,7 +1337,7 @@ mod tests {
async fn test_indexer_propagates_publish_lock() {
let universe = Universe::with_accelerated_time();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down Expand Up @@ -1409,7 +1409,7 @@ mod tests {
async fn test_indexer_ignores_messages_when_publish_lock_is_dead() {
let universe = Universe::with_accelerated_time();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down Expand Up @@ -1474,7 +1474,7 @@ mod tests {
async fn test_indexer_honors_batch_commit_request() {
let universe = Universe::with_accelerated_time();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down Expand Up @@ -1535,7 +1535,7 @@ mod tests {
#[tokio::test]
async fn test_indexer_checkpoint_on_all_failed_docs() -> anyhow::Result<()> {
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ mod tests {
let node_id = "test-node";
let doc_mapper = Arc::new(default_doc_mapper_for_test());
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: node_id.to_string(),
pipeline_ord: 0,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ mod tests {
#[tokio::test]
async fn test_merge_pipeline_simple() -> anyhow::Result<()> {
let mut metastore = MetastoreServiceClient::mock();
let index_uid = IndexUid::new("test-index");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let pipeline_id = IndexingPipelineId {
index_uid: index_uid.clone(),
source_id: "test-source".to_string(),
Expand Down
16 changes: 8 additions & 8 deletions quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ mod tests {
#[tokio::test]
async fn test_merge_planner_with_stable_custom_merge_policy() -> anyhow::Result<()> {
let universe = Universe::with_accelerated_time();
let index_uid = IndexUid::new("test-index");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let (merge_split_downloader_mailbox, merge_split_downloader_inbox) =
universe.create_test_mailbox();
let pipeline_id = IndexingPipelineId {
Expand Down Expand Up @@ -594,7 +594,7 @@ mod tests {
let universe = Universe::with_accelerated_time();
let (merge_split_downloader_mailbox, merge_split_downloader_inbox) =
universe.create_test_mailbox();
let index_uid = IndexUid::new("test-index");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let pipeline_id = IndexingPipelineId {
index_uid: index_uid.clone(),
source_id: "test-source".to_string(),
Expand Down Expand Up @@ -646,7 +646,7 @@ mod tests {
let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe
.spawn_ctx()
.create_mailbox("MergeSplitDownloader", QueueCapacity::Bounded(2));
let index_uid = IndexUid::new("test-index");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let pipeline_id = IndexingPipelineId {
index_uid: index_uid.clone(),
source_id: "test-source".to_string(),
Expand Down Expand Up @@ -745,7 +745,7 @@ mod tests {
let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe
.spawn_ctx()
.create_mailbox("MergeSplitDownloader", QueueCapacity::Bounded(2));
let index_uid = IndexUid::new("test-index");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let pipeline_id = IndexingPipelineId {
index_uid: index_uid.clone(),
source_id: "test-source".to_string(),
Expand Down Expand Up @@ -823,7 +823,7 @@ mod tests {
let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe
.spawn_ctx()
.create_mailbox("MergeSplitDownloader", QueueCapacity::Bounded(2));
let index_uid = IndexUid::new("test-index");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let pipeline_id = IndexingPipelineId {
index_uid,
source_id: "test-source".to_string(),
Expand All @@ -842,7 +842,7 @@ mod tests {
};

// It is different from the index_uid because the index uid has a unique suffix.
let other_index_uid = IndexUid::new("test-index");
let other_index_uid = IndexUid::new_with_random_ulid("test-index");

let pre_existing_splits = vec![
split_metadata_for_test(
Expand Down Expand Up @@ -887,7 +887,7 @@ mod tests {
let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe
.spawn_ctx()
.create_mailbox("MergeSplitDownloader", QueueCapacity::Bounded(2));
let index_uid = IndexUid::new("test-index");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let pipeline_id = IndexingPipelineId {
index_uid: index_uid.clone(),
source_id: "test-source".to_string(),
Expand Down Expand Up @@ -967,7 +967,7 @@ mod tests {
let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe
.spawn_ctx()
.create_mailbox("MergeSplitDownloader", QueueCapacity::Unbounded);
let index_uid = IndexUid::new("test-index");
let index_uid = IndexUid::new_with_random_ulid("test-index");
let pipeline_id = IndexingPipelineId {
index_uid: index_uid.clone(),
source_id: "test-source".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/packager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ mod tests {
}
let index = index_writer.finalize()?;
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ mod tests {

publisher_mailbox
.send_message(SplitsUpdate {
index_uid: IndexUid::new("index"),
index_uid: IndexUid::new_with_random_ulid("index"),
new_splits: vec![SplitMetadata::for_test("test-split".to_string())],
replaced_split_ids: Vec::new(),
checkpoint_delta_opt: None,
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ mod tests {
let event_broker = EventBroker::default();
let universe = Universe::new();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down Expand Up @@ -615,7 +615,7 @@ mod tests {
#[tokio::test]
async fn test_uploader_with_sequencer_emits_replace() -> anyhow::Result<()> {
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down Expand Up @@ -867,7 +867,7 @@ mod tests {
};
uploader_mailbox
.send_message(EmptySplit {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
checkpoint_delta,
publish_lock: PublishLock::default(),
publish_token_opt: None,
Expand Down Expand Up @@ -943,7 +943,7 @@ mod tests {

let universe = Universe::new();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-indexing/src/merge_policy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ pub mod tests {
let merged_split_id = new_split_id();
let tags = merge_tags(splits);
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test_index"),
index_uid: IndexUid::new_with_random_ulid("test_index"),
source_id: "test_source".to_string(),
node_id: "test_node".to_string(),
pipeline_ord: 0,
Expand Down Expand Up @@ -363,7 +363,7 @@ pub mod tests {
let (merge_op_mailbox, merge_op_inbox) =
universe.create_test_mailbox::<MergeSplitDownloader>();
let pipeline_id = IndexingPipelineId {
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
pipeline_ord: 0,
Expand Down Expand Up @@ -424,7 +424,7 @@ pub mod tests {
maturity,
tags: BTreeSet::from_iter(vec!["tenant_id:1".to_string(), "tenant_id:2".to_string()]),
footer_offsets: 0..100,
index_uid: IndexUid::new("test-index"),
index_uid: IndexUid::new_with_random_ulid("test-index"),
source_id: "test-source".to_string(),
node_id: "test-node".to_string(),
..Default::default()
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ mod tests {
let metastore = metastore_for_test();
let file_source = FileSourceFactory::typed_create_source(
SourceRuntimeArgs::for_test(
IndexUid::new("test-index"),
IndexUid::new_with_random_ulid("test-index"),
source_config,
metastore,
PathBuf::from("./queues"),
Expand Down Expand Up @@ -291,7 +291,7 @@ mod tests {
let metastore = metastore_for_test();
let source = FileSourceFactory::typed_create_source(
SourceRuntimeArgs::for_test(
IndexUid::new("test-index"),
IndexUid::new_with_random_ulid("test-index"),
source_config,
metastore,
PathBuf::from("./queues"),
Expand Down Expand Up @@ -382,7 +382,7 @@ mod tests {
let metastore = metastore_for_test();
let source = FileSourceFactory::typed_create_source(
SourceRuntimeArgs::for_test(
IndexUid::new("test-index"),
IndexUid::new_with_random_ulid("test-index"),
source_config,
metastore,
PathBuf::from("./queues"),
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ mod gcp_pubsub_emulator_tests {
let source_config = get_source_config(&subscription);

let index_id = append_random_suffix("test-gcp-pubsub-source--invalid-subscription--index");
let index_uid = IndexUid::new(&index_id);
let index_uid = IndexUid::new_with_random_ulid(&index_id);
let metastore = metastore_for_test();
let SourceParams::GcpPubSub(params) = source_config.clone().source_params else {
panic!(
Expand Down Expand Up @@ -385,7 +385,7 @@ mod gcp_pubsub_emulator_tests {
let source_loader = quickwit_supported_sources();
let metastore = metastore_for_test();
let index_id: String = append_random_suffix("test-gcp-pubsub-source--index");
let index_uid = IndexUid::new(&index_id);
let index_uid = IndexUid::new_with_random_ulid(&index_id);

let mut pubsub_messages = Vec::with_capacity(6);
for i in 0..6 {
Expand Down
Loading

0 comments on commit 814cb3e

Please sign in to comment.