Skip to content

Commit

Permalink
add list_fields api (#4242)
Browse files Browse the repository at this point in the history
* add list_fields api

* address comments

* add and connect elastic search api

* add schema fallback for old indices

* add rest API test

* add support for fields parameter

handle wildcards
handle _dynamic special case

* rename index_id to index_ids

* use protobuf

* use ListFieldEntryResponse, remove _dynamic from API

* parallel fetch

* cache results, improve error message

* move prost dependency

* remove schema fallback

* add elastic search compatibility test

* add ip type to es comparison

* add date_nanos type to es comparison
  • Loading branch information
PSeitz authored Dec 18, 2023
1 parent 35eba19 commit 13f34b5
Show file tree
Hide file tree
Showing 34 changed files with 2,056 additions and 303 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ bytes = { workspace = true }
criterion = { workspace = true, features = ["async_tokio"] }
mockall = { workspace = true }
proptest = { workspace = true }
prost = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true }
tempfile = { workspace = true }
prost = { workspace = true }

quickwit-actors = { workspace = true, features = ["testsuite"] }
quickwit-cluster = { workspace = true, features = ["testsuite"] }
Expand Down
105 changes: 99 additions & 6 deletions quickwit/quickwit-indexing/src/actors/packager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ use quickwit_common::temp_dir::TempDirectory;
use quickwit_directories::write_hotcache;
use quickwit_doc_mapper::tag_pruning::append_to_tag_set;
use quickwit_doc_mapper::NamedField;
use tantivy::schema::FieldType;
use tantivy::{InvertedIndexReader, ReloadPolicy, SegmentMeta};
use quickwit_proto::search::{
serialize_split_fields, ListFieldType, ListFields, ListFieldsEntryResponse,
};
use tantivy::schema::{FieldType, Type};
use tantivy::{FieldMetadata, InvertedIndexReader, ReloadPolicy, SegmentMeta};
use tokio::runtime::Handle;
use tracing::{debug, info, instrument, warn};

Expand All @@ -46,8 +49,7 @@ const MAX_VALUES_PER_TAG_FIELD: usize = if cfg!(any(test, feature = "testsuite")

use crate::actors::Uploader;
use crate::models::{
serialize_split_fields, EmptySplit, IndexedSplit, IndexedSplitBatch, PackagedSplit,
PackagedSplitBatch,
EmptySplit, IndexedSplit, IndexedSplitBatch, PackagedSplit, PackagedSplitBatch,
};

/// The role of the packager is to get an index writer and
Expand Down Expand Up @@ -315,7 +317,8 @@ fn create_packaged_split(
let mut hotcache_bytes = Vec::new();
build_hotcache(split.split_scratch_directory.path(), &mut hotcache_bytes)?;
ctx.record_progress();
let serialized_split_fields = serialize_split_fields(&fields_metadata);

let serialized_split_fields = serialize_field_metadata(&fields_metadata);

let packaged_split = PackagedSplit {
serialized_split_fields,
Expand All @@ -328,6 +331,47 @@ fn create_packaged_split(
Ok(packaged_split)
}

/// Serializes the Split fields.
///
/// `fields_metadata` has to be sorted.
fn serialize_field_metadata(fields_metadata: &[FieldMetadata]) -> Vec<u8> {
let fields = fields_metadata
.iter()
.map(field_metadata_to_list_field_serialized)
.collect::<Vec<_>>();

serialize_split_fields(ListFields { fields })
}

fn tantivy_type_to_list_field_type(typ: Type) -> ListFieldType {
match typ {
Type::Str => ListFieldType::Str,
Type::U64 => ListFieldType::U64,
Type::I64 => ListFieldType::I64,
Type::F64 => ListFieldType::F64,
Type::Bool => ListFieldType::Bool,
Type::Date => ListFieldType::Date,
Type::Facet => ListFieldType::Facet,
Type::Bytes => ListFieldType::Bytes,
Type::Json => ListFieldType::Json,
Type::IpAddr => ListFieldType::IpAddr,
}
}

fn field_metadata_to_list_field_serialized(
field_metadata: &FieldMetadata,
) -> ListFieldsEntryResponse {
ListFieldsEntryResponse {
field_name: field_metadata.field_name.to_string(),
field_type: tantivy_type_to_list_field_type(field_metadata.typ) as i32,
searchable: field_metadata.indexed,
aggregatable: field_metadata.fast,
index_ids: vec![],
non_searchable_index_ids: vec![],
non_aggregatable_index_ids: vec![],
}
}

/// Reads u64 from stored term data.
fn u64_from_term_data(data: &[u8]) -> anyhow::Result<u64> {
let u64_bytes: [u8; 8] = data[0..8]
Expand All @@ -343,15 +387,64 @@ mod tests {
use quickwit_actors::{ObservationType, Universe};
use quickwit_metastore::checkpoint::IndexCheckpointDelta;
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::search::{deserialize_split_fields, ListFieldsEntryResponse};
use quickwit_proto::types::{IndexUid, PipelineUid};
use tantivy::directory::MmapDirectory;
use tantivy::schema::{NumericOptions, Schema, FAST, STRING, TEXT};
use tantivy::schema::{NumericOptions, Schema, Type, FAST, STRING, TEXT};
use tantivy::{doc, DateTime, IndexBuilder, IndexSettings};
use tracing::Span;

use super::*;
use crate::models::{PublishLock, SplitAttrs};

#[test]
fn serialize_field_metadata_test() {
let fields_metadata = vec![
FieldMetadata {
field_name: "test".to_string(),
typ: Type::Str,
indexed: true,
stored: true,
fast: true,
},
FieldMetadata {
field_name: "test2".to_string(),
typ: Type::Str,
indexed: true,
stored: false,
fast: false,
},
FieldMetadata {
field_name: "test3".to_string(),
typ: Type::U64,
indexed: true,
stored: false,
fast: true,
},
];

let out = serialize_field_metadata(&fields_metadata);

let deserialized: Vec<ListFieldsEntryResponse> =
deserialize_split_fields(&mut &out[..]).unwrap().fields;

assert_eq!(fields_metadata.len(), deserialized.len());
assert_eq!(deserialized[0].field_name, "test");
assert_eq!(deserialized[0].field_type, ListFieldType::Str as i32);
assert!(deserialized[0].searchable);
assert!(deserialized[0].aggregatable);

assert_eq!(deserialized[1].field_name, "test2");
assert_eq!(deserialized[1].field_type, ListFieldType::Str as i32);
assert!(deserialized[1].searchable);
assert!(!deserialized[1].aggregatable);

assert_eq!(deserialized[2].field_name, "test3");
assert_eq!(deserialized[2].field_type, ListFieldType::U64 as i32);
assert!(deserialized[2].searchable);
assert!(deserialized[2].aggregatable);
}

fn make_indexed_split_for_test(
segment_timestamps: &[DateTime],
) -> anyhow::Result<IndexedSplit> {
Expand Down
2 changes: 0 additions & 2 deletions quickwit/quickwit-indexing/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ mod publisher_message;
mod raw_doc_batch;
mod shard_positions;
mod split_attrs;
mod split_fields;

pub use indexed_split::{
CommitTrigger, EmptySplit, IndexedSplit, IndexedSplitBatch, IndexedSplitBatchBuilder,
Expand All @@ -54,7 +53,6 @@ pub use raw_doc_batch::RawDocBatch;
pub(crate) use shard_positions::LocalShardPositionsUpdate;
pub use shard_positions::ShardPositionsService;
pub use split_attrs::{create_split_metadata, SplitAttrs};
pub use split_fields::{read_split_fields, serialize_split_fields, FieldConfig};

#[derive(Debug)]
pub struct NewPublishToken(pub PublishToken);
Loading

0 comments on commit 13f34b5

Please sign in to comment.