Skip to content

Commit

Permalink
Fix never-used warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jun 24, 2024
1 parent ac32fd6 commit 056f785
Showing 1 changed file with 77 additions and 66 deletions.
143 changes: 77 additions & 66 deletions quickwit/quickwit-indexing/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ mod kafka_source;
mod kinesis;
#[cfg(feature = "pulsar")]
mod pulsar_source;
#[cfg(feature = "sqs")]
mod queue_sources;
mod source_factory;
mod vec_source;
Expand Down Expand Up @@ -560,19 +561,13 @@ mod tests {

use std::num::NonZeroUsize;

use quickwit_config::{IndexConfig, SourceInputFormat, VecSourceParams};
use quickwit_metastore::checkpoint::{IndexCheckpointDelta, PartitionId};
use quickwit_metastore::{
CreateIndexRequestExt, IndexMetadata, SplitMetadata, StageSplitsRequestExt,
};
use quickwit_proto::metastore::{
CreateIndexRequest, IndexMetadataResponse, MockMetastoreService, PublishSplitsRequest,
StageSplitsRequest,
};
use quickwit_proto::types::{NodeId, Position};
use quickwit_config::{SourceInputFormat, VecSourceParams};
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
use quickwit_metastore::IndexMetadata;
use quickwit_proto::metastore::{IndexMetadataResponse, MockMetastoreService};
use quickwit_proto::types::NodeId;

use super::*;
use crate::new_split_id;

pub struct SourceRuntimeBuilder {
index_uid: IndexUid,
Expand Down Expand Up @@ -670,61 +665,6 @@ mod tests {
}
}

pub(crate) async fn setup_index(
mut metastore: MetastoreServiceClient,
index_id: &str,
source_config: &SourceConfig,
partition_deltas: &[(PartitionId, Position, Position)],
) -> IndexUid {
let index_uri = format!("ram:///indexes/{index_id}");
let index_config = IndexConfig::for_test(index_id, &index_uri);
let create_index_request = CreateIndexRequest::try_from_index_and_source_configs(
&index_config,
&[source_config.clone()],
)
.unwrap();
let index_uid: IndexUid = metastore
.create_index(create_index_request)
.await
.unwrap()
.index_uid()
.clone();

if partition_deltas.is_empty() {
return index_uid;
}
let split_id = new_split_id();
let split_metadata = SplitMetadata::for_test(split_id.clone());
let stage_splits_request =
StageSplitsRequest::try_from_split_metadata(index_uid.clone(), &split_metadata)
.unwrap();
metastore.stage_splits(stage_splits_request).await.unwrap();

let mut source_delta = SourceCheckpointDelta::default();
for (partition_id, from_position, to_position) in partition_deltas.iter().cloned() {
source_delta
.record_partition_delta(partition_id, from_position, to_position)
.unwrap();
}
let checkpoint_delta = IndexCheckpointDelta {
source_id: source_config.source_id.to_string(),
source_delta,
};
let checkpoint_delta_json = serde_json::to_string(&checkpoint_delta).unwrap();
let publish_splits_request = PublishSplitsRequest {
index_uid: Some(index_uid.clone()),
index_checkpoint_delta_json_opt: Some(checkpoint_delta_json),
staged_split_ids: vec![split_id.clone()],
replaced_split_ids: Vec::new(),
publish_token_opt: None,
};
metastore
.publish_splits(publish_splits_request)
.await
.unwrap();
index_uid
}

#[tokio::test]
async fn test_check_source_connectivity() -> anyhow::Result<()> {
{
Expand Down Expand Up @@ -782,3 +722,74 @@ mod tests {
Ok(())
}
}

#[cfg(all(
test,
any(feature = "sqs-localstack-tests", feature = "kafka-broker-tests")
))]
mod test_setup_helper {

use quickwit_config::IndexConfig;
use quickwit_metastore::checkpoint::{IndexCheckpointDelta, PartitionId};
use quickwit_metastore::{CreateIndexRequestExt, SplitMetadata, StageSplitsRequestExt};
use quickwit_proto::metastore::{CreateIndexRequest, PublishSplitsRequest, StageSplitsRequest};
use quickwit_proto::types::Position;

use super::*;
use crate::new_split_id;

pub async fn setup_index(
mut metastore: MetastoreServiceClient,
index_id: &str,
source_config: &SourceConfig,
partition_deltas: &[(PartitionId, Position, Position)],
) -> IndexUid {
let index_uri = format!("ram:///indexes/{index_id}");
let index_config = IndexConfig::for_test(index_id, &index_uri);
let create_index_request = CreateIndexRequest::try_from_index_and_source_configs(
&index_config,
&[source_config.clone()],
)
.unwrap();
let index_uid: IndexUid = metastore
.create_index(create_index_request)
.await
.unwrap()
.index_uid()
.clone();

if partition_deltas.is_empty() {
return index_uid;
}
let split_id = new_split_id();
let split_metadata = SplitMetadata::for_test(split_id.clone());
let stage_splits_request =
StageSplitsRequest::try_from_split_metadata(index_uid.clone(), &split_metadata)
.unwrap();
metastore.stage_splits(stage_splits_request).await.unwrap();

let mut source_delta = SourceCheckpointDelta::default();
for (partition_id, from_position, to_position) in partition_deltas.iter().cloned() {
source_delta
.record_partition_delta(partition_id, from_position, to_position)
.unwrap();
}
let checkpoint_delta = IndexCheckpointDelta {
source_id: source_config.source_id.to_string(),
source_delta,
};
let checkpoint_delta_json = serde_json::to_string(&checkpoint_delta).unwrap();
let publish_splits_request = PublishSplitsRequest {
index_uid: Some(index_uid.clone()),
index_checkpoint_delta_json_opt: Some(checkpoint_delta_json),
staged_split_ids: vec![split_id.clone()],
replaced_split_ids: Vec::new(),
publish_token_opt: None,
};
metastore
.publish_splits(publish_splits_request)
.await
.unwrap();
index_uid
}
}

0 comments on commit 056f785

Please sign in to comment.