Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(log-query): implement pagination with limit and offset parameters #5241

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/log-query/src/log_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub struct LogQuery {
pub time_filter: TimeFilter,
/// Columns with filters to query.
pub columns: Vec<ColumnFilters>,
/// Maximum number of logs to return. If not provided, it will return all matched logs.
pub limit: Option<usize>,
/// Controls row skipping and fetch count for logs.
pub limit: Limit,
/// Adjacent lines to return.
pub context: Context,
}
Expand All @@ -42,7 +42,7 @@ impl Default for LogQuery {
table: TableName::new("", "", ""),
time_filter: Default::default(),
columns: vec![],
limit: None,
limit: Limit::default(),
context: Default::default(),
}
}
Expand Down Expand Up @@ -266,6 +266,15 @@ pub enum Context {
Seconds(usize, usize),
}

/// Represents limit and offset parameters for query pagination.
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Limit {
/// Optional number of items to skip before starting to return results
pub skip: Option<usize>,
/// Optional number of items to return after skipping
pub fetch: Option<usize>,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
78 changes: 75 additions & 3 deletions src/query/src/log_query/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ impl LogQueryPlanner {

// Apply limit
plan_builder = plan_builder
.limit(0, query.limit.or(Some(DEFAULT_LIMIT)))
.limit(
query.limit.skip.unwrap_or(0),
Some(query.limit.fetch.unwrap_or(DEFAULT_LIMIT)),
)
.context(DataFusionPlanningSnafu)?;

// Build the final plan
Expand Down Expand Up @@ -179,7 +182,7 @@ mod tests {
use common_query::test_util::DummyDecoder;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaRef};
use log_query::{ContentFilter, Context};
use log_query::{ContentFilter, Context, Limit};
use session::context::QueryContext;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::table_name::TableName;
Expand Down Expand Up @@ -268,7 +271,10 @@ mod tests {
column_name: "message".to_string(),
filters: vec![ContentFilter::Contains("error".to_string())],
}],
limit: Some(100),
limit: Limit {
skip: None,
fetch: Some(100),
},
context: Context::None,
};

Expand Down Expand Up @@ -361,6 +367,72 @@ mod tests {
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
}

#[tokio::test]
async fn test_query_to_plan_with_only_skip() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let mut planner = LogQueryPlanner::new(table_provider);

let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
time_filter: TimeFilter {
start: Some("2021-01-01T00:00:00Z".to_string()),
end: Some("2021-01-02T00:00:00Z".to_string()),
span: None,
},
columns: vec![ColumnFilters {
column_name: "message".to_string(),
filters: vec![ContentFilter::Contains("error".to_string())],
}],
limit: Limit {
skip: Some(10),
fetch: None,
},
context: Context::None,
};

let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Limit: skip=10, fetch=1000 [message:Utf8]\
\n Projection: greptime.public.test_table.message [message:Utf8]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";

assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[tokio::test]
async fn test_query_to_plan_without_limit() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let mut planner = LogQueryPlanner::new(table_provider);

let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
time_filter: TimeFilter {
start: Some("2021-01-01T00:00:00Z".to_string()),
end: Some("2021-01-02T00:00:00Z".to_string()),
span: None,
},
columns: vec![ColumnFilters {
column_name: "message".to_string(),
filters: vec![ContentFilter::Contains("error".to_string())],
}],
limit: Limit {
skip: None,
fetch: None,
},
context: Context::None,
};

let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Limit: skip=0, fetch=1000 [message:Utf8]\
\n Projection: greptime.public.test_table.message [message:Utf8]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";

assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[test]
fn test_escape_pattern() {
assert_eq!(escape_like_pattern("test"), "test");
Expand Down
Loading