From 9f6f029885638855ae226e3f430ef5e5a24c8775 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 26 Jun 2024 09:38:29 +0200 Subject: [PATCH] Fix pulsar tests --- .../src/source/pulsar_source.rs | 86 +++---------------- 1 file changed, 11 insertions(+), 75 deletions(-) diff --git a/quickwit/quickwit-indexing/src/source/pulsar_source.rs b/quickwit/quickwit-indexing/src/source/pulsar_source.rs index 6dec087a454..6dcc91abc71 100644 --- a/quickwit/quickwit-indexing/src/source/pulsar_source.rs +++ b/quickwit/quickwit-indexing/src/source/pulsar_source.rs @@ -445,22 +445,15 @@ mod pulsar_broker_tests { use futures::future::join_all; use quickwit_actors::{ActorHandle, Inbox, Universe, HEARTBEAT}; use quickwit_common::rand::append_random_suffix; - use quickwit_config::{IndexConfig, SourceConfig, SourceInputFormat, SourceParams}; - use quickwit_metastore::checkpoint::{ - IndexCheckpointDelta, PartitionId, SourceCheckpointDelta, - }; - use quickwit_metastore::{ - metastore_for_test, CreateIndexRequestExt, SplitMetadata, StageSplitsRequestExt, - }; - use quickwit_proto::metastore::{ - CreateIndexRequest, MetastoreService, MetastoreServiceClient, PublishSplitsRequest, - StageSplitsRequest, - }; + use quickwit_config::{SourceConfig, SourceInputFormat, SourceParams}; + use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpointDelta}; + use quickwit_metastore::metastore_for_test; + use quickwit_proto::metastore::MetastoreServiceClient; use reqwest::StatusCode; use super::*; - use crate::new_split_id; use crate::source::pulsar_source::{msg_id_from_position, msg_id_to_position}; + use crate::source::test_setup_helper::setup_index; use crate::source::tests::SourceRuntimeBuilder; use crate::source::{quickwit_supported_sources, RawDocBatch, SuggestTruncate}; @@ -492,63 +485,6 @@ mod pulsar_broker_tests { }}; } - async fn setup_index( - metastore: MetastoreServiceClient, - index_id: &str, - source_id: &str, - partition_deltas: &[(&str, Position, Position)], - ) -> IndexUid { - let index_uri = format!("ram:///indexes/{index_id}"); - let index_config = IndexConfig::for_test(index_id, &index_uri); - let create_index_request = - CreateIndexRequest::try_from_index_config(&index_config).unwrap(); - let index_uid: IndexUid = metastore - .create_index(create_index_request) - .await - .unwrap() - .index_uid() - .clone(); - - if partition_deltas.is_empty() { - return index_uid; - } - let split_id = new_split_id(); - let split_metadata = SplitMetadata::for_test(split_id.clone()); - let stage_splits_request = - StageSplitsRequest::try_from_split_metadata(index_uid.clone(), &split_metadata) - .unwrap(); - metastore.stage_splits(stage_splits_request).await.unwrap(); - - let mut source_delta = SourceCheckpointDelta::default(); - for (partition_id, from_position, to_position) in partition_deltas { - source_delta - .record_partition_delta( - PartitionId::from(&**partition_id), - from_position.clone(), - to_position.clone(), - ) - .unwrap(); - } - let checkpoint_delta = IndexCheckpointDelta { - source_id: source_id.to_string(), - source_delta, - }; - let publish_splits_request = PublishSplitsRequest { - index_uid: Some(index_uid.clone()), - staged_split_ids: vec![split_id.clone()], - replaced_split_ids: Vec::new(), - index_checkpoint_delta_json_opt: Some( - serde_json::to_string(&checkpoint_delta).unwrap(), - ), - publish_token_opt: None, - }; - metastore - .publish_splits(publish_splits_request) - .await - .unwrap(); - index_uid - } - fn get_source_config>( topics: impl IntoIterator, ) -> (String, SourceConfig) { @@ -895,7 +831,7 @@ mod pulsar_broker_tests { let index_id = append_random_suffix("test-pulsar-source--topic-ingestion--index"); let (source_id, source_config) = get_source_config([&topic]); - let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await; + let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await; let (source_handle, doc_processor_inbox) = create_source( &universe, @@ -952,7 +888,7 @@ mod pulsar_broker_tests { let index_id = append_random_suffix("test-pulsar-source--topic-ingestion--index"); let (source_id, source_config) = get_source_config([&topic1, &topic2]); - let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await; + let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await; let (source_handle, doc_processor_inbox) = create_source( &universe, @@ -1020,7 +956,7 @@ mod pulsar_broker_tests { let (source_id, source_config) = get_source_config([&topic]); create_partitioned_topic(&topic, 2).await; - let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await; + let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await; let (source_handle, doc_processor_inbox) = create_source( &universe, @@ -1074,7 +1010,7 @@ mod pulsar_broker_tests { let (source_id, source_config) = get_source_config([&topic]); create_partitioned_topic(&topic, 2).await; - let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await; + let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await; let topic_partition_1 = format!("{topic}-partition-0"); let topic_partition_2 = format!("{topic}-partition-1"); @@ -1158,10 +1094,10 @@ mod pulsar_broker_tests { let index_id = append_random_suffix("test-pulsar-source--partitioned-multi-consumer-failure--index"); - let (source_id, source_config) = get_source_config([&topic]); + let (_, source_config) = get_source_config([&topic]); create_partitioned_topic(&topic, 2).await; - let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await; + let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await; let topic_partition_1 = format!("{topic}-partition-0"); let topic_partition_2 = format!("{topic}-partition-1");