Skip to content

Commit

Permalink
Fix pulsar tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jun 26, 2024
1 parent 935e7fa commit 9f6f029
Showing 1 changed file with 11 additions and 75 deletions.
86 changes: 11 additions & 75 deletions quickwit/quickwit-indexing/src/source/pulsar_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,22 +445,15 @@ mod pulsar_broker_tests {
use futures::future::join_all;
use quickwit_actors::{ActorHandle, Inbox, Universe, HEARTBEAT};
use quickwit_common::rand::append_random_suffix;
use quickwit_config::{IndexConfig, SourceConfig, SourceInputFormat, SourceParams};
use quickwit_metastore::checkpoint::{
IndexCheckpointDelta, PartitionId, SourceCheckpointDelta,
};
use quickwit_metastore::{
metastore_for_test, CreateIndexRequestExt, SplitMetadata, StageSplitsRequestExt,
};
use quickwit_proto::metastore::{
CreateIndexRequest, MetastoreService, MetastoreServiceClient, PublishSplitsRequest,
StageSplitsRequest,
};
use quickwit_config::{SourceConfig, SourceInputFormat, SourceParams};
use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpointDelta};
use quickwit_metastore::metastore_for_test;
use quickwit_proto::metastore::MetastoreServiceClient;
use reqwest::StatusCode;

use super::*;
use crate::new_split_id;
use crate::source::pulsar_source::{msg_id_from_position, msg_id_to_position};
use crate::source::test_setup_helper::setup_index;
use crate::source::tests::SourceRuntimeBuilder;
use crate::source::{quickwit_supported_sources, RawDocBatch, SuggestTruncate};

Expand Down Expand Up @@ -492,63 +485,6 @@ mod pulsar_broker_tests {
}};
}

async fn setup_index(
metastore: MetastoreServiceClient,
index_id: &str,
source_id: &str,
partition_deltas: &[(&str, Position, Position)],
) -> IndexUid {
let index_uri = format!("ram:///indexes/{index_id}");
let index_config = IndexConfig::for_test(index_id, &index_uri);
let create_index_request =
CreateIndexRequest::try_from_index_config(&index_config).unwrap();
let index_uid: IndexUid = metastore
.create_index(create_index_request)
.await
.unwrap()
.index_uid()
.clone();

if partition_deltas.is_empty() {
return index_uid;
}
let split_id = new_split_id();
let split_metadata = SplitMetadata::for_test(split_id.clone());
let stage_splits_request =
StageSplitsRequest::try_from_split_metadata(index_uid.clone(), &split_metadata)
.unwrap();
metastore.stage_splits(stage_splits_request).await.unwrap();

let mut source_delta = SourceCheckpointDelta::default();
for (partition_id, from_position, to_position) in partition_deltas {
source_delta
.record_partition_delta(
PartitionId::from(&**partition_id),
from_position.clone(),
to_position.clone(),
)
.unwrap();
}
let checkpoint_delta = IndexCheckpointDelta {
source_id: source_id.to_string(),
source_delta,
};
let publish_splits_request = PublishSplitsRequest {
index_uid: Some(index_uid.clone()),
staged_split_ids: vec![split_id.clone()],
replaced_split_ids: Vec::new(),
index_checkpoint_delta_json_opt: Some(
serde_json::to_string(&checkpoint_delta).unwrap(),
),
publish_token_opt: None,
};
metastore
.publish_splits(publish_splits_request)
.await
.unwrap();
index_uid
}

fn get_source_config<S: AsRef<str>>(
topics: impl IntoIterator<Item = S>,
) -> (String, SourceConfig) {
Expand Down Expand Up @@ -895,7 +831,7 @@ mod pulsar_broker_tests {
let index_id = append_random_suffix("test-pulsar-source--topic-ingestion--index");
let (source_id, source_config) = get_source_config([&topic]);

let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await;
let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await;

let (source_handle, doc_processor_inbox) = create_source(
&universe,
Expand Down Expand Up @@ -952,7 +888,7 @@ mod pulsar_broker_tests {
let index_id = append_random_suffix("test-pulsar-source--topic-ingestion--index");
let (source_id, source_config) = get_source_config([&topic1, &topic2]);

let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await;
let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await;

let (source_handle, doc_processor_inbox) = create_source(
&universe,
Expand Down Expand Up @@ -1020,7 +956,7 @@ mod pulsar_broker_tests {
let (source_id, source_config) = get_source_config([&topic]);

create_partitioned_topic(&topic, 2).await;
let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await;
let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await;

let (source_handle, doc_processor_inbox) = create_source(
&universe,
Expand Down Expand Up @@ -1074,7 +1010,7 @@ mod pulsar_broker_tests {
let (source_id, source_config) = get_source_config([&topic]);

create_partitioned_topic(&topic, 2).await;
let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await;
let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await;

let topic_partition_1 = format!("{topic}-partition-0");
let topic_partition_2 = format!("{topic}-partition-1");
Expand Down Expand Up @@ -1158,10 +1094,10 @@ mod pulsar_broker_tests {

let index_id =
append_random_suffix("test-pulsar-source--partitioned-multi-consumer-failure--index");
let (source_id, source_config) = get_source_config([&topic]);
let (_, source_config) = get_source_config([&topic]);

create_partitioned_topic(&topic, 2).await;
let index_uid = setup_index(metastore.clone(), &index_id, &source_id, &[]).await;
let index_uid = setup_index(metastore.clone(), &index_id, &source_config, &[]).await;

let topic_partition_1 = format!("{topic}-partition-0");
let topic_partition_2 = format!("{topic}-partition-1");
Expand Down

0 comments on commit 9f6f029

Please sign in to comment.