From 13d34186c8a36d44ebfde4aa146f0affb751e3e1 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Wed, 6 Dec 2023 15:25:37 +0800 Subject: [PATCH] add list_fields api --- .../protos/quickwit/search.proto | 55 ++ .../src/codegen/quickwit/quickwit.search.rs | 229 +++++++ quickwit/quickwit-search/Cargo.toml | 1 + quickwit/quickwit-search/src/client.rs | 18 + .../quickwit-search/src/cluster_client.rs | 15 +- quickwit/quickwit-search/src/leaf.rs | 31 +- quickwit/quickwit-search/src/lib.rs | 3 +- quickwit/quickwit-search/src/list_fields.rs | 623 ++++++++++++++++++ quickwit/quickwit-search/src/root.rs | 6 +- quickwit/quickwit-search/src/service.rs | 46 +- .../model/field_capability.rs | 35 + .../src/elastic_search_api/model/mod.rs | 1 + .../src/search_api/grpc_adapter.rs | 24 +- .../quickwit-storage/src/bundle_storage.rs | 1 + 14 files changed, 1068 insertions(+), 20 deletions(-) create mode 100644 quickwit/quickwit-search/src/list_fields.rs create mode 100644 quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index c95eb716eaf..c8671dc2455 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -72,6 +72,10 @@ service SearchService { rpc GetKV(GetKVRequest) returns (GetKVResponse); rpc ReportSplits(ReportSplitsRequest) returns (ReportSplitsResponse); + + rpc ListFields(ListFieldsRequest) returns (ListFieldsResponse); + + rpc LeafListFields(LeafListFieldsRequest) returns (ListFieldsResponse); } /// Scroll Request @@ -111,6 +115,57 @@ message ReportSplitsRequest { message ReportSplitsResponse {} +// -- ListFields ------------------- + +message ListFieldsRequest{ + // Optional limit query to a set of indexes. + repeated string index_id = 1; + // Optional limit query to a set of splits. + repeated string split_id = 2; + // Optional limit query to a list of fields + // Wildcard expressions are supported. + repeated string fields = 3; + + // Control if the the request will fail if split_ids contains a split that does not exist. + // optional bool fail_on_missing_index = 3; +} + +message LeafListFieldsRequest{ + // The index id + string index_id = 1; + // The index uri + string index_uri = 2; + // Index split ids to apply the query on. + // This ids are resolved from the index_uri defined in the search_request. + repeated SplitIdAndFooterOffsets split_offsets = 3; + + // Optional limit query to a list of fields + // Wildcard expressions are supported. + repeated string fields = 4; +} + +message ListFieldsResponse{ + repeated ListFieldsEntryResponse fields = 1; +} + +message ListFieldsEntryResponse{ + // The field name + string field_name = 1; + // The tantivy field type + int32 field_type = 2; + // The index ids the field exists + repeated string index_ids = 3; + // True means the field is searchable (indexed) in at least some indices. + // False means the field is not searchable in any indices. + bool searchable = 4; + // True means the field is aggregatable (fast) in at least some indices. + // False means the field is not aggregatable in any indices. + bool aggregatable = 5; + // The index ids the field exists, but is not searchable. + repeated string non_searchable_index_ids = 6; + // The index ids the field exists, but is not aggregatable + repeated string non_aggregatable_index_ids = 7; +} // -- Search ------------------- diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index dec8d61ce6d..b2088d91d77 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -61,6 +61,79 @@ pub struct ReportSplitsRequest { #[derive(Clone, PartialEq, ::prost::Message)] pub struct ReportSplitsResponse {} #[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListFieldsRequest { + /// Optional limit query to a set of indexes. + #[prost(string, repeated, tag = "1")] + pub index_id: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// Optional limit query to a set of splits. + #[prost(string, repeated, tag = "2")] + pub split_id: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// Optional limit query to a list of fields + /// Wildcard expressions are supported. + #[prost(string, repeated, tag = "3")] + pub fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LeafListFieldsRequest { + /// The index id + #[prost(string, tag = "1")] + pub index_id: ::prost::alloc::string::String, + /// The index uri + #[prost(string, tag = "2")] + pub index_uri: ::prost::alloc::string::String, + /// Index split ids to apply the query on. + /// This ids are resolved from the index_uri defined in the search_request. + #[prost(message, repeated, tag = "3")] + pub split_offsets: ::prost::alloc::vec::Vec, + /// Optional limit query to a list of fields + /// Wildcard expressions are supported. + #[prost(string, repeated, tag = "4")] + pub fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListFieldsResponse { + #[prost(message, repeated, tag = "1")] + pub fields: ::prost::alloc::vec::Vec, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListFieldsEntryResponse { + /// The field name + #[prost(string, tag = "1")] + pub field_name: ::prost::alloc::string::String, + /// The tantivy field type + #[prost(int32, tag = "2")] + pub field_type: i32, + /// The index ids the field exists + #[prost(string, repeated, tag = "3")] + pub index_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// True means the field is searchable (indexed) in at least some indices. + /// False means the field is not searchable in any indices. + #[prost(bool, tag = "4")] + pub searchable: bool, + /// True means the field is aggregatable (fast) in at least some indices. + /// False means the field is not aggregatable in any indices. + #[prost(bool, tag = "5")] + pub aggregatable: bool, + /// The index ids the field exists, but is not searchable. + #[prost(string, repeated, tag = "6")] + pub non_searchable_index_ids: ::prost::alloc::vec::Vec< + ::prost::alloc::string::String, + >, + /// The index ids the field exists, but is not aggregatable + #[prost(string, repeated, tag = "7")] + pub non_aggregatable_index_ids: ::prost::alloc::vec::Vec< + ::prost::alloc::string::String, + >, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] #[derive(Eq, Hash)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -958,6 +1031,58 @@ pub mod search_service_client { ); self.inner.unary(req, path, codec).await } + pub async fn list_fields( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.search.SearchService/ListFields", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("quickwit.search.SearchService", "ListFields")); + self.inner.unary(req, path, codec).await + } + pub async fn leaf_list_fields( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.search.SearchService/LeafListFields", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("quickwit.search.SearchService", "LeafListFields"), + ); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -1065,6 +1190,20 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; + async fn list_fields( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn leaf_list_fields( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct SearchServiceServer { @@ -1593,6 +1732,96 @@ pub mod search_service_server { }; Box::pin(fut) } + "/quickwit.search.SearchService/ListFields" => { + #[allow(non_camel_case_types)] + struct ListFieldsSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::UnaryService + for ListFieldsSvc { + type Response = super::ListFieldsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { (*inner).list_fields(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = ListFieldsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/quickwit.search.SearchService/LeafListFields" => { + #[allow(non_camel_case_types)] + struct LeafListFieldsSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::UnaryService + for LeafListFieldsSvc { + type Response = super::ListFieldsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).leaf_list_fields(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = LeafListFieldsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { Ok( diff --git a/quickwit/quickwit-search/Cargo.toml b/quickwit/quickwit-search/Cargo.toml index f75c49d73fd..f7c40ad256f 100644 --- a/quickwit/quickwit-search/Cargo.toml +++ b/quickwit/quickwit-search/Cargo.toml @@ -50,6 +50,7 @@ quickwit-opentelemetry = { workspace = true } quickwit-proto = { workspace = true } quickwit-query = { workspace = true } quickwit-storage = { workspace = true } +quickwit-indexing = { workspace = true } [dev-dependencies] assert-json-diff = { workspace = true } diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index 0ed5c214e83..0cd1c1eb1b1 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -140,6 +140,24 @@ impl SearchServiceClient { } } + /// Perform leaf search. + pub async fn leaf_list_fields( + &mut self, + request: quickwit_proto::search::LeafListFieldsRequest, + ) -> crate::Result { + match &mut self.client_impl { + SearchServiceClientImpl::Grpc(grpc_client) => { + let tonic_request = Request::new(request); + let tonic_response = grpc_client + .leaf_list_fields(tonic_request) + .await + .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; + Ok(tonic_response.into_inner()) + } + SearchServiceClientImpl::Local(service) => service.leaf_list_fields(request).await, + } + } + /// Perform leaf stream. pub async fn leaf_search_stream( &mut self, diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index b6ee4d14103..b1c0d63d07f 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -23,9 +23,9 @@ use base64::Engine; use futures::future::ready; use futures::{Future, StreamExt}; use quickwit_proto::search::{ - FetchDocsRequest, FetchDocsResponse, GetKvRequest, LeafListTermsRequest, LeafListTermsResponse, - LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, LeafSearchStreamResponse, - PutKvRequest, + FetchDocsRequest, FetchDocsResponse, GetKvRequest, LeafListFieldsRequest, LeafListTermsRequest, + LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, + LeafSearchStreamResponse, ListFieldsResponse, PutKvRequest, }; use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults; use tokio::sync::mpsc::error::SendError; @@ -112,6 +112,15 @@ impl ClusterClient { response_res } + /// Leaf search with retry on another node client. + pub async fn leaf_list_fields( + &self, + request: LeafListFieldsRequest, + mut client: SearchServiceClient, + ) -> crate::Result { + client.leaf_list_fields(request.clone()).await + } + /// Leaf search stream with retry on another node client. pub async fn leaf_search_stream( &self, diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index e4093dbff8e..247e0dd3f36 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -83,18 +83,14 @@ async fn get_split_footer_from_cache_or_fetch( Ok(footer_data_opt) } -/// Opens a `tantivy::Index` for the given split with several cache layers: +/// Returns hotcache_bytes and the split directory (`BundleStorage`) with cache layer: /// - A split footer cache given by `SearcherContext.split_footer_cache`. -/// - A fast fields cache given by `SearcherContext.storage_long_term_cache`. -/// - An ephemeral unbounded cache directory whose lifetime is tied to the returned `Index`. #[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] -pub(crate) async fn open_index_with_caches( +pub(crate) async fn open_split_bundle( searcher_context: &SearcherContext, index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, - tokenizer_manager: Option<&TokenizerManager>, - ephemeral_unbounded_cache: bool, -) -> anyhow::Result { +) -> anyhow::Result<(FileSlice, BundleStorage)> { let split_file = PathBuf::from(format!("{}.split", split_and_footer_offsets.split_id)); let footer_data = get_split_footer_from_cache_or_fetch( index_storage.clone(), @@ -117,17 +113,38 @@ pub(crate) async fn open_index_with_caches( split_file, FileSlice::new(Arc::new(footer_data)), )?; + + Ok((hotcache_bytes, bundle_storage)) +} + +/// Opens a `tantivy::Index` for the given split with several cache layers: +/// - A split footer cache given by `SearcherContext.split_footer_cache`. +/// - A fast fields cache given by `SearcherContext.storage_long_term_cache`. +/// - An ephemeral unbounded cache directory whose lifetime is tied to the returned `Index`. +#[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] +pub(crate) async fn open_index_with_caches( + searcher_context: &SearcherContext, + index_storage: Arc, + split_and_footer_offsets: &SplitIdAndFooterOffsets, + tokenizer_manager: Option<&TokenizerManager>, + ephemeral_unbounded_cache: bool, +) -> anyhow::Result { + let (hotcache_bytes, bundle_storage) = + open_split_bundle(searcher_context, index_storage, split_and_footer_offsets).await?; + let bundle_storage_with_cache = wrap_storage_with_cache( searcher_context.fast_fields_cache.clone(), Arc::new(bundle_storage), ); let directory = StorageDirectory::new(bundle_storage_with_cache); + let hot_directory = if ephemeral_unbounded_cache { let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory)); HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)? } else { HotDirectory::open(directory, hotcache_bytes.read_bytes()?)? }; + let mut index = Index::open(hot_directory)?; if let Some(tokenizer_manager) = tokenizer_manager { index.set_tokenizers(tokenizer_manager.tantivy_manager().clone()); diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 6162b37d758..8f41cbefd00 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -31,6 +31,7 @@ mod filters; mod find_trace_ids_collector; mod leaf; mod leaf_cache; +mod list_fields; mod retry; mod root; mod scroll_context; @@ -165,7 +166,7 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn } } -/// Extract the list of relevant splits for a given search request. +/// Extract the list of relevant splits for a given query. async fn list_relevant_splits( index_uids: Vec, start_timestamp: Option, diff --git a/quickwit/quickwit-search/src/list_fields.rs b/quickwit/quickwit-search/src/list_fields.rs new file mode 100644 index 00000000000..92b9570e4cb --- /dev/null +++ b/quickwit/quickwit-search/src/list_fields.rs @@ -0,0 +1,623 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::{HashMap, HashSet}; +use std::io; +use std::path::Path; +use std::sync::Arc; + +use futures::future::try_join_all; +use itertools::Itertools; +use quickwit_common::shared_consts::SPLIT_FIELDS_FILE_NAME; +use quickwit_common::uri::Uri; +use quickwit_indexing::models::read_split_fields; +use quickwit_metastore::{ListIndexesMetadataResponseExt, SplitMetadata}; +use quickwit_proto::metastore::{ + ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, +}; +use quickwit_proto::search::{ + LeafListFieldsRequest, ListFieldsEntryResponse, ListFieldsRequest, ListFieldsResponse, + SplitIdAndFooterOffsets, +}; +use quickwit_proto::types::IndexUid; +use quickwit_storage::Storage; +use tantivy::FieldMetadata; + +use crate::leaf::open_split_bundle; +use crate::service::SearcherContext; +use crate::{list_relevant_splits, ClusterClient, SearchError, SearchJob}; + +/// Get the list of splits for the request which we need to scan. +pub async fn get_fields_from_split<'a>( + searcher_context: &SearcherContext, + index_id: String, + split_and_footer_offsets: &'a SplitIdAndFooterOffsets, + index_storage: Arc, +) -> anyhow::Result>> { + // TODO: Add fancy caching + let (_, split_bundle) = + open_split_bundle(searcher_context, index_storage, split_and_footer_offsets).await?; + + let serialized_split_fields = split_bundle + .get_all(Path::new(SPLIT_FIELDS_FILE_NAME)) + .await?; + let serialized_split_fields_len = serialized_split_fields.len(); + let iter = read_split_fields(serialized_split_fields).map_err(|err| { + anyhow::anyhow!( + "could not read split fields (serialized len: {}): {:?}", + serialized_split_fields_len, + err + ) + })?; + + Ok(iter.map(move |metadata| { + metadata.map(|metadata| ListFieldsEntryResponse { + field_name: metadata.field_name, + field_type: metadata.typ.to_code() as i32, + index_ids: vec![index_id.to_string()], + searchable: metadata.indexed, + aggregatable: metadata.fast, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + }) + })) +} + +/// Since we want to kmerge the results, we simplify by always using `FieldMetadata`, to enforce +/// the same ordering +fn field_metadata_from_list_field_response(resp: &ListFieldsEntryResponse) -> FieldMetadata { + FieldMetadata { + field_name: resp.field_name.to_string(), + typ: tantivy::schema::Type::from_code(resp.field_type as u8).expect("invalid field type"), + indexed: resp.aggregatable, + fast: resp.searchable, + stored: true, + } +} + +/// `current_group` needs to contain at least one element. +/// The group needs to be of the same field name and type. +fn merge_same_field_group( + current_group: &mut Vec, +) -> ListFieldsEntryResponse { + // Make sure all fields have the same name and type in current_group + assert!(!current_group.is_empty()); + assert!(current_group + .windows(2) + .all(|window| window[0].field_name == window[1].field_name + && window[0].field_type == window[1].field_type)); + + if current_group.len() == 1 { + return current_group.pop().unwrap(); + } + let metadata = ¤t_group.last().unwrap(); + let searchable = current_group.iter().any(|entry| entry.searchable); + let aggregatable = current_group.iter().any(|entry| entry.aggregatable); + let field_name = metadata.field_name.to_string(); + let field_type = metadata.field_type; + let mut non_searchable_index_ids = if searchable { + // We need to combine the non_searchable_index_ids + index_ids where searchable is set to + // false (as they are all non_searchable) + current_group + .iter() + .flat_map(|entry| { + if !entry.searchable { + entry.index_ids.iter().map(Clone::clone) + } else { + entry.non_searchable_index_ids.iter().map(Clone::clone) + } + }) + .collect() + } else { + // Not searchable => no need to list all the indices + Vec::new() + }; + non_searchable_index_ids.sort(); + non_searchable_index_ids.dedup(); + + let mut non_aggregatable_index_ids = if aggregatable { + // We need to combine the non_aggregatable_index_ids + index_ids where aggregatable is set + // to false (as they are all non_aggregatable) + current_group + .iter() + .flat_map(|entry| { + if !entry.aggregatable { + entry.index_ids.iter().map(Clone::clone) + } else { + entry.non_aggregatable_index_ids.iter().map(Clone::clone) + } + }) + .collect() + } else { + // Not aggregatable => no need to list all the indices + Vec::new() + }; + non_aggregatable_index_ids.sort(); + non_aggregatable_index_ids.dedup(); + let mut index_ids: Vec = current_group + .drain(..) + .flat_map(|entry| entry.index_ids.into_iter()) + .collect(); + index_ids.sort(); + index_ids.dedup(); + ListFieldsEntryResponse { + field_name, + field_type, + searchable, + aggregatable, + non_searchable_index_ids, + non_aggregatable_index_ids, + index_ids, + } +} + +/// Merge iterators of sorted (FieldMetadata, index_id) into a Vec. +fn merge_leaf_list_fields( + iterators: Vec>>, +) -> crate::Result> { + let merged = iterators.into_iter().kmerge_by(|a, b| { + match (a, b) { + (Ok(ref a_field), Ok(ref b_field)) => { + field_metadata_from_list_field_response(a_field) + <= field_metadata_from_list_field_response(b_field) + } + _ => true, // Prioritize error results to halt early on errors + } + }); + let mut responses = Vec::new(); + + let mut current_group: Vec = Vec::new(); + // Build ListFieldsEntryResponse from current group + let flush_group = |responses: &mut Vec<_>, current_group: &mut Vec| { + let entry = merge_same_field_group(current_group); + responses.push(entry); + current_group.clear(); + }; + + for entry in merged { + let entry = + entry.map_err(|err| crate::error::SearchError::Internal(format!("{:?}", err)))?; // TODO: No early return on error + + if let Some(last) = current_group.last() { + if last.field_name != entry.field_name || last.field_type != entry.field_type { + flush_group(&mut responses, &mut current_group); + } + } + current_group.push(entry); + } + if !current_group.is_empty() { + flush_group(&mut responses, &mut current_group); + } + + Ok(responses) +} + +/// +pub async fn leaf_list_fields( + index_id: String, + index_storage: Arc, + searcher_context: &SearcherContext, + split_ids: &[SplitIdAndFooterOffsets], +) -> crate::Result { + let mut iter_per_split = Vec::new(); + for split_id in split_ids.iter() { + let fields = get_fields_from_split( + searcher_context, + index_id.to_string(), + split_id, + index_storage.clone(), + ) + .await?; + iter_per_split.push(fields); + } + let fields = merge_leaf_list_fields(iter_per_split)?; + Ok(ListFieldsResponse { fields }) +} + +/// Performs a distributed list fields request. +/// 1. Sends leaf request over gRPC to multiple leaf nodes. +/// 2. Merges the search results. +/// 3. Builds the response and returns. +pub async fn root_list_fields( + list_fields_req: ListFieldsRequest, + cluster_client: &ClusterClient, + mut metastore: MetastoreServiceClient, +) -> crate::Result { + let list_indexes_metadata_request = if list_fields_req.index_id.is_empty() { + ListIndexesMetadataRequest::all() + } else { + ListIndexesMetadataRequest { + // TODO: Check index id pattern + index_id_patterns: list_fields_req.index_id.clone(), + } + }; + + // Get the index ids from the request + let indexes_metadatas = metastore + .clone() + .list_indexes_metadata(list_indexes_metadata_request) + .await? + .deserialize_indexes_metadata()?; + let index_uid_to_index_id: HashMap = indexes_metadatas + .iter() + .map(|index_metadata| { + ( + index_metadata.index_uid.clone(), + ( + index_metadata.index_config.index_uri.clone(), + index_metadata.index_config.index_id.to_string(), + ), + ) + }) + .collect(); + let index_uids: Vec = indexes_metadatas + .into_iter() + .map(|index_metadata| index_metadata.index_uid) + .collect(); + + // TODO if search after is set, we sort by timestamp and we don't want to count all results, + // we can refine more here. Same if we sort by _shard_doc + let split_metadatas: Vec = + list_relevant_splits(index_uids, None, None, None, &mut metastore).await?; + + // Build requests for each index id + let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); + let assigned_leaf_search_jobs = cluster_client + .search_job_placer + .assign_jobs(jobs, &HashSet::default()) + .await?; + let mut leaf_request_tasks = Vec::new(); + for (client, client_jobs) in assigned_leaf_search_jobs { + let leaf_requests = + jobs_to_leaf_requests(&list_fields_req, &index_uid_to_index_id, client_jobs)?; + for leaf_request in leaf_requests { + leaf_request_tasks.push(cluster_client.leaf_list_fields(leaf_request, client.clone())); + } + } + let leaf_search_responses: Vec = try_join_all(leaf_request_tasks).await?; + let fields = merge_leaf_list_fields( + leaf_search_responses + .into_iter() + .map(|resp| resp.fields.into_iter().map(Result::Ok)) + .collect_vec(), + )?; + Ok(ListFieldsResponse { fields }) + + // Extract the list of index ids from the splits. + // For each node, forward to a node with an affinity for that index id. +} +/// Builds a list of [`LeafListFieldsRequest`], one per index, from a list of [`SearchJob`]. +pub fn jobs_to_leaf_requests( + request: &ListFieldsRequest, + index_uid_to_id: &HashMap, + jobs: Vec, +) -> crate::Result> { + let search_request_for_leaf = request.clone(); + let mut leaf_search_requests = Vec::new(); + // Group jobs by index uid. + for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) { + let (index_uri, index_id) = index_uid_to_id.get(&index_uid).ok_or_else(|| { + SearchError::Internal(format!( + "received list fields job for an unknown index {index_uid}. it should never happen" + )) + })?; + let leaf_search_request = LeafListFieldsRequest { + index_id: index_id.to_string(), + index_uri: index_uri.to_string(), + fields: search_request_for_leaf.fields.clone(), + split_offsets: job_group.into_iter().map(|job| job.offsets).collect(), + }; + leaf_search_requests.push(leaf_search_request); + } + Ok(leaf_search_requests) +} + +#[cfg(test)] +mod tests { + use quickwit_proto::search::ListFieldsEntryResponse; + use tantivy::schema::Type; + + use super::*; + + #[test] + fn merge_leaf_list_fields_identical_test() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone()].into_iter().map(Result::Ok), + vec![entry2.clone()].into_iter().map(Result::Ok), + ]) + .unwrap(); + assert_eq!(resp, vec![entry1]); + } + #[test] + fn merge_leaf_list_fields_different_test() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field2".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone()].into_iter().map(Result::Ok), + vec![entry2.clone()].into_iter().map(Result::Ok), + ]) + .unwrap(); + assert_eq!(resp, vec![entry1, entry2]); + } + #[test] + fn merge_leaf_list_fields_non_searchable_test() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: false, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index2".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone()].into_iter().map(Result::Ok), + vec![entry2.clone()].into_iter().map(Result::Ok), + ]) + .unwrap(); + let expected = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: vec!["index2".to_string()], + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string(), "index2".to_string()], + }; + assert_eq!(resp, vec![expected]); + } + #[test] + fn merge_leaf_list_fields_non_aggregatable_test() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: false, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index2".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone()].into_iter().map(Result::Ok), + vec![entry2.clone()].into_iter().map(Result::Ok), + ]) + .unwrap(); + let expected = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: vec!["index2".to_string()], + index_ids: vec!["index1".to_string(), "index2".to_string()], + }; + assert_eq!(resp, vec![expected]); + } + #[test] + fn merge_leaf_list_fields_mixed_types1() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry3 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::U64.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone(), entry2.clone()] + .into_iter() + .map(Result::Ok), + vec![entry3.clone()].into_iter().map(Result::Ok), + ]) + .unwrap(); + assert_eq!(resp, vec![entry1.clone(), entry3.clone()]); + } + #[test] + fn merge_leaf_list_fields_mixed_types2() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry3 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::U64.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone(), entry3.clone()] + .into_iter() + .map(Result::Ok), + vec![entry2.clone()].into_iter().map(Result::Ok), + ]) + .unwrap(); + assert_eq!(resp, vec![entry1.clone(), entry3.clone()]); + } + #[test] + fn merge_leaf_list_fields_multiple_field_names() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry3 = ListFieldsEntryResponse { + field_name: "field2".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone(), entry3.clone()] + .into_iter() + .map(Result::Ok), + vec![entry2.clone()].into_iter().map(Result::Ok), + ]) + .unwrap(); + assert_eq!(resp, vec![entry1.clone(), entry3.clone()]); + } + #[test] + fn merge_leaf_list_fields_non_aggregatable_list_test() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: vec!["index1".to_string()], + non_aggregatable_index_ids: Vec::new(), + index_ids: vec![ + "index1".to_string(), + "index2".to_string(), + "index3".to_string(), + ], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: false, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index4".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone()].into_iter().map(Result::Ok), + vec![entry2.clone()].into_iter().map(Result::Ok), + ]) + .unwrap(); + let expected = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: Type::Str.to_code() as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: vec!["index1".to_string(), "index4".to_string()], + non_aggregatable_index_ids: Vec::new(), + index_ids: vec![ + "index1".to_string(), + "index2".to_string(), + "index3".to_string(), + "index4".to_string(), + ], + }; + assert_eq!(resp, vec![expected]); + } +} diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 07489720df9..25f21b16cc6 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -71,9 +71,11 @@ const MAX_SCROLL_TTL: Duration = Duration::from_secs(DELETION_GRACE_PERIOD.as_se /// SearchJob to be assigned to search clients by the [`SearchJobPlacer`]. #[derive(Debug, Clone, PartialEq)] pub struct SearchJob { - index_uid: IndexUid, + /// The index UID. + pub index_uid: IndexUid, cost: usize, - offsets: SplitIdAndFooterOffsets, + /// The split ID and footer offsets of the split. + pub offsets: SplitIdAndFooterOffsets, } impl SearchJob { diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 48d0d6e2e05..2a3ee4ac134 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -29,11 +29,11 @@ use quickwit_config::SearcherConfig; use quickwit_doc_mapper::DocMapper; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::search::{ - FetchDocsRequest, FetchDocsResponse, GetKvRequest, Hit, LeafListTermsRequest, - LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, - LeafSearchStreamResponse, ListTermsRequest, ListTermsResponse, PutKvRequest, - ReportSplitsRequest, ReportSplitsResponse, ScrollRequest, SearchRequest, SearchResponse, - SearchStreamRequest, SnippetRequest, + FetchDocsRequest, FetchDocsResponse, GetKvRequest, Hit, LeafListFieldsRequest, + LeafListTermsRequest, LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, + LeafSearchStreamRequest, LeafSearchStreamResponse, ListFieldsRequest, ListFieldsResponse, + ListTermsRequest, ListTermsResponse, PutKvRequest, ReportSplitsRequest, ReportSplitsResponse, + ScrollRequest, SearchRequest, SearchResponse, SearchStreamRequest, SnippetRequest, }; use quickwit_storage::{ MemorySizedCache, QuickwitCache, SplitCache, StorageCache, StorageResolver, @@ -43,6 +43,7 @@ use tokio::sync::Semaphore; use tokio_stream::wrappers::UnboundedReceiverStream; use crate::leaf_cache::LeafSearchCache; +use crate::list_fields::{leaf_list_fields, root_list_fields}; use crate::root::{fetch_docs_phase, get_snippet_request}; use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset}; use crate::search_stream::{leaf_search_stream, root_search_stream}; @@ -136,6 +137,18 @@ pub trait SearchService: 'static + Send + Sync { /// Indexers call report_splits to inform searchers node about the presence of a split, which /// would then be considered as a candidate for the searcher split cache. async fn report_splits(&self, report_splits: ReportSplitsRequest) -> ReportSplitsResponse; + + /// Return the list of fields for a given or multiple indices. + async fn root_list_fields( + &self, + list_fields: ListFieldsRequest, + ) -> crate::Result; + + /// Return the list of fields for one index. + async fn leaf_list_fields( + &self, + list_fields: LeafListFieldsRequest, + ) -> crate::Result; } impl SearchServiceImpl { @@ -314,6 +327,29 @@ impl SearchService for SearchServiceImpl { } ReportSplitsResponse {} } + + async fn root_list_fields( + &self, + list_fields_req: ListFieldsRequest, + ) -> crate::Result { + root_list_fields( + list_fields_req, + &self.cluster_client, + self.metastore.clone(), + ) + .await + } + + async fn leaf_list_fields( + &self, + list_fields_req: LeafListFieldsRequest, + ) -> crate::Result { + let index_uri = Uri::from_str(&list_fields_req.index_uri)?; + let storage = self.storage_resolver.resolve(&index_uri).await?; + let index_id = list_fields_req.index_id; + let split_ids = list_fields_req.split_offsets; + leaf_list_fields(index_id, storage, &self.searcher_context, &split_ids[..]).await + } } pub(crate) async fn scroll( diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs new file mode 100644 index 00000000000..5ef8c49c013 --- /dev/null +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs @@ -0,0 +1,35 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug)] +struct FieldCapabilityResponse { + indices: Vec, + fields: HashMap>, +} +#[derive(Serialize, Deserialize, Debug)] +struct FieldCapabilityFieldTypesResponse { + long: Option, + keyword: Option, + text: Option, + date_nanos: Option, + double: Option, // Duplicate to the float ? + boolean: Option, + ip: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +struct FieldCapabilityEntryResponse { + metadata_field: bool, // Always false + searchable: bool, + aggregatable: bool, + indices: Vec, // [ "index1", "index2" ], + non_aggregatable_indices: Vec, // [ "index1" ] + non_searchable_indices: Vec, // [ "index1" ] +} + +#[derive(Serialize, Deserialize, Debug)] +struct FieldCapabilityEntry { + searchable: bool, + aggregatable: bool, +} diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs index 0e7fe8514e4..16ab0f8b552 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs @@ -20,6 +20,7 @@ mod bulk_body; mod bulk_query_params; mod error; +mod field_capability; mod multi_search; mod scroll; mod search_body; diff --git a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs index 1bb2607f567..45227a5c55c 100644 --- a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs +++ b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs @@ -23,8 +23,9 @@ use async_trait::async_trait; use futures::TryStreamExt; use quickwit_proto::error::convert_to_grpc_result; use quickwit_proto::search::{ - search_service_server as grpc, GetKvRequest, GetKvResponse, LeafSearchStreamRequest, - LeafSearchStreamResponse, ReportSplitsRequest, ReportSplitsResponse, + search_service_server as grpc, GetKvRequest, GetKvResponse, LeafListFieldsRequest, + LeafSearchStreamRequest, LeafSearchStreamResponse, ListFieldsRequest, ListFieldsResponse, + ReportSplitsRequest, ReportSplitsResponse, }; use quickwit_proto::{set_parent_span_from_request_metadata, tonic, ServiceError}; use quickwit_search::SearchService; @@ -163,4 +164,23 @@ impl grpc::SearchService for GrpcSearchAdapter { self.0.report_splits(get_search_after_context_request).await; Ok(tonic::Response::new(ReportSplitsResponse {})) } + + #[instrument(skip(self, request))] + async fn list_fields( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + set_parent_span_from_request_metadata(request.metadata()); + let resp = self.0.root_list_fields(request.into_inner()).await; + convert_to_grpc_result(resp) + } + #[instrument(skip(self, request))] + async fn leaf_list_fields( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + set_parent_span_from_request_metadata(request.metadata()); + let resp = self.0.leaf_list_fields(request.into_inner()).await; + convert_to_grpc_result(resp) + } } diff --git a/quickwit/quickwit-storage/src/bundle_storage.rs b/quickwit/quickwit-storage/src/bundle_storage.rs index 13dfbd9a32d..4b991cf1bcf 100644 --- a/quickwit/quickwit-storage/src/bundle_storage.rs +++ b/quickwit/quickwit-storage/src/bundle_storage.rs @@ -45,6 +45,7 @@ use crate::{ /// with some metadata pub struct BundleStorage { storage: Arc, + /// The file path of the bundle in the storage. bundle_filepath: PathBuf, metadata: BundleStorageFileOffsets, }