From 5c0c4e9190a776f7a2c5475df571e88fc9daad6b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 26 Dec 2024 17:00:40 +0800 Subject: [PATCH] feat(log-query): implement pagination with limit and offset parameters Signed-off-by: Ruihang Xia --- src/log-query/src/log_query.rs | 15 ++++-- src/query/src/log_query/planner.rs | 78 ++++++++++++++++++++++++++++-- 2 files changed, 87 insertions(+), 6 deletions(-) diff --git a/src/log-query/src/log_query.rs b/src/log-query/src/log_query.rs index 24ad3e622042..a79efc78eeca 100644 --- a/src/log-query/src/log_query.rs +++ b/src/log-query/src/log_query.rs @@ -30,8 +30,8 @@ pub struct LogQuery { pub time_filter: TimeFilter, /// Columns with filters to query. pub columns: Vec, - /// Maximum number of logs to return. If not provided, it will return all matched logs. - pub limit: Option, + /// Controls row skipping and fetch count for logs. + pub limit: Limit, /// Adjacent lines to return. pub context: Context, } @@ -42,7 +42,7 @@ impl Default for LogQuery { table: TableName::new("", "", ""), time_filter: Default::default(), columns: vec![], - limit: None, + limit: Limit::default(), context: Default::default(), } } @@ -266,6 +266,15 @@ pub enum Context { Seconds(usize, usize), } +/// Represents limit and offset parameters for query pagination. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct Limit { + /// Optional number of items to skip before starting to return results + pub skip: Option, + /// Optional number of items to return after skipping + pub fetch: Option, +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/query/src/log_query/planner.rs b/src/query/src/log_query/planner.rs index b5d4e385fbcb..e19356e44400 100644 --- a/src/query/src/log_query/planner.rs +++ b/src/query/src/log_query/planner.rs @@ -93,7 +93,10 @@ impl LogQueryPlanner { // Apply limit plan_builder = plan_builder - .limit(0, query.limit.or(Some(DEFAULT_LIMIT))) + .limit( + query.limit.skip.unwrap_or(0), + Some(query.limit.fetch.unwrap_or(DEFAULT_LIMIT)), + ) .context(DataFusionPlanningSnafu)?; // Build the final plan @@ -179,7 +182,7 @@ mod tests { use common_query::test_util::DummyDecoder; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaRef}; - use log_query::{ContentFilter, Context}; + use log_query::{ContentFilter, Context, Limit}; use session::context::QueryContext; use table::metadata::{TableInfoBuilder, TableMetaBuilder}; use table::table_name::TableName; @@ -268,7 +271,10 @@ mod tests { column_name: "message".to_string(), filters: vec![ContentFilter::Contains("error".to_string())], }], - limit: Some(100), + limit: Limit { + skip: None, + fetch: Some(100), + }, context: Context::None, }; @@ -361,6 +367,72 @@ mod tests { assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr)); } + #[tokio::test] + async fn test_query_to_plan_with_only_skip() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let mut planner = LogQueryPlanner::new(table_provider); + + let log_query = LogQuery { + table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"), + time_filter: TimeFilter { + start: Some("2021-01-01T00:00:00Z".to_string()), + end: Some("2021-01-02T00:00:00Z".to_string()), + span: None, + }, + columns: vec![ColumnFilters { + column_name: "message".to_string(), + filters: vec![ContentFilter::Contains("error".to_string())], + }], + limit: Limit { + skip: Some(10), + fetch: None, + }, + context: Context::None, + }; + + let plan = planner.query_to_plan(log_query).await.unwrap(); + let expected = "Limit: skip=10, fetch=1000 [message:Utf8]\ +\n Projection: greptime.public.test_table.message [message:Utf8]\ +\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ +\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + + assert_eq!(plan.display_indent_schema().to_string(), expected); + } + + #[tokio::test] + async fn test_query_to_plan_without_limit() { + let table_provider = + build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await; + let mut planner = LogQueryPlanner::new(table_provider); + + let log_query = LogQuery { + table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"), + time_filter: TimeFilter { + start: Some("2021-01-01T00:00:00Z".to_string()), + end: Some("2021-01-02T00:00:00Z".to_string()), + span: None, + }, + columns: vec![ColumnFilters { + column_name: "message".to_string(), + filters: vec![ContentFilter::Contains("error".to_string())], + }], + limit: Limit { + skip: None, + fetch: None, + }, + context: Context::None, + }; + + let plan = planner.query_to_plan(log_query).await.unwrap(); + let expected = "Limit: skip=0, fetch=1000 [message:Utf8]\ +\n Projection: greptime.public.test_table.message [message:Utf8]\ +\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\ +\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]"; + + assert_eq!(plan.display_indent_schema().to_string(), expected); + } + #[test] fn test_escape_pattern() { assert_eq!(escape_like_pattern("test"), "test");