diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index fd099fa9c2e..05a8c554232 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -785,7 +785,7 @@ mod tests { Ok(list_shards_resp) }, ); - let index_uid = IndexUid::new("test-index"); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let index_uid_string = index_uid.to_string(); mock_metastore.expect_create_index().times(1).return_once( |_create_index_request: CreateIndexRequest| { diff --git a/quickwit/quickwit-control-plane/src/indexing_plan.rs b/quickwit/quickwit-control-plane/src/indexing_plan.rs index c3f1d14e8e5..f987420f30c 100644 --- a/quickwit/quickwit-control-plane/src/indexing_plan.rs +++ b/quickwit/quickwit-control-plane/src/indexing_plan.rs @@ -720,7 +720,7 @@ mod tests { prop_compose! { fn gen_kafka_source() (index_idx in 0usize..100usize, desired_num_pipelines in 1usize..51usize, max_num_pipelines_per_indexer in 1usize..5usize) -> (IndexUid, SourceConfig) { - let index_uid = IndexUid::from_parts(format!("index-id-{index_idx}"), "" /* this is the index uid */); + let index_uid = IndexUid::from_parts(&format!("index-id-{index_idx}"), "" /* this is the index uid */); let source_id = append_random_suffix("kafka-source"); (index_uid, SourceConfig { source_id, diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index b8c0633403e..d0c6d61a097 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -475,7 +475,7 @@ mod tests { let split_id = "test-run-gc--split"; let split_metadata = SplitMetadata { split_id: split_id.to_string(), - index_uid: IndexUid::new(index_id), + index_uid: IndexUid::new_with_random_ulid(index_id), ..Default::default() }; let stage_splits_request = @@ -566,7 +566,7 @@ mod tests { .times(2) .returning(|_| Ok(ListSplitsResponse::empty())); run_garbage_collect( - IndexUid::new("index-test-gc-deletes"), + IndexUid::new_with_random_ulid("index-test-gc-deletes"), storage.clone(), MetastoreServiceClient::from(metastore), Duration::from_secs(30), @@ -597,7 +597,7 @@ mod tests { let split_id = "test-delete-splits-happy--split"; let split_metadata = SplitMetadata { split_id: split_id.to_string(), - index_uid: IndexUid::new(index_id), + index_uid: IndexUid::new_with_random_ulid(index_id), ..Default::default() }; let stage_splits_request = @@ -772,7 +772,7 @@ mod tests { let storage = Arc::new(mock_storage); let index_id = "test-delete-splits-storage-error--index"; - let index_uid = IndexUid::new(index_id.to_string()); + let index_uid = IndexUid::new_with_random_ulid(&index_id); let mut mock_metastore = MetastoreServiceClient::mock(); mock_metastore.expect_delete_splits().return_once(|_| { diff --git a/quickwit/quickwit-indexing/failpoints/mod.rs b/quickwit/quickwit-indexing/failpoints/mod.rs index c36958dc128..23e211b413e 100644 --- a/quickwit/quickwit-indexing/failpoints/mod.rs +++ b/quickwit/quickwit-indexing/failpoints/mod.rs @@ -297,7 +297,7 @@ async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Resul tantivy_dirs, }; let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new(index_id.to_string()), + index_uid: IndexUid::new_with_random_ulid(index_id.to_string()), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index cd19c98e938..85bc7907428 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -715,7 +715,7 @@ mod tests { #[tokio::test] async fn test_indexer_triggers_commit_on_target_num_docs() -> anyhow::Result<()> { - let index_uid = IndexUid::new("test-index"); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let pipeline_id = IndexingPipelineId { index_uid: index_uid.clone(), source_id: "test-source".to_string(), @@ -852,7 +852,7 @@ mod tests { #[tokio::test] async fn test_indexer_triggers_commit_on_memory_limit() -> anyhow::Result<()> { let universe = Universe::new(); - let index_uid = IndexUid::new("test-index"); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let pipeline_id = IndexingPipelineId { index_uid: index_uid.clone(), source_id: "test-source".to_string(), @@ -931,7 +931,7 @@ mod tests { async fn test_indexer_triggers_commit_on_timeout() -> anyhow::Result<()> { let universe = Universe::new(); let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, @@ -1015,7 +1015,7 @@ mod tests { async fn test_indexer_triggers_commit_on_drained_mailbox() -> anyhow::Result<()> { let universe = Universe::new(); let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, @@ -1090,7 +1090,7 @@ mod tests { async fn test_indexer_triggers_commit_on_quit() -> anyhow::Result<()> { let universe = Universe::with_accelerated_time(); let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, @@ -1169,7 +1169,7 @@ mod tests { async fn test_indexer_partitioning() -> anyhow::Result<()> { let universe = Universe::with_accelerated_time(); let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, @@ -1267,7 +1267,7 @@ mod tests { async fn test_indexer_exceeding_max_num_partitions() { let universe = Universe::with_accelerated_time(); let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, @@ -1337,7 +1337,7 @@ mod tests { async fn test_indexer_propagates_publish_lock() { let universe = Universe::with_accelerated_time(); let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, @@ -1409,7 +1409,7 @@ mod tests { async fn test_indexer_ignores_messages_when_publish_lock_is_dead() { let universe = Universe::with_accelerated_time(); let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, @@ -1474,7 +1474,7 @@ mod tests { async fn test_indexer_honors_batch_commit_request() { let universe = Universe::with_accelerated_time(); let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, @@ -1535,7 +1535,7 @@ mod tests { #[tokio::test] async fn test_indexer_checkpoint_on_all_failed_docs() -> anyhow::Result<()> { let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 5e9b9864888..ad7bb4eedca 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -833,7 +833,7 @@ mod tests { let node_id = "test-node"; let doc_mapper = Arc::new(default_doc_mapper_for_test()); let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: node_id.to_string(), pipeline_ord: 0, diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 7c1314bd199..1a094c579cb 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -499,7 +499,7 @@ mod tests { #[tokio::test] async fn test_merge_pipeline_simple() -> anyhow::Result<()> { let mut metastore = MetastoreServiceClient::mock(); - let index_uid = IndexUid::new("test-index"); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let pipeline_id = IndexingPipelineId { index_uid: index_uid.clone(), source_id: "test-source".to_string(), diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 99a2f18ec98..50157d9fc36 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -506,7 +506,7 @@ mod tests { #[tokio::test] async fn test_merge_planner_with_stable_custom_merge_policy() -> anyhow::Result<()> { let universe = Universe::with_accelerated_time(); - let index_uid = IndexUid::new("test-index"); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe.create_test_mailbox(); let pipeline_id = IndexingPipelineId { @@ -594,7 +594,7 @@ mod tests { let universe = Universe::with_accelerated_time(); let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe.create_test_mailbox(); - let index_uid = IndexUid::new("test-index"); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let pipeline_id = IndexingPipelineId { index_uid: index_uid.clone(), source_id: "test-source".to_string(), @@ -646,7 +646,7 @@ mod tests { let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe .spawn_ctx() .create_mailbox("MergeSplitDownloader", QueueCapacity::Bounded(2)); - let index_uid = IndexUid::new("test-index"); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let pipeline_id = IndexingPipelineId { index_uid: index_uid.clone(), source_id: "test-source".to_string(), @@ -745,7 +745,7 @@ mod tests { let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe .spawn_ctx() .create_mailbox("MergeSplitDownloader", QueueCapacity::Bounded(2)); - let index_uid = IndexUid::new("test-index"); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let pipeline_id = IndexingPipelineId { index_uid: index_uid.clone(), source_id: "test-source".to_string(), @@ -823,7 +823,7 @@ mod tests { let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe .spawn_ctx() .create_mailbox("MergeSplitDownloader", QueueCapacity::Bounded(2)); - let index_uid = IndexUid::new("test-index"); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let pipeline_id = IndexingPipelineId { index_uid, source_id: "test-source".to_string(), @@ -842,7 +842,7 @@ mod tests { }; // It is different from the index_uid because the index uid has a unique suffix. - let other_index_uid = IndexUid::new("test-index"); + let other_index_uid = IndexUid::new_with_random_ulid("test-index"); let pre_existing_splits = vec![ split_metadata_for_test( @@ -887,7 +887,7 @@ mod tests { let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe .spawn_ctx() .create_mailbox("MergeSplitDownloader", QueueCapacity::Bounded(2)); - let index_uid = IndexUid::new("test-index"); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let pipeline_id = IndexingPipelineId { index_uid: index_uid.clone(), source_id: "test-source".to_string(), @@ -967,7 +967,7 @@ mod tests { let (merge_split_downloader_mailbox, merge_split_downloader_inbox) = universe .spawn_ctx() .create_mailbox("MergeSplitDownloader", QueueCapacity::Unbounded); - let index_uid = IndexUid::new("test-index"); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let pipeline_id = IndexingPipelineId { index_uid: index_uid.clone(), source_id: "test-source".to_string(), diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs index 200aad9253b..7ba54b82b08 100644 --- a/quickwit/quickwit-indexing/src/actors/packager.rs +++ b/quickwit/quickwit-indexing/src/actors/packager.rs @@ -404,7 +404,7 @@ mod tests { } let index = index_writer.finalize()?; let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, diff --git a/quickwit/quickwit-indexing/src/actors/publisher.rs b/quickwit/quickwit-indexing/src/actors/publisher.rs index 4c5a769dfcd..aad20a9b665 100644 --- a/quickwit/quickwit-indexing/src/actors/publisher.rs +++ b/quickwit/quickwit-indexing/src/actors/publisher.rs @@ -419,7 +419,7 @@ mod tests { publisher_mailbox .send_message(SplitsUpdate { - index_uid: IndexUid::new("index"), + index_uid: IndexUid::new_with_random_ulid("index"), new_splits: vec![SplitMetadata::for_test("test-split".to_string())], replaced_split_ids: Vec::new(), checkpoint_delta_opt: None, diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index ecd78847d94..cc4588757dd 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -504,7 +504,7 @@ mod tests { let event_broker = EventBroker::default(); let universe = Universe::new(); let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, @@ -615,7 +615,7 @@ mod tests { #[tokio::test] async fn test_uploader_with_sequencer_emits_replace() -> anyhow::Result<()> { let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, @@ -867,7 +867,7 @@ mod tests { }; uploader_mailbox .send_message(EmptySplit { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), checkpoint_delta, publish_lock: PublishLock::default(), publish_token_opt: None, @@ -943,7 +943,7 @@ mod tests { let universe = Universe::new(); let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 9b1cfe34c29..d264f8a9f20 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -332,7 +332,7 @@ pub mod tests { let merged_split_id = new_split_id(); let tags = merge_tags(splits); let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test_index"), + index_uid: IndexUid::new_with_random_ulid("test_index"), source_id: "test_source".to_string(), node_id: "test_node".to_string(), pipeline_ord: 0, @@ -363,7 +363,7 @@ pub mod tests { let (merge_op_mailbox, merge_op_inbox) = universe.create_test_mailbox::(); let pipeline_id = IndexingPipelineId { - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), pipeline_ord: 0, @@ -424,7 +424,7 @@ pub mod tests { maturity, tags: BTreeSet::from_iter(vec!["tenant_id:1".to_string(), "tenant_id:2".to_string()]), footer_offsets: 0..100, - index_uid: IndexUid::new("test-index"), + index_uid: IndexUid::new_with_random_ulid("test-index"), source_id: "test-source".to_string(), node_id: "test-node".to_string(), ..Default::default() diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index 265e2f0e3d3..40e57f163c3 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -224,7 +224,7 @@ mod tests { let metastore = metastore_for_test(); let file_source = FileSourceFactory::typed_create_source( SourceRuntimeArgs::for_test( - IndexUid::new("test-index"), + IndexUid::new_with_random_ulid("test-index"), source_config, metastore, PathBuf::from("./queues"), @@ -291,7 +291,7 @@ mod tests { let metastore = metastore_for_test(); let source = FileSourceFactory::typed_create_source( SourceRuntimeArgs::for_test( - IndexUid::new("test-index"), + IndexUid::new_with_random_ulid("test-index"), source_config, metastore, PathBuf::from("./queues"), @@ -382,7 +382,7 @@ mod tests { let metastore = metastore_for_test(); let source = FileSourceFactory::typed_create_source( SourceRuntimeArgs::for_test( - IndexUid::new("test-index"), + IndexUid::new_with_random_ulid("test-index"), source_config, metastore, PathBuf::from("./queues"), diff --git a/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs b/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs index 4dc332d4e6b..389ec96d366 100644 --- a/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs +++ b/quickwit/quickwit-indexing/src/source/gcp_pubsub_source.rs @@ -354,7 +354,7 @@ mod gcp_pubsub_emulator_tests { let source_config = get_source_config(&subscription); let index_id = append_random_suffix("test-gcp-pubsub-source--invalid-subscription--index"); - let index_uid = IndexUid::new(&index_id); + let index_uid = IndexUid::new_with_random_ulid(&index_id); let metastore = metastore_for_test(); let SourceParams::GcpPubSub(params) = source_config.clone().source_params else { panic!( @@ -385,7 +385,7 @@ mod gcp_pubsub_emulator_tests { let source_loader = quickwit_supported_sources(); let metastore = metastore_for_test(); let index_id: String = append_random_suffix("test-gcp-pubsub-source--index"); - let index_uid = IndexUid::new(&index_id); + let index_uid = IndexUid::new_with_random_ulid(&index_id); let mut pubsub_messages = Vec::with_capacity(6); for i in 0..6 { diff --git a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs index 54ca2c4ceee..54d7ce7813b 100644 --- a/quickwit/quickwit-indexing/src/source/ingest_api_source.rs +++ b/quickwit/quickwit-indexing/src/source/ingest_api_source.rs @@ -272,7 +272,7 @@ mod tests { let universe = Universe::with_accelerated_time(); let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); - let index_uid = IndexUid::new(&index_id); + let index_uid = IndexUid::new_with_random_ulid(&index_id); let temp_dir = tempfile::tempdir()?; let queues_dir_path = temp_dir.path(); @@ -361,7 +361,7 @@ mod tests { let universe = Universe::with_accelerated_time(); let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); - let index_uid = IndexUid::new(&index_id); + let index_uid = IndexUid::new_with_random_ulid(&index_id); let temp_dir = tempfile::tempdir()?; let queues_dir_path = temp_dir.path(); let ingest_api_service = @@ -431,7 +431,7 @@ mod tests { let universe = Universe::with_accelerated_time(); let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); - let index_uid = IndexUid::new(&index_id); + let index_uid = IndexUid::new_with_random_ulid(&index_id); let temp_dir = tempfile::tempdir()?; let queues_dir_path = temp_dir.path(); let ingest_api_service = @@ -485,7 +485,7 @@ mod tests { let universe = Universe::with_accelerated_time(); let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); - let index_uid = IndexUid::new(&index_id); + let index_uid = IndexUid::new_with_random_ulid(&index_id); let temp_dir = tempfile::tempdir()?; let queues_dir_path = temp_dir.path(); @@ -552,7 +552,7 @@ mod tests { let universe = Universe::with_accelerated_time(); let metastore = metastore_for_test(); let index_id = append_random_suffix("test-ingest-api-source"); - let index_uid = IndexUid::new(&index_id); + let index_uid = IndexUid::new_with_random_ulid(&index_id); let temp_dir = tempfile::tempdir()?; let queues_dir_path = temp_dir.path(); diff --git a/quickwit/quickwit-indexing/src/source/kafka_source.rs b/quickwit/quickwit-indexing/src/source/kafka_source.rs index 908a488cba8..d2db3477425 100644 --- a/quickwit/quickwit-indexing/src/source/kafka_source.rs +++ b/quickwit/quickwit-indexing/src/source/kafka_source.rs @@ -1002,7 +1002,7 @@ mod kafka_broker_tests { let metastore = metastore_for_test(); let index_id = append_random_suffix("test-kafka-source--process-message--index"); - let index_uid = IndexUid::new(&index_id); + let index_uid = IndexUid::new_with_random_ulid(&index_id); let (_source_id, source_config) = get_source_config(&topic); let SourceParams::Kafka(params) = source_config.clone().source_params else { panic!( @@ -1191,7 +1191,7 @@ mod kafka_broker_tests { let metastore = metastore_for_test(); let index_id = append_random_suffix("test-kafka-source--process-revoke--partitions--index"); - let index_uid = IndexUid::new(&index_id); + let index_uid = IndexUid::new_with_random_ulid(&index_id); let (_source_id, source_config) = get_source_config(&topic); let SourceParams::Kafka(params) = source_config.clone().source_params else { panic!( @@ -1249,7 +1249,7 @@ mod kafka_broker_tests { let metastore = metastore_for_test(); let index_id = append_random_suffix("test-kafka-source--process-partition-eof--index"); - let index_uid = IndexUid::new(&index_id); + let index_uid = IndexUid::new_with_random_ulid(&index_id); let (_source_id, source_config) = get_source_config(&topic); let SourceParams::Kafka(params) = source_config.clone().source_params else { panic!( diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index 50b91b12928..691609464ce 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -815,7 +815,7 @@ mod pulsar_broker_tests { let topic = append_random_suffix("test-pulsar-source-topic"); let index_id = append_random_suffix("test-pulsar-source-index"); - let index_uid = IndexUid::new(&index_id); + let index_uid = IndexUid::new_with_random_ulid(&index_id); let (_source_id, source_config) = get_source_config([&topic]); let params = if let SourceParams::Pulsar(params) = source_config.clone().source_params { params diff --git a/quickwit/quickwit-indexing/src/source/source_factory.rs b/quickwit/quickwit-indexing/src/source/source_factory.rs index 72aa013df9c..28bff181ca3 100644 --- a/quickwit/quickwit-indexing/src/source/source_factory.rs +++ b/quickwit/quickwit-indexing/src/source/source_factory.rs @@ -144,7 +144,7 @@ mod tests { source_loader .load_source( SourceRuntimeArgs::for_test( - IndexUid::new("test-index"), + IndexUid::new_with_random_ulid("test-index"), source_config, metastore, PathBuf::from("./queues"), diff --git a/quickwit/quickwit-indexing/src/source/vec_source.rs b/quickwit/quickwit-indexing/src/source/vec_source.rs index f23fe8a0bbe..79271a53712 100644 --- a/quickwit/quickwit-indexing/src/source/vec_source.rs +++ b/quickwit/quickwit-indexing/src/source/vec_source.rs @@ -168,7 +168,7 @@ mod tests { let metastore = metastore_for_test(); let vec_source = VecSourceFactory::typed_create_source( SourceRuntimeArgs::for_test( - IndexUid::new("test-index"), + IndexUid::new_with_random_ulid("test-index"), source_config, metastore, PathBuf::from("./queues"), @@ -229,7 +229,7 @@ mod tests { let metastore = metastore_for_test(); let vec_source = VecSourceFactory::typed_create_source( SourceRuntimeArgs::for_test( - IndexUid::new("test-index"), + IndexUid::new_with_random_ulid("test-index"), source_config, metastore, PathBuf::from("./queues"), diff --git a/quickwit/quickwit-indexing/src/source/void_source.rs b/quickwit/quickwit-indexing/src/source/void_source.rs index c4d204ea3d5..113133e09a9 100644 --- a/quickwit/quickwit-indexing/src/source/void_source.rs +++ b/quickwit/quickwit-indexing/src/source/void_source.rs @@ -97,7 +97,7 @@ mod tests { }; let metastore = metastore_for_test(); let ctx = SourceRuntimeArgs::for_test( - IndexUid::new("test-index"), + IndexUid::new_with_random_ulid("test-index"), source_config, metastore, PathBuf::from("./queues"), @@ -124,7 +124,7 @@ mod tests { let metastore = metastore_for_test(); let void_source = VoidSourceFactory::typed_create_source( SourceRuntimeArgs::for_test( - IndexUid::new("test-index"), + IndexUid::new_with_random_ulid("test-index"), source_config, metastore, PathBuf::from("./queues"), diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 8327588cfd7..050a062cc29 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -237,7 +237,10 @@ pub struct MockSplitBuilder { impl MockSplitBuilder { pub fn new(split_id: &str) -> Self { Self { - split_metadata: mock_split_meta(split_id, &IndexUid::new("test-index")), + split_metadata: mock_split_meta( + split_id, + &IndexUid::from_parts("test-index", "000000"), + ), } } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs index 0d83315aca9..29edd6f1ecd 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/file_backed_index/mod.rs @@ -783,55 +783,54 @@ mod tests { fn test_single_filter_behaviour() { let [split_1, split_2, split_3] = make_splits(); - let query = ListSplitsQuery::for_index(IndexUid::new("test-index")) + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) .with_split_state(SplitState::Staged); assert!(split_query_predicate(&&split_1, &query)); - let query = ListSplitsQuery::for_index(IndexUid::new("test-index")) + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) .with_split_state(SplitState::Published); assert!(!split_query_predicate(&&split_2, &query)); - let query = ListSplitsQuery::for_index(IndexUid::new("test-index")) + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) .with_split_states([SplitState::Published, SplitState::MarkedForDeletion]); assert!(!split_query_predicate(&&split_1, &query)); assert!(split_query_predicate(&&split_3, &query)); - let query = - ListSplitsQuery::for_index(IndexUid::new("test-index")).with_update_timestamp_lt(51); + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) + .with_update_timestamp_lt(51); assert!(!split_query_predicate(&&split_1, &query)); assert!(split_query_predicate(&&split_2, &query)); assert!(split_query_predicate(&&split_3, &query)); - let query = - ListSplitsQuery::for_index(IndexUid::new("test-index")).with_create_timestamp_gte(51); + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) + .with_create_timestamp_gte(51); assert!(!split_query_predicate(&&split_1, &query)); assert!(!split_query_predicate(&&split_2, &query)); assert!(split_query_predicate(&&split_3, &query)); - let query = - ListSplitsQuery::for_index(IndexUid::new("test-index")).with_delete_opstamp_gte(4); + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) + .with_delete_opstamp_gte(4); assert!(split_query_predicate(&&split_1, &query)); assert!(split_query_predicate(&&split_2, &query)); assert!(!split_query_predicate(&&split_3, &query)); - let query = - ListSplitsQuery::for_index(IndexUid::new("test-index")).with_time_range_start_gt(45); + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) + .with_time_range_start_gt(45); assert!(!split_query_predicate(&&split_1, &query)); assert!(split_query_predicate(&&split_2, &query)); assert!(split_query_predicate(&&split_3, &query)); - let query = - ListSplitsQuery::for_index(IndexUid::new("test-index")).with_time_range_end_lt(45); + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) + .with_time_range_end_lt(45); assert!(split_query_predicate(&&split_1, &query)); assert!(split_query_predicate(&&split_2, &query)); assert!(split_query_predicate(&&split_3, &query)); - let query = ListSplitsQuery::for_index(IndexUid::new("test-index")).with_tags_filter( - TagFilterAst::Tag { + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) + .with_tags_filter(TagFilterAst::Tag { is_present: false, tag: "tag-2".to_string(), - }, - ); + }); assert!(split_query_predicate(&&split_1, &query)); assert!(!split_query_predicate(&&split_2, &query)); assert!(!split_query_predicate(&&split_3, &query)); @@ -841,35 +840,35 @@ mod tests { fn test_combination_filter() { let [split_1, split_2, split_3] = make_splits(); - let query = ListSplitsQuery::for_index(IndexUid::new("test-index")) + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) .with_time_range_start_gt(0) .with_time_range_end_lt(40); assert!(split_query_predicate(&&split_1, &query)); assert!(split_query_predicate(&&split_2, &query)); assert!(split_query_predicate(&&split_3, &query)); - let query = ListSplitsQuery::for_index(IndexUid::new("test-index")) + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) .with_time_range_start_gt(45) .with_delete_opstamp_gt(0); assert!(!split_query_predicate(&&split_1, &query)); assert!(split_query_predicate(&&split_2, &query)); assert!(!split_query_predicate(&&split_3, &query)); - let query = ListSplitsQuery::for_index(IndexUid::new("test-index")) + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) .with_update_timestamp_lt(51) .with_split_states([SplitState::Published, SplitState::MarkedForDeletion]); assert!(!split_query_predicate(&&split_1, &query)); assert!(split_query_predicate(&&split_2, &query)); assert!(split_query_predicate(&&split_3, &query)); - let query = ListSplitsQuery::for_index(IndexUid::new("test-index")) + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) .with_update_timestamp_lt(51) .with_create_timestamp_lte(63); assert!(!split_query_predicate(&&split_1, &query)); assert!(split_query_predicate(&&split_2, &query)); assert!(!split_query_predicate(&&split_3, &query)); - let query = ListSplitsQuery::for_index(IndexUid::new("test-index")) + let query = ListSplitsQuery::for_index(IndexUid::new_with_random_ulid("test-index")) .with_time_range_start_gt(90) .with_tags_filter(TagFilterAst::Tag { is_present: true, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs index 78d9b7b9cb5..b4aa4b446ab 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs @@ -1057,14 +1057,14 @@ mod tests { // Open a non-existent index. let metastore_error = metastore - .get_index(IndexUid::new("index-does-not-exist")) + .get_index(IndexUid::new_with_random_ulid("index-does-not-exist")) .await .unwrap_err(); assert!(matches!(metastore_error, MetastoreError::NotFound { .. })); // Open a index with a different incarnation_id. let metastore_error = metastore - .get_index(IndexUid::new(index_id)) + .get_index(IndexUid::new_with_random_ulid(index_id)) .await .unwrap_err(); assert!(matches!(metastore_error, MetastoreError::NotFound { .. })); @@ -1183,7 +1183,7 @@ mod tests { // Getting index with inconsistent index ID should raise an error. let metastore_error = metastore - .get_index(IndexUid::new(index_id)) + .get_index(IndexUid::new_with_random_ulid(index_id)) .await .unwrap_err(); assert!(matches!(metastore_error, MetastoreError::Internal { .. })); @@ -1369,7 +1369,7 @@ mod tests { let mut metastore = FileBackedMetastore::default_for_test().await; let mut index_uids = Vec::new(); for idx in 0..10 { - let index_uid = IndexUid::new(format!("test-index-{idx}")); + let index_uid = IndexUid::new_with_random_ulid(&format!("test-index-{idx}")); let index_config = IndexConfig::for_test(index_uid.index_id(), "ram:///indexes/test-index"); let create_index_request = @@ -1446,7 +1446,7 @@ mod tests { assert!(matches!(metastore_error, MetastoreError::Internal { .. })); // Try fetch the not created index. let created_index_error = metastore - .get_index(IndexUid::new(index_id)) + .get_index(IndexUid::new_with_random_ulid(index_id)) .await .unwrap_err(); assert!(matches!( @@ -1462,7 +1462,7 @@ mod tests { let ram_storage_clone = ram_storage.clone(); let ram_storage_clone_2 = ram_storage.clone(); let index_id = "test-index"; - let index_uid = IndexUid::new(index_id); + let index_uid = IndexUid::new_with_random_ulid(index_id); mock_storage // remove this if we end up changing the semantics of create. .expect_exists() @@ -1571,7 +1571,7 @@ mod tests { // Let's fetch the index, we expect an internal error as the index state is in `Creating` // state. let created_index_error = metastore - .get_index(IndexUid::new(index_id)) + .get_index(IndexUid::new_with_random_ulid(index_id)) .await .unwrap_err(); assert!(matches!( @@ -1586,7 +1586,7 @@ mod tests { let ram_storage = RamStorage::default(); let ram_storage_clone = ram_storage.clone(); let index_id = "test-index"; - let index_uid = IndexUid::new(index_id); + let index_uid = IndexUid::new_with_random_ulid(index_id); let index_metadata = IndexMetadata::for_test(index_uid.index_id(), "ram:///indexes/test-index"); let index = FileBackedIndex::from(index_metadata); @@ -1633,7 +1633,7 @@ mod tests { let ram_storage = RamStorage::default(); let ram_storage_clone = ram_storage.clone(); let index_id = "test-index"; - let index_uid = IndexUid::new(index_id); + let index_uid = IndexUid::new_with_random_ulid(index_id); let index_metadata = IndexMetadata::for_test(index_uid.index_id(), "ram:///indexes/test-index"); let index = FileBackedIndex::from(index_metadata); diff --git a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs index 050a9317893..24129bec968 100644 --- a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs @@ -53,7 +53,7 @@ pub struct IndexMetadata { impl IndexMetadata { /// Panics if `index_config` is missing `index_uri`. pub fn new(index_config: IndexConfig) -> Self { - let index_uid = IndexUid::new(index_config.index_id.clone()); + let index_uid = IndexUid::new_with_random_ulid(&index_config.index_id); IndexMetadata::new_with_index_uid(index_uid, index_config) } @@ -154,7 +154,7 @@ impl TestableForRegression for IndexMetadata { let checkpoint = IndexCheckpoint::from(per_source_checkpoint); let index_config = IndexConfig::sample_for_regression(); let mut index_metadata = IndexMetadata { - index_uid: IndexUid::from_parts(index_config.index_id.clone(), Ulid::nil()), + index_uid: IndexUid::from_parts(&index_config.index_id, Ulid::nil()), index_config, checkpoint, create_timestamp: 1789, diff --git a/quickwit/quickwit-metastore/src/metastore/index_metadata/serialize.rs b/quickwit/quickwit-metastore/src/metastore/index_metadata/serialize.rs index 9dd810a99da..00adaede029 100644 --- a/quickwit/quickwit-metastore/src/metastore/index_metadata/serialize.rs +++ b/quickwit/quickwit-metastore/src/metastore/index_metadata/serialize.rs @@ -97,7 +97,7 @@ impl TryFrom for IndexMetadata { } Ok(Self { index_uid: if v0_6.index_uid.is_empty() { - v0_6.index_config.index_id.clone().into() + IndexUid::from_parts(&v0_6.index_config.index_id, "") } else { v0_6.index_uid }, diff --git a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs index 5f9648296e1..c1fba530a80 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs @@ -1596,7 +1596,7 @@ mod tests { #[test] fn test_single_sql_query_builder() { - let index_uid = IndexUid::new("test-index"); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let query = ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Staged); let sql = build_query_filter(String::new(), &query); @@ -1699,7 +1699,7 @@ mod tests { #[test] fn test_combination_sql_query_builder() { - let index_uid = IndexUid::new("test-index"); + let index_uid = IndexUid::new_with_random_ulid("test-index"); let query = ListSplitsQuery::for_index(index_uid.clone()) .with_time_range_start_gt(0) .with_time_range_end_lt(40); @@ -1751,7 +1751,7 @@ mod tests { ) ); - let index_uid_2 = IndexUid::new("test-index-2"); + let index_uid_2 = IndexUid::new_with_random_ulid("test-index-2"); let query = ListSplitsQuery::try_from_index_uids(vec![index_uid.clone(), index_uid_2.clone()]) .unwrap(); diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 21a456014f2..88101809739 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -387,9 +387,10 @@ pub async fn test_metastore_delete_index< let index_uri = format!("ram:///indexes/{index_id}"); let index_config = IndexConfig::for_test(&index_id, &index_uri); + let index_uid_not_existing = IndexUid::new_with_random_ulid("index-not-found"); let error = metastore .delete_index(DeleteIndexRequest { - index_uid: "index-not-found".to_string(), + index_uid: index_uid_not_existing.to_string(), }) .await .unwrap_err(); @@ -400,7 +401,7 @@ pub async fn test_metastore_delete_index< let error = metastore .delete_index(DeleteIndexRequest { - index_uid: "test-delete-index".to_string(), + index_uid: index_uid_not_existing.to_string(), }) .await .unwrap_err(); @@ -534,7 +535,7 @@ pub async fn test_metastore_add_source Option<(IndexUid, SourceId, ShardId)> { /// Index identifiers that uniquely identify not only the index, but also /// its incarnation allowing to distinguish between deleted and recreated indexes. -/// It is represented as a stiring in index_id:incarnation_id format. -#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd, Hash)] +/// It is represented as a string in index_id:incarnation_id format. +#[derive(Clone, Debug, Default, Serialize, PartialEq, Eq, Ord, PartialOrd, Hash)] pub struct IndexUid(String); +// It is super lame, but for backward compatibility reasons we accept having a missing ulid part. +// TODO DEPRECATED ME and remove +impl<'de> Deserialize<'de> for IndexUid { + fn deserialize(deserializer: D) -> Result + where D: Deserializer<'de> { + let index_uid_str: String = String::deserialize(deserializer)?; + if !index_uid_str.contains(':') { + return Ok(IndexUid::from_parts(&index_uid_str, "")); + } + let index_uid = IndexUid::from(index_uid_str); + Ok(index_uid) + } +} + impl fmt::Display for IndexUid { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.0) @@ -73,8 +87,8 @@ impl fmt::Display for IndexUid { impl IndexUid { /// Creates a new index uid from index_id. - /// A random UUID will be used as incarnation - pub fn new(index_id: impl Into) -> Self { + /// A random ULID will be used as incarnation + pub fn new_with_random_ulid(index_id: &str) -> Self { Self::from_parts(index_id, Ulid::new().to_string()) } @@ -87,14 +101,9 @@ impl IndexUid { &self.0 } - pub fn from_parts(index_id: impl Into, incarnation_id: impl Into) -> Self { - let incarnation_id = incarnation_id.into(); - let index_id = index_id.into(); - if incarnation_id.is_empty() { - Self(index_id) - } else { - Self(format!("{index_id}:{incarnation_id}")) - } + pub fn from_parts(index_id: &str, incarnation_id: impl Display) -> Self { + assert!(!index_id.contains(':'), "Index id may not contain `:`"); + Self(format!("{index_id}:{incarnation_id}")) } pub fn index_id(&self) -> &str { @@ -127,8 +136,15 @@ impl From<&str> for IndexUid { } impl From for IndexUid { - fn from(index_uid: String) -> Self { - Self(index_uid) + fn from(index_uid: String) -> IndexUid { + let count_colon = index_uid + .as_bytes() + .iter() + .copied() + .filter(|c| *c == b':') + .count(); + assert_eq!(count_colon, 1, "Invalid index_uid: {}", index_uid); + IndexUid(index_uid) } }