From 13f34b5194a3cbe4292c4bbab5332ef0c9b7e18b Mon Sep 17 00:00:00 2001 From: PSeitz Date: Mon, 18 Dec 2023 11:34:41 +0100 Subject: [PATCH] add list_fields api (#4242) * add list_fields api * address comments * add and connect elastic search api * add schema fallback for old indices * add rest API test * add support for fields parameter handle wildcards handle _dynamic special case * rename index_id to index_ids * use protobuf * use ListFieldEntryResponse, remove _dynamic from API * parallel fetch * cache results, improve error message * move prost dependency * remove schema fallback * add elastic search compatibility test * add ip type to es comparison * add date_nanos type to es comparison --- quickwit/Cargo.lock | 1 + quickwit/quickwit-indexing/Cargo.toml | 2 +- .../quickwit-indexing/src/actors/packager.rs | 105 ++- quickwit/quickwit-indexing/src/models/mod.rs | 2 - .../src/models/split_fields.rs | 268 ------- quickwit/quickwit-proto/Cargo.toml | 1 + quickwit/quickwit-proto/build.rs | 1 + .../protos/quickwit/search.proto | 67 ++ .../src/codegen/quickwit/quickwit.search.rs | 283 +++++++ quickwit/quickwit-proto/src/search/mod.rs | 47 ++ quickwit/quickwit-search/src/client.rs | 18 + .../quickwit-search/src/cluster_client.rs | 15 +- quickwit/quickwit-search/src/leaf.rs | 31 +- quickwit/quickwit-search/src/lib.rs | 4 +- quickwit/quickwit-search/src/list_fields.rs | 700 ++++++++++++++++++ .../quickwit-search/src/list_fields_cache.rs | 119 +++ quickwit/quickwit-search/src/root.rs | 6 +- quickwit/quickwit-search/src/service.rs | 59 +- .../src/elastic_search_api/filter.rs | 20 +- .../src/elastic_search_api/mod.rs | 4 + .../model/field_capability.rs | 163 ++++ .../src/elastic_search_api/model/mod.rs | 5 + .../src/elastic_search_api/rest_handler.rs | 38 +- .../src/search_api/grpc_adapter.rs | 24 +- .../quickwit-storage/src/bundle_storage.rs | 1 + quickwit/rest-api-tests/run_tests.py | 2 +- .../0001-field-capabilities.yaml | 267 +++++++ .../_ctx.elasticsearch.yaml | 2 + .../es_field_capabilities/_ctx.quickwit.yaml | 1 + .../scenarii/es_field_capabilities/_ctx.yaml | 3 + .../_setup.elasticsearch.yaml | 34 + .../_setup.quickwit.yaml | 59 ++ .../_teardown.elasticsearch.yaml | 3 + .../_teardown.quickwit.yaml | 4 + 34 files changed, 2056 insertions(+), 303 deletions(-) delete mode 100644 quickwit/quickwit-indexing/src/models/split_fields.rs create mode 100644 quickwit/quickwit-search/src/list_fields.rs create mode 100644 quickwit/quickwit-search/src/list_fields_cache.rs create mode 100644 quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs create mode 100644 quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml create mode 100644 quickwit/rest-api-tests/scenarii/es_field_capabilities/_ctx.elasticsearch.yaml create mode 100644 quickwit/rest-api-tests/scenarii/es_field_capabilities/_ctx.quickwit.yaml create mode 100644 quickwit/rest-api-tests/scenarii/es_field_capabilities/_ctx.yaml create mode 100644 quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.elasticsearch.yaml create mode 100644 quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml create mode 100644 quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.elasticsearch.yaml create mode 100644 quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.quickwit.yaml diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index b1dac1332df..f9ee3b2324b 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5840,6 +5840,7 @@ dependencies = [ "tracing-opentelemetry", "ulid", "utoipa", + "zstd 0.13.0", ] [[package]] diff --git a/quickwit/quickwit-indexing/Cargo.toml b/quickwit/quickwit-indexing/Cargo.toml index cf728268da4..126ba2bc03e 100644 --- a/quickwit/quickwit-indexing/Cargo.toml +++ b/quickwit/quickwit-indexing/Cargo.toml @@ -87,10 +87,10 @@ bytes = { workspace = true } criterion = { workspace = true, features = ["async_tokio"] } mockall = { workspace = true } proptest = { workspace = true } -prost = { workspace = true } rand = { workspace = true } reqwest = { workspace = true } tempfile = { workspace = true } +prost = { workspace = true } quickwit-actors = { workspace = true, features = ["testsuite"] } quickwit-cluster = { workspace = true, features = ["testsuite"] } diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs index e0cc80cf519..69e46d52949 100644 --- a/quickwit/quickwit-indexing/src/actors/packager.rs +++ b/quickwit/quickwit-indexing/src/actors/packager.rs @@ -32,8 +32,11 @@ use quickwit_common::temp_dir::TempDirectory; use quickwit_directories::write_hotcache; use quickwit_doc_mapper::tag_pruning::append_to_tag_set; use quickwit_doc_mapper::NamedField; -use tantivy::schema::FieldType; -use tantivy::{InvertedIndexReader, ReloadPolicy, SegmentMeta}; +use quickwit_proto::search::{ + serialize_split_fields, ListFieldType, ListFields, ListFieldsEntryResponse, +}; +use tantivy::schema::{FieldType, Type}; +use tantivy::{FieldMetadata, InvertedIndexReader, ReloadPolicy, SegmentMeta}; use tokio::runtime::Handle; use tracing::{debug, info, instrument, warn}; @@ -46,8 +49,7 @@ const MAX_VALUES_PER_TAG_FIELD: usize = if cfg!(any(test, feature = "testsuite") use crate::actors::Uploader; use crate::models::{ - serialize_split_fields, EmptySplit, IndexedSplit, IndexedSplitBatch, PackagedSplit, - PackagedSplitBatch, + EmptySplit, IndexedSplit, IndexedSplitBatch, PackagedSplit, PackagedSplitBatch, }; /// The role of the packager is to get an index writer and @@ -315,7 +317,8 @@ fn create_packaged_split( let mut hotcache_bytes = Vec::new(); build_hotcache(split.split_scratch_directory.path(), &mut hotcache_bytes)?; ctx.record_progress(); - let serialized_split_fields = serialize_split_fields(&fields_metadata); + + let serialized_split_fields = serialize_field_metadata(&fields_metadata); let packaged_split = PackagedSplit { serialized_split_fields, @@ -328,6 +331,47 @@ fn create_packaged_split( Ok(packaged_split) } +/// Serializes the Split fields. +/// +/// `fields_metadata` has to be sorted. +fn serialize_field_metadata(fields_metadata: &[FieldMetadata]) -> Vec { + let fields = fields_metadata + .iter() + .map(field_metadata_to_list_field_serialized) + .collect::>(); + + serialize_split_fields(ListFields { fields }) +} + +fn tantivy_type_to_list_field_type(typ: Type) -> ListFieldType { + match typ { + Type::Str => ListFieldType::Str, + Type::U64 => ListFieldType::U64, + Type::I64 => ListFieldType::I64, + Type::F64 => ListFieldType::F64, + Type::Bool => ListFieldType::Bool, + Type::Date => ListFieldType::Date, + Type::Facet => ListFieldType::Facet, + Type::Bytes => ListFieldType::Bytes, + Type::Json => ListFieldType::Json, + Type::IpAddr => ListFieldType::IpAddr, + } +} + +fn field_metadata_to_list_field_serialized( + field_metadata: &FieldMetadata, +) -> ListFieldsEntryResponse { + ListFieldsEntryResponse { + field_name: field_metadata.field_name.to_string(), + field_type: tantivy_type_to_list_field_type(field_metadata.typ) as i32, + searchable: field_metadata.indexed, + aggregatable: field_metadata.fast, + index_ids: vec![], + non_searchable_index_ids: vec![], + non_aggregatable_index_ids: vec![], + } +} + /// Reads u64 from stored term data. fn u64_from_term_data(data: &[u8]) -> anyhow::Result { let u64_bytes: [u8; 8] = data[0..8] @@ -343,15 +387,64 @@ mod tests { use quickwit_actors::{ObservationType, Universe}; use quickwit_metastore::checkpoint::IndexCheckpointDelta; use quickwit_proto::indexing::IndexingPipelineId; + use quickwit_proto::search::{deserialize_split_fields, ListFieldsEntryResponse}; use quickwit_proto::types::{IndexUid, PipelineUid}; use tantivy::directory::MmapDirectory; - use tantivy::schema::{NumericOptions, Schema, FAST, STRING, TEXT}; + use tantivy::schema::{NumericOptions, Schema, Type, FAST, STRING, TEXT}; use tantivy::{doc, DateTime, IndexBuilder, IndexSettings}; use tracing::Span; use super::*; use crate::models::{PublishLock, SplitAttrs}; + #[test] + fn serialize_field_metadata_test() { + let fields_metadata = vec![ + FieldMetadata { + field_name: "test".to_string(), + typ: Type::Str, + indexed: true, + stored: true, + fast: true, + }, + FieldMetadata { + field_name: "test2".to_string(), + typ: Type::Str, + indexed: true, + stored: false, + fast: false, + }, + FieldMetadata { + field_name: "test3".to_string(), + typ: Type::U64, + indexed: true, + stored: false, + fast: true, + }, + ]; + + let out = serialize_field_metadata(&fields_metadata); + + let deserialized: Vec = + deserialize_split_fields(&mut &out[..]).unwrap().fields; + + assert_eq!(fields_metadata.len(), deserialized.len()); + assert_eq!(deserialized[0].field_name, "test"); + assert_eq!(deserialized[0].field_type, ListFieldType::Str as i32); + assert!(deserialized[0].searchable); + assert!(deserialized[0].aggregatable); + + assert_eq!(deserialized[1].field_name, "test2"); + assert_eq!(deserialized[1].field_type, ListFieldType::Str as i32); + assert!(deserialized[1].searchable); + assert!(!deserialized[1].aggregatable); + + assert_eq!(deserialized[2].field_name, "test3"); + assert_eq!(deserialized[2].field_type, ListFieldType::U64 as i32); + assert!(deserialized[2].searchable); + assert!(deserialized[2].aggregatable); + } + fn make_indexed_split_for_test( segment_timestamps: &[DateTime], ) -> anyhow::Result { diff --git a/quickwit/quickwit-indexing/src/models/mod.rs b/quickwit/quickwit-indexing/src/models/mod.rs index 439b8171ee4..807a540da2c 100644 --- a/quickwit/quickwit-indexing/src/models/mod.rs +++ b/quickwit/quickwit-indexing/src/models/mod.rs @@ -32,7 +32,6 @@ mod publisher_message; mod raw_doc_batch; mod shard_positions; mod split_attrs; -mod split_fields; pub use indexed_split::{ CommitTrigger, EmptySplit, IndexedSplit, IndexedSplitBatch, IndexedSplitBatchBuilder, @@ -54,7 +53,6 @@ pub use raw_doc_batch::RawDocBatch; pub(crate) use shard_positions::LocalShardPositionsUpdate; pub use shard_positions::ShardPositionsService; pub use split_attrs::{create_split_metadata, SplitAttrs}; -pub use split_fields::{read_split_fields, serialize_split_fields, FieldConfig}; #[derive(Debug)] pub struct NewPublishToken(pub PublishToken); diff --git a/quickwit/quickwit-indexing/src/models/split_fields.rs b/quickwit/quickwit-indexing/src/models/split_fields.rs deleted file mode 100644 index 0e6749494c2..00000000000 --- a/quickwit/quickwit-indexing/src/models/split_fields.rs +++ /dev/null @@ -1,268 +0,0 @@ -// Copyright (C) 2023 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::io::{self, ErrorKind, Read}; - -use tantivy::schema::Type; -use tantivy::FieldMetadata; - -#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] -pub struct FieldConfig { - pub typ: Type, - pub indexed: bool, - pub stored: bool, - pub fast: bool, -} - -impl FieldConfig { - fn serialize(&self) -> [u8; 2] { - let typ = self.typ.to_code(); - let flags = (self.indexed as u8) << 2 | (self.stored as u8) << 1 | (self.fast as u8); - [typ, flags] - } - fn deserialize_from(data: [u8; 2]) -> io::Result { - let typ = Type::from_code(data[0]).ok_or_else(|| { - io::Error::new( - ErrorKind::InvalidData, - format!("could not deserialize type {}", data[0]), - ) - })?; - - let data = data[1]; - let indexed = (data & 0b100) != 0; - let stored = (data & 0b010) != 0; - let fast = (data & 0b001) != 0; - - Ok(FieldConfig { - typ, - indexed, - stored, - fast, - }) - } -} - -/// Serializes the Split fields. -/// -/// `fields_metadata` has to be sorted. -pub fn serialize_split_fields(fields_metadata: &[FieldMetadata]) -> Vec { - // ensure that fields_metadata is strictly sorted. - debug_assert!(fields_metadata.windows(2).all(|w| w[0] < w[1])); - let mut payload = Vec::new(); - // Write Num Fields - let length = fields_metadata.len() as u32; - payload.extend_from_slice(&length.to_le_bytes()); - - for field_metadata in fields_metadata { - write_field(field_metadata, &mut payload); - } - let compression_level = 3; - let payload_compressed = zstd::stream::encode_all(&mut &payload[..], compression_level) - .expect("zstd encoding failed"); - let mut out = Vec::new(); - // Write Header -- Format Version - let format_version = 1u8; - out.push(format_version); - // Write Payload - out.extend_from_slice(&payload_compressed); - out -} - -fn write_field(field_metadata: &FieldMetadata, out: &mut Vec) { - let field_config = FieldConfig { - typ: field_metadata.typ, - indexed: field_metadata.indexed, - stored: field_metadata.stored, - fast: field_metadata.fast, - }; - - // Write Config 2 bytes - out.extend_from_slice(&field_config.serialize()); - let str_length = field_metadata.field_name.len() as u16; - // Write String length 2 bytes - out.extend_from_slice(&str_length.to_le_bytes()); - out.extend_from_slice(field_metadata.field_name.as_bytes()); -} - -/// Reads a fixed number of bytes into an array and returns the array. -fn read_exact_array(reader: &mut R) -> io::Result<[u8; N]> { - let mut buffer = [0u8; N]; - reader.read_exact(&mut buffer)?; - Ok(buffer) -} - -/// Reads the Split fields from a zstd compressed stream of bytes -pub fn read_split_fields( - mut reader: R, -) -> io::Result>> { - let format_version = read_exact_array::<_, 1>(&mut reader)?[0]; - assert_eq!(format_version, 1); - let reader = zstd::Decoder::new(reader)?; - read_split_fields_from_zstd(reader) -} - -fn read_field(reader: &mut R) -> io::Result { - // Read FieldConfig (2 bytes) - let config_bytes = read_exact_array::<_, 2>(reader)?; - let field_config = FieldConfig::deserialize_from(config_bytes)?; // Assuming this returns a Result - - // Read field name length and the field name - let name_len = u16::from_le_bytes(read_exact_array::<_, 2>(reader)?) as usize; - - let mut data = Vec::new(); - data.resize(name_len, 0); - reader.read_exact(&mut data)?; - - let field_name = String::from_utf8(data).map_err(|err| { - io::Error::new( - ErrorKind::InvalidData, - format!( - "Encountered invalid utf8 when deserializing field name: {}", - err - ), - ) - })?; - Ok(FieldMetadata { - field_name, - typ: field_config.typ, - indexed: field_config.indexed, - stored: field_config.stored, - fast: field_config.fast, - }) -} - -/// Reads the Split fields from a stream of bytes -fn read_split_fields_from_zstd( - mut reader: R, -) -> io::Result>> { - let mut num_fields = u32::from_le_bytes(read_exact_array::<_, 4>(&mut reader)?); - - Ok(std::iter::from_fn(move || { - if num_fields == 0 { - return None; - } - num_fields -= 1; - - Some(read_field(&mut reader)) - })) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn field_config_deser_test() { - let field_config = FieldConfig { - typ: Type::Str, - indexed: true, - stored: false, - fast: true, - }; - let serialized = field_config.serialize(); - let deserialized = FieldConfig::deserialize_from(serialized).unwrap(); - assert_eq!(field_config, deserialized); - } - #[test] - fn write_read_field_test() { - for typ in Type::iter_values() { - let field_metadata = FieldMetadata { - field_name: "test".to_string(), - typ, - indexed: true, - stored: true, - fast: true, - }; - let mut out = Vec::new(); - write_field(&field_metadata, &mut out); - let deserialized = read_field(&mut &out[..]).unwrap(); - assert_eq!(field_metadata, deserialized); - } - let field_metadata = FieldMetadata { - field_name: "test".to_string(), - typ: Type::Str, - indexed: false, - stored: true, - fast: true, - }; - let mut out = Vec::new(); - write_field(&field_metadata, &mut out); - let deserialized = read_field(&mut &out[..]).unwrap(); - assert_eq!(field_metadata, deserialized); - - let field_metadata = FieldMetadata { - field_name: "test".to_string(), - typ: Type::Str, - indexed: false, - stored: false, - fast: true, - }; - let mut out = Vec::new(); - write_field(&field_metadata, &mut out); - let deserialized = read_field(&mut &out[..]).unwrap(); - assert_eq!(field_metadata, deserialized); - - let field_metadata = FieldMetadata { - field_name: "test".to_string(), - typ: Type::Str, - indexed: true, - stored: false, - fast: false, - }; - let mut out = Vec::new(); - write_field(&field_metadata, &mut out); - let deserialized = read_field(&mut &out[..]).unwrap(); - assert_eq!(field_metadata, deserialized); - } - #[test] - fn write_split_fields_test() { - let fields_metadata = vec![ - FieldMetadata { - field_name: "test".to_string(), - typ: Type::Str, - indexed: true, - stored: true, - fast: true, - }, - FieldMetadata { - field_name: "test2".to_string(), - typ: Type::Str, - indexed: true, - stored: false, - fast: false, - }, - FieldMetadata { - field_name: "test3".to_string(), - typ: Type::U64, - indexed: true, - stored: false, - fast: true, - }, - ]; - - let out = serialize_split_fields(&fields_metadata); - - let deserialized: Vec = read_split_fields(&mut &out[..]) - .unwrap() - .map(|el| el.unwrap()) - .collect(); - - assert_eq!(fields_metadata, deserialized); - } -} diff --git a/quickwit/quickwit-proto/Cargo.toml b/quickwit/quickwit-proto/Cargo.toml index 470c8414f38..4d81ac71eeb 100644 --- a/quickwit/quickwit-proto/Cargo.toml +++ b/quickwit/quickwit-proto/Cargo.toml @@ -33,6 +33,7 @@ tracing = { workspace = true } tracing-opentelemetry = { workspace = true } ulid = { workspace = true } utoipa = { workspace = true } +zstd = { workspace = true } quickwit-actors = { workspace = true } quickwit-common = { workspace = true } diff --git a/quickwit/quickwit-proto/build.rs b/quickwit/quickwit-proto/build.rs index 5bf71bc4f3d..5f8a192ef8e 100644 --- a/quickwit/quickwit-proto/build.rs +++ b/quickwit/quickwit-proto/build.rs @@ -128,6 +128,7 @@ fn main() -> Result<(), Box> { .type_attribute("PartialHit", "#[derive(Eq, Hash)]") .type_attribute("PartialHit.sort_value", "#[derive(Copy)]") .type_attribute("SearchRequest", "#[derive(Eq, Hash)]") + .type_attribute("ListFieldSerialized", "#[derive(Eq)]") .type_attribute("SortByValue", "#[derive(Ord, PartialOrd)]") .type_attribute("SortField", "#[derive(Eq, Hash)]") .out_dir("src/codegen/quickwit") diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 90829dcaa46..7f5b05231bc 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -72,6 +72,10 @@ service SearchService { rpc GetKV(GetKVRequest) returns (GetKVResponse); rpc ReportSplits(ReportSplitsRequest) returns (ReportSplitsResponse); + + rpc ListFields(ListFieldsRequest) returns (ListFieldsResponse); + + rpc LeafListFields(LeafListFieldsRequest) returns (ListFieldsResponse); } /// Scroll Request @@ -111,7 +115,70 @@ message ReportSplitsRequest { message ReportSplitsResponse {} +// -- ListFields ------------------- + +message ListFieldsRequest { + // Optional limit query to a set of indexes. + repeated string index_ids = 1; + // Optional limit query to a list of fields + // Wildcard expressions are supported. + repeated string fields = 2; + + // Control if the the request will fail if split_ids contains a split that does not exist. + // optional bool fail_on_missing_index = 3; +} + +message LeafListFieldsRequest { + // The index id + string index_id = 1; + // The index uri + string index_uri = 2; + // Index split ids to apply the query on. + // This ids are resolved from the index_uri defined in the search_request. + repeated SplitIdAndFooterOffsets split_offsets = 3; + + // Optional limit query to a list of fields + // Wildcard expressions are supported. + repeated string fields = 4; + +} + +message ListFieldsResponse { + repeated ListFieldsEntryResponse fields = 1; +} +message ListFieldsEntryResponse { + string field_name = 1; + ListFieldType field_type = 2; + // The index ids the field exists + repeated string index_ids = 3; + // True means the field is searchable (indexed) in at least some indices. + // False means the field is not searchable in any indices. + bool searchable = 4; + // True means the field is aggregatable (fast) in at least some indices. + // False means the field is not aggregatable in any indices. + bool aggregatable = 5; + // The index ids the field exists, but is not searchable. + repeated string non_searchable_index_ids = 6; + // The index ids the field exists, but is not aggregatable + repeated string non_aggregatable_index_ids = 7; +} + +enum ListFieldType { + STR = 0; + U64 = 1; + I64 = 2; + F64 = 3; + BOOL = 4; + DATE = 5; + FACET = 6; + BYTES = 7; + IP_ADDR = 8; + JSON = 9; +} +message ListFields { + repeated ListFieldsEntryResponse fields = 1; +} // -- Search ------------------- message SearchRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 92e63827b95..2c55a308fa5 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -61,6 +61,81 @@ pub struct ReportSplitsRequest { #[derive(Clone, PartialEq, ::prost::Message)] pub struct ReportSplitsResponse {} #[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListFieldsRequest { + /// Optional limit query to a set of indexes. + #[prost(string, repeated, tag = "1")] + pub index_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// Optional limit query to a list of fields + /// Wildcard expressions are supported. + #[prost(string, repeated, tag = "2")] + pub fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LeafListFieldsRequest { + /// The index id + #[prost(string, tag = "1")] + pub index_id: ::prost::alloc::string::String, + /// The index uri + #[prost(string, tag = "2")] + pub index_uri: ::prost::alloc::string::String, + /// Index split ids to apply the query on. + /// This ids are resolved from the index_uri defined in the search_request. + #[prost(message, repeated, tag = "3")] + pub split_offsets: ::prost::alloc::vec::Vec, + /// Optional limit query to a list of fields + /// Wildcard expressions are supported. + #[prost(string, repeated, tag = "4")] + pub fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListFieldsResponse { + #[prost(message, repeated, tag = "1")] + pub fields: ::prost::alloc::vec::Vec, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListFieldsEntryResponse { + #[prost(string, tag = "1")] + pub field_name: ::prost::alloc::string::String, + #[prost(enumeration = "ListFieldType", tag = "2")] + pub field_type: i32, + /// The index ids the field exists + #[prost(string, repeated, tag = "3")] + pub index_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// True means the field is searchable (indexed) in at least some indices. + /// False means the field is not searchable in any indices. + #[prost(bool, tag = "4")] + pub searchable: bool, + /// True means the field is aggregatable (fast) in at least some indices. + /// False means the field is not aggregatable in any indices. + #[prost(bool, tag = "5")] + pub aggregatable: bool, + /// The index ids the field exists, but is not searchable. + #[prost(string, repeated, tag = "6")] + pub non_searchable_index_ids: ::prost::alloc::vec::Vec< + ::prost::alloc::string::String, + >, + /// The index ids the field exists, but is not aggregatable + #[prost(string, repeated, tag = "7")] + pub non_aggregatable_index_ids: ::prost::alloc::vec::Vec< + ::prost::alloc::string::String, + >, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListFields { + #[prost(message, repeated, tag = "1")] + pub fields: ::prost::alloc::vec::Vec, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] #[derive(Eq, Hash)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -508,6 +583,58 @@ pub struct LeafSearchStreamResponse { #[serde(rename_all = "snake_case")] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] +pub enum ListFieldType { + Str = 0, + U64 = 1, + I64 = 2, + F64 = 3, + Bool = 4, + Date = 5, + Facet = 6, + Bytes = 7, + IpAddr = 8, + Json = 9, +} +impl ListFieldType { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + ListFieldType::Str => "STR", + ListFieldType::U64 => "U64", + ListFieldType::I64 => "I64", + ListFieldType::F64 => "F64", + ListFieldType::Bool => "BOOL", + ListFieldType::Date => "DATE", + ListFieldType::Facet => "FACET", + ListFieldType::Bytes => "BYTES", + ListFieldType::IpAddr => "IP_ADDR", + ListFieldType::Json => "JSON", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "STR" => Some(Self::Str), + "U64" => Some(Self::U64), + "I64" => Some(Self::I64), + "F64" => Some(Self::F64), + "BOOL" => Some(Self::Bool), + "DATE" => Some(Self::Date), + "FACET" => Some(Self::Facet), + "BYTES" => Some(Self::Bytes), + "IP_ADDR" => Some(Self::IpAddr), + "JSON" => Some(Self::Json), + _ => None, + } + } +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[serde(rename_all = "snake_case")] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] pub enum CountHits { /// Count all hits, querying all splits. CountAll = 0, @@ -991,6 +1118,58 @@ pub mod search_service_client { ); self.inner.unary(req, path, codec).await } + pub async fn list_fields( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.search.SearchService/ListFields", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("quickwit.search.SearchService", "ListFields")); + self.inner.unary(req, path, codec).await + } + pub async fn leaf_list_fields( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.search.SearchService/LeafListFields", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("quickwit.search.SearchService", "LeafListFields"), + ); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -1098,6 +1277,20 @@ pub mod search_service_server { tonic::Response, tonic::Status, >; + async fn list_fields( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn leaf_list_fields( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct SearchServiceServer { @@ -1626,6 +1819,96 @@ pub mod search_service_server { }; Box::pin(fut) } + "/quickwit.search.SearchService/ListFields" => { + #[allow(non_camel_case_types)] + struct ListFieldsSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::UnaryService + for ListFieldsSvc { + type Response = super::ListFieldsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { (*inner).list_fields(request).await }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = ListFieldsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/quickwit.search.SearchService/LeafListFields" => { + #[allow(non_camel_case_types)] + struct LeafListFieldsSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::UnaryService + for LeafListFieldsSvc { + type Response = super::ListFieldsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).leaf_list_fields(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = LeafListFieldsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { Ok( diff --git a/quickwit/quickwit-proto/src/search/mod.rs b/quickwit/quickwit-proto/src/search/mod.rs index e55d3ce640d..3f3ba89855a 100644 --- a/quickwit/quickwit-proto/src/search/mod.rs +++ b/quickwit/quickwit-proto/src/search/mod.rs @@ -19,7 +19,9 @@ use std::cmp::Ordering; use std::fmt; +use std::io::{self, Read}; +use prost::Message; pub use sort_by_value::SortValue; include!("../codegen/quickwit/quickwit.search.rs"); @@ -243,3 +245,48 @@ impl PartialHit { } } } + +/// Serializes the Split fields. +/// +/// `fields_metadata` has to be sorted. +pub fn serialize_split_fields(list_fields: ListFields) -> Vec { + let payload = list_fields.encode_to_vec(); + let compression_level = 3; + let payload_compressed = zstd::stream::encode_all(&mut &payload[..], compression_level) + .expect("zstd encoding failed"); + let mut out = Vec::new(); + // Write Header -- Format Version 2 + let format_version = 2u8; + out.push(format_version); + // Write Payload + out.extend_from_slice(&payload_compressed); + out +} + +/// Reads a fixed number of bytes into an array and returns the array. +fn read_exact_array(reader: &mut impl Read) -> io::Result<[u8; N]> { + let mut buffer = [0u8; N]; + reader.read_exact(&mut buffer)?; + Ok(buffer) +} + +/// Reads the Split fields from a zstd compressed stream of bytes +pub fn deserialize_split_fields(mut reader: R) -> io::Result { + let format_version = read_exact_array::<1>(&mut reader)?[0]; + if format_version != 2 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Unsupported split field format version: {}", format_version), + )); + } + let reader = zstd::Decoder::new(reader)?; + read_split_fields_from_zstd(reader) +} + +/// Reads the Split fields from a stream of bytes +fn read_split_fields_from_zstd(reader: R) -> io::Result { + let all_bytes: Vec<_> = reader.bytes().collect::>()?; + let serialized_list_fields: ListFields = prost::Message::decode(&all_bytes[..])?; + + Ok(serialized_list_fields) +} diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index 0ed5c214e83..0cd1c1eb1b1 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -140,6 +140,24 @@ impl SearchServiceClient { } } + /// Perform leaf search. + pub async fn leaf_list_fields( + &mut self, + request: quickwit_proto::search::LeafListFieldsRequest, + ) -> crate::Result { + match &mut self.client_impl { + SearchServiceClientImpl::Grpc(grpc_client) => { + let tonic_request = Request::new(request); + let tonic_response = grpc_client + .leaf_list_fields(tonic_request) + .await + .map_err(|tonic_error| parse_grpc_error(&tonic_error))?; + Ok(tonic_response.into_inner()) + } + SearchServiceClientImpl::Local(service) => service.leaf_list_fields(request).await, + } + } + /// Perform leaf stream. pub async fn leaf_search_stream( &mut self, diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index b6ee4d14103..b1c0d63d07f 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -23,9 +23,9 @@ use base64::Engine; use futures::future::ready; use futures::{Future, StreamExt}; use quickwit_proto::search::{ - FetchDocsRequest, FetchDocsResponse, GetKvRequest, LeafListTermsRequest, LeafListTermsResponse, - LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, LeafSearchStreamResponse, - PutKvRequest, + FetchDocsRequest, FetchDocsResponse, GetKvRequest, LeafListFieldsRequest, LeafListTermsRequest, + LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, + LeafSearchStreamResponse, ListFieldsResponse, PutKvRequest, }; use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults; use tokio::sync::mpsc::error::SendError; @@ -112,6 +112,15 @@ impl ClusterClient { response_res } + /// Leaf search with retry on another node client. + pub async fn leaf_list_fields( + &self, + request: LeafListFieldsRequest, + mut client: SearchServiceClient, + ) -> crate::Result { + client.leaf_list_fields(request.clone()).await + } + /// Leaf search stream with retry on another node client. pub async fn leaf_search_stream( &self, diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index d64fc3b74aa..3e371059629 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -83,18 +83,14 @@ async fn get_split_footer_from_cache_or_fetch( Ok(footer_data_opt) } -/// Opens a `tantivy::Index` for the given split with several cache layers: +/// Returns hotcache_bytes and the split directory (`BundleStorage`) with cache layer: /// - A split footer cache given by `SearcherContext.split_footer_cache`. -/// - A fast fields cache given by `SearcherContext.storage_long_term_cache`. -/// - An ephemeral unbounded cache directory whose lifetime is tied to the returned `Index`. #[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] -pub(crate) async fn open_index_with_caches( +pub(crate) async fn open_split_bundle( searcher_context: &SearcherContext, index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, - tokenizer_manager: Option<&TokenizerManager>, - ephemeral_unbounded_cache: bool, -) -> anyhow::Result { +) -> anyhow::Result<(FileSlice, BundleStorage)> { let split_file = PathBuf::from(format!("{}.split", split_and_footer_offsets.split_id)); let footer_data = get_split_footer_from_cache_or_fetch( index_storage.clone(), @@ -117,17 +113,38 @@ pub(crate) async fn open_index_with_caches( split_file, FileSlice::new(Arc::new(footer_data)), )?; + + Ok((hotcache_bytes, bundle_storage)) +} + +/// Opens a `tantivy::Index` for the given split with several cache layers: +/// - A split footer cache given by `SearcherContext.split_footer_cache`. +/// - A fast fields cache given by `SearcherContext.storage_long_term_cache`. +/// - An ephemeral unbounded cache directory whose lifetime is tied to the returned `Index`. +#[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] +pub(crate) async fn open_index_with_caches( + searcher_context: &SearcherContext, + index_storage: Arc, + split_and_footer_offsets: &SplitIdAndFooterOffsets, + tokenizer_manager: Option<&TokenizerManager>, + ephemeral_unbounded_cache: bool, +) -> anyhow::Result { + let (hotcache_bytes, bundle_storage) = + open_split_bundle(searcher_context, index_storage, split_and_footer_offsets).await?; + let bundle_storage_with_cache = wrap_storage_with_cache( searcher_context.fast_fields_cache.clone(), Arc::new(bundle_storage), ); let directory = StorageDirectory::new(bundle_storage_with_cache); + let hot_directory = if ephemeral_unbounded_cache { let caching_directory = CachingDirectory::new_unbounded(Arc::new(directory)); HotDirectory::open(caching_directory, hotcache_bytes.read_bytes()?)? } else { HotDirectory::open(directory, hotcache_bytes.read_bytes()?)? }; + let mut index = Index::open(hot_directory)?; if let Some(tokenizer_manager) = tokenizer_manager { index.set_tokenizers(tokenizer_manager.tantivy_manager().clone()); diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 65fc51ff295..1223dbed578 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -31,6 +31,8 @@ mod filters; mod find_trace_ids_collector; mod leaf; mod leaf_cache; +mod list_fields; +mod list_fields_cache; mod retry; mod root; mod scroll_context; @@ -166,7 +168,7 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn } } -/// Extract the list of relevant splits for a given search request. +/// Extract the list of relevant splits for a given request. async fn list_relevant_splits( index_uids: Vec, start_timestamp: Option, diff --git a/quickwit/quickwit-search/src/list_fields.rs b/quickwit/quickwit-search/src/list_fields.rs new file mode 100644 index 00000000000..ebb052e787b --- /dev/null +++ b/quickwit/quickwit-search/src/list_fields.rs @@ -0,0 +1,700 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; +use std::path::Path; +use std::sync::Arc; + +use anyhow::Context; +use futures::future; +use futures::future::try_join_all; +use itertools::Itertools; +use quickwit_common::shared_consts::SPLIT_FIELDS_FILE_NAME; +use quickwit_common::uri::Uri; +use quickwit_metastore::{ListIndexesMetadataResponseExt, SplitMetadata}; +use quickwit_proto::metastore::{ + ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, +}; +use quickwit_proto::search::{ + deserialize_split_fields, LeafListFieldsRequest, ListFields, ListFieldsEntryResponse, + ListFieldsRequest, ListFieldsResponse, SplitIdAndFooterOffsets, +}; +use quickwit_proto::types::IndexUid; +use quickwit_storage::Storage; + +use crate::leaf::open_split_bundle; +use crate::service::SearcherContext; +use crate::{list_relevant_splits, ClusterClient, SearchError, SearchJob}; + +/// Get the list of splits for the request which we need to scan. +pub async fn get_fields_from_split<'a>( + searcher_context: &SearcherContext, + index_id: String, + split_and_footer_offsets: &'a SplitIdAndFooterOffsets, + index_storage: Arc, +) -> anyhow::Result + Send>> { + if let Some(list_fields) = searcher_context + .list_fields_cache + .get(split_and_footer_offsets.clone()) + { + return Ok(Box::new(list_fields.fields.into_iter())); + } + let (_, split_bundle) = + open_split_bundle(searcher_context, index_storage, split_and_footer_offsets).await?; + + let serialized_split_fields = split_bundle + .get_all(Path::new(SPLIT_FIELDS_FILE_NAME)) + .await?; + let serialized_split_fields_len = serialized_split_fields.len(); + let mut list_fields = deserialize_split_fields(serialized_split_fields) + .with_context(|| { + format!( + "could not read split fields (serialized len: {})", + serialized_split_fields_len, + ) + })? + .fields; + for list_field_entry in list_fields.iter_mut() { + list_field_entry.index_ids = vec![index_id.to_string()]; + } + // Prepare for grouping by field name and type + list_fields.sort_by(|left, right| match left.field_name.cmp(&right.field_name) { + Ordering::Equal => left.field_type.cmp(&right.field_type), + other => other, + }); + // Put result into cache + searcher_context.list_fields_cache.put( + split_and_footer_offsets.clone(), + ListFields { + fields: list_fields.clone(), + }, + ); + + Ok(Box::new(list_fields.into_iter())) +} + +/// `current_group` needs to contain at least one element. +/// The group needs to be of the same field name and type. +fn merge_same_field_group( + current_group: &mut Vec, +) -> ListFieldsEntryResponse { + // Make sure all fields have the same name and type in current_group + assert!(!current_group.is_empty()); + assert!(current_group + .windows(2) + .all(|window| window[0].field_name == window[1].field_name + && window[0].field_type == window[1].field_type)); + + if current_group.len() == 1 { + return current_group.pop().unwrap(); + } + let metadata = ¤t_group.last().unwrap(); + let searchable = current_group.iter().any(|entry| entry.searchable); + let aggregatable = current_group.iter().any(|entry| entry.aggregatable); + let field_name = metadata.field_name.to_string(); + let field_type = metadata.field_type; + let mut non_searchable_index_ids = if searchable { + // We need to combine the non_searchable_index_ids + index_ids where searchable is set to + // false (as they are all non_searchable) + current_group + .iter() + .flat_map(|entry| { + if !entry.searchable { + entry.index_ids.iter().cloned() + } else { + entry.non_searchable_index_ids.iter().cloned() + } + }) + .collect() + } else { + // Not searchable => no need to list all the indices + Vec::new() + }; + non_searchable_index_ids.sort(); + non_searchable_index_ids.dedup(); + + let mut non_aggregatable_index_ids = if aggregatable { + // We need to combine the non_aggregatable_index_ids + index_ids where aggregatable is set + // to false (as they are all non_aggregatable) + current_group + .iter() + .flat_map(|entry| { + if !entry.aggregatable { + entry.index_ids.iter().cloned() + } else { + entry.non_aggregatable_index_ids.iter().cloned() + } + }) + .collect() + } else { + // Not aggregatable => no need to list all the indices + Vec::new() + }; + non_aggregatable_index_ids.sort(); + non_aggregatable_index_ids.dedup(); + let mut index_ids: Vec = current_group + .drain(..) + .flat_map(|entry| entry.index_ids.into_iter()) + .collect(); + index_ids.sort(); + index_ids.dedup(); + ListFieldsEntryResponse { + field_name, + field_type, + searchable, + aggregatable, + non_searchable_index_ids, + non_aggregatable_index_ids, + index_ids, + } +} + +/// Merge iterators of ListFieldsEntryResponse into a Vec. +/// +/// The iterators need to be sorted by (field_name, fieldtype) +fn merge_leaf_list_fields( + iterators: Vec>, +) -> crate::Result> { + let merged = iterators + .into_iter() + .kmerge_by(|a, b| (&a.field_name, a.field_type) <= (&b.field_name, b.field_type)); + let mut responses = Vec::new(); + + let mut current_group: Vec = Vec::new(); + // Build ListFieldsEntryResponse from current group + let flush_group = |responses: &mut Vec<_>, current_group: &mut Vec| { + let entry = merge_same_field_group(current_group); + responses.push(entry); + current_group.clear(); + }; + + for entry in merged { + if let Some(last) = current_group.last() { + if last.field_name != entry.field_name || last.field_type != entry.field_type { + flush_group(&mut responses, &mut current_group); + } + } + current_group.push(entry); + } + if !current_group.is_empty() { + flush_group(&mut responses, &mut current_group); + } + + Ok(responses) +} + +fn matches_any_pattern(field_name: &str, field_patterns: &[String]) -> bool { + if field_patterns.is_empty() { + return true; + } + field_patterns + .iter() + .any(|pattern| matches_pattern(pattern, field_name)) +} + +/// Supports up to 1 wildcard. +fn matches_pattern(field_pattern: &str, field_name: &str) -> bool { + match field_pattern.find('*') { + None => field_pattern == field_name, + Some(index) => { + if index == 0 { + // "*field" + field_name.ends_with(&field_pattern[1..]) + } else if index == field_pattern.len() - 1 { + // "field*" + field_name.starts_with(&field_pattern[..index]) + } else { + // "fi*eld" + field_name.starts_with(&field_pattern[..index]) + && field_name.ends_with(&field_pattern[index + 1..]) + } + } + } +} +/// +pub async fn leaf_list_fields( + index_id: String, + index_storage: Arc, + searcher_context: &SearcherContext, + split_ids: &[SplitIdAndFooterOffsets], + field_patterns: &[String], +) -> crate::Result { + let mut iter_per_split = Vec::new(); + let get_field_futures: Vec<_> = split_ids + .iter() + .map(|split_id| { + get_fields_from_split( + searcher_context, + index_id.to_string(), + split_id, + index_storage.clone(), + ) + }) + .collect(); + let result = future::join_all(get_field_futures).await; + // This only works well, if the field data is in a local cache. + for fields in result { + let list_fields_iter = match fields { + Ok(fields) => fields, + Err(_err) => Box::new(std::iter::empty()), + }; + let list_fields_iter = list_fields_iter + .map(|mut entry| { + // We don't want to leak the _dynamic hack to the user API. + if entry.field_name.starts_with("_dynamic.") { + entry.field_name.replace_range(.."_dynamic.".len(), ""); + } + entry + }) + .filter(|field| matches_any_pattern(&field.field_name, field_patterns)); + iter_per_split.push(list_fields_iter); + } + let fields = merge_leaf_list_fields(iter_per_split)?; + Ok(ListFieldsResponse { fields }) +} + +/// Index metas needed for executing a leaf search request. +#[derive(Clone, Debug)] +pub struct IndexMetasForLeafSearch { + /// Index id. + pub index_id: String, + /// Index URI. + pub index_uri: Uri, +} + +/// Performs a distributed list fields request. +/// 1. Sends leaf request over gRPC to multiple leaf nodes. +/// 2. Merges the search results. +/// 3. Builds the response and returns. +pub async fn root_list_fields( + list_fields_req: ListFieldsRequest, + cluster_client: &ClusterClient, + mut metastore: MetastoreServiceClient, +) -> crate::Result { + let list_indexes_metadata_request = if list_fields_req.index_ids.is_empty() { + ListIndexesMetadataRequest::all() + } else { + ListIndexesMetadataRequest { + // TODO: Check index id pattern + index_id_patterns: list_fields_req.index_ids.clone(), + } + }; + + // Get the index ids from the request + let indexes_metadatas = metastore + .clone() + .list_indexes_metadata(list_indexes_metadata_request) + .await? + .deserialize_indexes_metadata()?; + if indexes_metadatas.is_empty() { + return Err(SearchError::IndexesNotFound { + index_ids: list_fields_req.index_ids.clone(), + }); + } + let index_uid_to_index_meta: HashMap = indexes_metadatas + .iter() + .map(|index_metadata| { + let index_metadata_for_leaf_search = IndexMetasForLeafSearch { + index_uri: index_metadata.index_uri().clone(), + index_id: index_metadata.index_config.index_id.to_string(), + }; + + ( + index_metadata.index_uid.clone(), + index_metadata_for_leaf_search, + ) + }) + .collect(); + let index_uids: Vec = indexes_metadatas + .into_iter() + .map(|index_metadata| index_metadata.index_uid) + .collect(); + + let split_metadatas: Vec = + list_relevant_splits(index_uids, None, None, None, &mut metastore).await?; + + // Build requests for each index id + let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); + let assigned_leaf_search_jobs = cluster_client + .search_job_placer + .assign_jobs(jobs, &HashSet::default()) + .await?; + let mut leaf_request_tasks = Vec::new(); + // For each node, forward to a node with an affinity for that index id. + for (client, client_jobs) in assigned_leaf_search_jobs { + let leaf_requests = + jobs_to_leaf_requests(&list_fields_req, &index_uid_to_index_meta, client_jobs)?; + for leaf_request in leaf_requests { + leaf_request_tasks.push(cluster_client.leaf_list_fields(leaf_request, client.clone())); + } + } + let leaf_search_responses: Vec = try_join_all(leaf_request_tasks).await?; + let fields = merge_leaf_list_fields( + leaf_search_responses + .into_iter() + .map(|resp| resp.fields.into_iter()) + .collect_vec(), + )?; + Ok(ListFieldsResponse { fields }) +} + +/// Builds a list of [`LeafListFieldsRequest`], one per index, from a list of [`SearchJob`]. +pub fn jobs_to_leaf_requests( + request: &ListFieldsRequest, + index_uid_to_id: &HashMap, + jobs: Vec, +) -> crate::Result> { + let search_request_for_leaf = request.clone(); + let mut leaf_search_requests = Vec::new(); + // Group jobs by index uid. + for (index_uid, job_group) in &jobs.into_iter().group_by(|job| job.index_uid.clone()) { + let index_meta = index_uid_to_id.get(&index_uid).ok_or_else(|| { + SearchError::Internal(format!( + "received list fields job for an unknown index {index_uid}. it should never happen" + )) + })?; + let leaf_search_request = LeafListFieldsRequest { + index_id: index_meta.index_id.to_string(), + index_uri: index_meta.index_uri.to_string(), + fields: search_request_for_leaf.fields.clone(), + split_offsets: job_group.into_iter().map(|job| job.offsets).collect(), + }; + leaf_search_requests.push(leaf_search_request); + } + Ok(leaf_search_requests) +} + +#[cfg(test)] +mod tests { + use quickwit_proto::search::{ListFieldType, ListFieldsEntryResponse}; + + use super::*; + + #[test] + fn test_pattern() { + assert!(matches_any_pattern("field", &["field".to_string()])); + assert!(matches_any_pattern("field", &["fi*eld".to_string()])); + assert!(matches_any_pattern("field", &["*field".to_string()])); + assert!(matches_any_pattern("field", &["field*".to_string()])); + + assert!(matches_any_pattern("field1", &["field*".to_string()])); + assert!(!matches_any_pattern("field1", &["*field".to_string()])); + assert!(!matches_any_pattern("field1", &["fi*eld".to_string()])); + assert!(!matches_any_pattern("field1", &["field".to_string()])); + + // 2.nd pattern matches + assert!(matches_any_pattern( + "field", + &["a".to_string(), "field".to_string()] + )); + assert!(matches_any_pattern( + "field", + &["a".to_string(), "fi*eld".to_string()] + )); + assert!(matches_any_pattern( + "field", + &["a".to_string(), "*field".to_string()] + )); + assert!(matches_any_pattern( + "field", + &["a".to_string(), "field*".to_string()] + )); + } + + #[test] + fn merge_leaf_list_fields_identical_test() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone()].into_iter(), + vec![entry2.clone()].into_iter(), + ]) + .unwrap(); + assert_eq!(resp, vec![entry1]); + } + #[test] + fn merge_leaf_list_fields_different_test() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field2".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone()].into_iter(), + vec![entry2.clone()].into_iter(), + ]) + .unwrap(); + assert_eq!(resp, vec![entry1, entry2]); + } + #[test] + fn merge_leaf_list_fields_non_searchable_test() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: false, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index2".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone()].into_iter(), + vec![entry2.clone()].into_iter(), + ]) + .unwrap(); + let expected = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: vec!["index2".to_string()], + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string(), "index2".to_string()], + }; + assert_eq!(resp, vec![expected]); + } + #[test] + fn merge_leaf_list_fields_non_aggregatable_test() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: false, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index2".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone()].into_iter(), + vec![entry2.clone()].into_iter(), + ]) + .unwrap(); + let expected = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: vec!["index2".to_string()], + index_ids: vec!["index1".to_string(), "index2".to_string()], + }; + assert_eq!(resp, vec![expected]); + } + #[test] + fn merge_leaf_list_fields_mixed_types1() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry3 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::U64 as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone(), entry2.clone()].into_iter(), + vec![entry3.clone()].into_iter(), + ]) + .unwrap(); + assert_eq!(resp, vec![entry1.clone(), entry3.clone()]); + } + #[test] + fn merge_leaf_list_fields_mixed_types2() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry3 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::U64 as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone(), entry3.clone()].into_iter(), + vec![entry2.clone()].into_iter(), + ]) + .unwrap(); + assert_eq!(resp, vec![entry1.clone(), entry3.clone()]); + } + #[test] + fn merge_leaf_list_fields_multiple_field_names() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let entry3 = ListFieldsEntryResponse { + field_name: "field2".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index1".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone(), entry3.clone()].into_iter(), + vec![entry2.clone()].into_iter(), + ]) + .unwrap(); + assert_eq!(resp, vec![entry1.clone(), entry3.clone()]); + } + #[test] + fn merge_leaf_list_fields_non_aggregatable_list_test() { + let entry1 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: vec!["index1".to_string()], + non_aggregatable_index_ids: Vec::new(), + index_ids: vec![ + "index1".to_string(), + "index2".to_string(), + "index3".to_string(), + ], + }; + let entry2 = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: false, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index4".to_string()], + }; + let resp = merge_leaf_list_fields(vec![ + vec![entry1.clone()].into_iter(), + vec![entry2.clone()].into_iter(), + ]) + .unwrap(); + let expected = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + non_searchable_index_ids: vec!["index1".to_string(), "index4".to_string()], + non_aggregatable_index_ids: Vec::new(), + index_ids: vec![ + "index1".to_string(), + "index2".to_string(), + "index3".to_string(), + "index4".to_string(), + ], + }; + assert_eq!(resp, vec![expected]); + } +} diff --git a/quickwit/quickwit-search/src/list_fields_cache.rs b/quickwit/quickwit-search/src/list_fields_cache.rs new file mode 100644 index 00000000000..b5e5b8d170d --- /dev/null +++ b/quickwit/quickwit-search/src/list_fields_cache.rs @@ -0,0 +1,119 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use quickwit_proto::search::{ + deserialize_split_fields, serialize_split_fields, ListFields, SplitIdAndFooterOffsets, +}; +use quickwit_storage::{MemorySizedCache, OwnedBytes}; + +/// A cache to memoize `leaf_search_single_split` results. +pub struct ListFieldsCache { + content: MemorySizedCache, +} + +// TODO For now this simply caches the whole ListFieldsEntryResponse. We could +// be more clever and cache aggregates instead. +impl ListFieldsCache { + pub fn new(capacity: usize) -> ListFieldsCache { + ListFieldsCache { + content: MemorySizedCache::with_capacity_in_bytes( + capacity, + &quickwit_storage::STORAGE_METRICS.partial_request_cache, + ), + } + } + pub fn get(&self, split_info: SplitIdAndFooterOffsets) -> Option { + let key = CacheKey::from_split_meta(split_info); + let encoded_result = self.content.get(&key)?; + // this should never fail + deserialize_split_fields(encoded_result).ok() + } + + pub fn put(&self, split_info: SplitIdAndFooterOffsets, list_fields: ListFields) { + let key = CacheKey::from_split_meta(split_info); + + let encoded_result = serialize_split_fields(list_fields); + self.content.put(key, OwnedBytes::new(encoded_result)); + } +} + +/// A key inside a [`LeafSearchCache`]. +#[derive(Debug, Hash, PartialEq, Eq)] +struct CacheKey { + /// The split this entry refers to + split_id: String, +} + +impl CacheKey { + fn from_split_meta(split_info: SplitIdAndFooterOffsets) -> Self { + CacheKey { + split_id: split_info.split_id, + } + } +} + +#[cfg(test)] +mod tests { + use quickwit_proto::search::{ + ListFieldType, ListFields, ListFieldsEntryResponse, SplitIdAndFooterOffsets, + }; + + use super::ListFieldsCache; + + #[test] + fn test_list_fields_cache() { + let cache = ListFieldsCache::new(64_000_000); + + let split_1 = SplitIdAndFooterOffsets { + split_id: "split_1".to_string(), + split_footer_start: 0, + split_footer_end: 100, + timestamp_start: None, + timestamp_end: None, + }; + + let split_2 = SplitIdAndFooterOffsets { + split_id: "split_2".to_string(), + split_footer_start: 0, + split_footer_end: 100, + timestamp_start: None, + timestamp_end: None, + }; + + let result = ListFieldsEntryResponse { + field_name: "field1".to_string(), + field_type: ListFieldType::Str as i32, + searchable: false, + aggregatable: true, + non_searchable_index_ids: Vec::new(), + non_aggregatable_index_ids: Vec::new(), + index_ids: vec!["index4".to_string()], + }; + + assert!(cache.get(split_1.clone()).is_none()); + + let list_fields = ListFields { + fields: vec![result.clone()], + }; + + cache.put(split_1.clone(), list_fields.clone()); + assert_eq!(cache.get(split_1.clone()).unwrap(), list_fields); + assert!(cache.get(split_2).is_none()); + } +} diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index d4e23cc2396..9cbf017873b 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -72,9 +72,11 @@ const MAX_SCROLL_TTL: Duration = Duration::from_secs(DELETION_GRACE_PERIOD.as_se /// SearchJob to be assigned to search clients by the [`SearchJobPlacer`]. #[derive(Debug, Clone, PartialEq)] pub struct SearchJob { - index_uid: IndexUid, + /// The index UID. + pub index_uid: IndexUid, cost: usize, - offsets: SplitIdAndFooterOffsets, + /// The split ID and footer offsets of the split. + pub offsets: SplitIdAndFooterOffsets, } impl SearchJob { diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index fcb2352de33..ae9b494f5a3 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -29,11 +29,11 @@ use quickwit_config::SearcherConfig; use quickwit_doc_mapper::DocMapper; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::search::{ - FetchDocsRequest, FetchDocsResponse, GetKvRequest, Hit, LeafListTermsRequest, - LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, - LeafSearchStreamResponse, ListTermsRequest, ListTermsResponse, PutKvRequest, - ReportSplitsRequest, ReportSplitsResponse, ScrollRequest, SearchRequest, SearchResponse, - SearchStreamRequest, SnippetRequest, + FetchDocsRequest, FetchDocsResponse, GetKvRequest, Hit, LeafListFieldsRequest, + LeafListTermsRequest, LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, + LeafSearchStreamRequest, LeafSearchStreamResponse, ListFieldsRequest, ListFieldsResponse, + ListTermsRequest, ListTermsResponse, PutKvRequest, ReportSplitsRequest, ReportSplitsResponse, + ScrollRequest, SearchRequest, SearchResponse, SearchStreamRequest, SnippetRequest, }; use quickwit_storage::{ MemorySizedCache, QuickwitCache, SplitCache, StorageCache, StorageResolver, @@ -43,6 +43,8 @@ use tokio::sync::Semaphore; use tokio_stream::wrappers::UnboundedReceiverStream; use crate::leaf_cache::LeafSearchCache; +use crate::list_fields::{leaf_list_fields, root_list_fields}; +use crate::list_fields_cache::ListFieldsCache; use crate::root::fetch_docs_phase; use crate::scroll_context::{MiniKV, ScrollContext, ScrollKeyAndStartOffset}; use crate::search_stream::{leaf_search_stream, root_search_stream}; @@ -136,6 +138,18 @@ pub trait SearchService: 'static + Send + Sync { /// Indexers call report_splits to inform searchers node about the presence of a split, which /// would then be considered as a candidate for the searcher split cache. async fn report_splits(&self, report_splits: ReportSplitsRequest) -> ReportSplitsResponse; + + /// Return the list of fields for a given or multiple indices. + async fn root_list_fields( + &self, + list_fields: ListFieldsRequest, + ) -> crate::Result; + + /// Return the list of fields for one index. + async fn leaf_list_fields( + &self, + list_fields: LeafListFieldsRequest, + ) -> crate::Result; } impl SearchServiceImpl { @@ -314,6 +328,36 @@ impl SearchService for SearchServiceImpl { } ReportSplitsResponse {} } + + async fn root_list_fields( + &self, + list_fields_req: ListFieldsRequest, + ) -> crate::Result { + root_list_fields( + list_fields_req, + &self.cluster_client, + self.metastore.clone(), + ) + .await + } + + async fn leaf_list_fields( + &self, + list_fields_req: LeafListFieldsRequest, + ) -> crate::Result { + let index_uri = Uri::from_str(&list_fields_req.index_uri)?; + let storage = self.storage_resolver.resolve(&index_uri).await?; + let index_id = list_fields_req.index_id; + let split_ids = list_fields_req.split_offsets; + leaf_list_fields( + index_id, + storage, + &self.searcher_context, + &split_ids[..], + &list_fields_req.fields, + ) + .await + } } pub(crate) async fn scroll( @@ -404,6 +448,8 @@ pub struct SearcherContext { pub leaf_search_cache: LeafSearchCache, /// Search split cache. `None` if no split cache is configured. pub split_cache_opt: Option>, + /// List fields cache. Caches the list fields response for a given split. + pub list_fields_cache: ListFieldsCache, } impl std::fmt::Debug for SearcherContext { @@ -442,6 +488,8 @@ impl SearcherContext { let storage_long_term_cache = Arc::new(QuickwitCache::new(fast_field_cache_capacity)); let leaf_search_cache = LeafSearchCache::new(searcher_config.partial_request_cache_capacity.as_u64() as usize); + let list_fields_cache = + ListFieldsCache::new(searcher_config.partial_request_cache_capacity.as_u64() as usize); Self { searcher_config, @@ -450,6 +498,7 @@ impl SearcherContext { split_footer_cache: global_split_footer_cache, split_stream_semaphore, leaf_search_cache, + list_fields_cache, split_cache_opt, } } diff --git a/quickwit/quickwit-serve/src/elastic_search_api/filter.rs b/quickwit/quickwit-serve/src/elastic_search_api/filter.rs index 3208d7e3e8f..b0f4079809e 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/filter.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/filter.rs @@ -23,7 +23,9 @@ use serde::de::DeserializeOwned; use warp::reject::LengthRequired; use warp::{Filter, Rejection}; -use super::model::MultiSearchQueryParams; +use super::model::{ + FieldCapabilityQueryParams, FieldCapabilityRequestBody, MultiSearchQueryParams, +}; use crate::elastic_search_api::model::{ ElasticIngestOptions, ScrollQueryParams, SearchBody, SearchQueryParams, }; @@ -100,6 +102,22 @@ fn json_or_empty( .unify() } +#[utoipa::path(get, tag = "metadata", path = "/{index}/_field_caps")] +pub(crate) fn elastic_index_field_capabilities_filter() -> impl Filter< + Extract = ( + Vec, + FieldCapabilityQueryParams, + FieldCapabilityRequestBody, + ), + Error = Rejection, +> + Clone { + warp::path!("_elastic" / String / "_field_caps") + .and_then(extract_index_id_patterns) + .and(warp::get().or(warp::post()).unify()) + .and(serde_qs::warp::query(serde_qs::Config::default())) + .and(json_or_empty()) +} + #[utoipa::path(get, tag = "Search", path = "/{index}/_search")] pub(crate) fn elastic_index_search_filter( ) -> impl Filter, SearchQueryParams, SearchBody), Error = Rejection> + Clone diff --git a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs index ab5fecde5da..c514e8ed5bc 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/mod.rs @@ -37,6 +37,7 @@ use rest_handler::{ use serde::{Deserialize, Serialize}; use warp::{Filter, Rejection}; +use self::rest_handler::es_compat_index_field_capabilities_handler; use crate::elastic_search_api::model::ElasticSearchError; use crate::json_api_response::JsonApiResponse; use crate::{BodyFormat, BuildInfo}; @@ -52,6 +53,9 @@ pub fn elastic_api_handlers( ) -> impl Filter + Clone { es_compat_cluster_info_handler(node_config, BuildInfo::get()) .or(es_compat_search_handler(search_service.clone())) + .or(es_compat_index_field_capabilities_handler( + search_service.clone(), + )) .or(es_compat_index_search_handler(search_service.clone())) .or(es_compat_scroll_handler(search_service.clone())) .or(es_compat_index_multi_search_handler(search_service)) diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs new file mode 100644 index 00000000000..03edfdd9a99 --- /dev/null +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/field_capability.rs @@ -0,0 +1,163 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashMap; + +use quickwit_proto::search::{ListFieldType, ListFieldsEntryResponse, ListFieldsResponse}; +use serde::{Deserialize, Serialize}; + +use super::search_query_params::*; +use super::ElasticSearchError; +use crate::simple_list::{from_simple_list, to_simple_list}; + +#[serde_with::skip_serializing_none] +#[derive(Default, Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct FieldCapabilityQueryParams { + #[serde(default)] + pub allow_no_indices: Option, + #[serde(serialize_with = "to_simple_list")] + #[serde(deserialize_with = "from_simple_list")] + #[serde(default)] + pub expand_wildcards: Option>, + #[serde(serialize_with = "to_simple_list")] + #[serde(deserialize_with = "from_simple_list")] + #[serde(default)] + pub fields: Option>, +} + +#[derive(Debug, Default, Clone, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct FieldCapabilityRequestBody { + #[serde(default)] + // unsupported currently + pub index_filter: serde_json::Value, + #[serde(default)] + // unsupported currently + pub runtime_mappings: serde_json::Value, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct FieldCapabilityResponse { + indices: Vec, + fields: HashMap, +} +#[derive(Serialize, Deserialize, Debug, Default)] +struct FieldCapabilityFieldTypesResponse { + #[serde(skip_serializing_if = "Option::is_none")] + long: Option, + #[serde(skip_serializing_if = "Option::is_none")] + keyword: Option, + #[serde(skip_serializing_if = "Option::is_none")] + text: Option, + #[serde(skip_serializing_if = "Option::is_none")] + date_nanos: Option, + #[serde(skip_serializing_if = "Option::is_none")] + binary: Option, + #[serde(skip_serializing_if = "Option::is_none")] + double: Option, // Do we need float ? + #[serde(skip_serializing_if = "Option::is_none")] + boolean: Option, + #[serde(skip_serializing_if = "Option::is_none")] + ip: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +struct FieldCapabilityEntryResponse { + metadata_field: bool, // Always false + searchable: bool, + aggregatable: bool, + indices: Vec, // [ "index1", "index2" ], + #[serde(skip_serializing_if = "Vec::is_empty")] + non_aggregatable_indices: Vec, // [ "index1" ] + #[serde(skip_serializing_if = "Vec::is_empty")] + non_searchable_indices: Vec, // [ "index1" ] +} +impl FieldCapabilityEntryResponse { + fn from_list_field_entry_response(entry: ListFieldsEntryResponse) -> Self { + Self { + metadata_field: false, + searchable: entry.searchable, + aggregatable: entry.aggregatable, + indices: entry.index_ids.clone(), + non_aggregatable_indices: entry.non_aggregatable_index_ids, + non_searchable_indices: entry.non_searchable_index_ids, + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct FieldCapabilityEntry { + searchable: bool, + aggregatable: bool, +} + +pub fn convert_to_es_field_capabilities_response( + resp: ListFieldsResponse, +) -> FieldCapabilityResponse { + let mut indices = resp + .fields + .iter() + .flat_map(|entry| entry.index_ids.iter().cloned()) + .collect::>(); + indices.sort(); + indices.dedup(); + + let mut fields: HashMap = HashMap::new(); + for list_field_resp in resp.fields { + let entry = fields + .entry(list_field_resp.field_name.to_string()) + .or_default(); + + let field_type = ListFieldType::from_i32(list_field_resp.field_type).unwrap(); + let add_entry = + FieldCapabilityEntryResponse::from_list_field_entry_response(list_field_resp); + match field_type { + ListFieldType::Str => { + if add_entry.aggregatable { + entry.keyword = Some(add_entry.clone()); + } + if add_entry.searchable { + entry.text = Some(add_entry); + } + } + ListFieldType::U64 => entry.long = Some(add_entry), + ListFieldType::I64 => entry.long = Some(add_entry), + ListFieldType::F64 => entry.double = Some(add_entry), + ListFieldType::Bool => entry.boolean = Some(add_entry), + ListFieldType::Date => entry.date_nanos = Some(add_entry), + ListFieldType::Facet => continue, + ListFieldType::Json => continue, + ListFieldType::Bytes => entry.binary = Some(add_entry), // Is this mapping correct? + ListFieldType::IpAddr => entry.ip = Some(add_entry), + } + } + FieldCapabilityResponse { indices, fields } +} + +pub fn build_list_field_request_for_es_api( + index_id_patterns: Vec, + search_params: FieldCapabilityQueryParams, + _search_body: FieldCapabilityRequestBody, +) -> Result { + Ok(quickwit_proto::search::ListFieldsRequest { + index_ids: index_id_patterns, + fields: search_params.fields.unwrap_or_default(), + }) +} diff --git a/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs b/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs index 8a87d78b99b..f10263eaa25 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs @@ -20,6 +20,7 @@ mod bulk_body; mod bulk_query_params; mod error; +mod field_capability; mod multi_search; mod scroll; mod search_body; @@ -28,6 +29,10 @@ mod search_query_params; pub use bulk_body::{BulkAction, BulkActionMeta}; pub use bulk_query_params::{ElasticIngestOptions, ElasticRefresh}; pub use error::ElasticSearchError; +pub use field_capability::{ + build_list_field_request_for_es_api, convert_to_es_field_capabilities_response, + FieldCapabilityQueryParams, FieldCapabilityRequestBody, FieldCapabilityResponse, +}; pub use multi_search::{ MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse, }; diff --git a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs index 490e4330c8a..0b1d4c2ab5d 100644 --- a/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs @@ -31,7 +31,8 @@ use itertools::Itertools; use quickwit_common::truncate_str; use quickwit_config::{validate_index_id_pattern, NodeConfig}; use quickwit_proto::search::{ - CountHits, PartialHit, ScrollRequest, SearchResponse, SortByValue, SortDatetimeFormat, + CountHits, ListFieldsResponse, PartialHit, ScrollRequest, SearchResponse, SortByValue, + SortDatetimeFormat, }; use quickwit_proto::ServiceErrorCode; use quickwit_query::query_ast::{QueryAst, UserInputQuery}; @@ -41,11 +42,14 @@ use serde_json::json; use warp::{Filter, Rejection}; use super::filter::{ - elastic_cluster_info_filter, elastic_index_search_filter, elastic_multi_search_filter, - elastic_scroll_filter, elastic_search_filter, + elastic_cluster_info_filter, elastic_index_field_capabilities_filter, + elastic_index_search_filter, elastic_multi_search_filter, elastic_scroll_filter, + elastic_search_filter, }; use super::model::{ - ElasticSearchError, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, + build_list_field_request_for_es_api, convert_to_es_field_capabilities_response, + ElasticSearchError, FieldCapabilityQueryParams, FieldCapabilityRequestBody, + FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams, }; use super::{make_elastic_api_response, TrackTotalHits}; @@ -93,6 +97,17 @@ pub fn es_compat_search_handler( }) } +/// GET or POST _elastic/{index}/_field_caps +/// TODO: add route handling for _elastic/_field_caps +pub fn es_compat_index_field_capabilities_handler( + search_service: Arc, +) -> impl Filter + Clone { + elastic_index_field_capabilities_filter() + .and(with_arg(search_service)) + .then(es_compat_index_field_capabilities) + .map(|result| make_elastic_api_response(result, BodyFormat::default())) +} + /// GET or POST _elastic/{index}/_search pub fn es_compat_index_search_handler( search_service: Arc, @@ -288,6 +303,21 @@ async fn es_compat_index_search( Ok(search_response_rest) } +async fn es_compat_index_field_capabilities( + index_id_patterns: Vec, + search_params: FieldCapabilityQueryParams, + search_body: FieldCapabilityRequestBody, + search_service: Arc, +) -> Result { + let search_request = + build_list_field_request_for_es_api(index_id_patterns, search_params, search_body)?; + let search_response: ListFieldsResponse = + search_service.root_list_fields(search_request).await?; + let search_response_rest: FieldCapabilityResponse = + convert_to_es_field_capabilities_response(search_response); + Ok(search_response_rest) +} + fn convert_hit(hit: quickwit_proto::search::Hit, append_shard_doc: bool) -> ElasticHit { let fields: BTreeMap = serde_json::from_str(&hit.json).unwrap_or_default(); diff --git a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs index 1bb2607f567..45227a5c55c 100644 --- a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs +++ b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs @@ -23,8 +23,9 @@ use async_trait::async_trait; use futures::TryStreamExt; use quickwit_proto::error::convert_to_grpc_result; use quickwit_proto::search::{ - search_service_server as grpc, GetKvRequest, GetKvResponse, LeafSearchStreamRequest, - LeafSearchStreamResponse, ReportSplitsRequest, ReportSplitsResponse, + search_service_server as grpc, GetKvRequest, GetKvResponse, LeafListFieldsRequest, + LeafSearchStreamRequest, LeafSearchStreamResponse, ListFieldsRequest, ListFieldsResponse, + ReportSplitsRequest, ReportSplitsResponse, }; use quickwit_proto::{set_parent_span_from_request_metadata, tonic, ServiceError}; use quickwit_search::SearchService; @@ -163,4 +164,23 @@ impl grpc::SearchService for GrpcSearchAdapter { self.0.report_splits(get_search_after_context_request).await; Ok(tonic::Response::new(ReportSplitsResponse {})) } + + #[instrument(skip(self, request))] + async fn list_fields( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + set_parent_span_from_request_metadata(request.metadata()); + let resp = self.0.root_list_fields(request.into_inner()).await; + convert_to_grpc_result(resp) + } + #[instrument(skip(self, request))] + async fn leaf_list_fields( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + set_parent_span_from_request_metadata(request.metadata()); + let resp = self.0.leaf_list_fields(request.into_inner()).await; + convert_to_grpc_result(resp) + } } diff --git a/quickwit/quickwit-storage/src/bundle_storage.rs b/quickwit/quickwit-storage/src/bundle_storage.rs index 13dfbd9a32d..4b991cf1bcf 100644 --- a/quickwit/quickwit-storage/src/bundle_storage.rs +++ b/quickwit/quickwit-storage/src/bundle_storage.rs @@ -45,6 +45,7 @@ use crate::{ /// with some metadata pub struct BundleStorage { storage: Arc, + /// The file path of the bundle in the storage. bundle_filepath: PathBuf, metadata: BundleStorageFileOffsets, } diff --git a/quickwit/rest-api-tests/run_tests.py b/quickwit/rest-api-tests/run_tests.py index 471000c6a46..be4195baf4a 100755 --- a/quickwit/rest-api-tests/run_tests.py +++ b/quickwit/rest-api-tests/run_tests.py @@ -65,7 +65,7 @@ def run_request_with_retry(run_req, expected_status_code=None, num_retries=10, w if try_number < num_retries: print("Retrying...") time.sleep(wait_time) - raise Exception("Wrong status code. Got %s, expected %s" % (r.status_code, expected_status_code)) + raise Exception("Wrong status code. Got %s, expected %s, url %s" % (r.status_code, expected_status_code, run_req().url)) def resolve_previous_result(c, previous_result): diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml new file mode 100644 index 00000000000..c006adb3a08 --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/0001-field-capabilities.yaml @@ -0,0 +1,267 @@ +# Test _field_caps API +method: [GET] +engines: + - quickwit +endpoint: fieldcaps/_field_caps +expected: + indices: + - fieldcaps + fields: + nested.response: + long: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + nested.name: + keyword: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + text: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + host: + ip: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + mixed: # This is a little weird case (values [5, -5.5]), since coercion happens only on the columnar side. That's why `long` is not aggregatable. + long: + metadata_field: false + searchable: true + aggregatable: false + indices: + - fieldcaps + double: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + date: + date_nanos: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + _field_presence: + long: + metadata_field: false + searchable: true + aggregatable: false + indices: + - fieldcaps + response: + long: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + id: + long: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + double: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + name: + keyword: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + text: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + tags: + keyword: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + text: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps +--- +# Test fields parameter with `.dynamic` suffix +engines: + - quickwit +method: [GET] +engines: + - quickwit +endpoint: fieldcaps/_field_caps?fields=nested.response,nested.name +expected: + indices: + - fieldcaps + fields: + nested.response: + long: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + nested.name: + keyword: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + text: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps +--- +# Test fields parameter with wildcard +method: [GET] +engines: + - quickwit +endpoint: fieldcaps/_field_caps?fields=nest* +expected: + indices: + - fieldcaps + fields: + nested.response: + long: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + nested.name: + keyword: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + text: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps +--- +# Test fields parameter with wildcard +method: [GET] +engines: + - quickwit +endpoint: fieldcaps/_field_caps?fields=nest* +expected: + indices: + - fieldcaps + fields: + nested.response: + long: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + nested.name: + keyword: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps + text: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps +--- +# Test fields parameter with wildcard +method: [GET] +engines: + - quickwit +endpoint: fieldcaps/_field_caps?fields=nested.*ponse +expected: + indices: + - fieldcaps + fields: + nested.response: + long: + metadata_field: false + searchable: true + aggregatable: true + indices: + - fieldcaps +--- +# Compare with elastic search +method: [GET] +engines: + - quickwit + - elasticsearch +endpoint: fieldcaps/_field_caps?fields=nested.*ponse +expected: + indices: + - fieldcaps + fields: + nested.response: + long: + metadata_field: false + searchable: true + aggregatable: true +--- +# Compare ip field with elastic search +method: [GET] +engines: + - quickwit + - elasticsearch +endpoint: fieldcaps/_field_caps?fields=host +expected: + indices: + - fieldcaps + fields: + host: + ip: + metadata_field: false + searchable: true + aggregatable: true +--- +# Compare ip field with elastic search +method: [GET] +engines: + - quickwit + - elasticsearch +endpoint: fieldcaps/_field_caps?fields=date +expected: + indices: + - fieldcaps + fields: + date: + date_nanos: + metadata_field: false + searchable: true + aggregatable: true + diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_ctx.elasticsearch.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_ctx.elasticsearch.yaml new file mode 100644 index 00000000000..d83aa5b5e8d --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_ctx.elasticsearch.yaml @@ -0,0 +1,2 @@ +api_root: http://localhost:9200/ + diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_ctx.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_ctx.quickwit.yaml new file mode 100644 index 00000000000..e0028a1af4b --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_ctx.quickwit.yaml @@ -0,0 +1 @@ +api_root: "http://localhost:7280/api/v1/_elastic/" diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_ctx.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_ctx.yaml new file mode 100644 index 00000000000..09cb0081b4f --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_ctx.yaml @@ -0,0 +1,3 @@ +method: [GET, POST] +headers: + Content-Type: application/json diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.elasticsearch.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.elasticsearch.yaml new file mode 100644 index 00000000000..200e60cb3c0 --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.elasticsearch.yaml @@ -0,0 +1,34 @@ +# Delete possibly remaining index +method: DELETE +endpoint: fieldcaps +status_code: null +--- +# Create index 1 +method: PUT +endpoint: fieldcaps +json: { + "mappings": { + "properties": { + "host": { + "type": "ip", + "store": true + }, + "date": { + "type": "date_nanos" + }, + } + } +} +--- +# Ingest documents in fieldcaps +method: POST +endpoint: _bulk +params: + refresh: "true" +headers: {"Content-Type": "application/json"} +ndjson: + - "index": { "_index": "fieldcaps" } + - {"name": "Fritz", "response": 30, "id": 5, "host": "192.168.0.1", "tags": ["nice", "cool"]} + - "index": { "_index": "fieldcaps" } + - {"nested": {"name": "Fritz", "response": 30}, "date": "2015-01-11T12:10:30Z", "host": "192.168.0.11", "tags": ["nice"]} + diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml new file mode 100644 index 00000000000..0af4612d051 --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_setup.quickwit.yaml @@ -0,0 +1,59 @@ +# Delete possibly remaining index +method: DELETE +api_root: http://localhost:7280/api/v1/ +endpoint: indexes/fieldcaps +status_code: null +--- +# Create index +method: POST +api_root: http://localhost:7280/api/v1/ +endpoint: indexes/ +json: + version: "0.6" + index_id: fieldcaps + doc_mapping: + mode: dynamic + dynamic_mapping: + tokenizer: default + fast: true + field_mappings: + - name: date + type: datetime + input_formats: + - rfc3339 + fast_precision: seconds + fast: true + - name: host + type: ip + fast: true +--- +# Ingest documents +method: POST +api_root: http://localhost:7280/api/v1/ +endpoint: fieldcaps/ingest +num_retries: 10 +params: + commit: force +ndjson: + - {"name": "Fritz", "response": 30, "id": 5, "host": "192.168.0.1", "tags": ["nice", "cool"]} + - {"nested": {"name": "Fritz", "response": 30}, "date": "2015-01-11T12:10:30Z", "host": "192.168.0.11", "tags": ["nice"]} +--- +# Ingest documents split #2 +method: POST +api_root: http://localhost:7280/api/v1/ +endpoint: fieldcaps/ingest +params: + commit: force +ndjson: + - {"id": -5.5} # cross split mixed type +--- +# Ingest documents split #3 +method: POST +api_root: http://localhost:7280/api/v1/ +endpoint: fieldcaps/ingest +params: + commit: force +ndjson: + - {"mixed": 5} # inter split mixed type + - {"mixed": -5.5} + diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.elasticsearch.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.elasticsearch.yaml new file mode 100644 index 00000000000..9d3c1723eed --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.elasticsearch.yaml @@ -0,0 +1,3 @@ +# # Delete index +method: DELETE +endpoint: fieldcaps diff --git a/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.quickwit.yaml new file mode 100644 index 00000000000..66330c067ad --- /dev/null +++ b/quickwit/rest-api-tests/scenarii/es_field_capabilities/_teardown.quickwit.yaml @@ -0,0 +1,4 @@ +# # Delete index +method: DELETE +api_root: http://localhost:7280/api/v1/ +endpoint: indexes/fieldcaps