From 97f1010e84e708a062b6b5c7938d7c53d5c7157e Mon Sep 17 00:00:00 2001 From: Kamalesh Palanisamy Date: Sun, 15 Oct 2023 22:39:49 -0400 Subject: [PATCH] Update sql strings with sea_query sql statements --- quickwit/Cargo.lock | 46 ++ quickwit/Cargo.toml | 2 + quickwit/quickwit-metastore/Cargo.toml | 2 + .../src/metastore/postgresql_metastore.rs | 443 +++++++++++------- .../src/metastore/postgresql_model.rs | 26 + 5 files changed, 341 insertions(+), 178 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 2310258e725..6e9be3245ec 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -3019,6 +3019,17 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" +[[package]] +name = "inherent" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce243b1bfa62ffc028f1cc3b6034ec63d649f3031bc8a4fbbb004e1ac17d1f68" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "inout" version = "0.1.3" @@ -5526,6 +5537,8 @@ dependencies = [ "quickwit-storage", "rand 0.8.5", "regex", + "sea-query", + "sea-query-binder", "serde", "serde_json", "serde_with 3.4.0", @@ -6485,6 +6498,39 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "sea-query" +version = "0.30.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb3e6bba153bb198646c8762c48414942a38db27d142e44735a133cabddcc820" +dependencies = [ + "inherent", + "sea-query-derive", +] + +[[package]] +name = "sea-query-binder" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36bbb68df92e820e4d5aeb17b4acd5cc8b5d18b2c36a4dd6f4626aabfa7ab1b9" +dependencies = [ + "sea-query", + "sqlx", +] + +[[package]] +name = "sea-query-derive" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd78f2e0ee8e537e9195d1049b752e0433e2cac125426bccb7b5c3e508096117" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 1.0.109", + "thiserror", +] + [[package]] name = "seahash" version = "4.1.0" diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index cee159ab6b3..a812b06f827 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -142,6 +142,8 @@ reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls", ] } rust-embed = "6.8.1" +sea-query = { version = "0" } +sea-query-binder = { version = "0", features = ["sqlx-postgres", "runtime-tokio-rustls",] } serde = { version = "= 1.0.171", features = ["derive", "rc"] } serde_json = "1.0" serde_qs = { version = "0.12", features = ["warp"] } diff --git a/quickwit/quickwit-metastore/Cargo.toml b/quickwit/quickwit-metastore/Cargo.toml index 0dd51313690..26cfa4d9f5c 100644 --- a/quickwit/quickwit-metastore/Cargo.toml +++ b/quickwit/quickwit-metastore/Cargo.toml @@ -24,6 +24,8 @@ serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } sqlx = { workspace = true, optional = true } +sea-query = { workspace = true } +sea-query-binder = { workspace = true } tempfile = { workspace = true, optional = true } thiserror = { workspace = true } time = { workspace = true } diff --git a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs index 5f9648296e1..b47f87d7229 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs @@ -25,7 +25,6 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use itertools::Itertools; use quickwit_common::uri::Uri; use quickwit_common::PrettySample; use quickwit_config::{ @@ -45,6 +44,10 @@ use quickwit_proto::metastore::{ ToggleSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, }; use quickwit_proto::types::IndexUid; +use sea_query::{ + all, any, Asterisk, Cond, Expr, Func, PostgresQueryBuilder, Query, SelectStatement, +}; +use sea_query_binder::SqlxBinder; use sqlx::migrate::Migrator; use sqlx::postgres::{PgConnectOptions, PgDatabaseError, PgPoolOptions}; use sqlx::{ConnectOptions, Pool, Postgres, Transaction}; @@ -53,7 +56,7 @@ use tracing::log::LevelFilter; use tracing::{debug, error, info, instrument, warn}; use crate::checkpoint::IndexCheckpointDelta; -use crate::metastore::postgresql_model::{PgDeleteTask, PgIndex, PgSplit}; +use crate::metastore::postgresql_model::{PgDeleteTask, PgIndex, PgSplit, Splits, ToTimestampFunc}; use crate::metastore::{instrument_metastore, FilterRange, PublishSplitsRequestExt}; use crate::{ AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt, @@ -222,131 +225,124 @@ async fn index_metadata( /// Extends an existing SQL string with the generated filter range appended to the query. /// /// This method is **not** SQL injection proof and should not be used with user-defined values. -fn write_sql_filter( - sql: &mut String, - field_name: impl Display, +fn append_range_filters( + sql: &mut SelectStatement, + field_name: Splits, filter_range: &FilterRange, - value_formatter: impl Fn(&V) -> String, + value_formatter: impl Fn(&V) -> Expr, ) { - match &filter_range.start { - Bound::Included(value) => { - let _ = write!(sql, " AND {} >= {}", field_name, (value_formatter)(value)); - } - Bound::Excluded(value) => { - let _ = write!(sql, " AND {} > {}", field_name, (value_formatter)(value)); - } - Bound::Unbounded => {} + if let Bound::Included(value) = &filter_range.start { + sql.cond_where(Expr::col(field_name).gte((value_formatter)(value))); }; - match &filter_range.end { - Bound::Included(value) => { - let _ = write!(sql, " AND {} <= {}", field_name, (value_formatter)(value)); - } - Bound::Excluded(value) => { - let _ = write!(sql, " AND {} < {}", field_name, (value_formatter)(value)); - } - Bound::Unbounded => {} + if let Bound::Excluded(value) = &filter_range.start { + sql.cond_where(Expr::col(field_name).gt((value_formatter)(value))); + }; + + if let Bound::Included(value) = &filter_range.end { + sql.cond_where(Expr::col(field_name).lte((value_formatter)(value))); + }; + + if let Bound::Excluded(value) = &filter_range.end { + sql.cond_where(Expr::col(field_name).lt((value_formatter)(value))); }; } -fn build_query_filter(mut sql: String, query: &ListSplitsQuery) -> String { +fn append_query_filters(sql: &mut SelectStatement, query: &ListSplitsQuery) { // Note: `ListSplitsQuery` builder enforces a non empty `index_uids` list. - let where_predicate: String = query + + let or_condition = query .index_uids .iter() - .map(|index_uid| format!("index_uid = '{index_uid}'")) - .join(" OR "); - sql.push_str(&format!(" WHERE ({where_predicate})")); + .fold(Cond::any(), |cond, index_uid| { + cond.add(Expr::col(Splits::IndexUid).eq(Expr::val(index_uid.to_string()))) + }); + sql.cond_where(or_condition); if !query.split_states.is_empty() { - let params = query - .split_states - .iter() - .map(|v| format!("'{}'", v.as_str())) - .join(", "); - let _ = write!(sql, " AND split_state IN ({params})"); - } + sql.cond_where( + Expr::col(Splits::SplitState) + .is_in(query.split_states.iter().map(|val| val.to_string())), + ); + }; if let Some(tags) = query.tags.as_ref() { - sql.push_str(" AND ("); - sql.push_str(&tags_filter_expression_helper(tags)); - sql.push(')'); - } + sql.cond_where(tags_filter_expression_helper(tags)); + }; match query.time_range.start { Bound::Included(v) => { - let _ = write!( - sql, - " AND (time_range_end >= {v} OR time_range_end IS NULL)" - ); + sql.cond_where(any![ + Expr::col(Splits::TimeRangeEnd).gte(v), + Expr::col(Splits::TimeRangeEnd).is_null() + ]); } Bound::Excluded(v) => { - let _ = write!(sql, " AND (time_range_end > {v} OR time_range_end IS NULL)"); + sql.cond_where(any![ + Expr::col(Splits::TimeRangeEnd).gt(v), + Expr::col(Splits::TimeRangeEnd).is_null() + ]); } Bound::Unbounded => {} }; match query.time_range.end { Bound::Included(v) => { - let _ = write!( - sql, - " AND (time_range_start <= {v} OR time_range_start IS NULL)" - ); + sql.cond_where(any![ + Expr::col(Splits::TimeRangeStart).lte(v), + Expr::col(Splits::TimeRangeStart).is_null() + ]); } Bound::Excluded(v) => { - let _ = write!( - sql, - " AND (time_range_start < {v} OR time_range_start IS NULL)" - ); + sql.cond_where(any![ + Expr::col(Splits::TimeRangeStart).lt(v), + Expr::col(Splits::TimeRangeStart).is_null() + ]); } Bound::Unbounded => {} }; match &query.mature { Bound::Included(evaluation_datetime) => { - let _ = write!( - sql, - " AND (maturity_timestamp = to_timestamp(0) OR to_timestamp({}) >= \ - maturity_timestamp)", - evaluation_datetime.unix_timestamp() - ); + sql.cond_where(any![ + Expr::col(Splits::MaturityTimestamp) + .eq(Func::cust(ToTimestampFunc).arg(Expr::val(0))), + Expr::col(Splits::MaturityTimestamp).lte( + Func::cust(ToTimestampFunc) + .arg(Expr::val(evaluation_datetime.unix_timestamp())) + ) + ]); } Bound::Excluded(evaluation_datetime) => { - let _ = write!( - sql, - " AND to_timestamp({}) < maturity_timestamp", - evaluation_datetime.unix_timestamp() - ); + sql.cond_where(Expr::col(Splits::MaturityTimestamp).gt( + Func::cust(ToTimestampFunc).arg(Expr::val(evaluation_datetime.unix_timestamp())), + )); } Bound::Unbounded => {} - } - - // WARNING: Not SQL injection proof - write_sql_filter( - &mut sql, - "update_timestamp", + }; + append_range_filters( + sql, + Splits::UpdateTimestamp, &query.update_timestamp, - |val| format!("to_timestamp({val})"), + |&val| Expr::expr(Func::cust(ToTimestampFunc).arg(Expr::val(val))), ); - write_sql_filter( - &mut sql, - "create_timestamp", + append_range_filters( + sql, + Splits::CreateTimestamp, &query.create_timestamp, - |val| format!("to_timestamp({val})"), + |&val| Expr::expr(Func::cust(ToTimestampFunc).arg(Expr::val(val))), ); - write_sql_filter(&mut sql, "delete_opstamp", &query.delete_opstamp, |val| { - val.to_string() + append_range_filters(sql, Splits::DeleteOpstamp, &query.delete_opstamp, |&val| { + Expr::expr(val) }); if let Some(limit) = query.limit { - let _ = write!(sql, " LIMIT {limit}"); + sql.limit(limit as u64); } if let Some(offset) = query.offset { - let _ = write!(sql, " OFFSET {offset}"); + sql.offset(offset as u64); } - - sql } /// Returns the unix timestamp at which the split becomes mature. @@ -813,10 +809,13 @@ impl MetastoreService for PostgresqlMetastore { request: ListSplitsRequest, ) -> MetastoreResult { let query = request.deserialize_list_splits_query()?; - let sql_base = "SELECT * FROM splits".to_string(); - let sql = build_query_filter(sql_base, &query); + let mut sql = Query::select(); + sql.column(Asterisk).from(Splits::Table); + append_query_filters(&mut sql, &query); + + let (sql, values) = sql.build_sqlx(PostgresQueryBuilder); - let pg_splits = sqlx::query_as::<_, PgSplit>(&sql) + let pg_splits = sqlx::query_as_with::<_, PgSplit, _>(&sql, values) .fetch_all(&self.connection_pool) .await?; @@ -1350,35 +1349,38 @@ fn generate_dollar_guard(s: &str) -> String { /// Takes a tag filters AST and returns a sql expression that can be used as /// a filter. -fn tags_filter_expression_helper(tags: &TagFilterAst) -> String { +fn tags_filter_expression_helper(tags: &TagFilterAst) -> Cond { match tags { TagFilterAst::And(child_asts) => { if child_asts.is_empty() { - return "TRUE".to_string(); + return all![Expr::cust("TRUE")]; } - let expr_without_parenthesis = child_asts + + child_asts .iter() .map(tags_filter_expression_helper) - .join(" AND "); - format!("({expr_without_parenthesis})") + .fold(Cond::all(), |cond, child_cond| cond.add(child_cond)) } TagFilterAst::Or(child_asts) => { if child_asts.is_empty() { - return "TRUE".to_string(); + return all![Expr::cust("TRUE")]; } - let expr_without_parenthesis = child_asts + + child_asts .iter() .map(tags_filter_expression_helper) - .join(" OR "); - format!("({expr_without_parenthesis})") + .fold(Cond::any(), |cond, child_cond| cond.add(child_cond)) } + TagFilterAst::Tag { is_present, tag } => { let dollar_guard = generate_dollar_guard(tag); - if *is_present { - format!("${dollar_guard}${tag}${dollar_guard}$ = ANY(tags)") + let expr_str = format!("${dollar_guard}${tag}${dollar_guard}$ = ANY(tags)"); + let expr = if *is_present { + Expr::cust(&expr_str) } else { - format!("NOT (${dollar_guard}${tag}${dollar_guard}$ = ANY(tags))") - } + Expr::cust(&expr_str).not() + }; + all![expr] } } } @@ -1512,10 +1514,12 @@ mod tests { use quickwit_doc_mapper::tag_pruning::{no_tag, tag, TagFilterAst}; use quickwit_proto::metastore::MetastoreService; use quickwit_proto::types::IndexUid; + use sea_query::{all, any, Asterisk, Cond, Expr, PostgresQueryBuilder, Query}; use time::OffsetDateTime; - use super::{build_query_filter, tags_filter_expression_helper, PostgresqlMetastore}; + use super::{append_query_filters, tags_filter_expression_helper, PostgresqlMetastore}; use crate::metastore::postgresql_metastore::build_index_id_patterns_sql_query; + use crate::metastore::postgresql_model::Splits; use crate::tests::DefaultForTest; use crate::{metastore_test_suite, ListSplitsQuery, SplitState}; @@ -1528,31 +1532,37 @@ mod tests { assert!(metastore.endpoints()[0].protocol().is_postgresql()); } - fn test_tags_filter_expression_helper(tags_ast: TagFilterAst, expected: &str) { + fn test_tags_filter_expression_helper(tags_ast: TagFilterAst, expected: Cond) { assert_eq!(tags_filter_expression_helper(&tags_ast), expected); } #[test] fn test_tags_filter_expression_single_tag() { let tags_ast = tag("my_field:titi"); - test_tags_filter_expression_helper(tags_ast, r#"$$my_field:titi$$ = ANY(tags)"#); + + let expected = all![Expr::cust("$$my_field:titi$$ = ANY(tags)")]; + + test_tags_filter_expression_helper(tags_ast, expected); } #[test] fn test_tags_filter_expression_not_tag() { - test_tags_filter_expression_helper( - no_tag("my_field:titi"), - r#"NOT ($$my_field:titi$$ = ANY(tags))"#, - ); + let expected = all![Expr::cust("$$my_field:titi$$ = ANY(tags)").not()]; + + test_tags_filter_expression_helper(no_tag("my_field:titi"), expected); } #[test] fn test_tags_filter_expression_ands() { let tags_ast = TagFilterAst::And(vec![tag("tag:val1"), tag("tag:val2"), tag("tag:val3")]); - test_tags_filter_expression_helper( - tags_ast, - "($$tag:val1$$ = ANY(tags) AND $$tag:val2$$ = ANY(tags) AND $$tag:val3$$ = ANY(tags))", - ); + + let expected = all![ + Expr::cust("$$tag:val1$$ = ANY(tags)"), + Expr::cust("$$tag:val2$$ = ANY(tags)"), + Expr::cust("$$tag:val3$$ = ANY(tags)"), + ]; + + test_tags_filter_expression_helper(tags_ast, expected); } #[test] @@ -1561,10 +1571,16 @@ mod tests { TagFilterAst::And(vec![tag("tag:val1"), tag("tag:val2")]), tag("tag:val3"), ]); - test_tags_filter_expression_helper( - tags_ast, - "(($$tag:val1$$ = ANY(tags) AND $$tag:val2$$ = ANY(tags)) OR $$tag:val3$$ = ANY(tags))", - ); + + let expected = any![ + all![ + Expr::cust("$$tag:val1$$ = ANY(tags)"), + Expr::cust("$$tag:val2$$ = ANY(tags)"), + ], + Expr::cust("$$tag:val3$$ = ANY(tags)"), + ]; + + test_tags_filter_expression_helper(tags_ast, expected); } #[test] @@ -1573,192 +1589,263 @@ mod tests { TagFilterAst::Or(vec![tag("tag:val1"), tag("tag:val2")]), tag("tag:val3"), ]); - test_tags_filter_expression_helper( - tags_ast, - r#"(($$tag:val1$$ = ANY(tags) OR $$tag:val2$$ = ANY(tags)) AND $$tag:val3$$ = ANY(tags))"#, - ); + + let expected = all![ + any![ + Expr::cust("$$tag:val1$$ = ANY(tags)"), + Expr::cust("$$tag:val2$$ = ANY(tags)"), + ], + Expr::cust("$$tag:val3$$ = ANY(tags)"), + ]; + + test_tags_filter_expression_helper(tags_ast, expected); } #[test] fn test_tags_sql_injection_attempt() { let tags_ast = tag("tag:$$;DELETE FROM something_evil"); - test_tags_filter_expression_helper( - tags_ast, - "$QuickwitGuard$tag:$$;DELETE FROM something_evil$QuickwitGuard$ = ANY(tags)", - ); + + let expected = all![Expr::cust( + "$QuickwitGuard$tag:$$;DELETE FROM something_evil$QuickwitGuard$ = ANY(tags)" + ),]; + + test_tags_filter_expression_helper(tags_ast, expected); + let tags_ast = tag("tag:$QuickwitGuard$;DELETE FROM something_evil"); - test_tags_filter_expression_helper( - tags_ast, + + let expected = all![Expr::cust( "$QuickwitGuardQuickwitGuard$tag:$QuickwitGuard$;DELETE FROM \ - something_evil$QuickwitGuardQuickwitGuard$ = ANY(tags)", - ); + something_evil$QuickwitGuardQuickwitGuard$ = ANY(tags)" + )]; + + test_tags_filter_expression_helper(tags_ast, expected); } #[test] fn test_single_sql_query_builder() { + let mut select_statement = Query::select(); + + let sql = select_statement.column(Asterisk).from(Splits::Table); let index_uid = IndexUid::new("test-index"); let query = ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Staged); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); + assert_eq!( - sql, - format!(" WHERE (index_uid = '{index_uid}') AND split_state IN ('Staged')") + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "split_state" IN ('Staged')"# + ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let query = ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Published); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); + assert_eq!( - sql, - format!(" WHERE (index_uid = '{index_uid}') AND split_state IN ('Published')") + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "split_state" IN ('Published')"# + ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let query = ListSplitsQuery::for_index(index_uid.clone()) .with_split_states([SplitState::Published, SplitState::MarkedForDeletion]); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); assert_eq!( - sql, + sql.to_string(PostgresQueryBuilder), format!( - " WHERE (index_uid = '{index_uid}') AND split_state IN ('Published', \ - 'MarkedForDeletion')" + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "split_state" IN ('Published', 'MarkedForDeletion')"# ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let query = ListSplitsQuery::for_index(index_uid.clone()).with_update_timestamp_lt(51); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); assert_eq!( - sql, - format!(" WHERE (index_uid = '{index_uid}') AND update_timestamp < to_timestamp(51)") + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "update_timestamp" < TO_TIMESTAMP(51)"# + ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let query = ListSplitsQuery::for_index(index_uid.clone()).with_create_timestamp_lte(55); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); assert_eq!( - sql, - format!(" WHERE (index_uid = '{index_uid}') AND create_timestamp <= to_timestamp(55)") + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "create_timestamp" <= TO_TIMESTAMP(55)"# + ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let maturity_evaluation_datetime = OffsetDateTime::from_unix_timestamp(55).unwrap(); let query = ListSplitsQuery::for_index(index_uid.clone()) .retain_mature(maturity_evaluation_datetime); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); + assert_eq!( - sql, + sql.to_string(PostgresQueryBuilder), format!( - " WHERE (index_uid = '{index_uid}') AND (maturity_timestamp = to_timestamp(0) OR \ - to_timestamp(55) >= maturity_timestamp)" + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND ("maturity_timestamp" = TO_TIMESTAMP(0) OR "maturity_timestamp" <= TO_TIMESTAMP(55))"# ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let query = ListSplitsQuery::for_index(index_uid.clone()) .retain_immature(maturity_evaluation_datetime); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); assert_eq!( - sql, - format!(" WHERE (index_uid = '{index_uid}') AND to_timestamp(55) < maturity_timestamp") + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "maturity_timestamp" > TO_TIMESTAMP(55)"# + ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let query = ListSplitsQuery::for_index(index_uid.clone()).with_delete_opstamp_gte(4); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); assert_eq!( - sql, - format!(" WHERE (index_uid = '{index_uid}') AND delete_opstamp >= 4") + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "delete_opstamp" >= 4"# + ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let query = ListSplitsQuery::for_index(index_uid.clone()).with_time_range_start_gt(45); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); assert_eq!( - sql, + sql.to_string(PostgresQueryBuilder), format!( - " WHERE (index_uid = '{index_uid}') AND (time_range_end > 45 OR time_range_end IS \ - NULL)" + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND ("time_range_end" > 45 OR "time_range_end" IS NULL)"# ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let query = ListSplitsQuery::for_index(index_uid.clone()).with_time_range_end_lt(45); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); assert_eq!( - sql, + sql.to_string(PostgresQueryBuilder), format!( - " WHERE (index_uid = '{index_uid}') AND (time_range_start < 45 OR \ - time_range_start IS NULL)" + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND ("time_range_start" < 45 OR "time_range_start" IS NULL)"# ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let query = ListSplitsQuery::for_index(index_uid.clone()).with_tags_filter(TagFilterAst::Tag { is_present: false, tag: "tag-2".to_string(), }); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); + assert_eq!( - sql, - format!(" WHERE (index_uid = '{index_uid}') AND (NOT ($$tag-2$$ = ANY(tags)))") + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND NOT ($$tag-2$$ = ANY(tags))"# + ) ); } #[test] fn test_combination_sql_query_builder() { + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let index_uid = IndexUid::new("test-index"); let query = ListSplitsQuery::for_index(index_uid.clone()) .with_time_range_start_gt(0) .with_time_range_end_lt(40); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); assert_eq!( - sql, + sql.to_string(PostgresQueryBuilder), format!( - " WHERE (index_uid = '{index_uid}') AND (time_range_end > 0 OR time_range_end IS \ - NULL) AND (time_range_start < 40 OR time_range_start IS NULL)" + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND ("time_range_end" > 0 OR "time_range_end" IS NULL) AND ("time_range_start" < 40 OR "time_range_start" IS NULL)"# ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let query = ListSplitsQuery::for_index(index_uid.clone()) .with_time_range_start_gt(45) .with_delete_opstamp_gt(0); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); assert_eq!( - sql, + sql.to_string(PostgresQueryBuilder), format!( - " WHERE (index_uid = '{index_uid}') AND (time_range_end > 45 OR time_range_end IS \ - NULL) AND delete_opstamp > 0" + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND ("time_range_end" > 45 OR "time_range_end" IS NULL) AND "delete_opstamp" > 0"# ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let query = ListSplitsQuery::for_index(index_uid.clone()) .with_update_timestamp_lt(51) .with_create_timestamp_lte(63); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); assert_eq!( - sql, + sql.to_string(PostgresQueryBuilder), format!( - " WHERE (index_uid = '{index_uid}') AND update_timestamp < to_timestamp(51) AND \ - create_timestamp <= to_timestamp(63)" + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "update_timestamp" < TO_TIMESTAMP(51) AND "create_timestamp" <= TO_TIMESTAMP(63)"# ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let query = ListSplitsQuery::for_index(index_uid.clone()) .with_time_range_start_gt(90) .with_tags_filter(TagFilterAst::Tag { is_present: true, tag: "tag-1".to_string(), }); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); assert_eq!( - sql, + sql.to_string(PostgresQueryBuilder), format!( - " WHERE (index_uid = '{index_uid}') AND ($$tag-1$$ = ANY(tags)) AND \ - (time_range_end > 90 OR time_range_end IS NULL)" + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND $$tag-1$$ = ANY(tags) AND ("time_range_end" > 90 OR "time_range_end" IS NULL)"# ) ); + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + let index_uid_2 = IndexUid::new("test-index-2"); let query = ListSplitsQuery::try_from_index_uids(vec![index_uid.clone(), index_uid_2.clone()]) .unwrap(); - let sql = build_query_filter(String::new(), &query); + append_query_filters(sql, &query); assert_eq!( - sql, - format!(" WHERE (index_uid = '{index_uid}' OR index_uid = '{index_uid_2}')") + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' OR "index_uid" = '{index_uid_2}'"# + ) ); } diff --git a/quickwit/quickwit-metastore/src/metastore/postgresql_model.rs b/quickwit/quickwit-metastore/src/metastore/postgresql_model.rs index 25454dc9261..4dbae9a54a6 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgresql_model.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgresql_model.rs @@ -22,6 +22,7 @@ use std::str::FromStr; use quickwit_proto::metastore::{DeleteQuery, DeleteTask, MetastoreError, MetastoreResult}; use quickwit_proto::types::IndexUid; +use sea_query::{Iden, Write}; use tracing::error; use crate::{IndexMetadata, Split, SplitMetadata, SplitState}; @@ -62,6 +63,31 @@ impl PgIndex { } } +#[derive(Iden, Clone, Copy)] +#[allow(dead_code)] +pub enum Splits { + Table, + SplitState, + TimeRangeStart, + TimeRangeEnd, + CreateTimestamp, + UpdateTimestamp, + PublishTimestamp, + MaturityTimestamp, + Tags, + SplitMetadataJson, + IndexUid, + DeleteOpstamp, +} + +pub struct ToTimestampFunc; + +impl Iden for ToTimestampFunc { + fn unquoted(&self, s: &mut dyn Write) { + write!(s, "TO_TIMESTAMP").unwrap() + } +} + /// A model structure for handling split metadata in a database. #[derive(sqlx::FromRow)] pub struct PgSplit {