From 3731e99362aefa6ec8b0d89a0933ea6f372ca43d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Fri, 27 Oct 2023 20:00:08 +0200 Subject: [PATCH] Add stream splits gRPC. --- quickwit/Cargo.lock | 1 + quickwit/Cargo.toml | 1 + quickwit/quickwit-cli/tests/cli.rs | 19 +-- quickwit/quickwit-common/src/stream_utils.rs | 32 ++++ .../src/garbage_collection.rs | 41 ++--- .../quickwit-index-management/src/index.rs | 25 +-- .../src/actors/indexing_pipeline.rs | 10 +- .../src/actors/indexing_service.rs | 9 +- .../src/actors/merge_executor.rs | 11 +- .../src/actors/merge_pipeline.rs | 19 ++- quickwit/quickwit-indexing/src/test_utils.rs | 10 +- .../src/actors/delete_task_pipeline.rs | 4 +- .../src/actors/delete_task_planner.rs | 11 +- .../src/actors/garbage_collector.rs | 26 +-- .../src/actors/retention_policy_executor.rs | 10 +- .../src/retention_policy_execution.rs | 3 +- quickwit/quickwit-metastore/Cargo.toml | 1 + .../src/metastore/control_plane_metastore.rs | 13 +- .../metastore/file_backed_metastore/mod.rs | 79 +++++---- .../quickwit-metastore/src/metastore/mod.rs | 18 ++ .../src/metastore/postgresql_metastore.rs | 119 ++++++++------ quickwit/quickwit-metastore/src/tests/mod.rs | 154 +++++------------- .../protos/quickwit/metastore.proto | 8 +- .../codegen/quickwit/quickwit.metastore.rs | 144 +++++++++------- quickwit/quickwit-search/src/lib.rs | 5 +- quickwit/quickwit-search/src/root.rs | 89 ++++++---- .../quickwit-search/src/search_stream/leaf.rs | 16 +- .../quickwit-search/src/search_stream/root.rs | 21 ++- quickwit/quickwit-search/src/tests.rs | 5 +- .../src/index_api/rest_handler.rs | 40 ++--- quickwit/quickwit-serve/src/search_api/mod.rs | 6 +- 31 files changed, 489 insertions(+), 461 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index a0a3bea8598..1064729929f 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5720,6 +5720,7 @@ dependencies = [ "md5", "mockall", "once_cell", + "ouroboros", "quickwit-common", "quickwit-config", "quickwit-doc-mapper", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 13f78cc72dd..97eb228db93 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -111,6 +111,7 @@ openssl = { version = "0.10.55", default-features = false } openssl-probe = "0.1.5" opentelemetry = { version = "0.19", features = ["rt-tokio"] } opentelemetry-otlp = "0.12.0" +ouroboros = "0.18.0" pin-project = "1.1.0" pnet = { version = "0.33.0", features = ["std"] } postcard = { version = "1.0.4", features = ["use-std"], default-features = false} diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index 4445b2cc0de..25dafe96022 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -42,8 +42,7 @@ use quickwit_common::rand::append_random_suffix; use quickwit_common::uri::Uri; use quickwit_config::{SourceInputFormat, CLI_INGEST_SOURCE_ID}; use quickwit_metastore::{ - ListSplitsRequestExt, ListSplitsResponseExt, MetastoreResolver, MetastoreServiceExt, - SplitState, StageSplitsRequestExt, + ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, SplitState, StageSplitsRequestExt, }; use quickwit_proto::metastore::{ DeleteSplitsRequest, EntityKind, IndexMetadataRequest, ListSplitsRequest, @@ -254,8 +253,6 @@ async fn test_ingest_docs_cli() { .await .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); assert_eq!(splits.len(), 1); @@ -668,8 +665,6 @@ async fn test_garbage_collect_cli_no_grace() { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); assert_eq!(splits.len(), 1); @@ -718,8 +713,6 @@ async fn test_garbage_collect_cli_no_grace() { .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await .unwrap() - .deserialize_splits() - .unwrap() .len(), 0 ); @@ -779,8 +772,6 @@ async fn test_garbage_collect_index_cli() { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); assert_eq!(splits.len(), 1); @@ -798,8 +789,6 @@ async fn test_garbage_collect_index_cli() { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); assert_eq!(splits.len(), 1); @@ -839,8 +828,6 @@ async fn test_garbage_collect_index_cli() { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); assert_eq!(splits[0].split_state, SplitState::Staged); @@ -854,8 +841,6 @@ async fn test_garbage_collect_index_cli() { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); assert_eq!(splits.len(), 1); assert_eq!(splits[0].split_state, SplitState::Staged); @@ -872,8 +857,6 @@ async fn test_garbage_collect_index_cli() { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); // Splits should be deleted from both metastore and file system. assert_eq!(splits.len(), 0); diff --git a/quickwit/quickwit-common/src/stream_utils.rs b/quickwit/quickwit-common/src/stream_utils.rs index aa89b483621..66ac81d4ad5 100644 --- a/quickwit/quickwit-common/src/stream_utils.rs +++ b/quickwit/quickwit-common/src/stream_utils.rs @@ -33,6 +33,30 @@ pub struct ServiceStream { inner: BoxStream, } +impl ServiceStream +where T: Send + 'static +{ + pub fn new(inner: BoxStream) -> Self { + Self { inner } + } + + pub fn empty() -> Self { + Self { + inner: Box::pin(stream::empty()), + } + } +} + +impl From> for ServiceStream +where T: Send + 'static +{ + fn from(values: Vec) -> Self { + Self { + inner: Box::pin(stream::iter(values)), + } + } +} + impl fmt::Debug for ServiceStream where T: 'static { @@ -104,6 +128,14 @@ where T: Send + 'static } } +impl From> for ServiceStream +where T: 'static +{ + fn from(stream: BoxStream) -> Self { + Self { inner: stream } + } +} + /// Adapts a server-side tonic::Streaming into a ServiceStream of `Result`. Once /// an error is encountered, the stream will be closed and subsequent calls to `poll_next` will /// return `None`. diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index f0062ff8921..035ad7339a7 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -25,7 +25,7 @@ use std::time::Duration; use futures::Future; use quickwit_common::{PrettySample, Progress}; use quickwit_metastore::{ - ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, SplitInfo, SplitMetadata, + ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceExt, SplitInfo, SplitMetadata, SplitState, }; use quickwit_proto::metastore::{ @@ -106,7 +106,9 @@ pub async fn run_garbage_collect( metastore.list_splits(list_deletable_staged_request), ) .await? - .deserialize_splits_metadata()?; + .into_iter() + .map(|split| split.split_metadata) + .collect(); if dry_run { let marked_for_deletion_query = ListSplitsQuery::for_index(index_uid.clone()) @@ -118,7 +120,9 @@ pub async fn run_garbage_collect( metastore.list_splits(marked_for_deletion_request), ) .await? - .deserialize_splits_metadata()?; + .into_iter() + .map(|split| split.split_metadata) + .collect(); splits_marked_for_deletion.extend(deletable_staged_splits); let candidate_entries: Vec = splits_marked_for_deletion @@ -192,9 +196,7 @@ async fn delete_splits_marked_for_deletion( } }; let list_splits_result = - protect_future(progress_opt, metastore.list_splits(list_splits_request)) - .await - .and_then(|list_splits_response| list_splits_response.deserialize_splits()); + protect_future(progress_opt, metastore.list_splits(list_splits_request)).await; let splits_to_delete: Vec = match list_splits_result { Ok(splits) => splits @@ -345,14 +347,13 @@ mod tests { use std::time::Duration; use itertools::Itertools; + use quickwit_common::ServiceStream; use quickwit_config::IndexConfig; use quickwit_metastore::{ metastore_for_test, CreateIndexRequestExt, ListSplitsQuery, SplitMetadata, SplitState, StageSplitsRequestExt, }; - use quickwit_proto::metastore::{ - CreateIndexRequest, EntityKind, ListSplitsResponse, StageSplitsRequest, - }; + use quickwit_proto::metastore::{CreateIndexRequest, EntityKind, StageSplitsRequest}; use quickwit_proto::types::IndexUid; use quickwit_storage::{ storage_for_test, BulkDeleteError, DeleteFailure, MockStorage, PutPayload, @@ -395,8 +396,6 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() - .deserialize_splits() - .unwrap() .len(), 1 ); @@ -422,8 +421,6 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() - .deserialize_splits() - .unwrap() .len(), 1 ); @@ -449,8 +446,6 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() - .deserialize_splits() - .unwrap() .len(), 1 ); @@ -496,8 +491,6 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() - .deserialize_splits() - .unwrap() .len(), 1 ); @@ -523,8 +516,6 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() - .deserialize_splits() - .unwrap() .len(), 1 ); @@ -549,8 +540,6 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() - .deserialize_splits() - .unwrap() .len(), 0 ); @@ -562,9 +551,9 @@ mod tests { let storage = storage_for_test(); let mut metastore = MetastoreServiceClient::mock(); metastore - .expect_list_splits() + .expect_stream_splits() .times(2) - .returning(|_| Ok(ListSplitsResponse::empty())); + .returning(|_| Ok(ServiceStream::empty())); run_garbage_collect( IndexUid::new_with_random_ulid("index-test-gc-deletes"), storage.clone(), @@ -620,8 +609,6 @@ mod tests { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); assert_eq!(splits.len(), 1); @@ -646,8 +633,6 @@ mod tests { .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await .unwrap() - .deserialize_splits() - .unwrap() .is_empty()); } @@ -738,8 +723,6 @@ mod tests { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); assert_eq!(splits.len(), 1); assert_eq!(splits[0].split_id(), split_id_1); diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 5a5fbec3b9b..8f38edc99da 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -25,7 +25,7 @@ use quickwit_config::{validate_identifier, IndexConfig, SourceConfig}; use quickwit_indexing::check_source_connectivity; use quickwit_metastore::{ AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt, - ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, SplitInfo, SplitMetadata, + ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceExt, SplitInfo, SplitMetadata, SplitState, }; use quickwit_proto::metastore::{ @@ -174,25 +174,26 @@ impl IndexService { if dry_run { let list_splits_request = ListSplitsRequest::try_from_index_uid(index_uid)?; - let splits_to_delete = self + let splits_to_delete: Vec = self .metastore .list_splits(list_splits_request) .await? - .deserialize_splits()? .into_iter() .map(|split| split.split_metadata.as_split_info()) - .collect::>(); + .collect(); return Ok(splits_to_delete); } // Schedule staged and published splits for deletion. let query = ListSplitsQuery::for_index(index_uid.clone()) .with_split_states([SplitState::Staged, SplitState::Published]); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?; - let split_ids = self + let split_ids: Vec = self .metastore .list_splits(list_splits_request) .await? - .deserialize_split_ids()?; + .into_iter() + .map(|split| split.split_metadata.split_id) + .collect(); let mark_splits_for_deletion_request = MarkSplitsForDeletionRequest::new(index_uid.clone(), split_ids); self.metastore @@ -203,11 +204,13 @@ impl IndexService { let query = ListSplitsQuery::for_index(index_uid.clone()) .with_split_state(SplitState::MarkedForDeletion); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?; - let splits_to_delete = self + let splits_to_delete: Vec = self .metastore .list_splits(list_splits_request) .await? - .deserialize_splits_metadata()?; + .into_iter() + .map(|split| split.split_metadata) + .collect(); let deleted_splits = delete_splits_from_storage_and_metastore( index_uid.clone(), @@ -291,7 +294,9 @@ impl IndexService { .metastore .list_splits(list_splits_request) .await? - .deserialize_splits_metadata()?; + .into_iter() + .map(|split| split.split_metadata) + .collect(); let split_ids: Vec = splits_metadata .iter() .map(|split| split.split_id.to_string()) @@ -496,8 +501,6 @@ mod tests { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); assert_eq!(splits.len(), 1); diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index ad7bb4eedca..f5c3f89e295 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -589,13 +589,13 @@ mod tests { use std::sync::Arc; use quickwit_actors::{Command, Universe}; + use quickwit_common::ServiceStream; use quickwit_config::{IndexingSettings, SourceInputFormat, SourceParams, VoidSourceParams}; use quickwit_doc_mapper::{default_doc_mapper_for_test, DefaultDocMapper}; use quickwit_metastore::checkpoint::IndexCheckpointDelta; - use quickwit_metastore::{IndexMetadata, ListSplitsResponseExt, PublishSplitsRequestExt}; + use quickwit_metastore::{IndexMetadata, PublishSplitsRequestExt}; use quickwit_proto::metastore::{ - EmptyResponse, IndexMetadataResponse, LastDeleteOpstampResponse, ListSplitsResponse, - MetastoreError, + EmptyResponse, IndexMetadataResponse, LastDeleteOpstampResponse, MetastoreError, }; use quickwit_proto::types::IndexUid; use quickwit_storage::RamStorage; @@ -827,8 +827,8 @@ mod tests { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata).unwrap()) }); mock_metastore - .expect_list_splits() - .returning(|_| Ok(ListSplitsResponse::empty())); + .expect_stream_splits() + .returning(|_| Ok(ServiceStream::empty())); let universe = Universe::with_accelerated_time(); let node_id = "test-node"; let doc_mapper = Arc::new(default_doc_mapper_for_test()); diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index a0da3d8868f..4c7c21457e8 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -830,6 +830,7 @@ mod tests { use quickwit_actors::{Health, ObservationType, Supervisable, Universe, HEARTBEAT}; use quickwit_cluster::create_cluster_for_test; use quickwit_common::rand::append_random_suffix; + use quickwit_common::ServiceStream; use quickwit_config::{ IngestApiConfig, KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams, VecSourceParams, @@ -837,12 +838,12 @@ mod tests { use quickwit_ingest::{init_ingest_api, CreateQueueIfNotExistsRequest}; use quickwit_metastore::{ metastore_for_test, AddSourceRequestExt, CreateIndexRequestExt, - ListIndexesMetadataResponseExt, ListSplitsResponseExt, + ListIndexesMetadataResponseExt, }; use quickwit_proto::indexing::IndexingTask; use quickwit_proto::metastore::{ AddSourceRequest, CreateIndexRequest, DeleteIndexRequest, IndexMetadataResponse, - ListIndexesMetadataResponse, ListSplitsResponse, + ListIndexesMetadataResponse, }; use super::*; @@ -1427,8 +1428,8 @@ mod tests { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata.clone()).unwrap()) }); metastore - .expect_list_splits() - .returning(|_| Ok(ListSplitsResponse::try_from_splits(Vec::new()).unwrap())); + .expect_stream_splits() + .returning(|_| Ok(ServiceStream::empty())); let universe = Universe::new(); let temp_dir = tempfile::tempdir().unwrap(); let (indexing_service, indexing_service_handle) = spawn_indexing_service_for_test( diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index 9444a05c22c..f078da6909a 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -559,7 +559,7 @@ mod tests { use quickwit_actors::Universe; use quickwit_common::split_file; use quickwit_metastore::{ - ListSplitsRequestExt, ListSplitsResponseExt, SplitMetadata, StageSplitsRequestExt, + ListSplitsRequestExt, MetastoreServiceExt, SplitMetadata, StageSplitsRequestExt, }; use quickwit_proto::metastore::{ DeleteQuery, ListSplitsRequest, PublishSplitsRequest, StageSplitsRequest, @@ -605,8 +605,9 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() - .deserialize_splits_metadata() - .unwrap(); + .into_iter() + .map(|split| split.split_metadata) + .collect(); assert_eq!(split_metas.len(), 4); let merge_scratch_directory = TempDirectory::for_test(); let downloaded_splits_directory = @@ -732,8 +733,6 @@ mod tests { .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await .unwrap() - .deserialize_splits() - .unwrap() .into_iter() .next() .unwrap(); @@ -845,8 +844,6 @@ mod tests { .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await .unwrap() - .deserialize_splits() - .unwrap() .into_iter() .all( |split| split.split_state == quickwit_metastore::SplitState::MarkedForDeletion diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 0ca0b00de78..ec4e2b9bf3e 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -32,12 +32,10 @@ use quickwit_common::temp_dir::TempDirectory; use quickwit_common::KillSwitch; use quickwit_doc_mapper::DocMapper; use quickwit_metastore::{ - ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, SplitMetadata, SplitState, + ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceExt, SplitMetadata, SplitState, }; use quickwit_proto::indexing::IndexingPipelineId; -use quickwit_proto::metastore::{ - ListSplitsRequest, MetastoreError, MetastoreService, MetastoreServiceClient, -}; +use quickwit_proto::metastore::{ListSplitsRequest, MetastoreError, MetastoreServiceClient}; use time::OffsetDateTime; use tracing::{debug, error, info, instrument}; @@ -226,7 +224,9 @@ impl MergePipeline { let published_splits_metadata: Vec = ctx .protect_future(self.params.metastore.list_splits(list_splits_request)) .await? - .deserialize_splits_metadata()?; + .into_iter() + .map(|split| split.split_metadata) + .collect(); info!( num_splits = published_splits_metadata.len(), @@ -485,10 +485,11 @@ mod tests { use quickwit_actors::{ActorExitStatus, Universe}; use quickwit_common::temp_dir::TempDirectory; + use quickwit_common::ServiceStream; use quickwit_doc_mapper::default_doc_mapper_for_test; - use quickwit_metastore::{ListSplitsRequestExt, ListSplitsResponseExt}; + use quickwit_metastore::ListSplitsRequestExt; use quickwit_proto::indexing::IndexingPipelineId; - use quickwit_proto::metastore::{ListSplitsResponse, MetastoreServiceClient}; + use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::types::IndexUid; use quickwit_storage::RamStorage; @@ -507,7 +508,7 @@ mod tests { pipeline_ord: 0, }; metastore - .expect_list_splits() + .expect_stream_splits() .times(1) .withf(move |list_splits_request| { let list_split_query = list_splits_request.deserialize_list_splits_query().unwrap(); @@ -521,7 +522,7 @@ mod tests { }; true }) - .returning(|_| Ok(ListSplitsResponse::try_from_splits(Vec::new()).unwrap())); + .returning(|_| Ok(ServiceStream::empty())); let universe = Universe::with_accelerated_time(); let storage = Arc::new(RamStorage::default()); let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 784cc1b60be..0d76813d683 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -282,8 +282,8 @@ pub fn mock_split_meta(split_id: &str, index_uid: &IndexUid) -> SplitMetadata { #[cfg(test)] mod tests { - use quickwit_metastore::{ListSplitsRequestExt, ListSplitsResponseExt}; - use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService}; + use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceExt}; + use quickwit_proto::metastore::ListSplitsRequest; use super::TestSandbox; @@ -312,8 +312,7 @@ mod tests { .list_splits( ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap(), ) - .await? - .deserialize_splits()?; + .await?; assert_eq!(splits.len(), 1); test_sandbox.add_documents(vec![ serde_json::json!({"title": "Byzantine-Ottoman wars", "body": "...", "url": "http://biz-ottoman"}), @@ -324,8 +323,7 @@ mod tests { .list_splits( ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap(), ) - .await? - .deserialize_splits()?; + .await?; assert_eq!(splits.len(), 2); } test_sandbox.assert_quit().await; diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 78196e48bce..851c85b1b1a 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -291,7 +291,7 @@ mod tests { use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_indexing::TestSandbox; - use quickwit_metastore::{ListSplitsRequestExt, ListSplitsResponseExt, SplitState}; + use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceExt, SplitState}; use quickwit_proto::metastore::{DeleteQuery, ListSplitsRequest, MetastoreService}; use quickwit_proto::search::{LeafSearchRequest, LeafSearchResponse}; use quickwit_search::{ @@ -419,8 +419,6 @@ mod tests { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); assert_eq!(splits.len(), 2); let published_split = splits diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs index d51e8274859..5669df35074 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs @@ -425,7 +425,9 @@ mod tests { use quickwit_config::build_doc_mapper; use quickwit_indexing::merge_policy::MergeOperation; use quickwit_indexing::TestSandbox; - use quickwit_metastore::{IndexMetadataResponseExt, ListSplitsRequestExt, SplitMetadata}; + use quickwit_metastore::{ + IndexMetadataResponseExt, ListSplitsRequestExt, MetastoreServiceExt, SplitMetadata, + }; use quickwit_proto::metastore::{DeleteQuery, IndexMetadataRequest, ListSplitsRequest}; use quickwit_proto::search::{LeafSearchRequest, LeafSearchResponse}; use quickwit_search::{searcher_pool_for_test, MockSearchService}; @@ -479,8 +481,9 @@ mod tests { .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await .unwrap() - .deserialize_splits_metadata() - .unwrap(); + .into_iter() + .map(|split| split.split_metadata) + .collect(); assert_eq!(split_metas.len(), 3); let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)?; @@ -594,8 +597,6 @@ mod tests { let all_splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); assert_eq!(all_splits[0].split_metadata.delete_opstamp, 2); assert_eq!(all_splits[1].split_metadata.delete_opstamp, 2); diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index 5e80b11d179..bc93dcd620c 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -212,6 +212,7 @@ mod tests { use quickwit_actors::Universe; use quickwit_common::shared_consts::DELETION_GRACE_PERIOD; + use quickwit_common::ServiceStream; use quickwit_metastore::{ IndexMetadata, ListSplitsRequestExt, ListSplitsResponseExt, Split, SplitMetadata, SplitState, @@ -262,7 +263,7 @@ mod tests { let mut mock_metastore = MetastoreServiceClient::mock(); mock_metastore - .expect_list_splits() + .expect_stream_splits() .times(2) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); @@ -292,7 +293,8 @@ mod tests { } _ => panic!("only Staged and MarkedForDeletion expected."), }; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits)])) }); mock_metastore .expect_mark_splits_for_deletion() @@ -356,7 +358,7 @@ mod tests { ) }); mock_metastore - .expect_list_splits() + .expect_stream_splits() .times(2) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); @@ -368,7 +370,8 @@ mod tests { } _ => panic!("only Staged and MarkedForDeletion expected."), }; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits)])) }); mock_metastore .expect_mark_splits_for_deletion() @@ -431,7 +434,7 @@ mod tests { ) }); mock_metastore - .expect_list_splits() + .expect_stream_splits() .times(6) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); @@ -443,7 +446,8 @@ mod tests { } _ => panic!("only Staged and MarkedForDeletion expected."), }; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits)])) }); mock_metastore .expect_mark_splits_for_deletion() @@ -600,7 +604,7 @@ mod tests { ) }); mock_metastore - .expect_list_splits() + .expect_stream_splits() .times(3) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); @@ -618,7 +622,8 @@ mod tests { } _ => panic!("only Staged and MarkedForDeletion expected."), }; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits)])) }); mock_metastore .expect_mark_splits_for_deletion() @@ -681,7 +686,7 @@ mod tests { ) }); mock_metastore - .expect_list_splits() + .expect_stream_splits() .times(4) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); @@ -693,7 +698,8 @@ mod tests { } _ => panic!("only Staged and MarkedForDeletion expected."), }; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits)])) }); mock_metastore .expect_mark_splits_for_deletion() diff --git a/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs b/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs index f8122a0b030..aabb755f7af 100644 --- a/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs +++ b/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs @@ -254,6 +254,7 @@ mod tests { use mockall::Sequence; use quickwit_actors::Universe; + use quickwit_common::ServiceStream; use quickwit_config::RetentionPolicy; use quickwit_metastore::{ IndexMetadata, ListSplitsRequestExt, ListSplitsResponseExt, Split, SplitMetadata, @@ -347,9 +348,9 @@ mod tests { let mut sequence = Sequence::new(); mock_metastore - .expect_list_splits() + .expect_stream_splits() .times(..) - .returning(|_| Ok(ListSplitsResponse::empty())); + .returning(|_| Ok(ServiceStream::empty())); mock_metastore .expect_list_indexes_metadata() .times(1) @@ -456,7 +457,7 @@ mod tests { }); mock_metastore - .expect_list_splits() + .expect_stream_splits() .times(2..=4) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); @@ -472,7 +473,8 @@ mod tests { "index-2" => Vec::new(), unknown => panic!("Unknown index: `{unknown}`."), }; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); mock_metastore diff --git a/quickwit/quickwit-janitor/src/retention_policy_execution.rs b/quickwit/quickwit-janitor/src/retention_policy_execution.rs index 6a1449c8e03..255e5f05d0f 100644 --- a/quickwit/quickwit-janitor/src/retention_policy_execution.rs +++ b/quickwit/quickwit-janitor/src/retention_policy_execution.rs @@ -21,7 +21,7 @@ use quickwit_actors::ActorContext; use quickwit_common::PrettySample; use quickwit_config::RetentionPolicy; use quickwit_metastore::{ - ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, SplitMetadata, SplitState, + ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceExt, SplitMetadata, SplitState, }; use quickwit_proto::metastore::{ ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreService, MetastoreServiceClient, @@ -58,7 +58,6 @@ pub async fn run_execute_retention_policy( let (expired_splits, ignored_splits): (Vec, Vec) = ctx .protect_future(metastore.list_splits(list_splits_request)) .await? - .deserialize_splits()? .into_iter() .map(|split| split.split_metadata) .partition(|split_metadata| split_metadata.time_range.is_some()); diff --git a/quickwit/quickwit-metastore/Cargo.toml b/quickwit/quickwit-metastore/Cargo.toml index 74e989b928a..c58021bf16f 100644 --- a/quickwit/quickwit-metastore/Cargo.toml +++ b/quickwit/quickwit-metastore/Cargo.toml @@ -18,6 +18,7 @@ http = { workspace = true } itertools = { workspace = true } mockall = { workspace = true, optional = true } once_cell = { workspace = true } +ouroboros = { workspace = true } rand = { workspace = true } regex = { workspace = true } sea-query = { workspace = true, optional = true } diff --git a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs index 13778862823..a54ac5ef995 100644 --- a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs @@ -30,9 +30,10 @@ use quickwit_proto::metastore::{ LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, - MetastoreResult, MetastoreService, MetastoreServiceClient, OpenShardsRequest, - OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, - ToggleSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, + MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream, + OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, + StageSplitsRequest, ToggleSourceRequest, UpdateSplitsDeleteOpstampRequest, + UpdateSplitsDeleteOpstampResponse, }; /// A [`MetastoreService`] implementation that proxies some requests to the control plane so it can @@ -141,11 +142,11 @@ impl MetastoreService for ControlPlaneMetastore { self.metastore.publish_splits(request).await } - async fn list_splits( + async fn stream_splits( &mut self, request: ListSplitsRequest, - ) -> MetastoreResult { - self.metastore.list_splits(request).await + ) -> MetastoreResult> { + self.metastore.stream_splits(request).await } async fn list_stale_splits( diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs index 068ac608c4e..f34a9deb6f0 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs @@ -34,6 +34,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::future::try_join_all; use itertools::Itertools; +use quickwit_common::ServiceStream; use quickwit_config::validate_index_id_pattern; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, AcquireShardsSubrequest, AddSourceRequest, @@ -44,9 +45,9 @@ use quickwit_proto::metastore::{ ListDeleteTasksResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult, - MetastoreService, OpenShardsRequest, OpenShardsResponse, OpenShardsSubrequest, - PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, - UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, + MetastoreService, MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse, + OpenShardsSubrequest, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, + ToggleSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; use quickwit_proto::types::IndexUid; use quickwit_storage::Storage; @@ -64,10 +65,10 @@ use self::store_operations::{ use super::{ AddSourceRequestExt, CreateIndexRequestExt, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsRequestExt, ListSplitsResponseExt, - PublishSplitsRequestExt, StageSplitsRequestExt, + PublishSplitsRequestExt, StageSplitsRequestExt, STREAM_SPLITS_CHUNK_SIZE, }; use crate::checkpoint::IndexCheckpointDelta; -use crate::{IndexMetadata, ListSplitsQuery, MetastoreServiceExt, SplitState}; +use crate::{IndexMetadata, ListSplitsQuery, MetastoreServiceExt, Split, SplitState}; /// State of an index tracked by the metastore. pub(crate) enum IndexState { @@ -318,6 +319,20 @@ impl FileBackedMetastore { Ok(index_mutex) } + async fn inner_list_splits(&self, request: ListSplitsRequest) -> MetastoreResult> { + let list_splits_query = request.deserialize_list_splits_query()?; + let mut all_splits = Vec::new(); + for index_uid in &list_splits_query.index_uids { + let splits = self + .read(index_uid.clone(), |index| { + index.list_splits(&list_splits_query) + }) + .await?; + all_splits.extend(splits); + } + Ok(all_splits) + } + /// Helper used for testing to obtain the data associated with the given index. #[cfg(test)] async fn get_index(&self, index_uid: IndexUid) -> MetastoreResult { @@ -610,23 +625,16 @@ impl MetastoreService for FileBackedMetastore { /// ------------------------------------------------------------------------------- /// Read-only accessors - async fn list_splits( + async fn stream_splits( &mut self, request: ListSplitsRequest, - ) -> MetastoreResult { - let list_splits_query = request.deserialize_list_splits_query()?; - let mut all_splits = Vec::new(); - - for index_uid in &list_splits_query.index_uids { - let splits = self - .read(index_uid.clone(), |index| { - index.list_splits(&list_splits_query) - }) - .await?; - all_splits.extend(splits); - } - let response = ListSplitsResponse::try_from_splits(all_splits)?; - Ok(response) + ) -> MetastoreResult> { + let splits = self.inner_list_splits(request).await?; + let chunks = splits + .chunks(STREAM_SPLITS_CHUNK_SIZE) + .map(|chunk| ListSplitsResponse::try_from_splits(chunk.to_vec())) + .collect_vec(); + Ok(ServiceStream::from(chunks)) } async fn list_stale_splits( @@ -640,7 +648,8 @@ impl MetastoreService for FileBackedMetastore { .sort_by_staleness() .with_limit(request.num_splits as usize); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(list_splits_query)?; - self.list_splits(list_splits_request).await + let splits = self.inner_list_splits(list_splits_request).await?; + ListSplitsResponse::try_from_splits(splits) } async fn index_metadata( @@ -1135,16 +1144,14 @@ mod tests { let query = ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Published); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query).unwrap(); - let list_splits_response = metastore.list_splits(list_splits_request).await.unwrap(); - let splits = list_splits_response.deserialize_splits().unwrap(); + let splits = metastore.list_splits(list_splits_request).await.unwrap(); assert!(splits.is_empty()); let list_splits_query = ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Staged); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(list_splits_query).unwrap(); - let list_splits_response = metastore.list_splits(list_splits_request).await.unwrap(); - let splits = list_splits_response.deserialize_splits().unwrap(); + let splits = metastore.list_splits(list_splits_request).await.unwrap(); assert!(!splits.is_empty()); } @@ -1196,11 +1203,10 @@ mod tests { let create_index_response = metastore.create_index(create_index_request).await.unwrap(); let index_uid: IndexUid = create_index_response.index_uid.into(); - let list_splits_response = metastore + let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await .unwrap(); - let splits = list_splits_response.deserialize_splits().unwrap(); assert!(splits.is_empty()); let split_metadata = SplitMetadata { @@ -1215,11 +1221,10 @@ mod tests { StageSplitsRequest::try_from_split_metadata(index_uid.clone(), split_metadata).unwrap(); metastore.stage_splits(stage_splits_request).await?; - let list_splits_response = metastore + let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await .unwrap(); - let splits = list_splits_response.deserialize_splits().unwrap(); assert_eq!(splits.len(), 1); Ok(()) } @@ -1244,18 +1249,16 @@ mod tests { .unwrap(); let index_uid: IndexUid = create_index_response.index_uid.into(); - let list_splits_response = metastore_write + let splits = metastore_write .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await .unwrap(); - let splits = list_splits_response.deserialize_splits().unwrap(); assert!(splits.is_empty()); - let list_splits_response = metastore_read + let splits = metastore_read .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await .unwrap(); - let splits = list_splits_response.deserialize_splits().unwrap(); assert!(splits.is_empty()); let split_metadata = SplitMetadata { @@ -1270,22 +1273,19 @@ mod tests { StageSplitsRequest::try_from_split_metadata(index_uid.clone(), split_metadata).unwrap(); metastore_write.stage_splits(stage_splits_request).await?; - let list_splits_response = metastore_read + let splits = metastore_read .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await .unwrap(); - let splits = list_splits_response.deserialize_splits().unwrap(); assert!(splits.is_empty()); for _ in 0..10 { tokio::time::sleep(polling_interval).await; - let list_splits_response = metastore_read + let splits = metastore_read .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await .unwrap(); - let splits = list_splits_response.deserialize_splits().unwrap(); - if !splits.is_empty() { return Ok(()); } @@ -1353,8 +1353,7 @@ mod tests { ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Published); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(list_splits_query).unwrap(); - let list_splits_response = metastore.list_splits(list_splits_request).await.unwrap(); - let splits = list_splits_response.deserialize_splits().unwrap(); + let splits = metastore.list_splits(list_splits_request).await.unwrap(); // Make sure that all 20 splits are in `Published` state. assert_eq!(splits.len(), 20); diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 5b5eee3ca87..6d428c2a536 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -29,6 +29,7 @@ pub mod control_plane_metastore; use std::ops::{Bound, RangeInclusive}; use async_trait::async_trait; +use futures::TryStreamExt; pub use index_metadata::IndexMetadata; use itertools::Itertools; use once_cell::sync::Lazy; @@ -47,6 +48,9 @@ use time::OffsetDateTime; use crate::checkpoint::IndexCheckpointDelta; use crate::{Split, SplitMetadata, SplitState}; +/// Splits batch size returned by the stream splits API +const STREAM_SPLITS_CHUNK_SIZE: usize = 1000; + static METASTORE_METRICS_LAYER: Lazy> = Lazy::new(|| PrometheusMetricsLayer::new("metastore", ["request"])); @@ -70,6 +74,20 @@ pub trait MetastoreServiceExt: MetastoreService { Err(error) => Err(error), } } + + /// Lists all splits matching the given query. + /// It calls `stream_splits` and collects the splits into a single vector. + /// This method is added for convenience and may be removed in the future to avoid + /// loading all splits in memory. + async fn list_splits(&mut self, query: ListSplitsRequest) -> MetastoreResult> { + let mut stream = self.stream_splits(query).await?; + let mut all_splits = Vec::new(); + while let Some(list_splits_response) = stream.try_next().await? { + let splits = list_splits_response.deserialize_splits()?; + all_splits.extend(splits); + } + Ok(all_splits) + } } impl MetastoreServiceExt for MetastoreServiceClient {} diff --git a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs index e6dee95f359..5d22515c116 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs @@ -17,7 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::fmt::{self, Display, Write}; use std::ops::Bound; use std::str::FromStr; @@ -25,8 +25,11 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use futures::stream::BoxStream; +use futures::StreamExt; +use ouroboros::self_referencing; use quickwit_common::uri::Uri; -use quickwit_common::PrettySample; +use quickwit_common::{PrettySample, ServiceStream}; use quickwit_config::{ validate_index_id_pattern, MetastoreBackend, MetastoreConfig, PostgresMetastoreConfig, }; @@ -39,9 +42,10 @@ use quickwit_proto::metastore::{ LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, - MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, OpenShardsRequest, - OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest, - ToggleSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, + MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, + MetastoreServiceStream, OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest, + ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, + UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; use quickwit_proto::types::IndexUid; use sea_query::{ @@ -52,9 +56,11 @@ use sqlx::migrate::Migrator; use sqlx::postgres::{PgConnectOptions, PgDatabaseError, PgPoolOptions}; use sqlx::{ConnectOptions, Pool, Postgres, Transaction}; use tokio::sync::Mutex; +use tokio_stream::Stream; use tracing::log::LevelFilter; use tracing::{debug, error, info, instrument, warn}; +use super::STREAM_SPLITS_CHUNK_SIZE; use crate::checkpoint::IndexCheckpointDelta; use crate::metastore::postgresql_model::{PgDeleteTask, PgIndex, PgSplit, Splits, ToTimestampFunc}; use crate::metastore::{instrument_metastore, FilterRange, PublishSplitsRequestExt}; @@ -799,57 +805,54 @@ impl MetastoreService for PostgresqlMetastore { } #[instrument(skip(self))] - async fn list_splits( + async fn stream_splits( &mut self, request: ListSplitsRequest, - ) -> MetastoreResult { + ) -> MetastoreResult> { let query = request.deserialize_list_splits_query()?; let mut sql = Query::select(); sql.column(Asterisk).from(Splits::Table); append_query_filters(&mut sql, &query); let (sql, values) = sql.build_sqlx(PostgresQueryBuilder); + let split_stream = SplitStream::new( + self.connection_pool.clone(), + sql, + |connection_pool: &Pool, sql: &String| { + sqlx::query_as_with::<_, PgSplit, _>(sql, values).fetch(connection_pool) + }, + ); - let pg_splits = sqlx::query_as_with::<_, PgSplit, _>(&sql, values) - .fetch_all(&self.connection_pool) - .await?; - - // If no splits were returned, maybe some indexes do not exist in the first place? - // TODO: the file-backed metastore is more accurate as it checks for index existence before - // returning splits. We could do the same here or remove index existence check `list_splits` - // for all metastore implementations. - if pg_splits.is_empty() { - let index_ids_str: Vec = query - .index_uids - .iter() - .map(|index_uid| index_uid.index_id().to_string()) - .collect(); - let list_indexes_metadata_request = ListIndexesMetadataRequest { - index_id_patterns: index_ids_str.clone(), - }; - let found_index_ids: HashSet = self - .list_indexes_metadata(list_indexes_metadata_request) - .await? - .deserialize_indexes_metadata()? - .into_iter() - .map(|index_metadata| index_metadata.index_id().to_string()) - .collect(); - let not_found_index_ids: Vec = index_ids_str - .into_iter() - .filter(|index_id| !found_index_ids.contains(index_id)) - .collect(); - if !not_found_index_ids.is_empty() { - return Err(MetastoreError::NotFound(EntityKind::Indexes { - index_ids: not_found_index_ids, - })); - } - } - let splits = pg_splits - .into_iter() - .map(|pg_split| pg_split.try_into()) - .collect::>>()?; - let response = ListSplitsResponse::try_from_splits(splits)?; - Ok(response) + let mapped_split_stream = + split_stream + .chunks(STREAM_SPLITS_CHUNK_SIZE) + .map(|pg_splits_res| { + let mut splits = Vec::new(); + for pg_split_res in pg_splits_res { + let pg_split = match pg_split_res { + Ok(pg_split) => pg_split, + Err(error) => { + return Err(MetastoreError::Internal { + message: "failed to fetch splits".to_string(), + cause: error.to_string(), + }) + } + }; + let split: Split = match pg_split.try_into() { + Ok(split) => split, + Err(error) => { + return Err(MetastoreError::Internal { + message: "failed to convert `PgSplit` into `Split`".to_string(), + cause: error.to_string(), + }) + } + }; + splits.push(split); + } + ListSplitsResponse::try_from_splits(splits) + }); + let service_stream = ServiceStream::new(Box::pin(mapped_split_stream)); + Ok(service_stream) } #[instrument(skip(self))] @@ -1320,6 +1323,28 @@ impl MetastoreService for PostgresqlMetastore { impl MetastoreServiceExt for PostgresqlMetastore {} +#[self_referencing] +struct SplitStream { + connection_pool: Pool, + sql: String, + #[borrows(connection_pool, sql)] + #[covariant] + inner: BoxStream<'this, Result>, +} + +impl Stream for SplitStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + SplitStream::with_inner_mut(&mut self, |this| { + std::pin::Pin::new(&mut this.as_mut()).poll_next(cx) + }) + } +} + // We use dollar-quoted strings in Postgresql. // // In order to ensure that we do not risk SQL injection, diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index f97122cb962..b9b665b5b6a 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -69,13 +69,11 @@ fn to_btree_set(tags: &[&str]) -> BTreeSet { tags.iter().map(|tag| tag.to_string()).collect() } -async fn cleanup_index(metastore: &mut dyn MetastoreService, index_uid: IndexUid) { +async fn cleanup_index(metastore: &mut dyn MetastoreServiceExt, index_uid: IndexUid) { // List all splits. let all_splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); if !all_splits.is_empty() { @@ -208,7 +206,9 @@ pub async fn test_metastore_index_exists< cleanup_index(&mut metastore, index_uid).await; } -pub async fn test_metastore_index_metadata() { +pub async fn test_metastore_index_metadata< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { let mut metastore = MetastoreToTest::default_for_test().await; let index_id = append_random_suffix("test-index-metadata"); @@ -246,7 +246,9 @@ pub async fn test_metastore_index_metadata() { +pub async fn test_metastore_list_all_indexes< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { let mut metastore = MetastoreToTest::default_for_test().await; let index_id_prefix = append_random_suffix("test-list-all-indexes"); @@ -296,7 +298,7 @@ pub async fn test_metastore_list_all_indexes() { +pub async fn test_metastore_list_indexes() { let mut metastore = MetastoreToTest::default_for_test().await; let index_id_fragment = append_random_suffix("test-list-indexes"); @@ -449,7 +451,7 @@ pub async fn test_metastore_delete_index< cleanup_index(&mut metastore, index_uid).await; } -pub async fn test_metastore_add_source() { +pub async fn test_metastore_add_source() { let mut metastore = MetastoreToTest::default_for_test().await; let index_id = append_random_suffix("test-add-source"); @@ -552,7 +554,7 @@ pub async fn test_metastore_add_source() { +pub async fn test_metastore_toggle_source() { let mut metastore = MetastoreToTest::default_for_test().await; let index_id = append_random_suffix("test-toggle-source"); @@ -629,7 +631,7 @@ pub async fn test_metastore_toggle_source() { +pub async fn test_metastore_delete_source() { let mut metastore = MetastoreToTest::default_for_test().await; let index_id = append_random_suffix("test-delete-source"); @@ -740,7 +742,9 @@ pub async fn test_metastore_delete_source() { +pub async fn test_metastore_reset_checkpoint< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { let mut metastore = MetastoreToTest::default_for_test().await; let index_id = append_random_suffix("test-reset-checkpoint"); @@ -871,7 +875,7 @@ pub async fn test_metastore_reset_checkpoint() { let mut metastore = MetastoreToTest::default_for_test().await; @@ -949,7 +953,9 @@ pub async fn test_metastore_publish_splits_empty_splits_array_is_allowed< } } -pub async fn test_metastore_publish_splits() { +pub async fn test_metastore_publish_splits< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { let mut metastore = MetastoreToTest::default_for_test().await; let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); @@ -1518,7 +1524,7 @@ pub async fn test_metastore_publish_splits() { let mut metastore = MetastoreToTest::default_for_test().await; @@ -1606,7 +1612,9 @@ pub async fn test_metastore_publish_splits_concurrency< cleanup_index(&mut metastore, index_uid).await } -pub async fn test_metastore_replace_splits() { +pub async fn test_metastore_replace_splits< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { let mut metastore = MetastoreToTest::default_for_test().await; let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); @@ -1959,7 +1967,7 @@ pub async fn test_metastore_replace_splits() { let mut metastore = MetastoreToTest::default_for_test().await; @@ -2065,12 +2073,7 @@ pub async fn test_metastore_mark_splits_for_deletion< .with_split_state(SplitState::MarkedForDeletion), ) .unwrap(); - let marked_splits = metastore - .list_splits(list_splits_request) - .await - .unwrap() - .deserialize_splits() - .unwrap(); + let marked_splits = metastore.list_splits(list_splits_request).await.unwrap(); assert_eq!(marked_splits.len(), 1); assert_eq!(marked_splits[0].split_id(), split_id_3); @@ -2100,12 +2103,7 @@ pub async fn test_metastore_mark_splits_for_deletion< .with_split_state(SplitState::MarkedForDeletion), ) .unwrap(); - let mut marked_splits = metastore - .list_splits(list_splits_request) - .await - .unwrap() - .deserialize_splits() - .unwrap(); + let mut marked_splits = metastore.list_splits(list_splits_request).await.unwrap(); marked_splits.sort_by_key(|split| split.split_id().to_string()); @@ -2123,7 +2121,7 @@ pub async fn test_metastore_mark_splits_for_deletion< cleanup_index(&mut metastore, index_uid).await; } -pub async fn test_metastore_delete_splits() { +pub async fn test_metastore_delete_splits() { let mut metastore = MetastoreToTest::default_for_test().await; let index_id = append_random_suffix("test-delete-splits"); @@ -2231,8 +2229,6 @@ pub async fn test_metastore_delete_splits() { +pub async fn test_metastore_list_all_splits< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { let mut metastore = MetastoreToTest::default_for_test().await; let index_id = append_random_suffix("test-list-all-splits"); @@ -2377,8 +2373,6 @@ pub async fn test_metastore_list_all_splits() { +pub async fn test_metastore_list_splits() { let mut metastore = MetastoreToTest::default_for_test().await; let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); @@ -2509,8 +2503,6 @@ pub async fn test_metastore_list_splits = splits .iter() @@ -2556,8 +2544,6 @@ pub async fn test_metastore_list_splits = splits .iter() @@ -2966,8 +2904,6 @@ pub async fn test_metastore_list_splits() { let mut metastore = MetastoreToTest::default_for_test().await; @@ -3071,8 +3001,6 @@ pub async fn test_metastore_split_update_timestamp< let split_meta = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap()[0] .clone(); assert!(split_meta.update_timestamp > current_timestamp); @@ -3099,8 +3027,6 @@ pub async fn test_metastore_split_update_timestamp< let split_meta = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap()[0] .clone(); assert!(split_meta.update_timestamp > current_timestamp); @@ -3121,8 +3047,6 @@ pub async fn test_metastore_split_update_timestamp< let split_meta = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap()[0] .clone(); assert!(split_meta.update_timestamp > current_timestamp); @@ -3132,7 +3056,7 @@ pub async fn test_metastore_split_update_timestamp< } pub async fn test_metastore_create_delete_task< - MetastoreToTest: MetastoreService + DefaultForTest, + MetastoreToTest: MetastoreServiceExt + DefaultForTest, >() { let mut metastore = MetastoreToTest::default_for_test().await; let index_id = append_random_suffix("add-delete-task"); @@ -3199,7 +3123,7 @@ pub async fn test_metastore_create_delete_task< } pub async fn test_metastore_last_delete_opstamp< - MetastoreToTest: MetastoreService + DefaultForTest, + MetastoreToTest: MetastoreServiceExt + DefaultForTest, >() { let mut metastore = MetastoreToTest::default_for_test().await; let index_id_1 = append_random_suffix("test-last-delete-opstamp-1"); @@ -3278,7 +3202,7 @@ pub async fn test_metastore_last_delete_opstamp< } pub async fn test_metastore_delete_index_with_tasks< - MetastoreToTest: MetastoreService + DefaultForTest, + MetastoreToTest: MetastoreServiceExt + DefaultForTest, >() { let mut metastore = MetastoreToTest::default_for_test().await; let index_id = append_random_suffix("delete-delete-tasks"); @@ -3316,7 +3240,7 @@ pub async fn test_metastore_delete_index_with_tasks< } pub async fn test_metastore_list_delete_tasks< - MetastoreToTest: MetastoreService + DefaultForTest, + MetastoreToTest: MetastoreServiceExt + DefaultForTest, >() { let mut metastore = MetastoreToTest::default_for_test().await; let index_id_1 = append_random_suffix("test-list-delete-tasks-1"); @@ -3389,7 +3313,7 @@ pub async fn test_metastore_list_delete_tasks< } pub async fn test_metastore_list_stale_splits< - MetastoreToTest: MetastoreService + DefaultForTest, + MetastoreToTest: MetastoreServiceExt + DefaultForTest, >() { let mut metastore = MetastoreToTest::default_for_test().await; let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); @@ -3580,7 +3504,7 @@ pub async fn test_metastore_list_stale_splits< } pub async fn test_metastore_update_splits_delete_opstamp< - MetastoreToTest: MetastoreService + DefaultForTest, + MetastoreToTest: MetastoreServiceExt + DefaultForTest, >() { let mut metastore = MetastoreToTest::default_for_test().await; let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); @@ -3718,7 +3642,7 @@ pub async fn test_metastore_update_splits_delete_opstamp< } } -pub async fn test_metastore_stage_splits() { +pub async fn test_metastore_stage_splits() { let mut metastore = MetastoreToTest::default_for_test().await; let current_timestamp = OffsetDateTime::now_utc().unix_timestamp(); let index_id = append_random_suffix("test-stage-splits"); @@ -3779,8 +3703,6 @@ pub async fn test_metastore_stage_splits for ListShardsRequest { OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("list_shards")]) } } +pub type MetastoreServiceStream = quickwit_common::ServiceStream< + crate::metastore::MetastoreResult, +>; #[cfg_attr(any(test, feature = "testsuite"), mockall::automock)] #[async_trait::async_trait] pub trait MetastoreService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static { @@ -586,11 +596,11 @@ pub trait MetastoreService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync &mut self, request: DeleteIndexRequest, ) -> crate::metastore::MetastoreResult; - /// Gets splits from index. - async fn list_splits( + /// Streams splits from index. + async fn stream_splits( &mut self, request: ListSplitsRequest, - ) -> crate::metastore::MetastoreResult; + ) -> crate::metastore::MetastoreResult>; /// Stages several splits. async fn stage_splits( &mut self, @@ -781,11 +791,11 @@ impl MetastoreService for MetastoreServiceClient { ) -> crate::metastore::MetastoreResult { self.inner.delete_index(request).await } - async fn list_splits( + async fn stream_splits( &mut self, request: ListSplitsRequest, - ) -> crate::metastore::MetastoreResult { - self.inner.list_splits(request).await + ) -> crate::metastore::MetastoreResult> { + self.inner.stream_splits(request).await } async fn stage_splits( &mut self, @@ -929,11 +939,13 @@ pub mod metastore_service_mock { ) -> crate::metastore::MetastoreResult { self.inner.lock().await.delete_index(request).await } - async fn list_splits( + async fn stream_splits( &mut self, request: super::ListSplitsRequest, - ) -> crate::metastore::MetastoreResult { - self.inner.lock().await.list_splits(request).await + ) -> crate::metastore::MetastoreResult< + MetastoreServiceStream, + > { + self.inner.lock().await.stream_splits(request).await } async fn stage_splits( &mut self, @@ -1123,7 +1135,7 @@ impl tower::Service for Box { } } impl tower::Service for Box { - type Response = ListSplitsResponse; + type Response = MetastoreServiceStream; type Error = crate::metastore::MetastoreError; type Future = BoxFuture; fn poll_ready( @@ -1134,7 +1146,7 @@ impl tower::Service for Box { } fn call(&mut self, request: ListSplitsRequest) -> Self::Future { let mut svc = self.clone(); - let fut = async move { svc.list_splits(request).await }; + let fut = async move { svc.stream_splits(request).await }; Box::pin(fut) } } @@ -1434,9 +1446,9 @@ struct MetastoreServiceTowerBlock { EmptyResponse, crate::metastore::MetastoreError, >, - list_splits_svc: quickwit_common::tower::BoxService< + stream_splits_svc: quickwit_common::tower::BoxService< ListSplitsRequest, - ListSplitsResponse, + MetastoreServiceStream, crate::metastore::MetastoreError, >, stage_splits_svc: quickwit_common::tower::BoxService< @@ -1533,7 +1545,7 @@ impl Clone for MetastoreServiceTowerBlock { index_metadata_svc: self.index_metadata_svc.clone(), list_indexes_metadata_svc: self.list_indexes_metadata_svc.clone(), delete_index_svc: self.delete_index_svc.clone(), - list_splits_svc: self.list_splits_svc.clone(), + stream_splits_svc: self.stream_splits_svc.clone(), stage_splits_svc: self.stage_splits_svc.clone(), publish_splits_svc: self.publish_splits_svc.clone(), mark_splits_for_deletion_svc: self.mark_splits_for_deletion_svc.clone(), @@ -1582,11 +1594,11 @@ impl MetastoreService for MetastoreServiceTowerBlock { ) -> crate::metastore::MetastoreResult { self.delete_index_svc.ready().await?.call(request).await } - async fn list_splits( + async fn stream_splits( &mut self, request: ListSplitsRequest, - ) -> crate::metastore::MetastoreResult { - self.list_splits_svc.ready().await?.call(request).await + ) -> crate::metastore::MetastoreResult> { + self.stream_splits_svc.ready().await?.call(request).await } async fn stage_splits( &mut self, @@ -1736,11 +1748,11 @@ pub struct MetastoreServiceTowerBlockBuilder { >, >, #[allow(clippy::type_complexity)] - list_splits_layer: Option< + stream_splits_layer: Option< quickwit_common::tower::BoxLayer< Box, ListSplitsRequest, - ListSplitsResponse, + MetastoreServiceStream, crate::metastore::MetastoreError, >, >, @@ -1930,7 +1942,7 @@ impl MetastoreServiceTowerBlockBuilder { >::Future: Send + 'static, L::Service: tower::Service< ListSplitsRequest, - Response = ListSplitsResponse, + Response = MetastoreServiceStream, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, >::Future: Send + 'static, @@ -2060,7 +2072,7 @@ impl MetastoreServiceTowerBlockBuilder { quickwit_common::tower::BoxLayer::new(layer.clone()), ); self - .list_splits_layer = Some( + .stream_splits_layer = Some( quickwit_common::tower::BoxLayer::new(layer.clone()), ); self @@ -2187,17 +2199,17 @@ impl MetastoreServiceTowerBlockBuilder { self.delete_index_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn list_splits_layer(mut self, layer: L) -> Self + pub fn stream_splits_layer(mut self, layer: L) -> Self where L: tower::Layer> + Send + Sync + 'static, L::Service: tower::Service< ListSplitsRequest, - Response = ListSplitsResponse, + Response = MetastoreServiceStream, Error = crate::metastore::MetastoreError, > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.list_splits_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.stream_splits_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn stage_splits_layer(mut self, layer: L) -> Self @@ -2507,7 +2519,7 @@ impl MetastoreServiceTowerBlockBuilder { } else { quickwit_common::tower::BoxService::new(boxed_instance.clone()) }; - let list_splits_svc = if let Some(layer) = self.list_splits_layer { + let stream_splits_svc = if let Some(layer) = self.stream_splits_layer { layer.layer(boxed_instance.clone()) } else { quickwit_common::tower::BoxService::new(boxed_instance.clone()) @@ -2610,7 +2622,7 @@ impl MetastoreServiceTowerBlockBuilder { index_metadata_svc, list_indexes_metadata_svc, delete_index_svc, - list_splits_svc, + stream_splits_svc, stage_splits_svc, publish_splits_svc, mark_splits_for_deletion_svc, @@ -2733,9 +2745,12 @@ where > + tower::Service< ListSplitsRequest, - Response = ListSplitsResponse, + Response = MetastoreServiceStream, Error = crate::metastore::MetastoreError, - Future = BoxFuture, + Future = BoxFuture< + MetastoreServiceStream, + crate::metastore::MetastoreError, + >, > + tower::Service< StageSplitsRequest, @@ -2870,10 +2885,10 @@ where ) -> crate::metastore::MetastoreResult { self.call(request).await } - async fn list_splits( + async fn stream_splits( &mut self, request: ListSplitsRequest, - ) -> crate::metastore::MetastoreResult { + ) -> crate::metastore::MetastoreResult> { self.call(request).await } async fn stage_splits( @@ -3065,14 +3080,18 @@ where .map(|response| response.into_inner()) .map_err(|error| error.into()) } - async fn list_splits( + async fn stream_splits( &mut self, request: ListSplitsRequest, - ) -> crate::metastore::MetastoreResult { + ) -> crate::metastore::MetastoreResult> { self.inner - .list_splits(request) + .stream_splits(request) .await - .map(|response| response.into_inner()) + .map(|response| { + let streaming: tonic::Streaming<_> = response.into_inner(); + let stream = quickwit_common::ServiceStream::from(streaming); + stream.map_err(|error| error.into()) + }) .map_err(|error| error.into()) } async fn stage_splits( @@ -3320,15 +3339,18 @@ for MetastoreServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(|error| error.into()) } - async fn list_splits( + type StreamSplitsStream = quickwit_common::ServiceStream< + tonic::Result, + >; + async fn stream_splits( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> Result, tonic::Status> { self.inner .clone() - .list_splits(request.into_inner()) + .stream_splits(request.into_inner()) .await - .map(tonic::Response::new) + .map(|stream| tonic::Response::new(stream.map_err(|error| error.into()))) .map_err(|error| error.into()) } async fn stage_splits( @@ -3719,12 +3741,12 @@ pub mod metastore_service_grpc_client { ); self.inner.unary(req, path, codec).await } - /// Gets splits from index. - pub async fn list_splits( + /// Streams splits from index. + pub async fn stream_splits( &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response, + tonic::Response>, tonic::Status, > { self.inner @@ -3738,14 +3760,17 @@ pub mod metastore_service_grpc_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/quickwit.metastore.MetastoreService/ListSplits", + "/quickwit.metastore.MetastoreService/StreamSplits", ); let mut req = request.into_request(); req.extensions_mut() .insert( - GrpcMethod::new("quickwit.metastore.MetastoreService", "ListSplits"), + GrpcMethod::new( + "quickwit.metastore.MetastoreService", + "StreamSplits", + ), ); - self.inner.unary(req, path, codec).await + self.inner.server_streaming(req, path, codec).await } /// Stages several splits. pub async fn stage_splits( @@ -4272,12 +4297,18 @@ pub mod metastore_service_grpc_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; - /// Gets splits from index. - async fn list_splits( + /// Server streaming response type for the StreamSplits method. + type StreamSplitsStream: futures_core::Stream< + Item = std::result::Result, + > + + Send + + 'static; + /// Streams splits from index. + async fn stream_splits( &self, request: tonic::Request, ) -> std::result::Result< - tonic::Response, + tonic::Response, tonic::Status, >; /// Stages several splits. @@ -4653,16 +4684,17 @@ pub mod metastore_service_grpc_server { }; Box::pin(fut) } - "/quickwit.metastore.MetastoreService/ListSplits" => { + "/quickwit.metastore.MetastoreService/StreamSplits" => { #[allow(non_camel_case_types)] - struct ListSplitsSvc(pub Arc); + struct StreamSplitsSvc(pub Arc); impl< T: MetastoreServiceGrpc, - > tonic::server::UnaryService - for ListSplitsSvc { + > tonic::server::ServerStreamingService + for StreamSplitsSvc { type Response = super::ListSplitsResponse; + type ResponseStream = T::StreamSplitsStream; type Future = BoxFuture< - tonic::Response, + tonic::Response, tonic::Status, >; fn call( @@ -4670,7 +4702,9 @@ pub mod metastore_service_grpc_server { request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { (*inner).list_splits(request).await }; + let fut = async move { + (*inner).stream_splits(request).await + }; Box::pin(fut) } } @@ -4681,7 +4715,7 @@ pub mod metastore_service_grpc_server { let inner = self.inner.clone(); let fut = async move { let inner = inner.0; - let method = ListSplitsSvc(inner); + let method = StreamSplitsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( @@ -4692,7 +4726,7 @@ pub mod metastore_service_grpc_server { max_decoding_message_size, max_encoding_message_size, ); - let res = grpc.unary(method, req).await; + let res = grpc.server_streaming(method, req).await; Ok(res) }; Box::pin(fut) diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 6162b37d758..86e1fab35af 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -49,7 +49,7 @@ pub use collector::QuickwitAggregations; use metrics::SEARCH_METRICS; use quickwit_common::tower::Pool; use quickwit_doc_mapper::DocMapper; -use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService, MetastoreServiceClient}; +use quickwit_proto::metastore::{ListSplitsRequest, MetastoreServiceClient}; use tantivy::schema::NamedFieldDocument; /// Refer to this as `crate::Result`. @@ -62,7 +62,7 @@ pub use find_trace_ids_collector::FindTraceIdsCollector; use quickwit_config::SearcherConfig; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_metastore::{ - ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, SplitMetadata, SplitState, + ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceExt, SplitMetadata, SplitState, }; use quickwit_proto::search::{PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets}; use quickwit_proto::types::IndexUid; @@ -189,7 +189,6 @@ async fn list_relevant_splits( let splits_metadata: Vec = metastore .list_splits(list_splits_request) .await? - .deserialize_splits()? .into_iter() .map(|split| split.split_metadata) .collect(); diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 4d0f459e65a..b750052aebf 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -31,7 +31,7 @@ use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_doc_mapper::{DocMapper, DYNAMIC_FIELD_NAME}; use quickwit_metastore::{ IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsRequestExt, - ListSplitsResponseExt, SplitMetadata, + MetastoreServiceExt, SplitMetadata, }; use quickwit_proto::metastore::{ IndexMetadataRequest, ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, @@ -1016,7 +1016,9 @@ pub async fn root_list_terms( .clone() .list_splits(list_splits_request) .await? - .deserialize_splits_metadata()?; + .into_iter() + .map(|split| split.split_metadata) + .collect(); let index_uri = &index_config.index_uri; @@ -1212,9 +1214,10 @@ mod tests { use std::sync::{Arc, RwLock}; use quickwit_common::shared_consts::SCROLL_BATCH_LEN; + use quickwit_common::ServiceStream; use quickwit_config::{DocMapping, IndexingSettings, SearchSettings}; use quickwit_indexing::MockSplitBuilder; - use quickwit_metastore::IndexMetadata; + use quickwit_metastore::{IndexMetadata, ListSplitsResponseExt}; use quickwit_proto::metastore::{ListIndexesMetadataResponse, ListSplitsResponse}; use quickwit_proto::search::{ScrollRequest, SortOrder, SortValue, SplitSearchError}; use quickwit_query::query_ast::{qast_helper, qast_json_helper, query_ast_from_user_text}; @@ -1464,7 +1467,7 @@ mod tests { .unwrap()) }); mock_metastore - .expect_list_splits() + .expect_stream_splits() .returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") @@ -1474,7 +1477,8 @@ mod tests { .with_index_uid(&index_uid) .build(), ]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let mut mock_search_service_2 = MockSearchService::new(); mock_search_service_2.expect_leaf_search().returning( @@ -1561,12 +1565,13 @@ mod tests { .unwrap()) }); metastore - .expect_list_splits() + .expect_stream_splits() .returning(move |_list_splits_request| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let mut mock_search_service = MockSearchService::new(); mock_search_service.expect_leaf_search().returning( @@ -1628,7 +1633,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_list_splits().returning(move |_filter| { + metastore.expect_stream_splits().returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -1637,7 +1642,8 @@ mod tests { .with_index_uid(&index_uid) .build(), ]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let mut mock_search_service_1 = MockSearchService::new(); mock_search_service_1.expect_leaf_search().returning( @@ -1723,7 +1729,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_list_splits().returning(move |_filter| { + metastore.expect_stream_splits().returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -1732,7 +1738,8 @@ mod tests { .with_index_uid(&index_uid) .build(), ]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let mut mock_search_service_1 = MockSearchService::new(); mock_search_service_1.expect_leaf_search().returning( @@ -1902,7 +1909,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_list_splits().returning(move |_filter| { + metastore.expect_stream_splits().returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -1911,7 +1918,8 @@ mod tests { .with_index_uid(&index_uid) .build(), ]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let mut mock_search_service_1 = MockSearchService::new(); mock_search_service_1.expect_leaf_search().returning( @@ -2076,7 +2084,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_list_splits().returning(move |_filter| { + metastore.expect_stream_splits().returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -2085,7 +2093,8 @@ mod tests { .with_index_uid(&index_uid) .build(), ]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let mut mock_search_service_1 = MockSearchService::new(); @@ -2196,7 +2205,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_list_splits().returning(move |_filter| { + metastore.expect_stream_splits().returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -2205,7 +2214,8 @@ mod tests { .with_index_uid(&index_uid) .build(), ]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let mut mock_search_service_1 = MockSearchService::new(); mock_search_service_1 @@ -2327,12 +2337,13 @@ mod tests { .unwrap()) }); metastore - .expect_list_splits() + .expect_stream_splits() .returning(move |_list_splits_request| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let mut first_call = true; let mut mock_search_service = MockSearchService::new(); @@ -2405,11 +2416,12 @@ mod tests { ]) .unwrap()) }); - metastore.expect_list_splits().returning(move |_filter| { + metastore.expect_stream_splits().returning(move |_filter| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let mut mock_search_service = MockSearchService::new(); @@ -2467,11 +2479,12 @@ mod tests { ]) .unwrap()) }); - metastore.expect_list_splits().returning(move |_filter| { + metastore.expect_stream_splits().returning(move |_filter| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); // Service1 - broken node. let mut mock_search_service_1 = MockSearchService::new(); @@ -2555,11 +2568,12 @@ mod tests { ]) .unwrap()) }); - metastore.expect_list_splits().returning(move |_filter| { + metastore.expect_stream_splits().returning(move |_filter| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); // Service1 - working node. @@ -2627,12 +2641,13 @@ mod tests { .unwrap()) }); mock_metastore - .expect_list_splits() + .expect_stream_splits() .returning(move |_filter| { let splits = vec![MockSplitBuilder::new("split") .with_index_uid(&index_uid) .build()]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", MockSearchService::new())]); @@ -2711,11 +2726,12 @@ mod tests { ]) .unwrap()) }); - metastore.expect_list_splits().returning(move |_filter| { + metastore.expect_stream_splits().returning(move |_filter| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", MockSearchService::new())]); let search_job_placer = SearchJobPlacer::new(searcher_pool); @@ -2758,12 +2774,13 @@ mod tests { .unwrap()) }); mock_metastore - .expect_list_splits() + .expect_stream_splits() .returning(move |_filter| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", MockSearchService::new())]); let search_job_placer = SearchJobPlacer::new(searcher_pool); @@ -2958,7 +2975,7 @@ mod tests { .unwrap(), ) }); - metastore.expect_list_splits().returning(move |_filter| { + metastore.expect_stream_splits().returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -2967,7 +2984,8 @@ mod tests { .with_index_uid(&index_uid_2) .build(), ]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let mut mock_search_service = MockSearchService::new(); mock_search_service.expect_leaf_search().times(2).returning( @@ -3149,7 +3167,7 @@ mod tests { }, ); metastore - .expect_list_splits() + .expect_stream_splits() .return_once(move |list_splits_request| { let list_splits_query = list_splits_request.deserialize_list_splits_query().unwrap(); @@ -3172,7 +3190,8 @@ mod tests { .with_index_uid(&index_uid_2) .build(), ]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) }); let mut mock_search_service_1 = MockSearchService::new(); mock_search_service_1 diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index 40bce90f37d..f6377d079fc 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -451,8 +451,8 @@ mod tests { use itertools::Itertools; use quickwit_indexing::TestSandbox; - use quickwit_metastore::{ListSplitsRequestExt, ListSplitsResponseExt}; - use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService}; + use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceExt}; + use quickwit_proto::metastore::ListSplitsRequest; use quickwit_query::query_ast::qast_json_helper; use serde_json::json; use tantivy::time::{Duration, OffsetDateTime}; @@ -501,8 +501,7 @@ mod tests { let splits = test_sandbox .metastore() .list_splits(ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap()) - .await? - .deserialize_splits()?; + .await?; let splits_offsets = splits .into_iter() .map(|split| extract_split_and_footer_offsets(&split.split_metadata)) @@ -578,8 +577,7 @@ mod tests { let splits = test_sandbox .metastore() .list_splits(ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap()) - .await? - .deserialize_splits()?; + .await?; let splits_offsets = splits .into_iter() .map(|split| extract_split_and_footer_offsets(&split.split_metadata)) @@ -634,8 +632,7 @@ mod tests { let splits = test_sandbox .metastore() .list_splits(ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap()) - .await? - .deserialize_splits()?; + .await?; let splits_offsets = splits .into_iter() .map(|split| extract_split_and_footer_offsets(&split.split_metadata)) @@ -723,8 +720,7 @@ mod tests { let splits = test_sandbox .metastore() .list_splits(ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap()) - .await? - .deserialize_splits()?; + .await?; let splits_offsets = splits .into_iter() .map(|split| extract_split_and_footer_offsets(&split.split_metadata)) diff --git a/quickwit/quickwit-search/src/search_stream/root.rs b/quickwit/quickwit-search/src/search_stream/root.rs index 41daf548408..767481c646f 100644 --- a/quickwit/quickwit-search/src/search_stream/root.rs +++ b/quickwit/quickwit-search/src/search_stream/root.rs @@ -132,6 +132,7 @@ fn jobs_to_leaf_request( #[cfg(test)] mod tests { + use quickwit_common::ServiceStream; use quickwit_indexing::MockSplitBuilder; use quickwit_metastore::{IndexMetadata, ListSplitsResponseExt}; use quickwit_proto::metastore::{IndexMetadataResponse, ListSplitsResponse}; @@ -157,11 +158,12 @@ mod tests { mock_metastore.expect_index_metadata().returning(move |_| { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata.clone()).unwrap()) }); - mock_metastore.expect_list_splits().returning(move |_| { + mock_metastore.expect_stream_splits().returning(move |_| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits)])) }); let mut mock_search_service = MockSearchService::new(); let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -214,11 +216,12 @@ mod tests { mock_metastore.expect_index_metadata().returning(move |_| { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata.clone()).unwrap()) }); - mock_metastore.expect_list_splits().returning(move |_| { + mock_metastore.expect_stream_splits().returning(move |_| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits)])) }); let mut mock_search_service = MockSearchService::new(); let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -269,7 +272,7 @@ mod tests { mock_metastore.expect_index_metadata().returning(move |_| { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata.clone()).unwrap()) }); - mock_metastore.expect_list_splits().returning(move |_| { + mock_metastore.expect_stream_splits().returning(move |_| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -278,7 +281,8 @@ mod tests { .with_index_uid(&index_uid) .build(), ]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits)])) }); let mut mock_search_service = MockSearchService::new(); let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -329,11 +333,12 @@ mod tests { mock_metastore.expect_index_metadata().returning(move |_| { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata.clone()).unwrap()) }); - mock_metastore.expect_list_splits().returning(move |_| { + mock_metastore.expect_stream_splits().returning(move |_| { let splits = vec![MockSplitBuilder::new("split") .with_index_uid(&index_uid) .build()]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits)])) }); let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", MockSearchService::new())]); diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index 6f84651a62a..21c9a2eef9d 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -1028,8 +1028,6 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec { .metastore() .list_splits(ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap()) .await - .unwrap() - .deserialize_splits() .unwrap(); let splits_offsets: Vec<_> = splits .into_iter() @@ -1666,8 +1664,7 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> { let splits = test_sandbox .metastore() .list_splits(ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap()) - .await? - .deserialize_splits()?; + .await?; let splits_offsets: Vec<_> = splits .into_iter() .map(|split| extract_split_and_footer_offsets(&split.split_metadata)) diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index fb2a65230a0..85902529c77 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -30,7 +30,7 @@ use quickwit_doc_mapper::{analyze_text, TokenizerConfig}; use quickwit_index_management::{IndexService, IndexServiceError}; use quickwit_metastore::{ IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsQuery, - ListSplitsRequestExt, ListSplitsResponseExt, Split, SplitInfo, SplitState, + ListSplitsRequestExt, MetastoreServiceExt, Split, SplitInfo, SplitState, }; use quickwit_proto::metastore::{ DeleteSourceRequest, EntityKind, IndexMetadataRequest, ListIndexesMetadataRequest, @@ -198,10 +198,7 @@ async fn describe_index( .deserialize_index_metadata()?; let query = ListSplitsQuery::for_index(index_metadata.index_uid.clone()); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?; - let splits = metastore - .list_splits(list_splits_request) - .await? - .deserialize_splits()?; + let splits = metastore.list_splits(list_splits_request).await?; let published_splits: Vec = splits .into_iter() .filter(|split| split.split_state == SplitState::Published) @@ -347,10 +344,7 @@ async fn list_splits( query = query.with_create_timestamp_lt(end_created_timestamp); } let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?; - let splits = metastore - .list_splits(list_splits_request) - .await? - .deserialize_splits()?; + let splits = metastore.list_splits(list_splits_request).await?; Ok(ListSplitsResponse { offset, size: splits.len(), @@ -870,9 +864,10 @@ mod tests { use assert_json_diff::assert_json_include; use quickwit_common::uri::Uri; + use quickwit_common::ServiceStream; use quickwit_config::{SourceParams, VecSourceParams}; use quickwit_indexing::{mock_split, MockSplitBuilder}; - use quickwit_metastore::{metastore_for_test, IndexMetadata}; + use quickwit_metastore::{metastore_for_test, IndexMetadata, ListSplitsResponseExt}; use quickwit_proto::metastore::{ EmptyResponse, IndexMetadataResponse, ListIndexesMetadataResponse, ListSplitsResponse, MetastoreServiceClient, SourceType, @@ -946,7 +941,7 @@ mod tests { }) .times(2); metastore - .expect_list_splits() + .expect_stream_splits() .returning(move |list_splits_request: ListSplitsRequest| { let list_split_query = list_splits_request.deserialize_list_splits_query().unwrap(); if list_split_query.index_uids.contains(&index_uid) @@ -959,7 +954,8 @@ mod tests { let splits = vec![MockSplitBuilder::new("split_1") .with_index_uid(&index_uid) .build()]; - return Ok(ListSplitsResponse::try_from_splits(splits).unwrap()); + let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); + return Ok(ServiceStream::from(vec![Ok(splits)])); } Err(MetastoreError::Internal { message: "".to_string(), @@ -1032,14 +1028,15 @@ mod tests { split_1_time_range.end() + 10, )); mock_metastore - .expect_list_splits() + .expect_stream_splits() .withf(move |list_split_request| -> bool { let list_split_query = list_split_request.deserialize_list_splits_query().unwrap(); list_split_query.index_uids.contains(&index_uid) }) .return_once(move |_| { let splits = vec![split_1, split_2]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits)])) }); let index_service = IndexService::new( @@ -1083,7 +1080,7 @@ mod tests { .return_once(move |_| { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata).unwrap()) }); - mock_metastore.expect_list_splits().return_once( + mock_metastore.expect_stream_splits().return_once( move |list_split_request: ListSplitsRequest| { let list_split_query = list_split_request.deserialize_list_splits_query().unwrap(); if list_split_query.index_uids.contains(&index_uid) @@ -1091,7 +1088,7 @@ mod tests { && list_split_query.time_range.is_unbounded() && list_split_query.create_timestamp.is_unbounded() { - return Ok(ListSplitsResponse::empty()); + return Ok(ServiceStream::empty()); } Err(MetastoreError::Internal { message: "".to_string(), @@ -1224,8 +1221,9 @@ mod tests { .unwrap(), ) }); - mock_metastore.expect_list_splits().return_once(|_| { - Ok(ListSplitsResponse::try_from_splits(vec![mock_split("split_1")]).unwrap()) + mock_metastore.expect_stream_splits().return_once(|_| { + let splits = ListSplitsResponse::try_from_splits(vec![mock_split("split_1")]).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits)])) }); mock_metastore .expect_mark_splits_for_deletion() @@ -1268,9 +1266,11 @@ mod tests { }) .times(2); mock_metastore - .expect_list_splits() + .expect_stream_splits() .returning(|_| { - Ok(ListSplitsResponse::try_from_splits(vec![mock_split("split_1")]).unwrap()) + let splits = + ListSplitsResponse::try_from_splits(vec![mock_split("split_1")]).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits)])) }) .times(3); mock_metastore diff --git a/quickwit/quickwit-serve/src/search_api/mod.rs b/quickwit/quickwit-serve/src/search_api/mod.rs index b77cc5d919c..dba69e5e183 100644 --- a/quickwit/quickwit-serve/src/search_api/mod.rs +++ b/quickwit/quickwit-serve/src/search_api/mod.rs @@ -33,6 +33,7 @@ mod tests { use std::sync::Arc; use futures::TryStreamExt; + use quickwit_common::ServiceStream; use quickwit_indexing::MockSplitBuilder; use quickwit_metastore::{IndexMetadata, IndexMetadataResponseExt, ListSplitsResponseExt}; use quickwit_proto::metastore::{ @@ -85,7 +86,7 @@ mod tests { metastore.expect_index_metadata().returning(move |_| { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata.clone()).unwrap()) }); - metastore.expect_list_splits().returning(move |_| { + metastore.expect_stream_splits().returning(move |_| { let splits = vec![ MockSplitBuilder::new("split_1") .with_index_uid(&index_uid) @@ -94,7 +95,8 @@ mod tests { .with_index_uid(&index_uid) .build(), ]; - Ok(ListSplitsResponse::try_from_splits(splits).unwrap()) + let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits)])) }); let mut mock_search_service = MockSearchService::new(); let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel();