Skip to content

Commit

Permalink
feat(index): support SQL to specify inverted index columns (GreptimeT…
Browse files Browse the repository at this point in the history
…eam#4929)

* feat(index): support building inverted index for the field column

Signed-off-by: Zhenchi <[email protected]>

* feat(index): support SQL to specify inverted index columns

Signed-off-by: Zhenchi <[email protected]>

* test: fix sqlness

Signed-off-by: Zhenchi <[email protected]>

* fix: consider compatibility

Signed-off-by: Zhenchi <[email protected]>

* polish

Signed-off-by: Zhenchi <[email protected]>

* compatibility

Signed-off-by: Zhenchi <[email protected]>

* fix

Signed-off-by: Zhenchi <[email protected]>

* fix: ignore case

Signed-off-by: Zhenchi <[email protected]>

* refactor: reduce dup

Signed-off-by: Zhenchi <[email protected]>

* fix: clippy

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Nov 11, 2024
1 parent 0e0c4fa commit 6248a6c
Show file tree
Hide file tree
Showing 14 changed files with 442 additions and 212 deletions.
40 changes: 30 additions & 10 deletions src/api/src/v1/column_def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;

use datatypes::schema::{
ColumnDefaultConstraint, ColumnSchema, FulltextOptions, COMMENT_KEY, FULLTEXT_KEY,
INVERTED_INDEX_KEY,
};
use snafu::ResultExt;

Expand All @@ -25,6 +26,8 @@ use crate::v1::{ColumnDef, ColumnOptions, SemanticType};

/// Key used to store fulltext options in gRPC column options.
const FULLTEXT_GRPC_KEY: &str = "fulltext";
/// Key used to store inverted index options in gRPC column options.
const INVERTED_INDEX_GRPC_KEY: &str = "inverted_index";

/// Tries to construct a `ColumnSchema` from the given `ColumnDef`.
pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
Expand All @@ -49,10 +52,13 @@ pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
if !column_def.comment.is_empty() {
metadata.insert(COMMENT_KEY.to_string(), column_def.comment.clone());
}
if let Some(options) = column_def.options.as_ref()
&& let Some(fulltext) = options.options.get(FULLTEXT_GRPC_KEY)
{
metadata.insert(FULLTEXT_KEY.to_string(), fulltext.to_string());
if let Some(options) = column_def.options.as_ref() {
if let Some(fulltext) = options.options.get(FULLTEXT_GRPC_KEY) {
metadata.insert(FULLTEXT_KEY.to_string(), fulltext.clone());
}
if let Some(inverted_index) = options.options.get(INVERTED_INDEX_GRPC_KEY) {
metadata.insert(INVERTED_INDEX_KEY.to_string(), inverted_index.clone());
}
}

ColumnSchema::new(&column_def.name, data_type.into(), column_def.is_nullable)
Expand All @@ -70,7 +76,12 @@ pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<Column
if let Some(fulltext) = column_schema.metadata().get(FULLTEXT_KEY) {
options
.options
.insert(FULLTEXT_GRPC_KEY.to_string(), fulltext.to_string());
.insert(FULLTEXT_GRPC_KEY.to_string(), fulltext.clone());
}
if let Some(inverted_index) = column_schema.metadata().get(INVERTED_INDEX_KEY) {
options
.options
.insert(INVERTED_INDEX_GRPC_KEY.to_string(), inverted_index.clone());
}

(!options.options.is_empty()).then_some(options)
Expand Down Expand Up @@ -115,10 +126,13 @@ mod tests {
comment: "test_comment".to_string(),
datatype_extension: None,
options: Some(ColumnOptions {
options: HashMap::from([(
FULLTEXT_GRPC_KEY.to_string(),
"{\"enable\":true}".to_string(),
)]),
options: HashMap::from([
(
FULLTEXT_GRPC_KEY.to_string(),
"{\"enable\":true}".to_string(),
),
(INVERTED_INDEX_GRPC_KEY.to_string(), "true".to_string()),
]),
}),
};

Expand All @@ -139,6 +153,7 @@ mod tests {
..Default::default()
}
);
assert!(schema.is_inverted_indexed());
}

#[test]
Expand All @@ -153,12 +168,17 @@ mod tests {
analyzer: FulltextAnalyzer::English,
case_sensitive: false,
})
.unwrap();
.unwrap()
.set_inverted_index(true);
let options = options_from_column_schema(&schema).unwrap();
assert_eq!(
options.options.get(FULLTEXT_GRPC_KEY).unwrap(),
"{\"enable\":true,\"analyzer\":\"English\",\"case-sensitive\":false}"
);
assert_eq!(
options.options.get(INVERTED_INDEX_GRPC_KEY).unwrap(),
"true"
);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, R
use crate::prelude::DataType;
pub use crate::schema::column_schema::{
ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, COMMENT_KEY, FULLTEXT_KEY,
TIME_INDEX_KEY,
INVERTED_INDEX_KEY, TIME_INDEX_KEY,
};
pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;
Expand Down
21 changes: 20 additions & 1 deletion src/datatypes/src/schema/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ pub const COMMENT_KEY: &str = "greptime:storage:comment";
const DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint";
/// Key used to store fulltext options in arrow field's metadata.
pub const FULLTEXT_KEY: &str = "greptime:fulltext";

/// Key used to store whether the column has inverted index in arrow field's metadata.
pub const INVERTED_INDEX_KEY: &str = "greptime:inverted_index";
/// Schema of a column, used as an immutable struct.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ColumnSchema {
Expand Down Expand Up @@ -134,6 +135,24 @@ impl ColumnSchema {
self
}

pub fn set_inverted_index(mut self, value: bool) -> Self {
let _ = self
.metadata
.insert(INVERTED_INDEX_KEY.to_string(), value.to_string());
self
}

pub fn is_inverted_indexed(&self) -> bool {
self.metadata
.get(INVERTED_INDEX_KEY)
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}

pub fn has_inverted_index_key(&self) -> bool {
self.metadata.contains_key(INVERTED_INDEX_KEY)
}

/// Set default constraint.
///
/// If a default constraint exists for the column, this method will
Expand Down
25 changes: 8 additions & 17 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,29 +393,20 @@ impl ScanRegion {
.and_then(|c| c.index_cache())
.cloned();

// TODO(zhongzc): currently we only index tag columns, need to support field columns.
let ignore_column_ids = &self
.version
.options
.index_options
.inverted_index
.ignore_column_ids;
let indexed_column_ids = self
.version
.metadata
.primary_key
.iter()
.filter(|id| !ignore_column_ids.contains(id))
.copied()
.collect::<HashSet<_>>();

InvertedIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
file_cache,
index_cache,
self.version.metadata.as_ref(),
indexed_column_ids,
self.version.metadata.inverted_indexed_column_ids(
self.version
.options
.index_options
.inverted_index
.ignore_column_ids
.iter(),
),
self.access_layer.puffin_manager_factory().clone(),
)
.build(&self.request.filters)
Expand Down
20 changes: 3 additions & 17 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub(crate) mod puffin_manager;
mod statistics;
mod store;

use std::collections::HashSet;
use std::num::NonZeroUsize;

use common_telemetry::{debug, warn};
Expand Down Expand Up @@ -213,28 +212,15 @@ impl<'a> IndexerBuilder<'a> {
segment_row_count = row_group_size;
}

// TODO(zhongzc): currently we only index tag columns, need to support field columns.
let indexed_column_ids = self
.metadata
.primary_key
.iter()
.filter(|id| {
!self
.index_options
.inverted_index
.ignore_column_ids
.contains(id)
})
.copied()
.collect::<HashSet<_>>();

let indexer = InvertedIndexer::new(
self.file_id,
self.metadata,
self.intermediate_manager.clone(),
self.inverted_index_config.mem_threshold_on_create(),
segment_row_count,
indexed_column_ids,
self.metadata.inverted_indexed_column_ids(
self.index_options.inverted_index.ignore_column_ids.iter(),
),
);

Some(indexer)
Expand Down
84 changes: 63 additions & 21 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ use query::sql::{
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{ColumnOption, TableConstraint};
use sql::ast::ColumnOption;
use sql::statements::alter::{AlterTable, AlterTableOperation};
use sql::statements::create::{
Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TIME_INDEX,
Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TableConstraint,
};
use sql::statements::{
column_to_schema, sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type,
Expand Down Expand Up @@ -130,8 +130,14 @@ pub(crate) async fn create_external_expr(
// expanded form
let time_index = find_time_index(&create.constraints)?;
let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
let column_schemas =
columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
let inverted_index_cols = find_inverted_index_cols(&create.columns, &create.constraints)?;
let column_schemas = columns_to_column_schemas(
&create.columns,
&time_index,
&inverted_index_cols,
&primary_keys,
Some(&query_ctx.timezone()),
)?;
(time_index, primary_keys, column_schemas)
} else {
// inferred form
Expand Down Expand Up @@ -186,6 +192,7 @@ pub fn create_to_expr(
);

let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
let inverted_index_cols = find_inverted_index_cols(&create.columns, &create.constraints)?;

let expr = CreateTableExpr {
catalog_name,
Expand All @@ -196,6 +203,7 @@ pub fn create_to_expr(
&create.columns,
&time_index,
&primary_keys,
&inverted_index_cols,
Some(&query_ctx.timezone()),
)?,
time_index,
Expand Down Expand Up @@ -304,9 +312,9 @@ fn find_primary_keys(
let constraints_pk = constraints
.iter()
.filter_map(|constraint| match constraint {
TableConstraint::PrimaryKey {
name: _, columns, ..
} => Some(columns.iter().map(|ident| ident.value.clone())),
TableConstraint::PrimaryKey { columns, .. } => {
Some(columns.iter().map(|ident| ident.value.clone()))
}
_ => None,
})
.flatten()
Expand All @@ -329,20 +337,9 @@ pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
let time_index = constraints
.iter()
.filter_map(|constraint| match constraint {
TableConstraint::Unique {
name: Some(name),
columns,
..
} => {
if name.value == TIME_INDEX {
Some(columns.iter().map(|ident| &ident.value))
} else {
None
}
}
TableConstraint::TimeIndex { column, .. } => Some(&column.value),
_ => None,
})
.flatten()
.collect::<Vec<&String>>();
ensure!(
time_index.len() == 1,
Expand All @@ -353,25 +350,70 @@ pub fn find_time_index(constraints: &[TableConstraint]) -> Result<String> {
Ok(time_index.first().unwrap().to_string())
}

/// Finds the inverted index columns from the constraints. If no inverted index
/// columns are provided in the constraints, return `None`.
fn find_inverted_index_cols(
columns: &[SqlColumn],
constraints: &[TableConstraint],
) -> Result<Option<Vec<String>>> {
let inverted_index_cols = constraints.iter().find_map(|constraint| {
if let TableConstraint::InvertedIndex { columns } = constraint {
Some(
columns
.iter()
.map(|ident| ident.value.clone())
.collect::<Vec<_>>(),
)
} else {
None
}
});

let Some(inverted_index_cols) = inverted_index_cols else {
return Ok(None);
};

for col in &inverted_index_cols {
if !columns.iter().any(|c| c.name().value == *col) {
return InvalidSqlSnafu {
err_msg: format!("inverted index column `{}` not found in column list", col),
}
.fail();
}
}

Ok(Some(inverted_index_cols))
}

fn columns_to_expr(
column_defs: &[SqlColumn],
time_index: &str,
primary_keys: &[String],
invereted_index_cols: &Option<Vec<String>>,
timezone: Option<&Timezone>,
) -> Result<Vec<api::v1::ColumnDef>> {
let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
let column_schemas = columns_to_column_schemas(
column_defs,
time_index,
invereted_index_cols,
primary_keys,
timezone,
)?;
column_schemas_to_defs(column_schemas, primary_keys)
}

fn columns_to_column_schemas(
columns: &[SqlColumn],
time_index: &str,
invereted_index_cols: &Option<Vec<String>>,
primary_keys: &[String],
timezone: Option<&Timezone>,
) -> Result<Vec<ColumnSchema>> {
columns
.iter()
.map(|c| {
column_to_schema(c, c.name().to_string() == time_index, timezone).context(ParseSqlSnafu)
column_to_schema(c, time_index, invereted_index_cols, primary_keys, timezone)
.context(ParseSqlSnafu)
})
.collect::<Result<Vec<ColumnSchema>>>()
}
Expand Down
Loading

0 comments on commit 6248a6c

Please sign in to comment.