Skip to content

Commit

Permalink
Merge branch 'main' into avoid-calling-truncate-on-begin
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Mar 5, 2024
2 parents 8ae471e + 1d8c420 commit 8829128
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 27 deletions.
4 changes: 2 additions & 2 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX IF EXISTS splits_index_uid_idx;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX IF NOT EXISTS splits_index_uid_idx ON splits USING HASH(index_uid);
34 changes: 17 additions & 17 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1667,7 +1667,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "split_state" IN ('Staged')"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Staged')"#
)
);

Expand All @@ -1681,7 +1681,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "split_state" IN ('Published')"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Published')"#
)
);

Expand All @@ -1694,7 +1694,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "split_state" IN ('Published', 'MarkedForDeletion')"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "split_state" IN ('Published', 'MarkedForDeletion')"#
)
);

Expand All @@ -1706,7 +1706,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "update_timestamp" < TO_TIMESTAMP(51)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51)"#
)
);

Expand All @@ -1718,7 +1718,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "create_timestamp" <= TO_TIMESTAMP(55)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "create_timestamp" <= TO_TIMESTAMP(55)"#
)
);

Expand All @@ -1733,7 +1733,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND ("maturity_timestamp" = TO_TIMESTAMP(0) OR "maturity_timestamp" <= TO_TIMESTAMP(55))"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("maturity_timestamp" = TO_TIMESTAMP(0) OR "maturity_timestamp" <= TO_TIMESTAMP(55))"#
)
);

Expand All @@ -1746,7 +1746,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "maturity_timestamp" > TO_TIMESTAMP(55)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "maturity_timestamp" > TO_TIMESTAMP(55)"#
)
);

Expand All @@ -1758,7 +1758,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "delete_opstamp" >= 4"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "delete_opstamp" >= 4"#
)
);

Expand All @@ -1770,7 +1770,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND ("time_range_end" > 45 OR "time_range_end" IS NULL)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL)"#
)
);

Expand All @@ -1782,7 +1782,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND ("time_range_start" < 45 OR "time_range_start" IS NULL)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_start" < 45 OR "time_range_start" IS NULL)"#
)
);

Expand All @@ -1799,7 +1799,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND (NOT ($$tag-2$$ = ANY(tags)))"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND (NOT ($$tag-2$$ = ANY(tags)))"#
)
);

Expand All @@ -1812,7 +1812,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' ORDER BY "split_id" ASC OFFSET 4"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "split_id" ASC OFFSET 4"#
)
);
}
Expand All @@ -1830,7 +1830,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND ("time_range_end" > 0 OR "time_range_end" IS NULL) AND ("time_range_start" < 40 OR "time_range_start" IS NULL)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 0 OR "time_range_end" IS NULL) AND ("time_range_start" < 40 OR "time_range_start" IS NULL)"#
)
);

Expand All @@ -1844,7 +1844,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND ("time_range_end" > 45 OR "time_range_end" IS NULL) AND "delete_opstamp" > 0"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("time_range_end" > 45 OR "time_range_end" IS NULL) AND "delete_opstamp" > 0"#
)
);

Expand All @@ -1858,7 +1858,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND "update_timestamp" < TO_TIMESTAMP(51) AND "create_timestamp" <= TO_TIMESTAMP(63)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND "update_timestamp" < TO_TIMESTAMP(51) AND "create_timestamp" <= TO_TIMESTAMP(63)"#
)
);

Expand All @@ -1875,7 +1875,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' AND ($$tag-1$$ = ANY(tags)) AND ("time_range_end" > 90 OR "time_range_end" IS NULL)"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ($$tag-1$$ = ANY(tags)) AND ("time_range_end" > 90 OR "time_range_end" IS NULL)"#
)
);

Expand All @@ -1890,7 +1890,7 @@ mod tests {
assert_eq!(
sql.to_string(PostgresQueryBuilder),
format!(
r#"SELECT * FROM "splits" WHERE "index_uid" = '{index_uid}' OR "index_uid" = '{index_uid_2}'"#
r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}', '{index_uid_2}')"#
)
);
}
Expand Down
12 changes: 4 additions & 8 deletions quickwit/quickwit-metastore/src/metastore/postgres/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::time::Duration;

use quickwit_common::uri::Uri;
use quickwit_proto::metastore::{MetastoreError, MetastoreResult};
use sea_query::{any, Cond, Expr, Func, Order, SelectStatement};
use sea_query::{any, Expr, Func, Order, SelectStatement};
use sqlx::postgres::{PgConnectOptions, PgPoolOptions};
use sqlx::{ConnectOptions, Pool, Postgres};
use tracing::error;
Expand Down Expand Up @@ -93,13 +93,9 @@ pub(super) fn append_range_filters<V: Display>(
pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplitsQuery) {
// Note: `ListSplitsQuery` builder enforces a non empty `index_uids` list.

let or_condition = query
.index_uids
.iter()
.fold(Cond::any(), |cond, index_uid| {
cond.add(Expr::col(Splits::IndexUid).eq(Expr::val(index_uid.to_string())))
});
sql.cond_where(or_condition);
sql.cond_where(
Expr::col(Splits::IndexUid).is_in(query.index_uids.iter().map(|val| val.to_string())),
);

if !query.split_states.is_empty() {
sql.cond_where(
Expand Down
67 changes: 67 additions & 0 deletions quickwit/quickwit-serve/src/index_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use quickwit_proto::metastore::{
MetastoreService, MetastoreServiceClient, ResetSourceCheckpointRequest, ToggleSourceRequest,
};
use quickwit_proto::types::IndexUid;
use quickwit_query::query_ast::{query_ast_from_user_text, QueryAst};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tracing::info;
Expand Down Expand Up @@ -88,6 +89,8 @@ pub fn index_management_handlers(
.or(delete_source_handler(index_service.metastore()))
// Tokenizer handlers.
.or(analyze_request_handler())
// Parse query into query AST handler.
.or(parse_query_request_handler())
}

fn json_body<T: DeserializeOwned + Send>(
Expand Down Expand Up @@ -866,6 +869,50 @@ async fn analyze_request(request: AnalyzeRequest) -> Result<serde_json::Value, I
Ok(json_value)
}

#[derive(Debug, Deserialize, utoipa::IntoParams, utoipa::ToSchema)]
struct ParseQueryRequest {
/// Query text. The query language is that of tantivy.
pub query: String,
// Fields to search on.
#[param(rename = "search_field")]
#[serde(default)]
#[serde(rename(deserialize = "search_field"))]
#[serde(deserialize_with = "from_simple_list")]
pub search_fields: Option<Vec<String>>,
}

fn parse_query_request_filter(
) -> impl Filter<Extract = (ParseQueryRequest,), Error = Rejection> + Clone {
warp::path!("parse-query")
.and(warp::post())
.and(warp::body::json())
}

fn parse_query_request_handler(
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
parse_query_request_filter()
.then(parse_query_request)
.and(extract_format_from_qs())
.map(into_rest_api_response)
}

/// Analyzes text with given tokenizer config and returns the list of tokens.
#[utoipa::path(
post,
tag = "parse_query",
path = "/parse_query",
request_body = ParseQueryRequest,
responses(
(status = 200, description = "Successfully parsed query into AST.")
),
)]
async fn parse_query_request(request: ParseQueryRequest) -> Result<QueryAst, IndexServiceError> {
let query_ast = query_ast_from_user_text(&request.query, request.search_fields)
.parse_user_query(&[])
.map_err(|err| IndexServiceError::Internal(err.to_string()))?;
Ok(query_ast)
}

#[cfg(test)]
mod tests {
use std::ops::{Bound, RangeInclusive};
Expand Down Expand Up @@ -1917,4 +1964,24 @@ mod tests {
expected: expected_response_json
);
}

#[tokio::test]
async fn test_parse_query_request() {
let metastore = MetastoreServiceClient::mock();
let index_service = IndexService::new(
MetastoreServiceClient::from(metastore),
StorageResolver::unconfigured(),
);
let index_management_handler =
super::index_management_handlers(index_service, Arc::new(NodeConfig::for_test()))
.recover(recover_fn);
let resp = warp::test::request()
.path("/parse-query")
.method("POST")
.json(&true)
.body(r#"{"query": "field:this AND field:that"}"#)
.reply(&index_management_handler)
.await;
assert_eq!(resp.status(), 200);
}
}

0 comments on commit 8829128

Please sign in to comment.