Skip to content

Commit

Permalink
Add metastore suite test for MetastoreServiceGrpcClientAdapter.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Nov 13, 2023
1 parent 6266316 commit a682e5f
Showing 1 changed file with 71 additions and 3 deletions.
74 changes: 71 additions & 3 deletions quickwit/quickwit-metastore/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ use itertools::Itertools;
use quickwit_common::rand::append_random_suffix;
use quickwit_config::{IndexConfig, SourceConfig, SourceInputFormat, SourceParams};
use quickwit_doc_mapper::tag_pruning::{no_tag, tag, TagFilterAst};
use quickwit_proto::metastore::metastore_service_grpc_client::MetastoreServiceGrpcClient;
use quickwit_proto::metastore::{
AddSourceRequest, CreateIndexRequest, DeleteIndexRequest, DeleteQuery, DeleteSourceRequest,
DeleteSplitsRequest, EntityKind, IndexMetadataRequest, LastDeleteOpstampRequest,
ListDeleteTasksRequest, ListIndexesMetadataRequest, ListSplitsRequest, ListStaleSplitsRequest,
MarkSplitsForDeletionRequest, MetastoreError, MetastoreService, PublishSplitsRequest,
ResetSourceCheckpointRequest, SourceType, StageSplitsRequest, ToggleSourceRequest,
UpdateSplitsDeleteOpstampRequest,
MarkSplitsForDeletionRequest, MetastoreError, MetastoreService, MetastoreServiceClient,
MetastoreServiceGrpcClientAdapter, PublishSplitsRequest, ResetSourceCheckpointRequest,
SourceType, StageSplitsRequest, ToggleSourceRequest, UpdateSplitsDeleteOpstampRequest,
};
use quickwit_proto::tonic::transport::Channel;
use quickwit_proto::types::{IndexUid, Position, SplitId};
use quickwit_query::query_ast::qast_json_helper;
use time::OffsetDateTime;
Expand All @@ -44,6 +46,7 @@ use tracing::{error, info};

pub(crate) mod shard;

use self::shard::DisableShardTestSuite;
use crate::checkpoint::{
IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta,
};
Expand All @@ -58,6 +61,71 @@ pub trait DefaultForTest {
async fn default_for_test() -> Self;
}

// We implement the trait to test the gRPC adapter backed by a file backed metastore.
#[async_trait]
impl DefaultForTest for MetastoreServiceGrpcClientAdapter<MetastoreServiceGrpcClient<Channel>> {
async fn default_for_test() -> Self {
use quickwit_proto::tonic::transport::Server;
use quickwit_storage::RamStorage;

use crate::FileBackedMetastore;
let metastore =
FileBackedMetastore::try_new(std::sync::Arc::new(RamStorage::default()), None)
.await
.unwrap();
let (client, server) = tokio::io::duplex(1024);
tokio::spawn(async move {
Server::builder()
.add_service(MetastoreServiceClient::new(metastore).as_grpc_service())
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
let channel = create_channel(client).await.unwrap();
let (_, connection_keys_watcher) =
tokio::sync::watch::channel(std::collections::HashSet::new());

MetastoreServiceGrpcClientAdapter::new(
MetastoreServiceGrpcClient::new(channel),
connection_keys_watcher,
)
}
}

impl MetastoreServiceExt
for MetastoreServiceGrpcClientAdapter<MetastoreServiceGrpcClient<Channel>>
{
}
impl DisableShardTestSuite
for MetastoreServiceGrpcClientAdapter<MetastoreServiceGrpcClient<Channel>>
{
}

async fn create_channel(client: tokio::io::DuplexStream) -> anyhow::Result<Channel> {
use http::Uri;
use quickwit_proto::tonic::transport::Endpoint;

let mut client = Some(client);
let channel = Endpoint::try_from("http://test.server")?
.connect_with_connector(tower::service_fn(move |_: Uri| {
let client = client.take();
async move {
client.ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::Other, "client already taken")
})
}
}))
.await?;
Ok(channel)
}

crate::metastore_test_suite!(
quickwit_proto::metastore::MetastoreServiceGrpcClientAdapter<
quickwit_proto::metastore::metastore_service_grpc_client::MetastoreServiceGrpcClient<
quickwit_proto::tonic::transport::Channel,
>,
>
);

fn collect_split_ids(splits: &[Split]) -> Vec<&str> {
splits
.iter()
Expand Down

0 comments on commit a682e5f

Please sign in to comment.