diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index 63150fc1f896..6d944a44c00a 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -98,6 +98,7 @@ impl WindowedSortPhysicalRule { } else { Arc::new(PartSortExec::new( first_sort_expr.clone(), + sort_exec.fetch(), scanner_info.partition_ranges.clone(), sort_exec.input().clone(), )) @@ -149,7 +150,7 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult() { partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges()); - time_index = region_scan_exec.time_index(); + time_index = Some(region_scan_exec.time_index()); tag_columns = Some(region_scan_exec.tag_columns()); // set distinguish_partition_ranges to true, this is an incorrect workaround diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 2b258187b5b6..2828db202d9d 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -47,6 +47,7 @@ use crate::downcast_ts_array; pub struct PartSortExec { /// Physical sort expressions(that is, sort by timestamp) expression: PhysicalSortExpr, + limit: Option, input: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, @@ -57,6 +58,7 @@ pub struct PartSortExec { impl PartSortExec { pub fn new( expression: PhysicalSortExpr, + limit: Option, partition_ranges: Vec>, input: Arc, ) -> Self { @@ -69,6 +71,7 @@ impl PartSortExec { Self { expression, + limit, input, metrics, partition_ranges, @@ -95,6 +98,7 @@ impl PartSortExec { let df_stream = Box::pin(PartSortStream::new( context, self, + self.limit, input_stream, self.partition_ranges[partition].clone(), partition, @@ -106,7 +110,16 @@ impl PartSortExec { impl DisplayAs for PartSortExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "PartSortExec {}", self.expression) + write!( + f, + "PartSortExec: expr={} num_ranges={}", + self.expression, + self.partition_ranges.len(), + )?; + if let Some(limit) = self.limit { + write!(f, " limit={}", limit)?; + } + Ok(()) } } @@ -138,6 +151,7 @@ impl ExecutionPlan for PartSortExec { }; Ok(Arc::new(Self::new( self.expression.clone(), + self.limit, self.partition_ranges.clone(), new_input.clone(), ))) @@ -170,6 +184,7 @@ struct PartSortStream { reservation: MemoryReservation, buffer: Vec, expression: PhysicalSortExpr, + limit: Option, produced: usize, input: DfSendableRecordBatchStream, input_complete: bool, @@ -185,6 +200,7 @@ impl PartSortStream { fn new( context: Arc, sort: &PartSortExec, + limit: Option, input: DfSendableRecordBatchStream, partition_ranges: Vec, partition: usize, @@ -194,6 +210,7 @@ impl PartSortStream { .register(&context.runtime_env().memory_pool), buffer: Vec::new(), expression: sort.expression.clone(), + limit, produced: 0, input, input_complete: false, @@ -294,7 +311,7 @@ impl PartSortStream { ) })?; - let indices = sort_to_indices(&sort_column, opt, None).map_err(|e| { + let indices = sort_to_indices(&sort_column, opt, self.limit).map_err(|e| { DataFusionError::ArrowError( e, Some(format!("Fail to sort to indices at {}", location!())), @@ -674,6 +691,7 @@ mod test { expr: Arc::new(Column::new("ts", 0)), options: opt, }, + None, vec![ranges], Arc::new(mock_input), ); diff --git a/src/query/src/window_sort.rs b/src/query/src/window_sort.rs index 38b64e29aaa0..435a255beb95 100644 --- a/src/query/src/window_sort.rs +++ b/src/query/src/window_sort.rs @@ -169,7 +169,16 @@ impl WindowedSortExec { impl DisplayAs for WindowedSortExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "WindowedSortExec") + write!( + f, + "WindowedSortExec: expr={} num_ranges={}", + self.expression, + self.ranges.len() + )?; + if let Some(fetch) = self.fetch { + write!(f, " fetch={}", fetch)?; + } + Ok(()) } } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index cc94a054de84..0eac7c0c354f 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -146,13 +146,15 @@ impl RegionScanExec { let _ = scanner.prepare(partition_ranges, distinguish_partition_range); } - pub fn time_index(&self) -> Option { + pub fn time_index(&self) -> String { self.scanner .lock() .unwrap() - .schema() - .timestamp_column() - .map(|x| x.name.clone()) + .metadata() + .time_index_column() + .column_schema + .name + .clone() } pub fn tag_columns(&self) -> Vec { diff --git a/tests/cases/standalone/common/order/windowed_sort.result b/tests/cases/standalone/common/order/windowed_sort.result index 9ecec83d2053..13b3503fb943 100644 --- a/tests/cases/standalone/common/order/windowed_sort.result +++ b/tests/cases/standalone/common/order/windowed_sort.result @@ -69,7 +69,7 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t LIMIT 5; |_|_|_| | 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED |_|_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST] REDACTED -|_|_|_WindowedSortExec REDACTED +|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=2 fetch=5 REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED |_|_|_| |_|_| Total rows: 5_| @@ -101,8 +101,8 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5; |_|_|_| | 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED |_|_|_SortPreservingMergeExec: [t@1 DESC] REDACTED -|_|_|_WindowedSortExec REDACTED -|_|_|_PartSortExec t@1 DESC REDACTED +|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=2 fetch=5 REDACTED +|_|_|_PartSortExec: expr=t@1 DESC num_ranges=2 limit=5 REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED |_|_|_| |_|_| Total rows: 5_| @@ -183,8 +183,8 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t LIMIT 5; |_|_|_| | 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED |_|_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST] REDACTED -|_|_|_WindowedSortExec REDACTED -|_|_|_PartSortExec t@2 ASC NULLS LAST REDACTED +|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=2 fetch=5 REDACTED +|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=2 limit=5 REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED |_|_|_| |_|_| Total rows: 5_| @@ -216,8 +216,8 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5; |_|_|_| | 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED |_|_|_SortPreservingMergeExec: [t@2 DESC] REDACTED -|_|_|_WindowedSortExec REDACTED -|_|_|_PartSortExec t@2 DESC REDACTED +|_|_|_WindowedSortExec: expr=t@2 DESC num_ranges=2 fetch=5 REDACTED +|_|_|_PartSortExec: expr=t@2 DESC num_ranges=2 limit=5 REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED |_|_|_| |_|_| Total rows: 5_|